Skip to content

Commit 2438c7b

Browse files
gnurizenclaude
andcommitted
Symbolize CUDA traces before GPU timing fixup
Move CUDA symbolization earlier in the pipeline: ConvertTrace now handles CUDA frames directly, and parcagpu.Start returns a TraceInterceptor instead of a filtered channel. The interceptor diverts symbolized CUDA traces into the GPU fixer post-ConvertTrace, and completed traces (with timing and kernel name) are reported directly. This eliminates the Symbolize method on the CUDA interpreter in favor of demangling in prepTrace. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 135641a commit 2438c7b

3 files changed

Lines changed: 181 additions & 124 deletions

File tree

interpreter/gpu/cuda.go

Lines changed: 114 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package gpu // import "go.opentelemetry.io/ebpf-profiler/interpreter/gpu"
22

33
import (
44
"bytes"
5-
"errors"
65
"fmt"
6+
"maps"
77
"strconv"
88
"sync"
9+
"unique"
910
"unsafe"
1011

1112
"github.com/ianlancetaylor/demangle"
@@ -16,6 +17,8 @@ import (
1617
"go.opentelemetry.io/ebpf-profiler/libpf/pfelf"
1718
"go.opentelemetry.io/ebpf-profiler/metrics"
1819
"go.opentelemetry.io/ebpf-profiler/remotememory"
20+
"go.opentelemetry.io/ebpf-profiler/reporter/samples"
21+
"go.opentelemetry.io/ebpf-profiler/traceutil"
1922
)
2023

2124
const (
@@ -31,6 +34,22 @@ var (
3134
gpuFixers sync.Map
3235
)
3336

37+
// SymbolizedCudaTrace holds a symbolized trace awaiting GPU timing information.
38+
// The CPU frames are already symbolized; only the CUDA kernel frame (frame[0])
39+
// needs the kernel name from the timing event.
40+
type SymbolizedCudaTrace struct {
41+
Trace libpf.Trace
42+
Meta samples.TraceEventMeta
43+
CorrelationID uint32
44+
CBID int32
45+
}
46+
47+
// CudaTraceOutput is a fully completed CUDA trace ready for reporting.
48+
type CudaTraceOutput struct {
49+
Trace libpf.Trace
50+
Meta samples.TraceEventMeta
51+
}
52+
3453
// gpuTraceFixer matches traces with timing information for a specific PID.
3554
// We use a single fixer per PID because CUDA correlation IDs are unique per process
3655
// across all devices and streams.
@@ -40,9 +59,9 @@ var (
4059
// that launched the kernel."
4160
type gpuTraceFixer struct {
4261
mu sync.Mutex
43-
timesAwaitingTraces map[uint32][]CuptiTimingEvent // keyed by correlation ID
44-
tracesAwaitingTimes map[uint32]*host.Trace // keyed by correlation ID
45-
maxCorrelationId uint32 // track highest ID for threshold-based clearing
62+
timesAwaitingTraces map[uint32][]CuptiTimingEvent // keyed by correlation ID
63+
tracesAwaitingTimes map[uint32]*SymbolizedCudaTrace // keyed by correlation ID
64+
maxCorrelationId uint32 // track highest ID for threshold-based clearing
4665
}
4766

4867
type data struct {
@@ -152,7 +171,7 @@ func (d *data) Attach(ebpf interpreter.EbpfHandler, pid libpf.PID, _ libpf.Addre
152171
// Create and register fixer for this PID
153172
fixer := &gpuTraceFixer{
154173
timesAwaitingTraces: make(map[uint32][]CuptiTimingEvent),
155-
tracesAwaitingTimes: make(map[uint32]*host.Trace),
174+
tracesAwaitingTimes: make(map[uint32]*SymbolizedCudaTrace),
156175
}
157176

158177
gpuFixers.Store(pid, fixer)
@@ -190,62 +209,56 @@ func isGraphLaunch(cbid int32) bool {
190209
return false
191210
}
192211

193-
// addTrace is called when a CUDA trace is received, to match it with timing info.
194-
// Sends completed traces directly to the output channel (may be multiple for graph launches).
195-
func (f *gpuTraceFixer) addTrace(trace *host.Trace, traceOutChan chan<- *host.Trace) error {
196-
if len(trace.Frames) == 0 {
197-
return errors.New("no frames in trace")
198-
}
199-
frame := trace.Frames[0]
200-
if frame.Type != libpf.CUDAKernelFrame {
201-
return errors.New("first frame is not a CUDA kernel frame")
202-
}
203-
correlationId := uint32(frame.Lineno)
204-
cbid := int32(frame.Lineno >> 32)
205-
206-
log.Debugf("[cuda] adding trace with id %d cbid %d (0x%x) for pid %d", correlationId, int(cbid), uint32(cbid), trace.PID)
212+
// addTrace is called when a symbolized CUDA trace is received, to match it with timing info.
213+
// Returns completed traces (may be multiple for graph launches).
214+
func (f *gpuTraceFixer) addTrace(st *SymbolizedCudaTrace) []CudaTraceOutput {
215+
log.Debugf("[cuda] adding trace with id %d cbid %d (0x%x) for pid %d",
216+
st.CorrelationID, int(st.CBID), uint32(st.CBID), st.Meta.PID)
207217
f.mu.Lock()
208218
defer f.mu.Unlock()
209219

210220
// Update max, detecting wrap-around (new ID much smaller than max means wrap)
211-
if correlationId > f.maxCorrelationId || f.maxCorrelationId-correlationId > 1<<31 {
212-
f.maxCorrelationId = correlationId
221+
if st.CorrelationID > f.maxCorrelationId || f.maxCorrelationId-st.CorrelationID > 1<<31 {
222+
f.maxCorrelationId = st.CorrelationID
213223
}
214224

215-
evs, ok := f.timesAwaitingTraces[correlationId]
225+
var outputs []CudaTraceOutput
226+
227+
evs, ok := f.timesAwaitingTraces[st.CorrelationID]
216228
if ok && len(evs) > 0 {
217229
// Process any timing events that arrived before this trace
218230
for idx := range evs {
219231
log.Debugf("[cuda] gpu trace completed id %d cbid %d (0x%x) for pid %d",
220-
correlationId, int(cbid), uint32(cbid), trace.PID)
221-
traceOutChan <- f.prepTrace(trace, &evs[idx])
232+
st.CorrelationID, int(st.CBID), uint32(st.CBID), st.Meta.PID)
233+
outputs = append(outputs, f.prepTrace(st, &evs[idx]))
222234
}
223235
// Always delete the key to avoid nil entries accumulating
224-
delete(f.timesAwaitingTraces, correlationId)
236+
delete(f.timesAwaitingTraces, st.CorrelationID)
225237
// For non-graph launches, we've matched the only timing event, done
226-
if !isGraphLaunch(cbid) {
227-
return nil
238+
if !isGraphLaunch(st.CBID) {
239+
return outputs
228240
}
229241
}
230242
// Store trace for future timing events
231-
f.tracesAwaitingTimes[correlationId] = trace
232-
return nil
243+
f.tracesAwaitingTimes[st.CorrelationID] = st
244+
return outputs
233245
}
234246

235247
// addTime is called when timing info is received from eBPF, to match it with a trace.
236248
// Caller must hold f.mu.
237-
func (f *gpuTraceFixer) addTime(ev *CuptiTimingEvent) *host.Trace {
249+
func (f *gpuTraceFixer) addTime(ev *CuptiTimingEvent) *CudaTraceOutput {
238250
// Update max, detecting wrap-around (new ID much smaller than max means wrap)
239251
if ev.Id > f.maxCorrelationId || f.maxCorrelationId-ev.Id > 1<<31 {
240252
f.maxCorrelationId = ev.Id
241253
}
242254

243-
trace, ok := f.tracesAwaitingTimes[ev.Id]
255+
st, ok := f.tracesAwaitingTimes[ev.Id]
244256
if ok {
245257
if ev.Graph == 0 {
246258
delete(f.tracesAwaitingTimes, ev.Id)
247259
}
248-
return f.prepTrace(trace, ev)
260+
out := f.prepTrace(st, ev)
261+
return &out
249262
}
250263
f.timesAwaitingTraces[ev.Id] = append(f.timesAwaitingTraces[ev.Id], *ev)
251264
return nil
@@ -294,57 +307,77 @@ func (f *gpuTraceFixer) maybeClear() fixerStats {
294307
return stats
295308
}
296309

297-
// prepTrace prepares a trace with timing information and kernel name.
298-
func (f *gpuTraceFixer) prepTrace(tr *host.Trace, ev *CuptiTimingEvent) *host.Trace {
310+
// prepTrace attaches timing information and the demangled kernel name to a symbolized
311+
// CUDA trace, producing a CudaTraceOutput ready for reporting.
312+
func (f *gpuTraceFixer) prepTrace(st *SymbolizedCudaTrace, ev *CuptiTimingEvent) CudaTraceOutput {
313+
out := CudaTraceOutput{
314+
Trace: st.Trace,
315+
Meta: st.Meta,
316+
}
317+
299318
if ev.Graph != 0 {
300-
// Graphs can have many kernels with same correlation ID
301-
clone := *tr
302-
tr = &clone
319+
// Graphs can have many kernels with same correlation ID.
320+
// Clone Frames and CustomLabels to avoid aliasing.
321+
out.Trace.Frames = make(libpf.Frames, len(st.Trace.Frames))
322+
copy(out.Trace.Frames, st.Trace.Frames)
323+
out.Trace.CustomLabels = maps.Clone(st.Trace.CustomLabels)
303324
}
304-
tr.OffTime = int64(ev.End - ev.Start)
305-
if tr.CustomLabels == nil {
306-
tr.CustomLabels = make(map[string]string)
325+
326+
out.Meta.OffTime = int64(ev.End - ev.Start)
327+
if out.Trace.CustomLabels == nil {
328+
out.Trace.CustomLabels = make(map[string]string)
307329
}
308330

309-
tr.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.Dev), 10)
331+
out.Trace.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.Dev), 10)
310332
if ev.Stream != 0 {
311-
tr.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.Stream), 10)
333+
out.Trace.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.Stream), 10)
312334
}
313335
if ev.Graph != 0 {
314-
tr.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.Graph), 10)
315-
tr.CustomLabels["cuda_id"] = strconv.FormatUint(uint64(ev.Id), 10)
336+
out.Trace.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.Graph), 10)
337+
out.Trace.CustomLabels["cuda_id"] = strconv.FormatUint(uint64(ev.Id), 10)
316338
}
317-
if len(ev.KernelName) > 0 {
318-
// Store the raw (mangled) kernel name - demangling happens in Symbolize
319-
// Use unsafe.String to avoid allocation - Intern/unique.Make will copy if new
320-
nameBytes := ev.KernelName[:]
321-
if idx := bytes.IndexByte(nameBytes, 0); idx >= 0 {
322-
nameBytes = nameBytes[:idx]
323-
}
324-
istr := libpf.Intern(unsafe.String(unsafe.SliceData(nameBytes), len(nameBytes)))
325-
// See collect_trace where we always make the first frame a CUDA kernel frame.
326-
if tr.Frames[0].Type != libpf.CUDAKernelFrame {
327-
panic("first frame is not a CUDA kernel frame")
339+
340+
// Extract kernel name from timing event, demangle, and update frame[0]
341+
nameBytes := ev.KernelName[:]
342+
if idx := bytes.IndexByte(nameBytes, 0); idx >= 0 {
343+
nameBytes = nameBytes[:idx]
344+
}
345+
if len(nameBytes) > 0 {
346+
mangledStr := libpf.Intern(unsafe.String(unsafe.SliceData(nameBytes), len(nameBytes)))
347+
funcName := mangledStr
348+
if demStr, err := demangle.ToString(
349+
mangledStr.String(), demangle.NoParams, demangle.NoEnclosingParams); err == nil {
350+
funcName = libpf.Intern(demStr)
328351
}
329-
tr.Frames[0].File = host.FileID(*(*uint64)(unsafe.Pointer(&istr)))
352+
353+
currentFrame := out.Trace.Frames[0].Value()
354+
out.Trace.Frames[0] = unique.Make(libpf.Frame{
355+
Type: currentFrame.Type,
356+
AddressOrLineno: currentFrame.AddressOrLineno,
357+
FunctionName: funcName,
358+
})
330359
}
331-
return tr
360+
361+
// Recompute trace hash since we modified frame[0]
362+
out.Trace.Hash = traceutil.HashTrace(&out.Trace)
363+
364+
return out
332365
}
333366

334367
// AddTrace is a static function that delegates to the appropriate fixer for the PID.
335-
// Completed traces are sent directly to traceOutChan.
336-
func AddTrace(trace *host.Trace, traceOutChan chan<- *host.Trace) error {
337-
pid := trace.PID
368+
func AddTrace(st *SymbolizedCudaTrace) []CudaTraceOutput {
369+
pid := st.Meta.PID
338370
value, ok := gpuFixers.Load(pid)
339371
if !ok {
340-
return fmt.Errorf("no GPU fixer found for PID %d", pid)
372+
log.Warnf("no GPU fixer found for PID %d", pid)
373+
return nil
341374
}
342375
fixer := value.(*gpuTraceFixer)
343-
return fixer.addTrace(trace, traceOutChan)
376+
return fixer.addTrace(st)
344377
}
345378

346-
// AddTime is a static function that delegates to the appropriate fixer for the PID.
347-
func AddTime(ev *CuptiTimingEvent) *host.Trace {
379+
// addTimeSingle is a static function that delegates to the appropriate fixer for the PID.
380+
func addTimeSingle(ev *CuptiTimingEvent) *CudaTraceOutput {
348381
pid := libpf.PID(ev.Pid)
349382
value, ok := gpuFixers.Load(pid)
350383
if !ok {
@@ -358,16 +391,19 @@ func AddTime(ev *CuptiTimingEvent) *host.Trace {
358391
}
359392

360393
// AddTimes processes a batch of timing events, taking the lock once per PID.
361-
func AddTimes(events []CuptiTimingEvent, out chan<- *host.Trace) {
394+
// Returns all completed traces.
395+
func AddTimes(events []CuptiTimingEvent) []CudaTraceOutput {
362396
if len(events) == 0 {
363-
return
397+
return nil
364398
}
365399

400+
var outputs []CudaTraceOutput
401+
366402
// Fast path: assume all events from same PID (common case)
367403
pid := libpf.PID(events[0].Pid)
368404
value, ok := gpuFixers.Load(pid)
369405
if !ok {
370-
return
406+
return nil
371407
}
372408
fixer := value.(*gpuTraceFixer)
373409

@@ -379,18 +415,20 @@ func AddTimes(events []CuptiTimingEvent, out chan<- *host.Trace) {
379415
otherPID = append(otherPID, *ev)
380416
continue
381417
}
382-
if trace := fixer.addTime(ev); trace != nil {
383-
out <- trace
418+
if out := fixer.addTime(ev); out != nil {
419+
outputs = append(outputs, *out)
384420
}
385421
}
386422
fixer.mu.Unlock()
387423

388424
// Handle rare events from other PIDs
389425
for i := range otherPID {
390-
if trace := AddTime(&otherPID[i]); trace != nil {
391-
out <- trace
426+
if out := addTimeSingle(&otherPID[i]); out != nil {
427+
outputs = append(outputs, *out)
392428
}
393429
}
430+
431+
return outputs
394432
}
395433

396434
// MaybeClearAll periodically clears all fixers and reports aggregated metrics.
@@ -416,27 +454,11 @@ func MaybeClearAll() {
416454
}
417455
}
418456

419-
func (i *Instance) Symbolize(f *host.Frame, frames *libpf.Frames) error {
420-
if f.Type != libpf.CUDAKernelFrame {
421-
return interpreter.ErrMismatchInterpreterType
422-
}
423-
// Extract the libpf.String directly from the uint64 field (both are 8 bytes)
424-
fileIDAsUint64 := uint64(f.File)
425-
mangledStr := *(*libpf.String)(unsafe.Pointer(&fileIDAsUint64))
426-
427-
// Demangle the kernel name
428-
funcName := mangledStr
429-
if demStr, err := demangle.ToString(
430-
mangledStr.String(), demangle.NoParams, demangle.NoEnclosingParams); err == nil {
431-
funcName = libpf.Intern(demStr)
432-
}
433-
434-
frames.Append(&libpf.Frame{
435-
Type: libpf.CUDAKernelFrame,
436-
AddressOrLineno: f.Lineno,
437-
FunctionName: funcName,
438-
})
439-
return nil
457+
// Symbolize is a stub — ConvertTrace handles CUDA frames directly via `case libpf.CUDA`,
458+
// so this should never be called in normal operation.
459+
func (i *Instance) Symbolize(f *host.Frame, _ *libpf.Frames) error {
460+
return fmt.Errorf("CUDA Symbolize called unexpectedly for frame type %d: %w",
461+
f.Type, interpreter.ErrMismatchInterpreterType)
440462
}
441463

442464
func (d *data) Unload(ebpf interpreter.EbpfHandler) {

0 commit comments

Comments
 (0)