@@ -21,9 +21,11 @@ import (
21
21
)
22
22
23
23
const (
24
- regExp = "^[.0-9]+\\ s+\\ S+\\ s+(\\ d+)\\ s+\\ w+\\ s+(W|R)\\ s+\\ d+\\ s+(\\ d+)\\ s+[.0-9]+$"
25
- countMatches = 4
26
- expectedMappingParts = 2
24
+ regExp = "^[.0-9]+\\ s+\\ S+\\ s+(\\ d+)\\ s+\\ w+\\ s+(W|R)\\ s+\\ d+\\ s+(\\ d+)\\ s+[.0-9]+$"
25
+ countMatches = 4
26
+ expectedMappingParts = 2
27
+ procDir = "host_proc"
28
+ parallelWorkerCmdline = "parallel worker for PID "
27
29
)
28
30
29
31
var (
@@ -51,7 +53,7 @@ func NewMonitor(pid int, container string, profiler *Profiler) *Monitor {
51
53
52
54
// InspectIOBlocks counts physically read blocks.
53
55
func (m * Monitor ) InspectIOBlocks (ctx context.Context ) error {
54
- log .Dbg ("Run read physical " )
56
+ log .Dbg ("Start IO inspection " )
55
57
56
58
cmd := exec .Command ("biosnoop" )
57
59
@@ -70,7 +72,7 @@ func (m *Monitor) InspectIOBlocks(ctx context.Context) error {
70
72
71
73
<- m .profiler .exitChan
72
74
73
- log .Dbg ("End read physical " )
75
+ log .Dbg ("Finish IO inspection " )
74
76
75
77
return nil
76
78
}
@@ -97,11 +99,10 @@ func (m *Monitor) scanOutput(ctx context.Context, r io.Reader) {
97
99
98
100
pid , ok := m .pidMapping [bytesEntry .pid ]
99
101
if ! ok {
100
- hostPID , err := m .filterPID (bytesEntry .pid )
102
+ hostPID , err := m .detectReferencedPID (bytesEntry .pid )
101
103
m .pidMapping [bytesEntry .pid ] = hostPID
102
104
103
105
if err != nil {
104
- // log.Dbg("failed to get PID mapping: ", err)
105
106
continue
106
107
}
107
108
@@ -112,87 +113,54 @@ func (m *Monitor) scanOutput(ctx context.Context, r io.Reader) {
112
113
continue
113
114
}
114
115
115
- log .Dbg ("read bytes: " , bytesEntry .totalBytes )
116
-
117
116
atomic .AddUint64 (& m .profiler .readBytes , bytesEntry .totalBytes )
118
117
119
118
select {
120
119
case <- ctx .Done ():
121
- log .Dbg ("context" )
120
+ log .Dbg (ctx . Err (). Error () )
122
121
return
123
122
124
123
case <- m .profiler .exitChan :
125
- log .Dbg ("exit chan " )
124
+ log .Dbg ("finish to scan IO entries " )
126
125
return
127
126
128
127
default :
129
128
}
130
129
}
131
130
}
132
131
133
- func getContainerHash (pid int ) (string , error ) {
134
- procParallel , err := exec .Command ("cat" , fmt .Sprintf ("/host_proc/%d/cgroup" , pid )).Output ()
135
- if err != nil {
136
- return "" , err
137
- }
138
-
139
- return isInside (procParallel ), nil
140
- }
141
-
142
- func isInside (procParallel []byte ) string {
143
- sc := bufio .NewScanner (bytes .NewBuffer (procParallel ))
144
-
145
- for sc .Scan () {
146
- line := sc .Bytes ()
147
-
148
- if ! bytes .HasPrefix (line , []byte ("1:name" )) {
149
- continue
150
- }
151
-
152
- res := bytes .SplitN (line , []byte ("/docker/" ), 2 )
153
-
154
- if len (res ) == 1 {
155
- return ""
156
- }
157
-
158
- return string (res [1 ])
159
- }
160
-
161
- return ""
162
- }
163
-
164
- func (m * Monitor ) isValidContainer (hash string ) bool {
165
- return m .container == hash
166
- }
167
-
168
- func (m * Monitor ) filterPID (pid int ) (int , error ) {
132
+ func (m * Monitor ) detectReferencedPID (pid int ) (int , error ) {
169
133
hash , err := getContainerHash (pid )
170
134
if err != nil {
171
135
return 0 , err
172
136
}
173
137
174
- if ! m .isValidContainer (hash ) {
138
+ if hash == "" || ! m .isAppropriateContainer (hash ) {
175
139
return 0 , nil
176
140
}
177
141
178
- procParallel , err := exec .Command ("cat" , fmt .Sprintf ("/host_proc /%d/cmdline" , pid )).Output ()
142
+ procParallel , err := exec .Command ("cat" , fmt .Sprintf ("/%s /%d/cmdline" , procDir , pid )).Output ()
179
143
if err != nil {
180
144
return 0 , err
181
145
}
182
146
183
147
if bytes .Contains (procParallel , []byte ("postgres" )) &&
184
- bytes .Contains (procParallel , []byte ("parallel worker for PID " + strconv .Itoa (m .pid ))) {
148
+ bytes .Contains (procParallel , []byte (parallelWorkerCmdline + strconv .Itoa (m .pid ))) {
185
149
return m .pid , nil
186
150
}
187
151
188
- procStatus , err := exec .Command ("cat" , fmt .Sprintf ("/host_proc /%d/status" , pid )).Output ()
152
+ procStatus , err := exec .Command ("cat" , fmt .Sprintf ("/%s /%d/status" , procDir , pid )).Output ()
189
153
if err != nil {
190
154
return 0 , err
191
155
}
192
156
193
157
return m .parsePIDMapping (procStatus )
194
158
}
195
159
160
+ func (m * Monitor ) isAppropriateContainer (hash string ) bool {
161
+ return m .container == hash
162
+ }
163
+
196
164
func (m * Monitor ) parsePIDMapping (procStatus []byte ) (int , error ) {
197
165
sc := bufio .NewScanner (bytes .NewBuffer (procStatus ))
198
166
@@ -241,3 +209,39 @@ func (m *Monitor) parseReadBytes(line []byte) *bytesEntry {
241
209
totalBytes : totalBytes ,
242
210
}
243
211
}
212
+
213
+ func getContainerHash (pid int ) (string , error ) {
214
+ procParallel , err := exec .Command ("cat" , fmt .Sprintf ("/%s/%d/cgroup" , procDir , pid )).Output ()
215
+ if err != nil {
216
+ return "" , err
217
+ }
218
+
219
+ return detectContainerHash (procParallel ), nil
220
+ }
221
+
222
+ const (
223
+ procNamePrefix = "1:name"
224
+ procDockerEntry = "/docker/"
225
+ )
226
+
227
+ func detectContainerHash (procParallel []byte ) string {
228
+ sc := bufio .NewScanner (bytes .NewBuffer (procParallel ))
229
+
230
+ for sc .Scan () {
231
+ line := sc .Bytes ()
232
+
233
+ if ! bytes .HasPrefix (line , []byte (procNamePrefix )) {
234
+ continue
235
+ }
236
+
237
+ procNameLine := bytes .SplitN (line , []byte (procDockerEntry ), 2 )
238
+
239
+ if len (procNameLine ) == 1 {
240
+ return ""
241
+ }
242
+
243
+ return string (procNameLine [1 ])
244
+ }
245
+
246
+ return ""
247
+ }
0 commit comments