Skip to content

Commit 42bac4a

Browse files
committed
Symbolize CUDA traces before GPU timing fixup
CUDA stack can sit at raw traces for awhile waiting for the fixer to match them with GPU timing information, during this time pointers in the raw traces could grow stale due to functional program GC'ing activation records. Avoid this by doing trace symbolizing before parking traces in the fixer maps. This has the nice side affect of removing some channel indirection and now traces so straight into the fixer maps and when matched they go straight to ReportTraceEvent. Move CUDA symbolization earlier in the pipeline: ConvertTrace now handles CUDA frames directly, and parcagpu.Start returns a TraceInterceptor instead of a filtered channel. The interceptor diverts symbolized CUDA traces into the GPU fixer post-ConvertTrace, and completed traces (with timing and kernel name) are reported directly. This eliminates the Symbolize method on the CUDA interpreter in favor of demangling in prepTrace.
1 parent 3f35336 commit 42bac4a

3 files changed

Lines changed: 189 additions & 126 deletions

File tree

interpreter/gpu/cuda.go

Lines changed: 122 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package gpu // import "go.opentelemetry.io/ebpf-profiler/interpreter/gpu"
22

33
import (
44
"bytes"
5-
"errors"
65
"fmt"
76
"strconv"
87
"sync"
8+
"unique"
99
"unsafe"
1010

1111
"github.com/ianlancetaylor/demangle"
@@ -16,6 +16,8 @@ import (
1616
"go.opentelemetry.io/ebpf-profiler/libpf/pfelf"
1717
"go.opentelemetry.io/ebpf-profiler/metrics"
1818
"go.opentelemetry.io/ebpf-profiler/remotememory"
19+
"go.opentelemetry.io/ebpf-profiler/reporter/samples"
20+
"go.opentelemetry.io/ebpf-profiler/traceutil"
1921
)
2022

2123
const (
@@ -31,6 +33,24 @@ var (
3133
gpuFixers sync.Map
3234
)
3335

36+
// SymbolizedCudaTrace holds a symbolized trace awaiting GPU timing information.
37+
// The CPU frames are already symbolized; only the CUDA kernel frame (frame[0])
38+
// needs the kernel name from the timing event.
39+
type SymbolizedCudaTrace struct {
40+
Trace *libpf.Trace
41+
Meta *samples.TraceEventMeta
42+
CorrelationID uint32
43+
CBID int32
44+
}
45+
46+
// CudaTraceOutput is a fully completed CUDA trace ready for reporting.
47+
// For non-graph launches the pointers alias the SymbolizedCudaTrace directly.
48+
// For graph launches they point to copies since the original is reused.
49+
type CudaTraceOutput struct {
50+
Trace *libpf.Trace
51+
Meta *samples.TraceEventMeta
52+
}
53+
3454
// gpuTraceFixer matches traces with timing information for a specific PID.
3555
// We use a single fixer per PID because CUDA correlation IDs are unique per process
3656
// across all devices and streams.
@@ -40,9 +60,9 @@ var (
4060
// that launched the kernel."
4161
type gpuTraceFixer struct {
4262
mu sync.Mutex
43-
timesAwaitingTraces map[uint32][]CuptiTimingEvent // keyed by correlation ID
44-
tracesAwaitingTimes map[uint32]*host.Trace // keyed by correlation ID
45-
maxCorrelationId uint32 // track highest ID for threshold-based clearing
63+
timesAwaitingTraces map[uint32][]CuptiTimingEvent // keyed by correlation ID
64+
tracesAwaitingTimes map[uint32]*SymbolizedCudaTrace // keyed by correlation ID
65+
maxCorrelationId uint32 // track highest ID for threshold-based clearing
4666
}
4767

4868
type data struct {
@@ -152,7 +172,7 @@ func (d *data) Attach(ebpf interpreter.EbpfHandler, pid libpf.PID, _ libpf.Addre
152172
// Create and register fixer for this PID
153173
fixer := &gpuTraceFixer{
154174
timesAwaitingTraces: make(map[uint32][]CuptiTimingEvent),
155-
tracesAwaitingTimes: make(map[uint32]*host.Trace),
175+
tracesAwaitingTimes: make(map[uint32]*SymbolizedCudaTrace),
156176
}
157177

158178
gpuFixers.Store(pid, fixer)
@@ -190,65 +210,58 @@ func isGraphLaunch(cbid int32) bool {
190210
return false
191211
}
192212

193-
// addTrace is called when a CUDA trace is received, to match it with timing info.
194-
// Sends completed traces directly to the output channel (may be multiple for graph launches).
195-
func (f *gpuTraceFixer) addTrace(trace *host.Trace, traceOutChan chan<- *host.Trace) error {
196-
if len(trace.Frames) == 0 {
197-
return errors.New("no frames in trace")
198-
}
199-
frame := trace.Frames[0]
200-
if frame.Type != libpf.CUDAKernelFrame {
201-
return errors.New("first frame is not a CUDA kernel frame")
202-
}
203-
correlationId := uint32(frame.Lineno)
204-
cbid := int32(frame.Lineno >> 32)
205-
206-
log.Debugf("[cuda] adding trace with id %d cbid %d (0x%x) for pid %d", correlationId, int(cbid), uint32(cbid), trace.PID)
213+
// addTrace is called when a symbolized CUDA trace is received, to match it with timing info.
214+
// Returns completed traces (may be multiple for graph launches).
215+
func (f *gpuTraceFixer) addTrace(st *SymbolizedCudaTrace) []CudaTraceOutput {
216+
log.Debugf("[cuda] adding trace with id %d cbid %d (0x%x) for pid %d",
217+
st.CorrelationID, int(st.CBID), uint32(st.CBID), st.Meta.PID)
207218
f.mu.Lock()
208219
defer f.mu.Unlock()
209220

210221
// Update max, detecting wrap-around (new ID much smaller than max means wrap)
211-
if correlationId > f.maxCorrelationId || f.maxCorrelationId-correlationId > 1<<31 {
212-
f.maxCorrelationId = correlationId
222+
if st.CorrelationID > f.maxCorrelationId || f.maxCorrelationId-st.CorrelationID > 1<<31 {
223+
f.maxCorrelationId = st.CorrelationID
213224
}
214225

215-
evs, ok := f.timesAwaitingTraces[correlationId]
226+
var outputs []CudaTraceOutput
227+
228+
evs, ok := f.timesAwaitingTraces[st.CorrelationID]
216229
if ok && len(evs) > 0 {
217230
// Process any timing events that arrived before this trace
218231
for idx := range evs {
219232
log.Debugf("[cuda] gpu trace completed id %d cbid %d (0x%x) for pid %d",
220-
correlationId, int(cbid), uint32(cbid), trace.PID)
221-
traceOutChan <- f.prepTrace(trace, &evs[idx])
233+
st.CorrelationID, int(st.CBID), uint32(st.CBID), st.Meta.PID)
234+
outputs = append(outputs, f.prepTrace(st, &evs[idx]))
222235
}
223236
// Always delete the key to avoid nil entries accumulating
224-
delete(f.timesAwaitingTraces, correlationId)
237+
delete(f.timesAwaitingTraces, st.CorrelationID)
225238
// For non-graph launches, we've matched the only timing event, done
226-
if !isGraphLaunch(cbid) {
227-
return nil
239+
if !isGraphLaunch(st.CBID) {
240+
return outputs
228241
}
229242
}
230243
// Store trace for future timing events
231-
f.tracesAwaitingTimes[correlationId] = trace
232-
return nil
244+
f.tracesAwaitingTimes[st.CorrelationID] = st
245+
return outputs
233246
}
234247

235248
// addTime is called when timing info is received from eBPF, to match it with a trace.
236249
// Caller must hold f.mu.
237-
func (f *gpuTraceFixer) addTime(ev *CuptiTimingEvent) *host.Trace {
250+
func (f *gpuTraceFixer) addTime(ev *CuptiTimingEvent) (CudaTraceOutput, bool) {
238251
// Update max, detecting wrap-around (new ID much smaller than max means wrap)
239252
if ev.Id > f.maxCorrelationId || f.maxCorrelationId-ev.Id > 1<<31 {
240253
f.maxCorrelationId = ev.Id
241254
}
242255

243-
trace, ok := f.tracesAwaitingTimes[ev.Id]
256+
st, ok := f.tracesAwaitingTimes[ev.Id]
244257
if ok {
245258
if ev.Graph == 0 {
246259
delete(f.tracesAwaitingTimes, ev.Id)
247260
}
248-
return f.prepTrace(trace, ev)
261+
return f.prepTrace(st, ev), true
249262
}
250263
f.timesAwaitingTraces[ev.Id] = append(f.timesAwaitingTraces[ev.Id], *ev)
251-
return nil
264+
return CudaTraceOutput{}, false
252265
}
253266

254267
// fixerStats holds statistics from a single fixer for aggregation.
@@ -294,62 +307,88 @@ func (f *gpuTraceFixer) maybeClear() fixerStats {
294307
return stats
295308
}
296309

297-
// prepTrace prepares a trace with timing information and kernel name.
298-
func (f *gpuTraceFixer) prepTrace(tr *host.Trace, ev *CuptiTimingEvent) *host.Trace {
310+
// prepTrace attaches timing information and the demangled kernel name to a symbolized
311+
// CUDA trace, producing a CudaTraceOutput ready for reporting.
312+
func (f *gpuTraceFixer) prepTrace(st *SymbolizedCudaTrace, ev *CuptiTimingEvent) CudaTraceOutput {
313+
out := CudaTraceOutput{
314+
Trace: st.Trace,
315+
Meta: st.Meta,
316+
}
317+
299318
if ev.Graph != 0 {
300-
// Graphs can have many kernels with same correlation ID
301-
clone := *tr
302-
tr = &clone
319+
// Graphs can have many kernels with same correlation ID.
320+
// Copy Trace (Frames differ per kernel, Hash differs) and Meta (OffTime differs)
321+
// since the original st stays in the map for future timing events.
322+
// CustomLabels are NOT copied: all events for the same correlation ID share
323+
// identical cuda_device/cuda_stream/cuda_graph values.
324+
traceCopy := *st.Trace
325+
traceCopy.Frames = make(libpf.Frames, len(st.Trace.Frames))
326+
copy(traceCopy.Frames, st.Trace.Frames)
327+
out.Trace = &traceCopy
328+
metaCopy := *st.Meta
329+
out.Meta = &metaCopy
303330
}
304-
tr.OffTime = int64(ev.End - ev.Start)
305-
if tr.CustomLabels == nil {
306-
tr.CustomLabels = make(map[string]string)
331+
332+
out.Meta.OffTime = int64(ev.End - ev.Start)
333+
if out.Trace.CustomLabels == nil {
334+
out.Trace.CustomLabels = make(map[string]string)
307335
}
308336

309-
tr.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.Dev), 10)
337+
out.Trace.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.Dev), 10)
310338
if ev.Stream != 0 {
311-
tr.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.Stream), 10)
339+
out.Trace.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.Stream), 10)
312340
}
313341
if ev.Graph != 0 {
314-
tr.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.Graph), 10)
315-
tr.CustomLabels["cuda_id"] = strconv.FormatUint(uint64(ev.Id), 10)
342+
out.Trace.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.Graph), 10)
343+
out.Trace.CustomLabels["cuda_id"] = strconv.FormatUint(uint64(ev.Id), 10)
316344
}
317-
if len(ev.KernelName) > 0 {
318-
// Store the raw (mangled) kernel name - demangling happens in Symbolize
319-
// Use unsafe.String to avoid allocation - Intern/unique.Make will copy if new
320-
nameBytes := ev.KernelName[:]
321-
if idx := bytes.IndexByte(nameBytes, 0); idx >= 0 {
322-
nameBytes = nameBytes[:idx]
323-
}
324-
istr := libpf.Intern(unsafe.String(unsafe.SliceData(nameBytes), len(nameBytes)))
325-
// See collect_trace where we always make the first frame a CUDA kernel frame.
326-
if tr.Frames[0].Type != libpf.CUDAKernelFrame {
327-
panic("first frame is not a CUDA kernel frame")
345+
346+
// Extract kernel name from timing event, demangle, and update frame[0]
347+
nameBytes := ev.KernelName[:]
348+
if idx := bytes.IndexByte(nameBytes, 0); idx >= 0 {
349+
nameBytes = nameBytes[:idx]
350+
}
351+
if len(nameBytes) > 0 {
352+
mangledStr := libpf.Intern(unsafe.String(unsafe.SliceData(nameBytes), len(nameBytes)))
353+
funcName := mangledStr
354+
if demStr, err := demangle.ToString(
355+
mangledStr.String(), demangle.NoParams, demangle.NoEnclosingParams); err == nil {
356+
funcName = libpf.Intern(demStr)
328357
}
329-
tr.Frames[0].File = host.FileID(*(*uint64)(unsafe.Pointer(&istr)))
358+
359+
currentFrame := out.Trace.Frames[0].Value()
360+
out.Trace.Frames[0] = unique.Make(libpf.Frame{
361+
Type: currentFrame.Type,
362+
AddressOrLineno: currentFrame.AddressOrLineno,
363+
FunctionName: funcName,
364+
})
330365
}
331-
return tr
366+
367+
// Recompute trace hash since we modified frame[0]
368+
out.Trace.Hash = traceutil.HashTrace(out.Trace)
369+
370+
return out
332371
}
333372

334373
// AddTrace is a static function that delegates to the appropriate fixer for the PID.
335-
// Completed traces are sent directly to traceOutChan.
336-
func AddTrace(trace *host.Trace, traceOutChan chan<- *host.Trace) error {
337-
pid := trace.PID
374+
func AddTrace(st *SymbolizedCudaTrace) []CudaTraceOutput {
375+
pid := st.Meta.PID
338376
value, ok := gpuFixers.Load(pid)
339377
if !ok {
340-
return fmt.Errorf("no GPU fixer found for PID %d", pid)
378+
log.Warnf("no GPU fixer found for PID %d", pid)
379+
return nil
341380
}
342381
fixer := value.(*gpuTraceFixer)
343-
return fixer.addTrace(trace, traceOutChan)
382+
return fixer.addTrace(st)
344383
}
345384

346-
// AddTime is a static function that delegates to the appropriate fixer for the PID.
347-
func AddTime(ev *CuptiTimingEvent) *host.Trace {
385+
// addTimeSingle is a static function that delegates to the appropriate fixer for the PID.
386+
func addTimeSingle(ev *CuptiTimingEvent) (CudaTraceOutput, bool) {
348387
pid := libpf.PID(ev.Pid)
349388
value, ok := gpuFixers.Load(pid)
350389
if !ok {
351390
log.Warnf("no GPU fixer found for PID %d", pid)
352-
return nil
391+
return CudaTraceOutput{}, false
353392
}
354393
fixer := value.(*gpuTraceFixer)
355394
fixer.mu.Lock()
@@ -358,16 +397,19 @@ func AddTime(ev *CuptiTimingEvent) *host.Trace {
358397
}
359398

360399
// AddTimes processes a batch of timing events, taking the lock once per PID.
361-
func AddTimes(events []CuptiTimingEvent, out chan<- *host.Trace) {
400+
// Returns all completed traces.
401+
func AddTimes(events []CuptiTimingEvent) []CudaTraceOutput {
362402
if len(events) == 0 {
363-
return
403+
return nil
364404
}
365405

406+
var outputs []CudaTraceOutput
407+
366408
// Fast path: assume all events from same PID (common case)
367409
pid := libpf.PID(events[0].Pid)
368410
value, ok := gpuFixers.Load(pid)
369411
if !ok {
370-
return
412+
return nil
371413
}
372414
fixer := value.(*gpuTraceFixer)
373415

@@ -379,18 +421,20 @@ func AddTimes(events []CuptiTimingEvent, out chan<- *host.Trace) {
379421
otherPID = append(otherPID, *ev)
380422
continue
381423
}
382-
if trace := fixer.addTime(ev); trace != nil {
383-
out <- trace
424+
if out, ok := fixer.addTime(ev); ok {
425+
outputs = append(outputs, out)
384426
}
385427
}
386428
fixer.mu.Unlock()
387429

388430
// Handle rare events from other PIDs
389431
for i := range otherPID {
390-
if trace := AddTime(&otherPID[i]); trace != nil {
391-
out <- trace
432+
if out, ok := addTimeSingle(&otherPID[i]); ok {
433+
outputs = append(outputs, out)
392434
}
393435
}
436+
437+
return outputs
394438
}
395439

396440
// MaybeClearAll periodically clears all fixers and reports aggregated metrics.
@@ -416,27 +460,11 @@ func MaybeClearAll() {
416460
}
417461
}
418462

419-
func (i *Instance) Symbolize(f *host.Frame, frames *libpf.Frames) error {
420-
if f.Type != libpf.CUDAKernelFrame {
421-
return interpreter.ErrMismatchInterpreterType
422-
}
423-
// Extract the libpf.String directly from the uint64 field (both are 8 bytes)
424-
fileIDAsUint64 := uint64(f.File)
425-
mangledStr := *(*libpf.String)(unsafe.Pointer(&fileIDAsUint64))
426-
427-
// Demangle the kernel name
428-
funcName := mangledStr
429-
if demStr, err := demangle.ToString(
430-
mangledStr.String(), demangle.NoParams, demangle.NoEnclosingParams); err == nil {
431-
funcName = libpf.Intern(demStr)
432-
}
433-
434-
frames.Append(&libpf.Frame{
435-
Type: libpf.CUDAKernelFrame,
436-
AddressOrLineno: f.Lineno,
437-
FunctionName: funcName,
438-
})
439-
return nil
463+
// Symbolize is a stub — ConvertTrace handles CUDA frames directly via `case libpf.CUDA`,
464+
// so this should never be called in normal operation.
465+
func (i *Instance) Symbolize(f *host.Frame, _ *libpf.Frames) error {
466+
return fmt.Errorf("CUDA Symbolize called unexpectedly for frame type %d: %w",
467+
f.Type, interpreter.ErrMismatchInterpreterType)
440468
}
441469

442470
func (d *data) Unload(ebpf interpreter.EbpfHandler) {

0 commit comments

Comments
 (0)