Skip to content

Commit 3e6aa8b

Browse files
committed
parse proc status to get PID mappings
1 parent 31e7ac0 commit 3e6aa8b

File tree

2 files changed

+114
-17
lines changed

2 files changed

+114
-17
lines changed

pkg/estimator/profile.go

Lines changed: 80 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package estimator
77

88
import (
99
"bufio"
10+
"bytes"
1011
"context"
1112
"database/sql"
1213
"fmt"
@@ -99,6 +100,7 @@ type Profiler struct {
99100
readBytes uint64
100101
startReadBlocks uint64
101102
blockSize uint64
103+
pidMapping map[string]int
102104
once sync.Once
103105
exitChan chan struct{}
104106
}
@@ -120,6 +122,7 @@ func NewProfiler(conn pgxtype.Querier, opts TraceOptions) *Profiler {
120122
waitEventPercents: make(map[string]float64),
121123
exitChan: make(chan struct{}),
122124
blockSize: defaultBlockSize,
125+
pidMapping: make(map[string]int),
123126
}
124127
}
125128

@@ -284,14 +287,31 @@ func (p *Profiler) scanOutput(ctx context.Context, r io.Reader) {
284287
scanner := bufio.NewScanner(r)
285288

286289
for scanner.Scan() {
287-
parsedReadBytes := p.parseReadBytes(scanner.Bytes())
288-
if parsedReadBytes == 0 {
290+
bytesEntry := p.parseReadBytes(scanner.Bytes())
291+
if bytesEntry == nil || bytesEntry.totalBytes == 0 {
289292
continue
290293
}
291294

292-
log.Dbg("read bytes: ", parsedReadBytes)
295+
pid, ok := p.pidMapping[bytesEntry.pid]
296+
if !ok {
297+
hostPID, err := p.filterPID(bytesEntry.pid)
298+
p.pidMapping[bytesEntry.pid] = hostPID
293299

294-
atomic.AddUint64(&p.readBytes, parsedReadBytes)
300+
if err != nil {
301+
// log.Err("failed to get PID mapping")
302+
continue
303+
}
304+
305+
pid = hostPID
306+
}
307+
308+
if pid != p.opts.Pid {
309+
continue
310+
}
311+
312+
log.Dbg("read bytes: ", bytesEntry.totalBytes)
313+
314+
atomic.AddUint64(&p.readBytes, bytesEntry.totalBytes)
295315

296316
select {
297317
case <-ctx.Done():
@@ -308,29 +328,72 @@ func (p *Profiler) scanOutput(ctx context.Context, r io.Reader) {
308328
}
309329

310330
const (
311-
regExp = "^[.0-9]+\\s+\\S+\\s+(\\d+)\\s+\\w+\\s+(W|R)\\s+\\d+\\s+(\\d+)\\s+[.0-9]+$"
312-
countMatches = 4
331+
regExp = "^[.0-9]+\\s+\\S+\\s+(\\d+)\\s+\\w+\\s+(W|R)\\s+\\d+\\s+(\\d+)\\s+[.0-9]+$"
332+
countMatches = 4
333+
expectedMappingParts = 2
313334
)
314335

315-
var r = regexp.MustCompile(regExp)
336+
var (
337+
r = regexp.MustCompile(regExp)
338+
nsPrefix = []byte("NSpid:")
339+
)
316340

317-
func (p *Profiler) parseReadBytes(line []byte) uint64 {
318-
submatch := r.FindSubmatch(line)
319-
if len(submatch) != countMatches {
320-
return 0
341+
type bytesEntry struct {
342+
pid string
343+
totalBytes uint64
344+
}
345+
346+
func (p *Profiler) filterPID(pid string) (int, error) {
347+
procStatus, err := exec.Command("cat", "/host_proc/"+pid+"/status").Output()
348+
if err != nil {
349+
return 0, err
350+
}
351+
352+
return p.parsePIDMapping(procStatus)
353+
}
354+
355+
func (p *Profiler) parsePIDMapping(procStatus []byte) (int, error) {
356+
sc := bufio.NewScanner(bytes.NewBuffer(procStatus))
357+
358+
for sc.Scan() {
359+
line := sc.Bytes()
360+
if !bytes.HasPrefix(line, nsPrefix) {
361+
continue
362+
}
363+
364+
val := bytes.TrimSpace(bytes.TrimPrefix(line, nsPrefix))
365+
366+
pidValues := bytes.SplitN(val, []byte(" "), expectedMappingParts)
367+
if len(pidValues) < expectedMappingParts {
368+
return 0, nil
369+
}
370+
371+
hostPID, err := strconv.Atoi(string(pidValues[1]))
372+
if err != nil {
373+
return 0, err
374+
}
375+
376+
return hostPID, nil
321377
}
322378

323-
pid, err := strconv.Atoi(string(submatch[1]))
324-
if err != nil || p.opts.Pid != pid {
325-
return 0
379+
return 0, nil
380+
}
381+
382+
func (p *Profiler) parseReadBytes(line []byte) *bytesEntry {
383+
submatch := r.FindSubmatch(line)
384+
if len(submatch) != countMatches {
385+
return nil
326386
}
327387

328-
bytes, err := strconv.ParseUint(string(submatch[3]), 10, 64)
388+
totalBytes, err := strconv.ParseUint(string(submatch[3]), 10, 64)
329389
if err != nil {
330-
return 0
390+
return nil
331391
}
332392

333-
return bytes
393+
return &bytesEntry{
394+
pid: string(submatch[1]),
395+
totalBytes: totalBytes,
396+
}
334397
}
335398

336399
// resetCounters deletes all entries from the maps.

pkg/estimator/profile_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package estimator
33
import (
44
"bytes"
55
"context"
6+
"strconv"
67
"testing"
78

89
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
911
)
1012

1113
const sample = `
@@ -87,6 +89,9 @@ func TestOutputScanner(t *testing.T) {
8789
for _, tc := range testCases {
8890
r := bytes.NewReader([]byte(sample))
8991
p := Profiler{
92+
pidMapping: map[string]int{
93+
strconv.Itoa(tc.pid): tc.pid,
94+
},
9095
opts: TraceOptions{
9196
Pid: tc.pid,
9297
},
@@ -97,3 +102,32 @@ func TestOutputScanner(t *testing.T) {
97102
assert.Equal(t, tc.readBytes, p.readBytes)
98103
}
99104
}
105+
106+
const procStatus = `
107+
Name: postgres
108+
Umask: 0077 State: S (sleeping)
109+
Tgid: 2752157
110+
Ngid: 0
111+
Pid: 2752157
112+
PPid: 2747061
113+
TracerPid: 0
114+
Uid: 999 999 999 999
115+
Gid: 999 999 999 999
116+
FDSize: 64
117+
Groups: 101
118+
NStgid: 2752157 674
119+
NSpid: 2752157 674
120+
NSpgid: 2752157 674
121+
NSsid: 2752157 674
122+
VmPeak: 2316996 kB
123+
VmSize: 2315104 kB
124+
`
125+
126+
func TestProcStatParsing(t *testing.T) {
127+
p := Profiler{}
128+
129+
hostPID, err := p.parsePIDMapping([]byte(procStatus))
130+
131+
require.Nil(t, err)
132+
assert.Equal(t, 674, hostPID)
133+
}

0 commit comments

Comments
 (0)