Skip to content

Commit 3f35336

Browse files
committed
Add TraceInterceptor to tracehandler
Add a TraceInterceptor callback that is invoked after ConvertTrace on cache-miss. When the interceptor returns true the trace is consumed (skipped for caching and reporting), allowing callers like the GPU subsystem to divert specific traces for further processing. Includes tests covering consume, pass-through, mixed, and non-caching behavior.
1 parent d13351c commit 3f35336

3 files changed

Lines changed: 118 additions & 4 deletions

File tree

internal/controller/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func startTraceHandling(ctx context.Context, rep reporter.TraceReporter,
181181
}
182182

183183
_, err := tracehandler.Start(ctx, rep, trc.TraceProcessor(),
184-
traceCh, intervals, cacheSize)
184+
traceCh, intervals, cacheSize, nil)
185185
return err
186186
}
187187

tracehandler/tracehandler.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ type Times interface {
3434
// symbolization efforts.
3535
var traceCacheLifetime = 5 * time.Minute
3636

37+
// TraceInterceptor is called after ConvertTrace with the symbolized trace,
38+
// metadata, and original BPF trace. Return true to consume the trace
39+
// (skip caching and reporting), false to proceed normally.
40+
type TraceInterceptor func(trace *libpf.Trace, meta *samples.TraceEventMeta,
41+
rawTrace *host.Trace) bool
42+
3743
// TraceProcessor is an interface used by traceHandler to convert traces
3844
// from a form received from eBPF to the form we wish to dispatch to the
3945
// collection agent.
@@ -72,12 +78,17 @@ type traceHandler struct {
7278
// reporter instance to use to send out traces.
7379
reporter reporter.TraceReporter
7480

81+
// interceptor, if set, is called after ConvertTrace on cache-miss.
82+
// If it returns true the trace is consumed and not cached or reported.
83+
interceptor TraceInterceptor
84+
7585
times Times
7686
}
7787

7888
// newTraceHandler creates a new traceHandler
7989
func newTraceHandler(ctx context.Context, rep reporter.TraceReporter,
80-
traceProcessor TraceProcessor, intervals Times, cacheSize uint32) (*traceHandler, error) {
90+
traceProcessor TraceProcessor, intervals Times, cacheSize uint32,
91+
interceptor TraceInterceptor) (*traceHandler, error) {
8192
traceCache, err := lru.NewSynced[host.TraceHash, libpf.Trace](
8293
cacheSize, func(k host.TraceHash) uint32 { return uint32(k) })
8394
if err != nil {
@@ -111,6 +122,7 @@ func newTraceHandler(ctx context.Context, rep reporter.TraceReporter,
111122
traceProcessor: traceProcessor,
112123
traceCache: traceCache,
113124
reporter: rep,
125+
interceptor: interceptor,
114126
times: intervals,
115127
}, nil
116128
}
@@ -150,6 +162,13 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
150162
panic(err)
151163
}
152164
log.Debugf("Trace hash remap 0x%x -> 0x%x", bpfTrace.Hash, umTrace.Hash)
165+
166+
// If an interceptor is set and consumes the trace, skip caching and reporting.
167+
// CUDA traces are always intercepted here and never cached.
168+
if m.interceptor != nil && m.interceptor(umTrace, meta, bpfTrace) {
169+
return
170+
}
171+
153172
m.traceCache.Add(bpfTrace.Hash, *umTrace)
154173

155174
meta.APMServiceName = m.traceProcessor.MaybeNotifyAPMAgent(bpfTrace, umTrace.Hash, 1)
@@ -164,9 +183,10 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) {
164183
// to exit after a cancellation through the context.
165184
func Start(ctx context.Context, rep reporter.TraceReporter, traceProcessor TraceProcessor,
166185
traceInChan <-chan *host.Trace, intervals Times, cacheSize uint32,
186+
interceptor TraceInterceptor,
167187
) (workerExited <-chan libpf.Void, err error) {
168188
handler, err :=
169-
newTraceHandler(ctx, rep, traceProcessor, intervals, cacheSize)
189+
newTraceHandler(ctx, rep, traceProcessor, intervals, cacheSize, interceptor)
170190
if err != nil {
171191
return nil, fmt.Errorf("failed to create traceHandler: %v", err)
172192
}

tracehandler/tracehandler_test.go

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,100 @@ func (m *mockReporter) ReportTraceEvent(trace *libpf.Trace, _ *samples.TraceEven
6767
return nil
6868
}
6969

70+
func TestTraceInterceptor(t *testing.T) {
71+
tests := map[string]struct {
72+
// interceptReturn controls what the interceptor returns for each trace.
73+
// true = consumed (should NOT appear in reporter), false = pass through.
74+
interceptReturn map[host.TraceHash]bool
75+
input []arguments
76+
expectedEvents map[libpf.TraceHash]uint16
77+
}{
78+
"interceptor consumes trace": {
79+
interceptReturn: map[host.TraceHash]bool{
80+
host.TraceHash(0xaa): true,
81+
},
82+
input: []arguments{
83+
{trace: &host.Trace{Hash: host.TraceHash(0xaa)}},
84+
},
85+
expectedEvents: nil, // consumed, nothing reported
86+
},
87+
"interceptor passes through": {
88+
interceptReturn: map[host.TraceHash]bool{
89+
host.TraceHash(0xbb): false,
90+
},
91+
input: []arguments{
92+
{trace: &host.Trace{Hash: host.TraceHash(0xbb)}},
93+
},
94+
expectedEvents: map[libpf.TraceHash]uint16{
95+
libpf.NewTraceHash(0xbb, 0xbb): 1,
96+
},
97+
},
98+
"mix of consumed and passed": {
99+
interceptReturn: map[host.TraceHash]bool{
100+
host.TraceHash(1): true, // consumed
101+
host.TraceHash(2): false, // passed
102+
host.TraceHash(3): true, // consumed
103+
},
104+
input: []arguments{
105+
{trace: &host.Trace{Hash: host.TraceHash(1)}},
106+
{trace: &host.Trace{Hash: host.TraceHash(2)}},
107+
{trace: &host.Trace{Hash: host.TraceHash(3)}},
108+
},
109+
expectedEvents: map[libpf.TraceHash]uint16{
110+
libpf.NewTraceHash(2, 2): 1,
111+
},
112+
},
113+
"consumed trace not cached on repeat": {
114+
interceptReturn: map[host.TraceHash]bool{
115+
host.TraceHash(0xcc): true,
116+
},
117+
input: []arguments{
118+
{trace: &host.Trace{Hash: host.TraceHash(0xcc)}},
119+
{trace: &host.Trace{Hash: host.TraceHash(0xcc)}},
120+
},
121+
expectedEvents: nil, // both consumed, nothing reported
122+
},
123+
}
124+
125+
for name, test := range tests {
126+
t.Run(name, func(t *testing.T) {
127+
r := &mockReporter{
128+
t: t,
129+
reports: make(map[libpf.TraceHash]uint16),
130+
}
131+
132+
var interceptCalls int
133+
interceptor := func(trace *libpf.Trace, meta *samples.TraceEventMeta,
134+
rawTrace *host.Trace) bool {
135+
interceptCalls++
136+
return test.interceptReturn[rawTrace.Hash]
137+
}
138+
139+
traceChan := make(chan *host.Trace)
140+
ctx, cancel := context.WithCancel(t.Context())
141+
defer cancel()
142+
exitNotify, err := tracehandler.Start(ctx, r, &fakeTraceProcessor{},
143+
traceChan, defaultTimes(), 128, interceptor)
144+
require.NoError(t, err)
145+
146+
for _, input := range test.input {
147+
traceChan <- input.trace
148+
}
149+
150+
cancel()
151+
<-exitNotify
152+
153+
expected := test.expectedEvents
154+
if expected == nil {
155+
expected = map[libpf.TraceHash]uint16{}
156+
}
157+
if !maps.Equal(r.reports, expected) {
158+
t.Fatalf("Expected %#v but got %#v", expected, r.reports)
159+
}
160+
})
161+
}
162+
}
163+
70164
func TestTraceHandler(t *testing.T) {
71165
tests := map[string]struct {
72166
input []arguments
@@ -108,7 +202,7 @@ func TestTraceHandler(t *testing.T) {
108202
ctx, cancel := context.WithCancel(t.Context())
109203
defer cancel()
110204
exitNotify, err := tracehandler.Start(ctx, r, &fakeTraceProcessor{},
111-
traceChan, defaultTimes(), 128)
205+
traceChan, defaultTimes(), 128, nil)
112206
require.NoError(t, err)
113207

114208
for _, input := range test.input {

0 commit comments

Comments
 (0)