@@ -7,6 +7,7 @@ package estimator
7
7
8
8
import (
9
9
"bufio"
10
+ "bytes"
10
11
"context"
11
12
"database/sql"
12
13
"fmt"
@@ -99,6 +100,7 @@ type Profiler struct {
99
100
readBytes uint64
100
101
startReadBlocks uint64
101
102
blockSize uint64
103
+ pidMapping map [string ]int
102
104
once sync.Once
103
105
exitChan chan struct {}
104
106
}
@@ -120,6 +122,7 @@ func NewProfiler(conn pgxtype.Querier, opts TraceOptions) *Profiler {
120
122
waitEventPercents : make (map [string ]float64 ),
121
123
exitChan : make (chan struct {}),
122
124
blockSize : defaultBlockSize ,
125
+ pidMapping : make (map [string ]int ),
123
126
}
124
127
}
125
128
@@ -284,14 +287,31 @@ func (p *Profiler) scanOutput(ctx context.Context, r io.Reader) {
284
287
scanner := bufio .NewScanner (r )
285
288
286
289
for scanner .Scan () {
287
- parsedReadBytes := p .parseReadBytes (scanner .Bytes ())
288
- if parsedReadBytes == 0 {
290
+ bytesEntry := p .parseReadBytes (scanner .Bytes ())
291
+ if bytesEntry == nil || bytesEntry . totalBytes == 0 {
289
292
continue
290
293
}
291
294
292
- log .Dbg ("read bytes: " , parsedReadBytes )
295
+ pid , ok := p .pidMapping [bytesEntry .pid ]
296
+ if ! ok {
297
+ hostPID , err := p .filterPID (bytesEntry .pid )
298
+ p .pidMapping [bytesEntry .pid ] = hostPID
293
299
294
- atomic .AddUint64 (& p .readBytes , parsedReadBytes )
300
+ if err != nil {
301
+ // log.Err("failed to get PID mapping")
302
+ continue
303
+ }
304
+
305
+ pid = hostPID
306
+ }
307
+
308
+ if pid != p .opts .Pid {
309
+ continue
310
+ }
311
+
312
+ log .Dbg ("read bytes: " , bytesEntry .totalBytes )
313
+
314
+ atomic .AddUint64 (& p .readBytes , bytesEntry .totalBytes )
295
315
296
316
select {
297
317
case <- ctx .Done ():
@@ -308,29 +328,72 @@ func (p *Profiler) scanOutput(ctx context.Context, r io.Reader) {
308
328
}
309
329
310
330
const (
311
- regExp = "^[.0-9]+\\ s+\\ S+\\ s+(\\ d+)\\ s+\\ w+\\ s+(W|R)\\ s+\\ d+\\ s+(\\ d+)\\ s+[.0-9]+$"
312
- countMatches = 4
331
+ regExp = "^[.0-9]+\\ s+\\ S+\\ s+(\\ d+)\\ s+\\ w+\\ s+(W|R)\\ s+\\ d+\\ s+(\\ d+)\\ s+[.0-9]+$"
332
+ countMatches = 4
333
+ expectedMappingParts = 2
313
334
)
314
335
315
- var r = regexp .MustCompile (regExp )
336
+ var (
337
+ r = regexp .MustCompile (regExp )
338
+ nsPrefix = []byte ("NSpid:" )
339
+ )
316
340
317
- func (p * Profiler ) parseReadBytes (line []byte ) uint64 {
318
- submatch := r .FindSubmatch (line )
319
- if len (submatch ) != countMatches {
320
- return 0
341
+ type bytesEntry struct {
342
+ pid string
343
+ totalBytes uint64
344
+ }
345
+
346
+ func (p * Profiler ) filterPID (pid string ) (int , error ) {
347
+ procStatus , err := exec .Command ("cat" , "/host_proc/" + pid + "/status" ).Output ()
348
+ if err != nil {
349
+ return 0 , err
350
+ }
351
+
352
+ return p .parsePIDMapping (procStatus )
353
+ }
354
+
355
+ func (p * Profiler ) parsePIDMapping (procStatus []byte ) (int , error ) {
356
+ sc := bufio .NewScanner (bytes .NewBuffer (procStatus ))
357
+
358
+ for sc .Scan () {
359
+ line := sc .Bytes ()
360
+ if ! bytes .HasPrefix (line , nsPrefix ) {
361
+ continue
362
+ }
363
+
364
+ val := bytes .TrimSpace (bytes .TrimPrefix (line , nsPrefix ))
365
+
366
+ pidValues := bytes .SplitN (val , []byte (" " ), expectedMappingParts )
367
+ if len (pidValues ) < expectedMappingParts {
368
+ return 0 , nil
369
+ }
370
+
371
+ hostPID , err := strconv .Atoi (string (pidValues [1 ]))
372
+ if err != nil {
373
+ return 0 , err
374
+ }
375
+
376
+ return hostPID , nil
321
377
}
322
378
323
- pid , err := strconv .Atoi (string (submatch [1 ]))
324
- if err != nil || p .opts .Pid != pid {
325
- return 0
379
+ return 0 , nil
380
+ }
381
+
382
+ func (p * Profiler ) parseReadBytes (line []byte ) * bytesEntry {
383
+ submatch := r .FindSubmatch (line )
384
+ if len (submatch ) != countMatches {
385
+ return nil
326
386
}
327
387
328
- bytes , err := strconv .ParseUint (string (submatch [3 ]), 10 , 64 )
388
+ totalBytes , err := strconv .ParseUint (string (submatch [3 ]), 10 , 64 )
329
389
if err != nil {
330
- return 0
390
+ return nil
331
391
}
332
392
333
- return bytes
393
+ return & bytesEntry {
394
+ pid : string (submatch [1 ]),
395
+ totalBytes : totalBytes ,
396
+ }
334
397
}
335
398
336
399
// resetCounters deletes all entries from the maps.
0 commit comments