Skip to content

Commit 1fea01a

Browse files
committed
clean up code
1 parent 90bfc52 commit 1fea01a

File tree

2 files changed

+56
-52
lines changed

2 files changed

+56
-52
lines changed

pkg/estimator/monitor.go

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ import (
2121
)
2222

2323
const (
24-
regExp = "^[.0-9]+\\s+\\S+\\s+(\\d+)\\s+\\w+\\s+(W|R)\\s+\\d+\\s+(\\d+)\\s+[.0-9]+$"
25-
countMatches = 4
26-
expectedMappingParts = 2
24+
regExp = "^[.0-9]+\\s+\\S+\\s+(\\d+)\\s+\\w+\\s+(W|R)\\s+\\d+\\s+(\\d+)\\s+[.0-9]+$"
25+
countMatches = 4
26+
expectedMappingParts = 2
27+
procDir = "host_proc"
28+
parallelWorkerCmdline = "parallel worker for PID "
2729
)
2830

2931
var (
@@ -51,7 +53,7 @@ func NewMonitor(pid int, container string, profiler *Profiler) *Monitor {
5153

5254
// InspectIOBlocks counts physically read blocks.
5355
func (m *Monitor) InspectIOBlocks(ctx context.Context) error {
54-
log.Dbg("Run read physical")
56+
log.Dbg("Start IO inspection")
5557

5658
cmd := exec.Command("biosnoop")
5759

@@ -70,7 +72,7 @@ func (m *Monitor) InspectIOBlocks(ctx context.Context) error {
7072

7173
<-m.profiler.exitChan
7274

73-
log.Dbg("End read physical")
75+
log.Dbg("Finish IO inspection")
7476

7577
return nil
7678
}
@@ -97,11 +99,10 @@ func (m *Monitor) scanOutput(ctx context.Context, r io.Reader) {
9799

98100
pid, ok := m.pidMapping[bytesEntry.pid]
99101
if !ok {
100-
hostPID, err := m.filterPID(bytesEntry.pid)
102+
hostPID, err := m.detectReferencedPID(bytesEntry.pid)
101103
m.pidMapping[bytesEntry.pid] = hostPID
102104

103105
if err != nil {
104-
// log.Dbg("failed to get PID mapping: ", err)
105106
continue
106107
}
107108

@@ -112,87 +113,54 @@ func (m *Monitor) scanOutput(ctx context.Context, r io.Reader) {
112113
continue
113114
}
114115

115-
log.Dbg("read bytes: ", bytesEntry.totalBytes)
116-
117116
atomic.AddUint64(&m.profiler.readBytes, bytesEntry.totalBytes)
118117

119118
select {
120119
case <-ctx.Done():
121-
log.Dbg("context")
120+
log.Dbg(ctx.Err().Error())
122121
return
123122

124123
case <-m.profiler.exitChan:
125-
log.Dbg("exit chan")
124+
log.Dbg("finish to scan IO entries")
126125
return
127126

128127
default:
129128
}
130129
}
131130
}
132131

133-
func getContainerHash(pid int) (string, error) {
134-
procParallel, err := exec.Command("cat", fmt.Sprintf("/host_proc/%d/cgroup", pid)).Output()
135-
if err != nil {
136-
return "", err
137-
}
138-
139-
return isInside(procParallel), nil
140-
}
141-
142-
func isInside(procParallel []byte) string {
143-
sc := bufio.NewScanner(bytes.NewBuffer(procParallel))
144-
145-
for sc.Scan() {
146-
line := sc.Bytes()
147-
148-
if !bytes.HasPrefix(line, []byte("1:name")) {
149-
continue
150-
}
151-
152-
res := bytes.SplitN(line, []byte("/docker/"), 2)
153-
154-
if len(res) == 1 {
155-
return ""
156-
}
157-
158-
return string(res[1])
159-
}
160-
161-
return ""
162-
}
163-
164-
func (m *Monitor) isValidContainer(hash string) bool {
165-
return m.container == hash
166-
}
167-
168-
func (m *Monitor) filterPID(pid int) (int, error) {
132+
func (m *Monitor) detectReferencedPID(pid int) (int, error) {
169133
hash, err := getContainerHash(pid)
170134
if err != nil {
171135
return 0, err
172136
}
173137

174-
if !m.isValidContainer(hash) {
138+
if hash == "" || !m.isAppropriateContainer(hash) {
175139
return 0, nil
176140
}
177141

178-
procParallel, err := exec.Command("cat", fmt.Sprintf("/host_proc/%d/cmdline", pid)).Output()
142+
procParallel, err := exec.Command("cat", fmt.Sprintf("/%s/%d/cmdline", procDir, pid)).Output()
179143
if err != nil {
180144
return 0, err
181145
}
182146

183147
if bytes.Contains(procParallel, []byte("postgres")) &&
184-
bytes.Contains(procParallel, []byte("parallel worker for PID "+strconv.Itoa(m.pid))) {
148+
bytes.Contains(procParallel, []byte(parallelWorkerCmdline+strconv.Itoa(m.pid))) {
185149
return m.pid, nil
186150
}
187151

188-
procStatus, err := exec.Command("cat", fmt.Sprintf("/host_proc/%d/status", pid)).Output()
152+
procStatus, err := exec.Command("cat", fmt.Sprintf("/%s/%d/status", procDir, pid)).Output()
189153
if err != nil {
190154
return 0, err
191155
}
192156

193157
return m.parsePIDMapping(procStatus)
194158
}
195159

160+
func (m *Monitor) isAppropriateContainer(hash string) bool {
161+
return m.container == hash
162+
}
163+
196164
func (m *Monitor) parsePIDMapping(procStatus []byte) (int, error) {
197165
sc := bufio.NewScanner(bytes.NewBuffer(procStatus))
198166

@@ -241,3 +209,39 @@ func (m *Monitor) parseReadBytes(line []byte) *bytesEntry {
241209
totalBytes: totalBytes,
242210
}
243211
}
212+
213+
func getContainerHash(pid int) (string, error) {
214+
procParallel, err := exec.Command("cat", fmt.Sprintf("/%s/%d/cgroup", procDir, pid)).Output()
215+
if err != nil {
216+
return "", err
217+
}
218+
219+
return detectContainerHash(procParallel), nil
220+
}
221+
222+
const (
223+
procNamePrefix = "1:name"
224+
procDockerEntry = "/docker/"
225+
)
226+
227+
func detectContainerHash(procParallel []byte) string {
228+
sc := bufio.NewScanner(bytes.NewBuffer(procParallel))
229+
230+
for sc.Scan() {
231+
line := sc.Bytes()
232+
233+
if !bytes.HasPrefix(line, []byte(procNamePrefix)) {
234+
continue
235+
}
236+
237+
procNameLine := bytes.SplitN(line, []byte(procDockerEntry), 2)
238+
239+
if len(procNameLine) == 1 {
240+
return ""
241+
}
242+
243+
return string(procNameLine[1])
244+
}
245+
246+
return ""
247+
}

pkg/estimator/monitor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ const procCgroup = `
143143
`
144144

145145
func TestIsInside(t *testing.T) {
146-
containerHash := isInside([]byte(procCgroup))
146+
containerHash := detectContainerHash([]byte(procCgroup))
147147

148148
assert.Equal(t, "ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b", containerHash)
149149
}

0 commit comments

Comments
 (0)