Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func startTraceHandling(ctx context.Context, rep reporter.TraceReporter,
}

_, err := tracehandler.Start(ctx, rep, trc.TraceProcessor(),
traceCh, intervals, cacheSize)
traceCh, intervals, cacheSize, nil)
return err
}

Expand Down
216 changes: 122 additions & 94 deletions interpreter/gpu/cuda.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package gpu // import "go.opentelemetry.io/ebpf-profiler/interpreter/gpu"

import (
"bytes"
"errors"
"fmt"
"strconv"
"sync"
"unique"
"unsafe"

"github.com/ianlancetaylor/demangle"
Expand All @@ -16,6 +16,8 @@ import (
"go.opentelemetry.io/ebpf-profiler/libpf/pfelf"
"go.opentelemetry.io/ebpf-profiler/metrics"
"go.opentelemetry.io/ebpf-profiler/remotememory"
"go.opentelemetry.io/ebpf-profiler/reporter/samples"
"go.opentelemetry.io/ebpf-profiler/traceutil"
)

const (
Expand All @@ -31,6 +33,24 @@ var (
gpuFixers sync.Map
)

// SymbolizedCudaTrace holds a symbolized trace awaiting GPU timing information.
// The CPU frames are already symbolized; only the CUDA kernel frame (frame[0])
// needs the kernel name from the timing event.
type SymbolizedCudaTrace struct {
Trace *libpf.Trace
Meta *samples.TraceEventMeta
CorrelationID uint32
CBID int32
}

// CudaTraceOutput is a fully completed CUDA trace ready for reporting.
// For non-graph launches the pointers alias the SymbolizedCudaTrace directly.
// For graph launches they point to copies since the original is reused.
type CudaTraceOutput struct {
Trace *libpf.Trace
Meta *samples.TraceEventMeta
}

// gpuTraceFixer matches traces with timing information for a specific PID.
// We use a single fixer per PID because CUDA correlation IDs are unique per process
// across all devices and streams.
Expand All @@ -40,9 +60,9 @@ var (
// that launched the kernel."
type gpuTraceFixer struct {
mu sync.Mutex
timesAwaitingTraces map[uint32][]CuptiTimingEvent // keyed by correlation ID
tracesAwaitingTimes map[uint32]*host.Trace // keyed by correlation ID
maxCorrelationId uint32 // track highest ID for threshold-based clearing
timesAwaitingTraces map[uint32][]CuptiTimingEvent // keyed by correlation ID
tracesAwaitingTimes map[uint32]*SymbolizedCudaTrace // keyed by correlation ID
maxCorrelationId uint32 // track highest ID for threshold-based clearing
}

type data struct {
Expand Down Expand Up @@ -152,7 +172,7 @@ func (d *data) Attach(ebpf interpreter.EbpfHandler, pid libpf.PID, _ libpf.Addre
// Create and register fixer for this PID
fixer := &gpuTraceFixer{
timesAwaitingTraces: make(map[uint32][]CuptiTimingEvent),
tracesAwaitingTimes: make(map[uint32]*host.Trace),
tracesAwaitingTimes: make(map[uint32]*SymbolizedCudaTrace),
}

gpuFixers.Store(pid, fixer)
Expand Down Expand Up @@ -190,65 +210,58 @@ func isGraphLaunch(cbid int32) bool {
return false
}

// addTrace is called when a CUDA trace is received, to match it with timing info.
// Sends completed traces directly to the output channel (may be multiple for graph launches).
func (f *gpuTraceFixer) addTrace(trace *host.Trace, traceOutChan chan<- *host.Trace) error {
if len(trace.Frames) == 0 {
return errors.New("no frames in trace")
}
frame := trace.Frames[0]
if frame.Type != libpf.CUDAKernelFrame {
return errors.New("first frame is not a CUDA kernel frame")
}
correlationId := uint32(frame.Lineno)
cbid := int32(frame.Lineno >> 32)

log.Debugf("[cuda] adding trace with id %d cbid %d (0x%x) for pid %d", correlationId, int(cbid), uint32(cbid), trace.PID)
// addTrace is called when a symbolized CUDA trace is received, to match it with timing info.
// Returns completed traces (may be multiple for graph launches).
func (f *gpuTraceFixer) addTrace(st *SymbolizedCudaTrace) []CudaTraceOutput {
log.Debugf("[cuda] adding trace with id %d cbid %d (0x%x) for pid %d",
st.CorrelationID, int(st.CBID), uint32(st.CBID), st.Meta.PID)
f.mu.Lock()
defer f.mu.Unlock()

// Update max, detecting wrap-around (new ID much smaller than max means wrap)
if correlationId > f.maxCorrelationId || f.maxCorrelationId-correlationId > 1<<31 {
f.maxCorrelationId = correlationId
if st.CorrelationID > f.maxCorrelationId || f.maxCorrelationId-st.CorrelationID > 1<<31 {
f.maxCorrelationId = st.CorrelationID
}

evs, ok := f.timesAwaitingTraces[correlationId]
var outputs []CudaTraceOutput

evs, ok := f.timesAwaitingTraces[st.CorrelationID]
if ok && len(evs) > 0 {
// Process any timing events that arrived before this trace
for idx := range evs {
log.Debugf("[cuda] gpu trace completed id %d cbid %d (0x%x) for pid %d",
correlationId, int(cbid), uint32(cbid), trace.PID)
traceOutChan <- f.prepTrace(trace, &evs[idx])
st.CorrelationID, int(st.CBID), uint32(st.CBID), st.Meta.PID)
outputs = append(outputs, f.prepTrace(st, &evs[idx]))
}
// Always delete the key to avoid nil entries accumulating
delete(f.timesAwaitingTraces, correlationId)
delete(f.timesAwaitingTraces, st.CorrelationID)
// For non-graph launches, we've matched the only timing event, done
if !isGraphLaunch(cbid) {
return nil
if !isGraphLaunch(st.CBID) {
return outputs
}
}
// Store trace for future timing events
f.tracesAwaitingTimes[correlationId] = trace
return nil
f.tracesAwaitingTimes[st.CorrelationID] = st
return outputs
}

// addTime is called when timing info is received from eBPF, to match it with a trace.
// Caller must hold f.mu.
func (f *gpuTraceFixer) addTime(ev *CuptiTimingEvent) *host.Trace {
func (f *gpuTraceFixer) addTime(ev *CuptiTimingEvent) (CudaTraceOutput, bool) {
// Update max, detecting wrap-around (new ID much smaller than max means wrap)
if ev.Id > f.maxCorrelationId || f.maxCorrelationId-ev.Id > 1<<31 {
f.maxCorrelationId = ev.Id
}

trace, ok := f.tracesAwaitingTimes[ev.Id]
st, ok := f.tracesAwaitingTimes[ev.Id]
if ok {
if ev.Graph == 0 {
delete(f.tracesAwaitingTimes, ev.Id)
}
return f.prepTrace(trace, ev)
return f.prepTrace(st, ev), true
}
f.timesAwaitingTraces[ev.Id] = append(f.timesAwaitingTraces[ev.Id], *ev)
return nil
return CudaTraceOutput{}, false
}

// fixerStats holds statistics from a single fixer for aggregation.
Expand Down Expand Up @@ -294,62 +307,88 @@ func (f *gpuTraceFixer) maybeClear() fixerStats {
return stats
}

// prepTrace prepares a trace with timing information and kernel name.
func (f *gpuTraceFixer) prepTrace(tr *host.Trace, ev *CuptiTimingEvent) *host.Trace {
// prepTrace attaches timing information and the demangled kernel name to a symbolized
// CUDA trace, producing a CudaTraceOutput ready for reporting.
func (f *gpuTraceFixer) prepTrace(st *SymbolizedCudaTrace, ev *CuptiTimingEvent) CudaTraceOutput {
out := CudaTraceOutput{
Trace: st.Trace,
Meta: st.Meta,
}

if ev.Graph != 0 {
// Graphs can have many kernels with same correlation ID
clone := *tr
tr = &clone
// Graphs can have many kernels with same correlation ID.
// Copy Trace (Frames differ per kernel, Hash differs) and Meta (OffTime differs)
// since the original st stays in the map for future timing events.
// CustomLabels are NOT copied: all events for the same correlation ID share
// identical cuda_device/cuda_stream/cuda_graph values.
traceCopy := *st.Trace
traceCopy.Frames = make(libpf.Frames, len(st.Trace.Frames))
copy(traceCopy.Frames, st.Trace.Frames)
out.Trace = &traceCopy
metaCopy := *st.Meta
out.Meta = &metaCopy
}
tr.OffTime = int64(ev.End - ev.Start)
if tr.CustomLabels == nil {
tr.CustomLabels = make(map[string]string)

out.Meta.OffTime = int64(ev.End - ev.Start)
if out.Trace.CustomLabels == nil {
out.Trace.CustomLabels = make(map[string]string)
}

tr.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.Dev), 10)
out.Trace.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.Dev), 10)
if ev.Stream != 0 {
tr.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.Stream), 10)
out.Trace.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.Stream), 10)
}
if ev.Graph != 0 {
tr.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.Graph), 10)
tr.CustomLabels["cuda_id"] = strconv.FormatUint(uint64(ev.Id), 10)
out.Trace.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.Graph), 10)
out.Trace.CustomLabels["cuda_id"] = strconv.FormatUint(uint64(ev.Id), 10)
}
if len(ev.KernelName) > 0 {
// Store the raw (mangled) kernel name - demangling happens in Symbolize
// Use unsafe.String to avoid allocation - Intern/unique.Make will copy if new
nameBytes := ev.KernelName[:]
if idx := bytes.IndexByte(nameBytes, 0); idx >= 0 {
nameBytes = nameBytes[:idx]
}
istr := libpf.Intern(unsafe.String(unsafe.SliceData(nameBytes), len(nameBytes)))
// See collect_trace where we always make the first frame a CUDA kernel frame.
if tr.Frames[0].Type != libpf.CUDAKernelFrame {
panic("first frame is not a CUDA kernel frame")

// Extract kernel name from timing event, demangle, and update frame[0]
nameBytes := ev.KernelName[:]
if idx := bytes.IndexByte(nameBytes, 0); idx >= 0 {
nameBytes = nameBytes[:idx]
}
if len(nameBytes) > 0 {
mangledStr := libpf.Intern(unsafe.String(unsafe.SliceData(nameBytes), len(nameBytes)))
funcName := mangledStr
if demStr, err := demangle.ToString(
mangledStr.String(), demangle.NoParams, demangle.NoEnclosingParams); err == nil {
funcName = libpf.Intern(demStr)
}
tr.Frames[0].File = host.FileID(*(*uint64)(unsafe.Pointer(&istr)))

currentFrame := out.Trace.Frames[0].Value()
out.Trace.Frames[0] = unique.Make(libpf.Frame{
Type: currentFrame.Type,
AddressOrLineno: currentFrame.AddressOrLineno,
FunctionName: funcName,
})
}
return tr

// Recompute trace hash since we modified frame[0]
out.Trace.Hash = traceutil.HashTrace(out.Trace)

return out
}

// AddTrace is a static function that delegates to the appropriate fixer for the PID.
// Completed traces are sent directly to traceOutChan.
func AddTrace(trace *host.Trace, traceOutChan chan<- *host.Trace) error {
pid := trace.PID
func AddTrace(st *SymbolizedCudaTrace) []CudaTraceOutput {
pid := st.Meta.PID
value, ok := gpuFixers.Load(pid)
if !ok {
return fmt.Errorf("no GPU fixer found for PID %d", pid)
log.Warnf("no GPU fixer found for PID %d", pid)
return nil
}
fixer := value.(*gpuTraceFixer)
return fixer.addTrace(trace, traceOutChan)
return fixer.addTrace(st)
}

// AddTime is a static function that delegates to the appropriate fixer for the PID.
func AddTime(ev *CuptiTimingEvent) *host.Trace {
// addTimeSingle is a static function that delegates to the appropriate fixer for the PID.
func addTimeSingle(ev *CuptiTimingEvent) (CudaTraceOutput, bool) {
pid := libpf.PID(ev.Pid)
value, ok := gpuFixers.Load(pid)
if !ok {
log.Warnf("no GPU fixer found for PID %d", pid)
return nil
return CudaTraceOutput{}, false
}
fixer := value.(*gpuTraceFixer)
fixer.mu.Lock()
Expand All @@ -358,16 +397,19 @@ func AddTime(ev *CuptiTimingEvent) *host.Trace {
}

// AddTimes processes a batch of timing events, taking the lock once per PID.
func AddTimes(events []CuptiTimingEvent, out chan<- *host.Trace) {
// Returns all completed traces.
func AddTimes(events []CuptiTimingEvent) []CudaTraceOutput {
if len(events) == 0 {
return
return nil
}

var outputs []CudaTraceOutput

// Fast path: assume all events from same PID (common case)
pid := libpf.PID(events[0].Pid)
value, ok := gpuFixers.Load(pid)
if !ok {
return
return nil
}
fixer := value.(*gpuTraceFixer)

Expand All @@ -379,18 +421,20 @@ func AddTimes(events []CuptiTimingEvent, out chan<- *host.Trace) {
otherPID = append(otherPID, *ev)
continue
}
if trace := fixer.addTime(ev); trace != nil {
out <- trace
if out, ok := fixer.addTime(ev); ok {
outputs = append(outputs, out)
}
}
fixer.mu.Unlock()

// Handle rare events from other PIDs
for i := range otherPID {
if trace := AddTime(&otherPID[i]); trace != nil {
out <- trace
if out, ok := addTimeSingle(&otherPID[i]); ok {
outputs = append(outputs, out)
}
}

return outputs
}

// MaybeClearAll periodically clears all fixers and reports aggregated metrics.
Expand All @@ -416,27 +460,11 @@ func MaybeClearAll() {
}
}

func (i *Instance) Symbolize(f *host.Frame, frames *libpf.Frames) error {
if f.Type != libpf.CUDAKernelFrame {
return interpreter.ErrMismatchInterpreterType
}
// Extract the libpf.String directly from the uint64 field (both are 8 bytes)
fileIDAsUint64 := uint64(f.File)
mangledStr := *(*libpf.String)(unsafe.Pointer(&fileIDAsUint64))

// Demangle the kernel name
funcName := mangledStr
if demStr, err := demangle.ToString(
mangledStr.String(), demangle.NoParams, demangle.NoEnclosingParams); err == nil {
funcName = libpf.Intern(demStr)
}

frames.Append(&libpf.Frame{
Type: libpf.CUDAKernelFrame,
AddressOrLineno: f.Lineno,
FunctionName: funcName,
})
return nil
// Symbolize is a stub — ConvertTrace handles CUDA frames directly via `case libpf.CUDA`,
// so this should never be called in normal operation.
func (i *Instance) Symbolize(f *host.Frame, _ *libpf.Frames) error {
return fmt.Errorf("CUDA Symbolize called unexpectedly for frame type %d: %w",
f.Type, interpreter.ErrMismatchInterpreterType)
}

func (d *data) Unload(ebpf interpreter.EbpfHandler) {
Expand Down
Loading
Loading