Skip to content

Commit 0779bed

Browse files
committed
Add relabeling to probes
1 parent aa15bef commit 0779bed

3 files changed

Lines changed: 46 additions & 13 deletions

File tree

probes/bpf/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# probes BPF
2+
3+
`probe.bpf.c` is the kernel-side uprobe program. The compiled outputs
4+
`probe.bpf.amd64` and `probe.bpf.arm64` are produced by `make probes-bpf`
5+
in the parca-agent repository root and are git-ignored.
6+
7+
This README is committed so `//go:embed bpf` in `../loader.go` always has
8+
a valid embed target even before the BPF object is compiled.
9+
10+
Build dependencies: clang (>= 14, with the bpf target) and the libbpf
11+
headers (`bpf/bpf_helpers.h` etc., from libbpf-dev / libbpf-devel).

reporter/parca_reporter.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (r *ParcaReporter) ReportTraceEvent(trace *libpf.Trace,
243243
r.stacks.Add(trace.Hash, trace.Frames)
244244
}
245245

246-
labelRetrievalResult := r.labelsForTID(meta.TID, meta.PID, meta.Comm, meta.CPU, meta.EnvVars)
246+
labelRetrievalResult := r.labelsForTID(meta.TID, meta.PID, meta.Comm, meta.CPU, meta.Origin, meta.EnvVars)
247247

248248
if !labelRetrievalResult.keep {
249249
r.skippedByRelabeling.Inc()
@@ -595,7 +595,7 @@ func (r *ParcaReporter) addMetadataForPID(ctx context.Context, pid libpf.PID, lb
595595
return cache
596596
}
597597

598-
func (r *ParcaReporter) labelsForTID(tid, pid libpf.PID, comm libpf.String, cpu int, envVars map[libpf.String]libpf.String) labelRetrievalResult {
598+
func (r *ParcaReporter) labelsForTID(tid, pid libpf.PID, comm libpf.String, cpu int, origin libpf.Origin, envVars map[libpf.String]libpf.String) labelRetrievalResult {
599599
cached, hit := r.labels.Get(pid)
600600

601601
if !hit {
@@ -638,8 +638,15 @@ func (r *ParcaReporter) labelsForTID(tid, pid libpf.PID, comm libpf.String, cpu
638638
return cached
639639
}
640640

641-
// No per-sample labels to patch — return cached entry as-is.
642-
if r.disableCPULabel && r.disableThreadIDLabel && r.disableThreadCommLabel {
641+
// Probe samples additionally run through a per-sample relabel pass so
642+
// rules can derive custom labels (or drop) from per-sample fields. We
643+
// gate this on probe origin only — CPU/off-CPU/memory/cuda samples
644+
// keep the cheap "patch and ship" path (see commit 34c9ed7a).
645+
perSampleRelabel := origin == support.TraceOriginProbe && len(r.relabelConfigs) > 0
646+
647+
// Nothing per-sample to do: no patches and no per-sample relabel.
648+
if r.disableCPULabel && r.disableThreadIDLabel && r.disableThreadCommLabel &&
649+
!perSampleRelabel {
643650
return cached
644651
}
645652

@@ -655,9 +662,23 @@ func (r *ParcaReporter) labelsForTID(tid, pid libpf.PID, comm libpf.String, cpu
655662
lb.Set("thread_name", comm.String())
656663
}
657664

665+
// Per-sample relabel pass for probe samples. The per-PID pass already
666+
// ran against cached metadata; here the relabeler additionally sees
667+
// the final label names (thread_id, thread_name, cpu). Rules that only
668+
// consume per-PID inputs are idempotent across the two passes.
669+
keep := true
670+
if perSampleRelabel {
671+
keep = relabel.ProcessBuilder(lb, r.relabelConfigs...)
672+
lb.Range(func(l labels.Label) {
673+
if strings.HasPrefix(l.Name, model.MetaLabelPrefix) {
674+
lb.Del(l.Name)
675+
}
676+
})
677+
}
678+
658679
return labelRetrievalResult{
659680
labels: lb.Labels(),
660-
keep: true,
681+
keep: keep,
661682
}
662683
}
663684

reporter/parca_reporter_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
lru "github.com/elastic/go-freelru"
99
"github.com/stretchr/testify/require"
1010
"go.opentelemetry.io/ebpf-profiler/libpf"
11+
"go.opentelemetry.io/ebpf-profiler/support"
1112
)
1213

1314
const (
@@ -68,13 +69,13 @@ func TestLabelsForTID_CPUCacheMismatch(t *testing.T) {
6869
pid := libpf.PID(1000)
6970

7071
// First call: TID 1234 on CPU 1 — cache miss, labels built fresh.
71-
result1 := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 1, nil)
72+
result1 := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 1, support.TraceOriginSampling, nil)
7273
require.True(t, result1.keep)
7374
require.Equal(t, "1", result1.labels.Get("cpu"),
7475
"first call should set cpu=1")
7576

7677
// Second call: same TID on CPU 3 — should return cpu=3, not stale cpu=1.
77-
result2 := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 3, nil)
78+
result2 := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 3, support.TraceOriginSampling, nil)
7879
require.True(t, result2.keep)
7980
require.Equal(t, "3", result2.labels.Get("cpu"),
8081
"same TID on different CPU must return the actual cpu value")
@@ -91,7 +92,7 @@ func TestLabelsForTID_ThreadMigrationPattern(t *testing.T) {
9192
cpuSequence := []int{0, 1, 0, 3, 2, 1, 3, 0}
9293

9394
for i, cpu := range cpuSequence {
94-
result := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), cpu, nil)
95+
result := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), cpu, support.TraceOriginSampling, nil)
9596
require.Equal(t, fmt.Sprint(cpu), result.labels.Get("cpu"),
9697
"tick %d: thread on cpu %d must get cpu=%d in labels", i, cpu, cpu)
9798
}
@@ -103,7 +104,7 @@ func TestLabelsForTID_DisableFlags(t *testing.T) {
103104

104105
t.Run("all enabled", func(t *testing.T) {
105106
r := newTestReporterWithFlags(t, false, false, false)
106-
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, nil)
107+
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, support.TraceOriginSampling, nil)
107108
require.True(t, res.keep)
108109
require.Equal(t, "2", res.labels.Get("cpu"))
109110
require.Equal(t, "1234", res.labels.Get("thread_id"))
@@ -112,7 +113,7 @@ func TestLabelsForTID_DisableFlags(t *testing.T) {
112113

113114
t.Run("cpu disabled", func(t *testing.T) {
114115
r := newTestReporterWithFlags(t, true, false, false)
115-
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, nil)
116+
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, support.TraceOriginSampling, nil)
116117
require.True(t, res.keep)
117118
require.Equal(t, "", res.labels.Get("cpu"))
118119
require.Equal(t, "1234", res.labels.Get("thread_id"))
@@ -121,7 +122,7 @@ func TestLabelsForTID_DisableFlags(t *testing.T) {
121122

122123
t.Run("thread_id disabled", func(t *testing.T) {
123124
r := newTestReporterWithFlags(t, false, true, false)
124-
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, nil)
125+
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, support.TraceOriginSampling, nil)
125126
require.True(t, res.keep)
126127
require.Equal(t, "2", res.labels.Get("cpu"))
127128
require.Equal(t, "", res.labels.Get("thread_id"))
@@ -130,7 +131,7 @@ func TestLabelsForTID_DisableFlags(t *testing.T) {
130131

131132
t.Run("thread_name disabled", func(t *testing.T) {
132133
r := newTestReporterWithFlags(t, false, false, true)
133-
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, nil)
134+
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, support.TraceOriginSampling, nil)
134135
require.True(t, res.keep)
135136
require.Equal(t, "2", res.labels.Get("cpu"))
136137
require.Equal(t, "1234", res.labels.Get("thread_id"))
@@ -139,7 +140,7 @@ func TestLabelsForTID_DisableFlags(t *testing.T) {
139140

140141
t.Run("all disabled", func(t *testing.T) {
141142
r := newTestReporterWithFlags(t, true, true, true)
142-
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, nil)
143+
res := r.labelsForTID(tid, pid, libpf.Intern("myprocess"), 2, support.TraceOriginSampling, nil)
143144
require.True(t, res.keep)
144145
require.Equal(t, "", res.labels.Get("cpu"))
145146
require.Equal(t, "", res.labels.Get("thread_id"))

0 commit comments

Comments
 (0)