diff --git a/internal/controller/controller.go b/internal/controller/controller.go index b827696e0..975803a14 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -181,7 +181,7 @@ func startTraceHandling(ctx context.Context, rep reporter.TraceReporter, } _, err := tracehandler.Start(ctx, rep, trc.TraceProcessor(), - traceCh, intervals, cacheSize) + traceCh, intervals, cacheSize, nil) return err } diff --git a/interpreter/gpu/cuda.go b/interpreter/gpu/cuda.go index 89a5a610f..fc41bc31a 100644 --- a/interpreter/gpu/cuda.go +++ b/interpreter/gpu/cuda.go @@ -2,10 +2,10 @@ package gpu // import "go.opentelemetry.io/ebpf-profiler/interpreter/gpu" import ( "bytes" - "errors" "fmt" "strconv" "sync" + "unique" "unsafe" "github.com/ianlancetaylor/demangle" @@ -16,6 +16,8 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf/pfelf" "go.opentelemetry.io/ebpf-profiler/metrics" "go.opentelemetry.io/ebpf-profiler/remotememory" + "go.opentelemetry.io/ebpf-profiler/reporter/samples" + "go.opentelemetry.io/ebpf-profiler/traceutil" ) const ( @@ -31,6 +33,24 @@ var ( gpuFixers sync.Map ) +// SymbolizedCudaTrace holds a symbolized trace awaiting GPU timing information. +// The CPU frames are already symbolized; only the CUDA kernel frame (frame[0]) +// needs the kernel name from the timing event. +type SymbolizedCudaTrace struct { + Trace *libpf.Trace + Meta *samples.TraceEventMeta + CorrelationID uint32 + CBID int32 +} + +// CudaTraceOutput is a fully completed CUDA trace ready for reporting. +// For non-graph launches the pointers alias the SymbolizedCudaTrace directly. +// For graph launches they point to copies since the original is reused. +type CudaTraceOutput struct { + Trace *libpf.Trace + Meta *samples.TraceEventMeta +} + // gpuTraceFixer matches traces with timing information for a specific PID. // We use a single fixer per PID because CUDA correlation IDs are unique per process // across all devices and streams. @@ -40,9 +60,9 @@ var ( // that launched the kernel." type gpuTraceFixer struct { mu sync.Mutex - timesAwaitingTraces map[uint32][]CuptiTimingEvent // keyed by correlation ID - tracesAwaitingTimes map[uint32]*host.Trace // keyed by correlation ID - maxCorrelationId uint32 // track highest ID for threshold-based clearing + timesAwaitingTraces map[uint32][]CuptiTimingEvent // keyed by correlation ID + tracesAwaitingTimes map[uint32]*SymbolizedCudaTrace // keyed by correlation ID + maxCorrelationId uint32 // track highest ID for threshold-based clearing } type data struct { @@ -152,7 +172,7 @@ func (d *data) Attach(ebpf interpreter.EbpfHandler, pid libpf.PID, _ libpf.Addre // Create and register fixer for this PID fixer := &gpuTraceFixer{ timesAwaitingTraces: make(map[uint32][]CuptiTimingEvent), - tracesAwaitingTimes: make(map[uint32]*host.Trace), + tracesAwaitingTimes: make(map[uint32]*SymbolizedCudaTrace), } gpuFixers.Store(pid, fixer) @@ -190,65 +210,58 @@ func isGraphLaunch(cbid int32) bool { return false } -// addTrace is called when a CUDA trace is received, to match it with timing info. -// Sends completed traces directly to the output channel (may be multiple for graph launches). -func (f *gpuTraceFixer) addTrace(trace *host.Trace, traceOutChan chan<- *host.Trace) error { - if len(trace.Frames) == 0 { - return errors.New("no frames in trace") - } - frame := trace.Frames[0] - if frame.Type != libpf.CUDAKernelFrame { - return errors.New("first frame is not a CUDA kernel frame") - } - correlationId := uint32(frame.Lineno) - cbid := int32(frame.Lineno >> 32) - - log.Debugf("[cuda] adding trace with id %d cbid %d (0x%x) for pid %d", correlationId, int(cbid), uint32(cbid), trace.PID) +// addTrace is called when a symbolized CUDA trace is received, to match it with timing info. +// Returns completed traces (may be multiple for graph launches). +func (f *gpuTraceFixer) addTrace(st *SymbolizedCudaTrace) []CudaTraceOutput { + log.Debugf("[cuda] adding trace with id %d cbid %d (0x%x) for pid %d", + st.CorrelationID, int(st.CBID), uint32(st.CBID), st.Meta.PID) f.mu.Lock() defer f.mu.Unlock() // Update max, detecting wrap-around (new ID much smaller than max means wrap) - if correlationId > f.maxCorrelationId || f.maxCorrelationId-correlationId > 1<<31 { - f.maxCorrelationId = correlationId + if st.CorrelationID > f.maxCorrelationId || f.maxCorrelationId-st.CorrelationID > 1<<31 { + f.maxCorrelationId = st.CorrelationID } - evs, ok := f.timesAwaitingTraces[correlationId] + var outputs []CudaTraceOutput + + evs, ok := f.timesAwaitingTraces[st.CorrelationID] if ok && len(evs) > 0 { // Process any timing events that arrived before this trace for idx := range evs { log.Debugf("[cuda] gpu trace completed id %d cbid %d (0x%x) for pid %d", - correlationId, int(cbid), uint32(cbid), trace.PID) - traceOutChan <- f.prepTrace(trace, &evs[idx]) + st.CorrelationID, int(st.CBID), uint32(st.CBID), st.Meta.PID) + outputs = append(outputs, f.prepTrace(st, &evs[idx])) } // Always delete the key to avoid nil entries accumulating - delete(f.timesAwaitingTraces, correlationId) + delete(f.timesAwaitingTraces, st.CorrelationID) // For non-graph launches, we've matched the only timing event, done - if !isGraphLaunch(cbid) { - return nil + if !isGraphLaunch(st.CBID) { + return outputs } } // Store trace for future timing events - f.tracesAwaitingTimes[correlationId] = trace - return nil + f.tracesAwaitingTimes[st.CorrelationID] = st + return outputs } // addTime is called when timing info is received from eBPF, to match it with a trace. // Caller must hold f.mu. -func (f *gpuTraceFixer) addTime(ev *CuptiTimingEvent) *host.Trace { +func (f *gpuTraceFixer) addTime(ev *CuptiTimingEvent) (CudaTraceOutput, bool) { // Update max, detecting wrap-around (new ID much smaller than max means wrap) if ev.Id > f.maxCorrelationId || f.maxCorrelationId-ev.Id > 1<<31 { f.maxCorrelationId = ev.Id } - trace, ok := f.tracesAwaitingTimes[ev.Id] + st, ok := f.tracesAwaitingTimes[ev.Id] if ok { if ev.Graph == 0 { delete(f.tracesAwaitingTimes, ev.Id) } - return f.prepTrace(trace, ev) + return f.prepTrace(st, ev), true } f.timesAwaitingTraces[ev.Id] = append(f.timesAwaitingTraces[ev.Id], *ev) - return nil + return CudaTraceOutput{}, false } // fixerStats holds statistics from a single fixer for aggregation. @@ -294,62 +307,88 @@ func (f *gpuTraceFixer) maybeClear() fixerStats { return stats } -// prepTrace prepares a trace with timing information and kernel name. -func (f *gpuTraceFixer) prepTrace(tr *host.Trace, ev *CuptiTimingEvent) *host.Trace { +// prepTrace attaches timing information and the demangled kernel name to a symbolized +// CUDA trace, producing a CudaTraceOutput ready for reporting. +func (f *gpuTraceFixer) prepTrace(st *SymbolizedCudaTrace, ev *CuptiTimingEvent) CudaTraceOutput { + out := CudaTraceOutput{ + Trace: st.Trace, + Meta: st.Meta, + } + if ev.Graph != 0 { - // Graphs can have many kernels with same correlation ID - clone := *tr - tr = &clone + // Graphs can have many kernels with same correlation ID. + // Copy Trace (Frames differ per kernel, Hash differs) and Meta (OffTime differs) + // since the original st stays in the map for future timing events. + // CustomLabels are NOT copied: all events for the same correlation ID share + // identical cuda_device/cuda_stream/cuda_graph values. + traceCopy := *st.Trace + traceCopy.Frames = make(libpf.Frames, len(st.Trace.Frames)) + copy(traceCopy.Frames, st.Trace.Frames) + out.Trace = &traceCopy + metaCopy := *st.Meta + out.Meta = &metaCopy } - tr.OffTime = int64(ev.End - ev.Start) - if tr.CustomLabels == nil { - tr.CustomLabels = make(map[string]string) + + out.Meta.OffTime = int64(ev.End - ev.Start) + if out.Trace.CustomLabels == nil { + out.Trace.CustomLabels = make(map[string]string) } - tr.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.Dev), 10) + out.Trace.CustomLabels["cuda_device"] = strconv.FormatUint(uint64(ev.Dev), 10) if ev.Stream != 0 { - tr.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.Stream), 10) + out.Trace.CustomLabels["cuda_stream"] = strconv.FormatUint(uint64(ev.Stream), 10) } if ev.Graph != 0 { - tr.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.Graph), 10) - tr.CustomLabels["cuda_id"] = strconv.FormatUint(uint64(ev.Id), 10) + out.Trace.CustomLabels["cuda_graph"] = strconv.FormatUint(uint64(ev.Graph), 10) + out.Trace.CustomLabels["cuda_id"] = strconv.FormatUint(uint64(ev.Id), 10) } - if len(ev.KernelName) > 0 { - // Store the raw (mangled) kernel name - demangling happens in Symbolize - // Use unsafe.String to avoid allocation - Intern/unique.Make will copy if new - nameBytes := ev.KernelName[:] - if idx := bytes.IndexByte(nameBytes, 0); idx >= 0 { - nameBytes = nameBytes[:idx] - } - istr := libpf.Intern(unsafe.String(unsafe.SliceData(nameBytes), len(nameBytes))) - // See collect_trace where we always make the first frame a CUDA kernel frame. - if tr.Frames[0].Type != libpf.CUDAKernelFrame { - panic("first frame is not a CUDA kernel frame") + + // Extract kernel name from timing event, demangle, and update frame[0] + nameBytes := ev.KernelName[:] + if idx := bytes.IndexByte(nameBytes, 0); idx >= 0 { + nameBytes = nameBytes[:idx] + } + if len(nameBytes) > 0 { + mangledStr := libpf.Intern(unsafe.String(unsafe.SliceData(nameBytes), len(nameBytes))) + funcName := mangledStr + if demStr, err := demangle.ToString( + mangledStr.String(), demangle.NoParams, demangle.NoEnclosingParams); err == nil { + funcName = libpf.Intern(demStr) } - tr.Frames[0].File = host.FileID(*(*uint64)(unsafe.Pointer(&istr))) + + currentFrame := out.Trace.Frames[0].Value() + out.Trace.Frames[0] = unique.Make(libpf.Frame{ + Type: currentFrame.Type, + AddressOrLineno: currentFrame.AddressOrLineno, + FunctionName: funcName, + }) } - return tr + + // Recompute trace hash since we modified frame[0] + out.Trace.Hash = traceutil.HashTrace(out.Trace) + + return out } // AddTrace is a static function that delegates to the appropriate fixer for the PID. -// Completed traces are sent directly to traceOutChan. -func AddTrace(trace *host.Trace, traceOutChan chan<- *host.Trace) error { - pid := trace.PID +func AddTrace(st *SymbolizedCudaTrace) []CudaTraceOutput { + pid := st.Meta.PID value, ok := gpuFixers.Load(pid) if !ok { - return fmt.Errorf("no GPU fixer found for PID %d", pid) + log.Warnf("no GPU fixer found for PID %d", pid) + return nil } fixer := value.(*gpuTraceFixer) - return fixer.addTrace(trace, traceOutChan) + return fixer.addTrace(st) } -// AddTime is a static function that delegates to the appropriate fixer for the PID. -func AddTime(ev *CuptiTimingEvent) *host.Trace { +// addTimeSingle is a static function that delegates to the appropriate fixer for the PID. +func addTimeSingle(ev *CuptiTimingEvent) (CudaTraceOutput, bool) { pid := libpf.PID(ev.Pid) value, ok := gpuFixers.Load(pid) if !ok { log.Warnf("no GPU fixer found for PID %d", pid) - return nil + return CudaTraceOutput{}, false } fixer := value.(*gpuTraceFixer) fixer.mu.Lock() @@ -358,16 +397,19 @@ func AddTime(ev *CuptiTimingEvent) *host.Trace { } // AddTimes processes a batch of timing events, taking the lock once per PID. -func AddTimes(events []CuptiTimingEvent, out chan<- *host.Trace) { +// Returns all completed traces. +func AddTimes(events []CuptiTimingEvent) []CudaTraceOutput { if len(events) == 0 { - return + return nil } + var outputs []CudaTraceOutput + // Fast path: assume all events from same PID (common case) pid := libpf.PID(events[0].Pid) value, ok := gpuFixers.Load(pid) if !ok { - return + return nil } fixer := value.(*gpuTraceFixer) @@ -379,18 +421,20 @@ func AddTimes(events []CuptiTimingEvent, out chan<- *host.Trace) { otherPID = append(otherPID, *ev) continue } - if trace := fixer.addTime(ev); trace != nil { - out <- trace + if out, ok := fixer.addTime(ev); ok { + outputs = append(outputs, out) } } fixer.mu.Unlock() // Handle rare events from other PIDs for i := range otherPID { - if trace := AddTime(&otherPID[i]); trace != nil { - out <- trace + if out, ok := addTimeSingle(&otherPID[i]); ok { + outputs = append(outputs, out) } } + + return outputs } // MaybeClearAll periodically clears all fixers and reports aggregated metrics. @@ -416,27 +460,11 @@ func MaybeClearAll() { } } -func (i *Instance) Symbolize(f *host.Frame, frames *libpf.Frames) error { - if f.Type != libpf.CUDAKernelFrame { - return interpreter.ErrMismatchInterpreterType - } - // Extract the libpf.String directly from the uint64 field (both are 8 bytes) - fileIDAsUint64 := uint64(f.File) - mangledStr := *(*libpf.String)(unsafe.Pointer(&fileIDAsUint64)) - - // Demangle the kernel name - funcName := mangledStr - if demStr, err := demangle.ToString( - mangledStr.String(), demangle.NoParams, demangle.NoEnclosingParams); err == nil { - funcName = libpf.Intern(demStr) - } - - frames.Append(&libpf.Frame{ - Type: libpf.CUDAKernelFrame, - AddressOrLineno: f.Lineno, - FunctionName: funcName, - }) - return nil +// Symbolize is a stub — ConvertTrace handles CUDA frames directly via `case libpf.CUDA`, +// so this should never be called in normal operation. +func (i *Instance) Symbolize(f *host.Frame, _ *libpf.Frames) error { + return fmt.Errorf("CUDA Symbolize called unexpectedly for frame type %d: %w", + f.Type, interpreter.ErrMismatchInterpreterType) } func (d *data) Unload(ebpf interpreter.EbpfHandler) { diff --git a/parcagpu/parcagpu.go b/parcagpu/parcagpu.go index be12de02a..f28520e50 100644 --- a/parcagpu/parcagpu.go +++ b/parcagpu/parcagpu.go @@ -11,40 +11,20 @@ import ( "go.opentelemetry.io/ebpf-profiler/host" "go.opentelemetry.io/ebpf-profiler/interpreter/gpu" + "go.opentelemetry.io/ebpf-profiler/libpf" + "go.opentelemetry.io/ebpf-profiler/reporter" + "go.opentelemetry.io/ebpf-profiler/reporter/samples" "go.opentelemetry.io/ebpf-profiler/support" "go.opentelemetry.io/ebpf-profiler/tracer" + "go.opentelemetry.io/ebpf-profiler/tracehandler" ) -// Start starts two goroutines that filter traces coming from ebpf and match them up with timing -// information coming from the parcagpu usdt probes. -func Start(ctx context.Context, traceInCh <-chan *host.Trace, - tr *tracer.Tracer) chan *host.Trace { +// Start starts a goroutine that reads GPU timing events and returns a TraceInterceptor +// that diverts CUDA traces (post-symbolization) into the GPU fixer. +// Completed CUDA traces are reported directly via rep. +func Start(ctx context.Context, tr *tracer.Tracer, + rep reporter.TraceReporter) tracehandler.TraceInterceptor { gpuTimingEvents := tr.GetEbpfMaps()["cuda_timing_events"] - traceOutChan := make(chan *host.Trace, 1024) - - // Read traces coming from ebpf and send normal traces through - go func() { - timer := time.NewTicker(1 * time.Second) - defer timer.Stop() - - for { - select { - case <-timer.C: - // Periodically clean up all GPU trace fixers and report metrics - gpu.MaybeClearAll() - case <-ctx.Done(): - return - case t := <-traceInCh: - if t != nil && t.Origin == support.TraceOriginCuda { - if err := gpu.AddTrace(t, traceOutChan); err != nil { - log.Errorf("[parcagpu] failed to add trace for PID %d: %v", t.PID, err) - } - } else { - traceOutChan <- t - } - } - } - }() // Per-CPU buffer size for timing events. CuptiTimingEvent is ~300 bytes, // so 1MB allows ~3400 events per CPU before overflow. @@ -55,9 +35,14 @@ func Start(ctx context.Context, traceInCh <-chan *host.Trace, var lostEventsCount, readErrorCount, noDataCount atomic.Uint64 - // processBatch processes a batch of timing events in parallel. + // processBatch processes a batch of timing events and reports completed traces. processBatch := func(batch []gpu.CuptiTimingEvent) { - gpu.AddTimes(batch, traceOutChan) + outputs := gpu.AddTimes(batch) + for i := range outputs { + if err := rep.ReportTraceEvent(outputs[i].Trace, outputs[i].Meta); err != nil { + log.Errorf("[parcagpu] failed to report CUDA trace: %v", err) + } + } } const batchSize = 100 @@ -67,6 +52,10 @@ func Start(ctx context.Context, traceInCh <-chan *host.Trace, logTicker := time.NewTicker(5 * time.Second) defer logTicker.Stop() + + clearTicker := time.NewTicker(1 * time.Second) + defer clearTicker.Stop() + for { select { case <-logTicker.C: @@ -77,6 +66,9 @@ func Start(ctx context.Context, traceInCh <-chan *host.Trace, log.Warnf("[cuda] timing event reader: lost=%d readErrors=%d noData=%d", lost, readErr, noData) } + case <-clearTicker.C: + // Periodically clean up all GPU trace fixers and report metrics + gpu.MaybeClearAll() case <-ctx.Done(): return default: @@ -103,5 +95,44 @@ func Start(ctx context.Context, traceInCh <-chan *host.Trace, } }() - return traceOutChan + // Return the interceptor function that diverts CUDA traces post-symbolization. + return func(trace *libpf.Trace, meta *samples.TraceEventMeta, + rawTrace *host.Trace) bool { + if meta.Origin != support.TraceOriginCuda { + return false + } + + // Find the CUDA kernel frame in the symbolized trace + cudaFrameIdx := -1 + for i, uniqueFrame := range trace.Frames { + if uniqueFrame.Value().Type == libpf.CUDAKernelFrame { + cudaFrameIdx = i + break + } + } + if cudaFrameIdx < 0 { + log.Errorf("[parcagpu] CUDA trace has no CUDAKernelFrame") + return false + } + + frame := trace.Frames[cudaFrameIdx].Value() + correlationID := uint32(frame.AddressOrLineno) + cbid := int32(frame.AddressOrLineno >> 32) + + st := &gpu.SymbolizedCudaTrace{ + Trace: trace, + Meta: meta, + CorrelationID: correlationID, + CBID: cbid, + } + + outputs := gpu.AddTrace(st) + for i := range outputs { + if err := rep.ReportTraceEvent(outputs[i].Trace, outputs[i].Meta); err != nil { + log.Errorf("[parcagpu] failed to report CUDA trace: %v", err) + } + } + + return true // consumed + } } diff --git a/processmanager/manager.go b/processmanager/manager.go index 6ea93e301..66d491306 100644 --- a/processmanager/manager.go +++ b/processmanager/manager.go @@ -286,6 +286,10 @@ func (pm *ProcessManager) ConvertTrace(trace *host.Trace) (newTrace *libpf.Trace newTrace.AppendFrameFull(frame.Type, fileID, relativeRIP, mappingStart, mappingEnd, fileOffset) + case libpf.CUDA: + // CUDA kernel frames are symbolized later when GPU timing arrives. + // Preserve AddressOrLineno which encodes correlation ID | (CBID << 32). + newTrace.AppendFrame(frame.Type, libpf.UnsymbolizedFileID, frame.Lineno) default: err := pm.symbolizeFrame(i, trace, &newTrace.Frames) if err != nil { diff --git a/support/ebpf/extmaps.h b/support/ebpf/extmaps.h index 7b51dd5ac..3e4435013 100644 --- a/support/ebpf/extmaps.h +++ b/support/ebpf/extmaps.h @@ -21,13 +21,6 @@ extern bpf_map_def trace_events; extern bpf_map_def go_labels_procs; extern bpf_map_def cl_procs; extern bpf_map_def v8_procs; - -#if defined(TESTING_COREDUMP) - -// References to maps in alphabetical order that -// are needed only for testing. - -extern bpf_map_def apm_int_procs; extern bpf_map_def exe_id_to_8_stack_deltas; extern bpf_map_def exe_id_to_9_stack_deltas; extern bpf_map_def exe_id_to_10_stack_deltas; @@ -44,14 +37,21 @@ extern bpf_map_def exe_id_to_20_stack_deltas; extern bpf_map_def exe_id_to_21_stack_deltas; extern bpf_map_def exe_id_to_22_stack_deltas; extern bpf_map_def exe_id_to_23_stack_deltas; +extern bpf_map_def stack_delta_page_to_info; +extern bpf_map_def unwind_info_array; + +#if defined(TESTING_COREDUMP) + +// References to maps in alphabetical order that +// are needed only for testing. + +extern bpf_map_def apm_int_procs; extern bpf_map_def hotspot_procs; extern bpf_map_def dotnet_procs; extern bpf_map_def perl_procs; extern bpf_map_def php_procs; extern bpf_map_def py_procs; extern bpf_map_def ruby_procs; -extern bpf_map_def stack_delta_page_to_info; -extern bpf_map_def unwind_info_array; extern bpf_map_def luajit_procs; #endif // TESTING_COREDUMP diff --git a/support/ebpf/native_stack_trace.ebpf.c b/support/ebpf/native_stack_trace.ebpf.c index af015e9d1..f725fa1b4 100644 --- a/support/ebpf/native_stack_trace.ebpf.c +++ b/support/ebpf/native_stack_trace.ebpf.c @@ -36,10 +36,6 @@ STACK_DELTA_BUCKET(21); STACK_DELTA_BUCKET(22); STACK_DELTA_BUCKET(23); -// Unwind info value for invalid stack delta -#define STACK_DELTA_INVALID (STACK_DELTA_COMMAND_FLAG | UNWIND_COMMAND_INVALID) -#define STACK_DELTA_STOP (STACK_DELTA_COMMAND_FLAG | UNWIND_COMMAND_STOP) - // An array of unwind info contains the all the different UnwindInfo instances // needed system wide. Individual stack delta entries refer to this array. bpf_map_def SEC("maps") unwind_info_array = { @@ -52,9 +48,6 @@ bpf_map_def SEC("maps") unwind_info_array = { .max_entries = 16384, }; -// The number of native frames to unwind per frame-unwinding eBPF program. -#define NATIVE_FRAMES_PER_PROGRAM 8 - // The decision whether to unwind native stacks or interpreter stacks is made by checking if a given // PC address falls into the "interpreter loop" of an interpreter. This map helps identify such // loops: The keys are those executable section IDs that contain interpreter loops, the values @@ -84,520 +77,7 @@ bpf_map_def SEC("maps") kernel_stackmap = { .max_entries = 16 * 1024, }; -// Record a native frame -static EBPF_INLINE ErrorCode push_native(Trace *trace, u64 file, u64 line, bool return_address) -{ - return _push_with_return_address(trace, file, line, FRAME_MARKER_NATIVE, return_address); -} - -// A single step for the bsearch into the big_stack_deltas array. This is really a textbook bsearch -// step, built in a way to update the value of *lo and *hi. This function will be called repeatedly -// (since we cannot do loops). The return value signals whether the bsearch came to an end / found -// the right element or whether it needs to continue. -static EBPF_INLINE bool bsearch_step(void *inner_map, u32 *lo, u32 *hi, u16 page_offset) -{ - u32 pivot = (*lo + *hi) >> 1; - StackDelta *delta = bpf_map_lookup_elem(inner_map, &pivot); - if (!delta) { - *hi = 0; - return false; - } - if (page_offset >= delta->addrLow) { - *lo = pivot + 1; - } else { - *hi = pivot; - } - return *lo < *hi; -} - -// Get the outer map based on the number of stack delta entries. -static EBPF_INLINE void *get_stack_delta_map(int mapID) -{ - switch (mapID) { - case 8: return &exe_id_to_8_stack_deltas; - case 9: return &exe_id_to_9_stack_deltas; - case 10: return &exe_id_to_10_stack_deltas; - case 11: return &exe_id_to_11_stack_deltas; - case 12: return &exe_id_to_12_stack_deltas; - case 13: return &exe_id_to_13_stack_deltas; - case 14: return &exe_id_to_14_stack_deltas; - case 15: return &exe_id_to_15_stack_deltas; - case 16: return &exe_id_to_16_stack_deltas; - case 17: return &exe_id_to_17_stack_deltas; - case 18: return &exe_id_to_18_stack_deltas; - case 19: return &exe_id_to_19_stack_deltas; - case 20: return &exe_id_to_20_stack_deltas; - case 21: return &exe_id_to_21_stack_deltas; - case 22: return &exe_id_to_22_stack_deltas; - case 23: return &exe_id_to_23_stack_deltas; - default: return NULL; - } -} - -// Get the stack offset of the given instruction. -static EBPF_INLINE ErrorCode get_stack_delta(UnwindState *state, int *addrDiff, u32 *unwindInfo) -{ - u64 exe_id = state->text_section_id; - - // Look up the stack delta page information for this address. - StackDeltaPageKey key = {}; - key.fileID = state->text_section_id; - key.page = state->text_section_offset & ~STACK_DELTA_PAGE_MASK; - DEBUG_PRINT( - "Look up stack delta for %lx:%lx", - (unsigned long)state->text_section_id, - (unsigned long)state->text_section_offset); - StackDeltaPageInfo *info = bpf_map_lookup_elem(&stack_delta_page_to_info, &key); - if (!info) { - DEBUG_PRINT( - "Failure to look up stack delta page fileID %lx, page %lx", - (unsigned long)key.fileID, - (unsigned long)key.page); - state->error_metric = metricID_UnwindNativeErrLookupTextSection; - return ERR_NATIVE_LOOKUP_TEXT_SECTION; - } - - void *outer_map = get_stack_delta_map(info->mapID); - if (!outer_map) { - DEBUG_PRINT( - "Failure to look up outer map for text section %lx in mapID %d", - (unsigned long)exe_id, - (int)info->mapID); - state->error_metric = metricID_UnwindNativeErrLookupStackDeltaOuterMap; - return ERR_NATIVE_LOOKUP_STACK_DELTA_OUTER_MAP; - } - - void *inner_map = bpf_map_lookup_elem(outer_map, &exe_id); - if (!inner_map) { - DEBUG_PRINT("Failure to look up inner map for text section %lx", (unsigned long)exe_id); - state->error_metric = metricID_UnwindNativeErrLookupStackDeltaInnerMap; - return ERR_NATIVE_LOOKUP_STACK_DELTA_INNER_MAP; - } - - // Preinitialize the idx for the index to use for page without any deltas. - u32 idx = info->firstDelta; - u16 page_offset = state->text_section_offset & STACK_DELTA_PAGE_MASK; - if (info->numDeltas) { - // Page has deltas, so find the correct one to use using binary search. - u32 lo = info->firstDelta; - u32 hi = lo + info->numDeltas; - - DEBUG_PRINT( - "Intervals should be from %lu to %lu (mapID %d)", - (unsigned long)lo, - (unsigned long)hi, - (int)info->mapID); - - // Do the binary search, up to 16 iterations. Deltas are paged to 64kB pages. - // They can contain at most 64kB deltas even if everything is single byte opcodes. - int i; - UNROLL for (i = 0; i < 16; i++) - { - if (!bsearch_step(inner_map, &lo, &hi, page_offset)) { - break; - } - } - if (i >= 16 || hi == 0) { - DEBUG_PRINT("Failed bsearch in 16 steps. Corrupt data?"); - state->error_metric = metricID_UnwindNativeErrLookupIterations; - return ERR_NATIVE_EXCEEDED_DELTA_LOOKUP_ITERATIONS; - } - // After bsearch, 'hi' points to the first entry greater than the requested. - idx = hi; - } - - // The code above found the first entry with greater address than requested, - // so it needs to be decremented by one to get the entry with equal-or-less. - // This makes also the logic work cross-pages: if the first entry in within - // the page is too large, this actually gets the entry from the previous page. - idx--; - - StackDelta *delta = bpf_map_lookup_elem(inner_map, &idx); - if (!delta) { - state->error_metric = metricID_UnwindNativeErrLookupRange; - return ERR_NATIVE_LOOKUP_RANGE; - } - - DEBUG_PRINT( - "delta index %d, addrLow 0x%x, unwindInfo %d", idx, delta->addrLow, delta->unwindInfo); - - // Calculate PC delta from stack delta for merged delta comparison - int deltaOffset = (int)page_offset - (int)delta->addrLow; - if (idx < info->firstDelta) { - // PC is below the first delta of the corresponding page. This means that - // delta->addrLow contains address relative to one page before the page_offset. - // Fix up the deltaOffset with this difference of base pages. - deltaOffset += 1 << STACK_DELTA_PAGE_BITS; - } - - *addrDiff = deltaOffset; - *unwindInfo = delta->unwindInfo; - - if (delta->unwindInfo == STACK_DELTA_INVALID) { - state->error_metric = metricID_UnwindNativeErrStackDeltaInvalid; - return ERR_NATIVE_STACK_DELTA_INVALID; - } - if (delta->unwindInfo == STACK_DELTA_STOP) { - increment_metric(metricID_UnwindNativeStackDeltaStop); - } - - return ERR_OK; -} - -// unwind_register_address calculates the given expression ('opcode'/'param') to get -// the CFA (canonical frame address, to recover PC and be used in further calculations), -// or the address where a register is stored (FP currently), so that the value of -// the register can be recovered. -// -// Currently the following expressions are supported: -// 1. Not recoverable -> NULL is returned. -// 2. When UNWIND_OPCODEF_DEREF is not set: -// BASE + param -// 3. When UNWIND_OPCODEF_DEREF is set: -// *(BASE + preDeref) + postDeref -static EBPF_INLINE u64 unwind_register_address(UnwindState *state, u64 cfa, u8 opcode, s32 param) -{ - unsigned long addr, val; - s32 preDeref = param, postDeref = 0; - - if (opcode & UNWIND_OPCODEF_DEREF) { - // For expressions that dereference the base expression, the parameter is constructed - // of pre-dereference and post-derefence operands. Unpack those. - preDeref &= ~UNWIND_DEREF_MASK; - postDeref = (param & UNWIND_DEREF_MASK) * UNWIND_DEREF_MULTIPLIER; - } - - // Resolve the 'BASE' register, and fetch the CFA/FP/SP value. - switch (opcode & ~UNWIND_OPCODEF_DEREF) { - case UNWIND_OPCODE_BASE_CFA: addr = cfa; break; - case UNWIND_OPCODE_BASE_FP: addr = state->fp; break; - case UNWIND_OPCODE_BASE_SP: addr = state->sp; break; -#if defined(__aarch64__) - case UNWIND_OPCODE_BASE_LR: - DEBUG_PRINT("unwind: lr"); - - if (state->lr == 0) { - increment_metric(metricID_UnwindNativeLr0); - DEBUG_PRINT("Failure to unwind frame: zero LR at %llx", state->pc); - return 0; - } - - return state->lr; -#endif -#if defined(__x86_64__) - case UNWIND_OPCODE_BASE_REG: - val = (param & ~UNWIND_REG_MASK) >> 1; - DEBUG_PRINT("unwind: r%d+%lu", param & UNWIND_REG_MASK, val); - switch (param & UNWIND_REG_MASK) { - case 0: // rax - addr = state->rax; - break; - case 9: // r9 - addr = state->r9; - break; - case 11: // r11 - addr = state->r11; - break; - case 15: // r15 - addr = state->r15; - break; - default: return 0; - } - return addr + val; -#endif - default: return 0; - } - -#ifdef OPTI_DEBUG - switch (opcode) { - case UNWIND_OPCODE_BASE_CFA: DEBUG_PRINT("unwind: cfa+%d", preDeref); break; - case UNWIND_OPCODE_BASE_FP: DEBUG_PRINT("unwind: fp+%d", preDeref); break; - case UNWIND_OPCODE_BASE_SP: DEBUG_PRINT("unwind: sp+%d", preDeref); break; - case UNWIND_OPCODE_BASE_CFA | UNWIND_OPCODEF_DEREF: - DEBUG_PRINT("unwind: *(cfa+%d)+%d", preDeref, postDeref); - break; - case UNWIND_OPCODE_BASE_FP | UNWIND_OPCODEF_DEREF: - DEBUG_PRINT("unwind: *(fp+%d)+%d", preDeref, postDeref); - break; - case UNWIND_OPCODE_BASE_SP | UNWIND_OPCODEF_DEREF: - DEBUG_PRINT("unwind: *(sp+%d)+%d", preDeref, postDeref); - break; - } -#endif - - // Adjust based on parameter / preDereference adder. - addr += preDeref; - if ((opcode & UNWIND_OPCODEF_DEREF) == 0) { - // All done: return "BASE + param" - return addr; - } - - // Dereference, and add the postDereference adder. - if (bpf_probe_read_user(&val, sizeof(val), (void *)addr)) { - DEBUG_PRINT("unwind failed to dereference address 0x%lx", addr); - return 0; - } - // Return: "*(BASE + preDeref) + postDeref" - return val + postDeref; -} - -// Stack unwinding in the absence of frame pointers can be a bit involved, so -// this comment explains what the following code does. -// -// One begins unwinding a frame somewhere in the middle of execution. -// On x86_64, registers RIP (PC), RSP (SP), and RBP (FP) are available. -// -// This function resolves a "stack delta" command from from our internal maps. -// This stack delta refers to a rule on how to unwind the state. In the simple -// case it just provides SP delta and potentially offset from where to recover -// FP value. See unwind_register_address() on the expressions supported. -// -// The function sets the bool pointed to by the given `stop` pointer to `false` -// if the main ebpf unwinder should exit. This is the case if the current PC -// is marked with UNWIND_COMMAND_STOP which marks entry points (main function, -// thread spawn function, signal handlers, ...). -#if defined(__x86_64__) -static EBPF_INLINE ErrorCode unwind_one_frame(PerCPURecord *record, bool *stop) -{ - UnwindState *state = &record->state; - *stop = false; - - u32 unwindInfo = 0; - int addrDiff = 0; - u64 cfa = 0; - - // The relevant executable is compiled with frame pointer omission, so - // stack deltas need to be retrieved from the relevant map. - ErrorCode error = get_stack_delta(state, &addrDiff, &unwindInfo); - if (error) { - return error; - } - - if (unwindInfo & STACK_DELTA_COMMAND_FLAG) { - switch (unwindInfo & ~STACK_DELTA_COMMAND_FLAG) { - case UNWIND_COMMAND_PLT: - // The toolchains routinely emit a fixed DWARF expression to unwind the full - // PLT table with one expression to reduce .eh_frame size. - // This is the hard coded implementation of this expression. For further details, - // see https://hal.inria.fr/hal-02297690/document, page 4. (DOI: 10.1145/3360572) - cfa = state->sp + 8 + ((((state->pc & 15) >= 11) ? 1 : 0) << 3); - DEBUG_PRINT("PLT, cfa=0x%lx", (unsigned long)cfa); - break; - case UNWIND_COMMAND_SIGNAL: { - // Use the PerCPURecord scratch union instead of a stack-local buffer to avoid - // exceeding the 512-byte BPF stack limit when inlined into interpreters. - u64 *rt_regs = record->rt_regs; - // The rt_sigframe is defined at: - // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/x86/include/asm/sigframe.h?h=v6.4#n59 - // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/x86/include/uapi/asm/sigcontext.h?h=v6.4#n238 - // offsetof(struct rt_sigframe, uc.uc_mcontext) = 40 - if (bpf_probe_read_user(rt_regs, 18 * sizeof(u64), (void *)(state->sp + 40))) { - goto err_native_pc_read; - } - state->rax = rt_regs[13]; - state->r9 = rt_regs[1]; - state->r11 = rt_regs[3]; - state->r13 = rt_regs[5]; - state->r15 = rt_regs[7]; - state->fp = rt_regs[10]; - state->sp = rt_regs[15]; - state->pc = rt_regs[16]; - state->return_address = false; - DEBUG_PRINT("signal frame"); - goto frame_ok; - } - case UNWIND_COMMAND_STOP: *stop = true; return ERR_OK; - case UNWIND_COMMAND_FRAME_POINTER: - if (!unwinder_unwind_frame_pointer(state)) { - goto err_native_pc_read; - } - goto frame_ok; - case UNWIND_COMMAND_GO_MORESTACK: - if (!unwinder_unwind_go_morestack(record)) { - goto err_native_pc_read; - } - goto frame_ok; - default: return ERR_UNREACHABLE; - } - } else { - UnwindInfo *info = bpf_map_lookup_elem(&unwind_info_array, &unwindInfo); - if (!info) { - increment_metric(metricID_UnwindNativeErrBadUnwindInfoIndex); - return ERR_NATIVE_BAD_UNWIND_INFO_INDEX; - } - - s32 param = info->param; - if (info->mergeOpcode) { - DEBUG_PRINT("AddrDiff %d, merged delta %#02x", addrDiff, info->mergeOpcode); - if (addrDiff >= (info->mergeOpcode & ~MERGEOPCODE_NEGATIVE)) { - param += (info->mergeOpcode & MERGEOPCODE_NEGATIVE) ? -8 : 8; - DEBUG_PRINT("Merged delta match: cfaDelta=%d", unwindInfo); - } - } - - // Resolve the frame's CFA (previous PC is fixed to CFA) address, and - // the previous FP address if any. - cfa = unwind_register_address(state, 0, info->opcode, param); - u64 fpa = unwind_register_address(state, cfa, info->fpOpcode, info->fpParam); - - if (fpa) { - bpf_probe_read_user(&state->fp, sizeof(state->fp), (void *)fpa); - } else if (info->opcode == UNWIND_OPCODE_BASE_FP) { - // FP used for recovery, but no new FP value received, clear FP - state->fp = 0; - } - } - - if (!cfa || bpf_probe_read_user(&state->pc, sizeof(state->pc), (void *)(cfa - 8))) { - err_native_pc_read: - increment_metric(metricID_UnwindNativeErrPCRead); - return ERR_NATIVE_PC_READ; - } - state->sp = cfa; - unwinder_mark_nonleaf_frame(state); -frame_ok: - increment_metric(metricID_UnwindNativeFrames); - return ERR_OK; -} -#elif defined(__aarch64__) -static EBPF_INLINE ErrorCode unwind_one_frame(struct PerCPURecord *record, bool *stop) -{ - UnwindState *state = &record->state; - *stop = false; - - u32 unwindInfo = 0; - int addrDiff = 0; - u64 cfa = 0; - - // The relevant executable is compiled with frame pointer omission, so - // stack deltas need to be retrieved from the relevant map. - ErrorCode error = get_stack_delta(state, &addrDiff, &unwindInfo); - if (error) { - return error; - } - - if (unwindInfo & STACK_DELTA_COMMAND_FLAG) { - switch (unwindInfo & ~STACK_DELTA_COMMAND_FLAG) { - case UNWIND_COMMAND_SIGNAL: { - // Use the PerCPURecord scratch union instead of a stack-local buffer to avoid - // exceeding the 512-byte BPF stack limit when inlined into interpreters. - u64 *rt_regs = record->rt_regs; - // On aarch64 the struct rt_sigframe is at: - // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/arm64/kernel/signal.c?h=v6.4#n39 - // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/arm64/include/uapi/asm/sigcontext.h?h=v6.4#n28 - // offsetof(struct rt_sigframe, uc.uc_mcontext.regs[0]) = 312 - // offsetof(struct rt_sigframe, uc) 128 + - // offsetof(struct ucontext, uc_mcontext) 176 + - // offsetof(struct sigcontext, regs[0]) 8 - if (bpf_probe_read_user(rt_regs, 34 * sizeof(u64), (void *)(state->sp + 312))) { - goto err_native_pc_read; - } - state->pc = normalize_pac_ptr(rt_regs[32]); - state->sp = rt_regs[31]; - state->fp = rt_regs[29]; - state->lr = normalize_pac_ptr(rt_regs[30]); - state->r7 = rt_regs[7]; - state->r22 = rt_regs[22]; - state->r28 = rt_regs[28]; - state->return_address = false; - state->lr_invalid = false; - DEBUG_PRINT("signal frame"); - goto frame_ok; - } - case UNWIND_COMMAND_STOP: *stop = true; return ERR_OK; - case UNWIND_COMMAND_FRAME_POINTER: - if (!unwinder_unwind_frame_pointer(state)) { - goto err_native_pc_read; - } - goto frame_ok; - case UNWIND_COMMAND_GO_MORESTACK: - if (!unwinder_unwind_go_morestack(record)) { - goto err_native_pc_read; - } - goto frame_ok; - default: return ERR_UNREACHABLE; - } - } - - UnwindInfo *info = bpf_map_lookup_elem(&unwind_info_array, &unwindInfo); - if (!info) { - increment_metric(metricID_UnwindNativeErrBadUnwindInfoIndex); - DEBUG_PRINT("Giving up due to invalid unwind info array index"); - return ERR_NATIVE_BAD_UNWIND_INFO_INDEX; - } - - s32 param = info->param; - if (info->mergeOpcode) { - DEBUG_PRINT("AddrDiff %d, merged delta %#02x", addrDiff, info->mergeOpcode); - if (addrDiff >= (info->mergeOpcode & ~MERGEOPCODE_NEGATIVE)) { - param += (info->mergeOpcode & MERGEOPCODE_NEGATIVE) ? -8 : 8; - DEBUG_PRINT("Merged delta match: cfaDelta=%d", unwindInfo); - } - } - - // Resolve the frame CFA (previous PC is fixed to CFA) address - cfa = unwind_register_address(state, 0, info->opcode, param); - - // Resolve Return Address, it is either the value of link register or - // stack address where RA is stored - u64 ra = unwind_register_address(state, cfa, info->fpOpcode, info->fpParam); - if (ra) { - if (info->fpOpcode == UNWIND_OPCODE_BASE_LR) { - // Allow LR unwinding only if it's known to be valid: either because - // it's the topmost user-mode frame, or recovered by signal trampoline. - if (state->lr_invalid) { - increment_metric(metricID_UnwindNativeErrLrUnwindingMidTrace); - return ERR_NATIVE_LR_UNWINDING_MID_TRACE; - } - - // set return address location to link register - state->pc = ra; - } else { - DEBUG_PRINT("RA: %016llX", (u64)ra); - - // read the value of RA from stack - if (bpf_probe_read_user(&state->pc, sizeof(state->pc), (void *)ra)) { - // error reading memory, mark RA as invalid - ra = 0; - } - } - - state->pc = normalize_pac_ptr(state->pc); - } - - if (!ra) { - err_native_pc_read: - // report failure to resolve RA and stop unwinding - increment_metric(metricID_UnwindNativeErrPCRead); - DEBUG_PRINT("Giving up due to failure to resolve RA"); - return ERR_NATIVE_PC_READ; - } - - // Try to resolve frame pointer - // simple heuristic for FP based frames - // the GCC compiler usually generates stack frame records in such a way, - // so that FP/RA pair is at the bottom of a stack frame (stack frame - // record at lower addresses is followed by stack vars at higher ones) - // this implies that if no other changes are applied to the stack such - // as alloca(), following the prolog SP/FP points to the frame record - // itself, in such a case FP offset will be equal to 8 - if (info->fpParam == 8) { - // we can assume the presence of frame pointers - if (info->fpOpcode != UNWIND_OPCODE_BASE_LR) { - // FP precedes the RA on the stack (Aarch64 ABI requirement) - bpf_probe_read_user(&state->fp, sizeof(state->fp), (void *)(ra - 8)); - } - } - - state->sp = cfa; - unwinder_mark_nonleaf_frame(state); -frame_ok: - increment_metric(metricID_UnwindNativeFrames); - return ERR_OK; -} -#else - #error unsupported architecture -#endif +#include "native_stack_trace.h" // unwind_native is the tail call destination for PROG_UNWIND_NATIVE. static EBPF_INLINE int unwind_native(struct pt_regs *ctx) diff --git a/support/ebpf/native_stack_trace.h b/support/ebpf/native_stack_trace.h new file mode 100644 index 000000000..50fc4c5bd --- /dev/null +++ b/support/ebpf/native_stack_trace.h @@ -0,0 +1,526 @@ +#ifndef OPTI_NATIVE_STACK_TRACE_H +#define OPTI_NATIVE_STACK_TRACE_H + +// Unwind info value for invalid stack delta +#define STACK_DELTA_INVALID (STACK_DELTA_COMMAND_FLAG | UNWIND_COMMAND_INVALID) +#define STACK_DELTA_STOP (STACK_DELTA_COMMAND_FLAG | UNWIND_COMMAND_STOP) + +// The number of native frames to unwind per frame-unwinding eBPF program. +#define NATIVE_FRAMES_PER_PROGRAM 8 + +// Record a native frame +static EBPF_INLINE ErrorCode push_native(Trace *trace, u64 file, u64 line, bool return_address) +{ + return _push_with_return_address(trace, file, line, FRAME_MARKER_NATIVE, return_address); +} + +// A single step for the bsearch into the big_stack_deltas array. This is really a textbook bsearch +// step, built in a way to update the value of *lo and *hi. This function will be called repeatedly +// (since we cannot do loops). The return value signals whether the bsearch came to an end / found +// the right element or whether it needs to continue. +static EBPF_INLINE bool bsearch_step(void *inner_map, u32 *lo, u32 *hi, u16 page_offset) +{ + u32 pivot = (*lo + *hi) >> 1; + StackDelta *delta = bpf_map_lookup_elem(inner_map, &pivot); + if (!delta) { + *hi = 0; + return false; + } + if (page_offset >= delta->addrLow) { + *lo = pivot + 1; + } else { + *hi = pivot; + } + return *lo < *hi; +} + +// Get the outer map based on the number of stack delta entries. +static EBPF_INLINE void *get_stack_delta_map(int mapID) +{ + switch (mapID) { + case 8: return &exe_id_to_8_stack_deltas; + case 9: return &exe_id_to_9_stack_deltas; + case 10: return &exe_id_to_10_stack_deltas; + case 11: return &exe_id_to_11_stack_deltas; + case 12: return &exe_id_to_12_stack_deltas; + case 13: return &exe_id_to_13_stack_deltas; + case 14: return &exe_id_to_14_stack_deltas; + case 15: return &exe_id_to_15_stack_deltas; + case 16: return &exe_id_to_16_stack_deltas; + case 17: return &exe_id_to_17_stack_deltas; + case 18: return &exe_id_to_18_stack_deltas; + case 19: return &exe_id_to_19_stack_deltas; + case 20: return &exe_id_to_20_stack_deltas; + case 21: return &exe_id_to_21_stack_deltas; + case 22: return &exe_id_to_22_stack_deltas; + case 23: return &exe_id_to_23_stack_deltas; + default: return NULL; + } +} + +// Get the stack offset of the given instruction. +static EBPF_INLINE ErrorCode get_stack_delta(UnwindState *state, int *addrDiff, u32 *unwindInfo) +{ + u64 exe_id = state->text_section_id; + + // Look up the stack delta page information for this address. + StackDeltaPageKey key = {}; + key.fileID = state->text_section_id; + key.page = state->text_section_offset & ~STACK_DELTA_PAGE_MASK; + DEBUG_PRINT( + "Look up stack delta for %lx:%lx", + (unsigned long)state->text_section_id, + (unsigned long)state->text_section_offset); + StackDeltaPageInfo *info = bpf_map_lookup_elem(&stack_delta_page_to_info, &key); + if (!info) { + DEBUG_PRINT( + "Failure to look up stack delta page fileID %lx, page %lx", + (unsigned long)key.fileID, + (unsigned long)key.page); + state->error_metric = metricID_UnwindNativeErrLookupTextSection; + return ERR_NATIVE_LOOKUP_TEXT_SECTION; + } + + void *outer_map = get_stack_delta_map(info->mapID); + if (!outer_map) { + DEBUG_PRINT( + "Failure to look up outer map for text section %lx in mapID %d", + (unsigned long)exe_id, + (int)info->mapID); + state->error_metric = metricID_UnwindNativeErrLookupStackDeltaOuterMap; + return ERR_NATIVE_LOOKUP_STACK_DELTA_OUTER_MAP; + } + + void *inner_map = bpf_map_lookup_elem(outer_map, &exe_id); + if (!inner_map) { + DEBUG_PRINT("Failure to look up inner map for text section %lx", (unsigned long)exe_id); + state->error_metric = metricID_UnwindNativeErrLookupStackDeltaInnerMap; + return ERR_NATIVE_LOOKUP_STACK_DELTA_INNER_MAP; + } + + // Preinitialize the idx for the index to use for page without any deltas. + u32 idx = info->firstDelta; + u16 page_offset = state->text_section_offset & STACK_DELTA_PAGE_MASK; + if (info->numDeltas) { + // Page has deltas, so find the correct one to use using binary search. + u32 lo = info->firstDelta; + u32 hi = lo + info->numDeltas; + + DEBUG_PRINT( + "Intervals should be from %lu to %lu (mapID %d)", + (unsigned long)lo, + (unsigned long)hi, + (int)info->mapID); + + // Do the binary search, up to 16 iterations. Deltas are paged to 64kB pages. + // They can contain at most 64kB deltas even if everything is single byte opcodes. + int i; + UNROLL for (i = 0; i < 16; i++) + { + if (!bsearch_step(inner_map, &lo, &hi, page_offset)) { + break; + } + } + if (i >= 16 || hi == 0) { + DEBUG_PRINT("Failed bsearch in 16 steps. Corrupt data?"); + state->error_metric = metricID_UnwindNativeErrLookupIterations; + return ERR_NATIVE_EXCEEDED_DELTA_LOOKUP_ITERATIONS; + } + // After bsearch, 'hi' points to the first entry greater than the requested. + idx = hi; + } + + // The code above found the first entry with greater address than requested, + // so it needs to be decremented by one to get the entry with equal-or-less. + // This makes also the logic work cross-pages: if the first entry in within + // the page is too large, this actually gets the entry from the previous page. + idx--; + + StackDelta *delta = bpf_map_lookup_elem(inner_map, &idx); + if (!delta) { + state->error_metric = metricID_UnwindNativeErrLookupRange; + return ERR_NATIVE_LOOKUP_RANGE; + } + + DEBUG_PRINT( + "delta index %d, addrLow 0x%x, unwindInfo %d", idx, delta->addrLow, delta->unwindInfo); + + // Calculate PC delta from stack delta for merged delta comparison + int deltaOffset = (int)page_offset - (int)delta->addrLow; + if (idx < info->firstDelta) { + // PC is below the first delta of the corresponding page. This means that + // delta->addrLow contains address relative to one page before the page_offset. + // Fix up the deltaOffset with this difference of base pages. + deltaOffset += 1 << STACK_DELTA_PAGE_BITS; + } + + *addrDiff = deltaOffset; + *unwindInfo = delta->unwindInfo; + + if (delta->unwindInfo == STACK_DELTA_INVALID) { + state->error_metric = metricID_UnwindNativeErrStackDeltaInvalid; + return ERR_NATIVE_STACK_DELTA_INVALID; + } + if (delta->unwindInfo == STACK_DELTA_STOP) { + increment_metric(metricID_UnwindNativeStackDeltaStop); + } + + return ERR_OK; +} + +// unwind_register_address calculates the given expression ('opcode'/'param') to get +// the CFA (canonical frame address, to recover PC and be used in further calculations), +// or the address where a register is stored (FP currently), so that the value of +// the register can be recovered. +// +// Currently the following expressions are supported: +// 1. Not recoverable -> NULL is returned. +// 2. When UNWIND_OPCODEF_DEREF is not set: +// BASE + param +// 3. When UNWIND_OPCODEF_DEREF is set: +// *(BASE + preDeref) + postDeref +static EBPF_INLINE u64 unwind_register_address(UnwindState *state, u64 cfa, u8 opcode, s32 param) +{ + unsigned long addr, val; + s32 preDeref = param, postDeref = 0; + + if (opcode & UNWIND_OPCODEF_DEREF) { + // For expressions that dereference the base expression, the parameter is constructed + // of pre-dereference and post-derefence operands. Unpack those. + preDeref &= ~UNWIND_DEREF_MASK; + postDeref = (param & UNWIND_DEREF_MASK) * UNWIND_DEREF_MULTIPLIER; + } + + // Resolve the 'BASE' register, and fetch the CFA/FP/SP value. + switch (opcode & ~UNWIND_OPCODEF_DEREF) { + case UNWIND_OPCODE_BASE_CFA: addr = cfa; break; + case UNWIND_OPCODE_BASE_FP: addr = state->fp; break; + case UNWIND_OPCODE_BASE_SP: addr = state->sp; break; +#if defined(__aarch64__) + case UNWIND_OPCODE_BASE_LR: + DEBUG_PRINT("unwind: lr"); + + if (state->lr == 0) { + increment_metric(metricID_UnwindNativeLr0); + DEBUG_PRINT("Failure to unwind frame: zero LR at %llx", state->pc); + return 0; + } + + return state->lr; +#endif +#if defined(__x86_64__) + case UNWIND_OPCODE_BASE_REG: + val = (param & ~UNWIND_REG_MASK) >> 1; + DEBUG_PRINT("unwind: r%d+%lu", param & UNWIND_REG_MASK, val); + switch (param & UNWIND_REG_MASK) { + case 0: // rax + addr = state->rax; + break; + case 9: // r9 + addr = state->r9; + break; + case 11: // r11 + addr = state->r11; + break; + case 15: // r15 + addr = state->r15; + break; + default: return 0; + } + return addr + val; +#endif + default: return 0; + } + +#ifdef OPTI_DEBUG + switch (opcode) { + case UNWIND_OPCODE_BASE_CFA: DEBUG_PRINT("unwind: cfa+%d", preDeref); break; + case UNWIND_OPCODE_BASE_FP: DEBUG_PRINT("unwind: fp+%d", preDeref); break; + case UNWIND_OPCODE_BASE_SP: DEBUG_PRINT("unwind: sp+%d", preDeref); break; + case UNWIND_OPCODE_BASE_CFA | UNWIND_OPCODEF_DEREF: + DEBUG_PRINT("unwind: *(cfa+%d)+%d", preDeref, postDeref); + break; + case UNWIND_OPCODE_BASE_FP | UNWIND_OPCODEF_DEREF: + DEBUG_PRINT("unwind: *(fp+%d)+%d", preDeref, postDeref); + break; + case UNWIND_OPCODE_BASE_SP | UNWIND_OPCODEF_DEREF: + DEBUG_PRINT("unwind: *(sp+%d)+%d", preDeref, postDeref); + break; + } +#endif + + // Adjust based on parameter / preDereference adder. + addr += preDeref; + if ((opcode & UNWIND_OPCODEF_DEREF) == 0) { + // All done: return "BASE + param" + return addr; + } + + // Dereference, and add the postDereference adder. + if (bpf_probe_read_user(&val, sizeof(val), (void *)addr)) { + DEBUG_PRINT("unwind failed to dereference address 0x%lx", addr); + return 0; + } + // Return: "*(BASE + preDeref) + postDeref" + return val + postDeref; +} + +// Stack unwinding in the absence of frame pointers can be a bit involved, so +// this comment explains what the following code does. +// +// One begins unwinding a frame somewhere in the middle of execution. +// On x86_64, registers RIP (PC), RSP (SP), and RBP (FP) are available. +// +// This function resolves a "stack delta" command from from our internal maps. +// This stack delta refers to a rule on how to unwind the state. In the simple +// case it just provides SP delta and potentially offset from where to recover +// FP value. See unwind_register_address() on the expressions supported. +// +// The function sets the bool pointed to by the given `stop` pointer to `false` +// if the main ebpf unwinder should exit. This is the case if the current PC +// is marked with UNWIND_COMMAND_STOP which marks entry points (main function, +// thread spawn function, signal handlers, ...). +#if defined(__x86_64__) +static EBPF_INLINE ErrorCode unwind_one_frame(PerCPURecord *record, bool *stop) +{ + UnwindState *state = &record->state; + *stop = false; + + u32 unwindInfo = 0; + int addrDiff = 0; + u64 cfa = 0; + + // The relevant executable is compiled with frame pointer omission, so + // stack deltas need to be retrieved from the relevant map. + ErrorCode error = get_stack_delta(state, &addrDiff, &unwindInfo); + if (error) { + return error; + } + + if (unwindInfo & STACK_DELTA_COMMAND_FLAG) { + switch (unwindInfo & ~STACK_DELTA_COMMAND_FLAG) { + case UNWIND_COMMAND_PLT: + // The toolchains routinely emit a fixed DWARF expression to unwind the full + // PLT table with one expression to reduce .eh_frame size. + // This is the hard coded implementation of this expression. For further details, + // see https://hal.inria.fr/hal-02297690/document, page 4. (DOI: 10.1145/3360572) + cfa = state->sp + 8 + ((((state->pc & 15) >= 11) ? 1 : 0) << 3); + DEBUG_PRINT("PLT, cfa=0x%lx", (unsigned long)cfa); + break; + case UNWIND_COMMAND_SIGNAL: { + // Use the PerCPURecord scratch union instead of a stack-local buffer to avoid + // exceeding the 512-byte BPF stack limit when inlined into interpreters. + u64 *rt_regs = record->rt_regs; + // The rt_sigframe is defined at: + // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/x86/include/asm/sigframe.h?h=v6.4#n59 + // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/x86/include/uapi/asm/sigcontext.h?h=v6.4#n238 + // offsetof(struct rt_sigframe, uc.uc_mcontext) = 40 + if (bpf_probe_read_user(rt_regs, 18 * sizeof(u64), (void *)(state->sp + 40))) { + goto err_native_pc_read; + } + state->rax = rt_regs[13]; + state->r9 = rt_regs[1]; + state->r11 = rt_regs[3]; + state->r13 = rt_regs[5]; + state->r15 = rt_regs[7]; + state->fp = rt_regs[10]; + state->sp = rt_regs[15]; + state->pc = rt_regs[16]; + state->return_address = false; + DEBUG_PRINT("signal frame"); + goto frame_ok; + } + case UNWIND_COMMAND_STOP: *stop = true; return ERR_OK; + case UNWIND_COMMAND_FRAME_POINTER: + if (!unwinder_unwind_frame_pointer(state)) { + goto err_native_pc_read; + } + goto frame_ok; + case UNWIND_COMMAND_GO_MORESTACK: + if (!unwinder_unwind_go_morestack(record)) { + goto err_native_pc_read; + } + goto frame_ok; + default: return ERR_UNREACHABLE; + } + } else { + UnwindInfo *info = bpf_map_lookup_elem(&unwind_info_array, &unwindInfo); + if (!info) { + increment_metric(metricID_UnwindNativeErrBadUnwindInfoIndex); + return ERR_NATIVE_BAD_UNWIND_INFO_INDEX; + } + + s32 param = info->param; + if (info->mergeOpcode) { + DEBUG_PRINT("AddrDiff %d, merged delta %#02x", addrDiff, info->mergeOpcode); + if (addrDiff >= (info->mergeOpcode & ~MERGEOPCODE_NEGATIVE)) { + param += (info->mergeOpcode & MERGEOPCODE_NEGATIVE) ? -8 : 8; + DEBUG_PRINT("Merged delta match: cfaDelta=%d", unwindInfo); + } + } + + // Resolve the frame's CFA (previous PC is fixed to CFA) address, and + // the previous FP address if any. + cfa = unwind_register_address(state, 0, info->opcode, param); + u64 fpa = unwind_register_address(state, cfa, info->fpOpcode, info->fpParam); + + if (fpa) { + bpf_probe_read_user(&state->fp, sizeof(state->fp), (void *)fpa); + } else if (info->opcode == UNWIND_OPCODE_BASE_FP) { + // FP used for recovery, but no new FP value received, clear FP + state->fp = 0; + } + } + + if (!cfa || bpf_probe_read_user(&state->pc, sizeof(state->pc), (void *)(cfa - 8))) { + err_native_pc_read: + increment_metric(metricID_UnwindNativeErrPCRead); + return ERR_NATIVE_PC_READ; + } + state->sp = cfa; + unwinder_mark_nonleaf_frame(state); +frame_ok: + increment_metric(metricID_UnwindNativeFrames); + return ERR_OK; +} +#elif defined(__aarch64__) +static EBPF_INLINE ErrorCode unwind_one_frame(struct PerCPURecord *record, bool *stop) +{ + UnwindState *state = &record->state; + *stop = false; + + u32 unwindInfo = 0; + int addrDiff = 0; + u64 cfa = 0; + + // The relevant executable is compiled with frame pointer omission, so + // stack deltas need to be retrieved from the relevant map. + ErrorCode error = get_stack_delta(state, &addrDiff, &unwindInfo); + if (error) { + return error; + } + + if (unwindInfo & STACK_DELTA_COMMAND_FLAG) { + switch (unwindInfo & ~STACK_DELTA_COMMAND_FLAG) { + case UNWIND_COMMAND_SIGNAL: { + // Use the PerCPURecord scratch union instead of a stack-local buffer to avoid + // exceeding the 512-byte BPF stack limit when inlined into interpreters. + u64 *rt_regs = record->rt_regs; + // On aarch64 the struct rt_sigframe is at: + // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/arm64/kernel/signal.c?h=v6.4#n39 + // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/arch/arm64/include/uapi/asm/sigcontext.h?h=v6.4#n28 + // offsetof(struct rt_sigframe, uc.uc_mcontext.regs[0]) = 312 + // offsetof(struct rt_sigframe, uc) 128 + + // offsetof(struct ucontext, uc_mcontext) 176 + + // offsetof(struct sigcontext, regs[0]) 8 + if (bpf_probe_read_user(rt_regs, 34 * sizeof(u64), (void *)(state->sp + 312))) { + goto err_native_pc_read; + } + state->pc = normalize_pac_ptr(rt_regs[32]); + state->sp = rt_regs[31]; + state->fp = rt_regs[29]; + state->lr = normalize_pac_ptr(rt_regs[30]); + state->r7 = rt_regs[7]; + state->r22 = rt_regs[22]; + state->r28 = rt_regs[28]; + state->return_address = false; + state->lr_invalid = false; + DEBUG_PRINT("signal frame"); + goto frame_ok; + } + case UNWIND_COMMAND_STOP: *stop = true; return ERR_OK; + case UNWIND_COMMAND_FRAME_POINTER: + if (!unwinder_unwind_frame_pointer(state)) { + goto err_native_pc_read; + } + goto frame_ok; + case UNWIND_COMMAND_GO_MORESTACK: + if (!unwinder_unwind_go_morestack(record)) { + goto err_native_pc_read; + } + goto frame_ok; + default: return ERR_UNREACHABLE; + } + } + + UnwindInfo *info = bpf_map_lookup_elem(&unwind_info_array, &unwindInfo); + if (!info) { + increment_metric(metricID_UnwindNativeErrBadUnwindInfoIndex); + DEBUG_PRINT("Giving up due to invalid unwind info array index"); + return ERR_NATIVE_BAD_UNWIND_INFO_INDEX; + } + + s32 param = info->param; + if (info->mergeOpcode) { + DEBUG_PRINT("AddrDiff %d, merged delta %#02x", addrDiff, info->mergeOpcode); + if (addrDiff >= (info->mergeOpcode & ~MERGEOPCODE_NEGATIVE)) { + param += (info->mergeOpcode & MERGEOPCODE_NEGATIVE) ? -8 : 8; + DEBUG_PRINT("Merged delta match: cfaDelta=%d", unwindInfo); + } + } + + // Resolve the frame CFA (previous PC is fixed to CFA) address + cfa = unwind_register_address(state, 0, info->opcode, param); + + // Resolve Return Address, it is either the value of link register or + // stack address where RA is stored + u64 ra = unwind_register_address(state, cfa, info->fpOpcode, info->fpParam); + if (ra) { + if (info->fpOpcode == UNWIND_OPCODE_BASE_LR) { + // Allow LR unwinding only if it's known to be valid: either because + // it's the topmost user-mode frame, or recovered by signal trampoline. + if (state->lr_invalid) { + increment_metric(metricID_UnwindNativeErrLrUnwindingMidTrace); + return ERR_NATIVE_LR_UNWINDING_MID_TRACE; + } + + // set return address location to link register + state->pc = ra; + } else { + DEBUG_PRINT("RA: %016llX", (u64)ra); + + // read the value of RA from stack + if (bpf_probe_read_user(&state->pc, sizeof(state->pc), (void *)ra)) { + // error reading memory, mark RA as invalid + ra = 0; + } + } + + state->pc = normalize_pac_ptr(state->pc); + } + + if (!ra) { + err_native_pc_read: + // report failure to resolve RA and stop unwinding + increment_metric(metricID_UnwindNativeErrPCRead); + DEBUG_PRINT("Giving up due to failure to resolve RA"); + return ERR_NATIVE_PC_READ; + } + + // Try to resolve frame pointer + // simple heuristic for FP based frames + // the GCC compiler usually generates stack frame records in such a way, + // so that FP/RA pair is at the bottom of a stack frame (stack frame + // record at lower addresses is followed by stack vars at higher ones) + // this implies that if no other changes are applied to the stack such + // as alloca(), following the prolog SP/FP points to the frame record + // itself, in such a case FP offset will be equal to 8 + if (info->fpParam == 8) { + // we can assume the presence of frame pointers + if (info->fpOpcode != UNWIND_OPCODE_BASE_LR) { + // FP precedes the RA on the stack (Aarch64 ABI requirement) + bpf_probe_read_user(&state->fp, sizeof(state->fp), (void *)(ra - 8)); + } + } + + state->sp = cfa; + unwinder_mark_nonleaf_frame(state); +frame_ok: + increment_metric(metricID_UnwindNativeFrames); + return ERR_OK; +} +#else + #error unsupported architecture +#endif + +#endif // OPTI_NATIVE_STACK_TRACE_H diff --git a/support/ebpf/python_tracer.ebpf.c b/support/ebpf/python_tracer.ebpf.c index b07f2fb47..53b44f2b3 100644 --- a/support/ebpf/python_tracer.ebpf.c +++ b/support/ebpf/python_tracer.ebpf.c @@ -2,15 +2,11 @@ #include "bpfdefs.h" #include "errors.h" +#include "stackdeltatypes.h" #include "tracemgmt.h" #include "tsd.h" #include "types.h" -// The number of Python frames to unwind per frame-unwinding eBPF program. If -// we start running out of instructions in the walk_python_stack program, one -// option is to adjust this number downwards. -#define FRAMES_PER_WALK_PYTHON_STACK 12 - // Forward declaration to avoid warnings like // "declaration of 'struct pt_regs' will not be visible outside of this function [-Wvisibility]". struct pt_regs; @@ -127,10 +123,17 @@ static EBPF_INLINE ErrorCode process_python_frame( } // Read PyCodeObject - if (bpf_probe_read_user(pss->code, sizeof(pss->code), py_codeobject)) { - DEBUG_PRINT("Failed to read PyCodeObject at 0x%lx", (unsigned long)(py_codeobject)); + long pycode_err = bpf_probe_read_user(pss->code, sizeof(pss->code), py_codeobject); + if (pycode_err) { + DEBUG_PRINT( + "Failed to read PyCodeObject at 0x%lx err=%ld", (unsigned long)(py_codeobject), pycode_err); increment_metric(metricID_UnwindPythonErrBadCodeObjectArgCountAddr); - return ERR_PYTHON_BAD_CODE_OBJECT_ADDR; + // Push the frame with the code object address so the agent can try to + // read it via /proc/pid/mem (which supports page faults unlike BPF). + // codeobject_id=0 distinguishes this from a successful read. + file_id = (u64)py_codeobject; + lineno = py_encode_lineno(0, (u32)py_f_lasti); + goto push_frame; } int py_argcount = *(int *)(&pss->code[pyinfo->PyCodeObject_co_argcount]); @@ -155,40 +158,6 @@ static EBPF_INLINE ErrorCode process_python_frame( return ERR_OK; } -static EBPF_INLINE ErrorCode -walk_python_stack(PerCPURecord *record, const PyProcInfo *pyinfo, int *unwinder) -{ - void *py_frame = record->pythonUnwindState.py_frame; - ErrorCode error = ERR_OK; - *unwinder = PROG_UNWIND_STOP; - - UNROLL for (u32 i = 0; i < FRAMES_PER_WALK_PYTHON_STACK; ++i) - { - bool continue_with_next; - error = process_python_frame(record, pyinfo, &py_frame, &continue_with_next); - if (error) { - goto stop; - } - if (continue_with_next) { - *unwinder = get_next_unwinder_after_interpreter(); - goto stop; - } - if (!py_frame) { - goto stop; - } - } - - *unwinder = PROG_UNWIND_PYTHON; - -stop: - // Set up the state for the next invocation of this unwinding program. - if (error || !py_frame) { - unwinder_mark_done(record, PROG_UNWIND_PYTHON); - } - record->pythonUnwindState.py_frame = py_frame; - return error; -} - // get_PyThreadState retrieves the PyThreadState* for the current thread. // // Python sets the thread_state using pthread_setspecific with the key @@ -276,6 +245,64 @@ static EBPF_INLINE ErrorCode get_PyFrame(const PyProcInfo *pyinfo, void **frame) return ERR_OK; } +#include "native_stack_trace.h" + +// Number of loop iterations in unwind_python. Each iteration handles either +// one Python frame or one native frame depending on the current unwinder state. +// This bounds the BPF verifier instruction count. +#define PYTHON_NATIVE_LOOP_ITERS 9 + +// step_python processes one Python frame and updates *unwinder to indicate +// what should happen next: PROG_UNWIND_NATIVE to unwind a native boundary +// frame, PROG_UNWIND_PYTHON to tail-call back (more frames but budget +// exhausted), or PROG_UNWIND_STOP when all Python frames are done. +static EBPF_INLINE ErrorCode +step_python(PerCPURecord *record, const PyProcInfo *pyinfo, void **py_frame, int *unwinder) +{ + bool continue_with_next; + ErrorCode error = process_python_frame(record, pyinfo, py_frame, &continue_with_next); + if (error) { + *unwinder = PROG_UNWIND_STOP; + return error; + } + if (continue_with_next) { + *unwinder = get_next_unwinder_after_interpreter(); + } else if (!*py_frame) { + *unwinder = PROG_UNWIND_STOP; + } else { + // More Python frames but loop budget will be exhausted; tail call to self. + *unwinder = PROG_UNWIND_PYTHON; + } + return ERR_OK; +} + +// step_native processes one native frame at an interpreter boundary and +// updates *unwinder: PROG_UNWIND_PYTHON when we've crossed back into Python, +// or whatever get_next_unwinder_after_native_frame returns otherwise. +static EBPF_INLINE ErrorCode step_native(PerCPURecord *record, int *unwinder) +{ + Trace *trace = &record->trace; + *unwinder = PROG_UNWIND_STOP; + + increment_metric(metricID_UnwindNativeAttempts); + ErrorCode error = push_native( + trace, + record->state.text_section_id, + record->state.text_section_offset, + record->state.return_address); + if (error) { + return error; + } + + bool stop; + error = unwind_one_frame(record, &stop); + if (error || stop) { + return error; + } + + return get_next_unwinder_after_native_frame(record, unwinder); +} + // unwind_python is the entry point for tracing when invoked from the native tracer // or interpreter dispatcher. It does not reset the trace object and will append the // Python stack frames to the trace object for the current CPU. @@ -286,7 +313,7 @@ static EBPF_INLINE int unwind_python(struct pt_regs *ctx) return -1; ErrorCode error = ERR_OK; - int unwinder = get_next_unwinder_after_interpreter(); + int unwinder = PROG_UNWIND_PYTHON; Trace *trace = &record->trace; u32 pid = trace->pid; @@ -314,7 +341,26 @@ static EBPF_INLINE int unwind_python(struct pt_regs *ctx) goto exit; } - error = walk_python_stack(record, pyinfo, &unwinder); + { + void *py_frame = record->pythonUnwindState.py_frame; + + for (int t = 0; t < PYTHON_NATIVE_LOOP_ITERS; t++) { + switch (unwinder) { + case PROG_UNWIND_PYTHON: error = step_python(record, pyinfo, &py_frame, &unwinder); break; + case PROG_UNWIND_NATIVE: error = step_native(record, &unwinder); break; + default: goto done; + } + if (error) { + goto done; + } + } + + done: + if (error || !py_frame) { + unwinder_mark_done(record, PROG_UNWIND_PYTHON); + } + record->pythonUnwindState.py_frame = py_frame; + } exit: record->state.unwind_error = error; diff --git a/support/ebpf/tracer.ebpf.amd64 b/support/ebpf/tracer.ebpf.amd64 index e364a10b2..3b68f7aa1 100644 Binary files a/support/ebpf/tracer.ebpf.amd64 and b/support/ebpf/tracer.ebpf.amd64 differ diff --git a/support/ebpf/tracer.ebpf.arm64 b/support/ebpf/tracer.ebpf.arm64 index 326535360..892039c0e 100644 Binary files a/support/ebpf/tracer.ebpf.arm64 and b/support/ebpf/tracer.ebpf.arm64 differ diff --git a/tracehandler/tracehandler.go b/tracehandler/tracehandler.go index 09cf1d507..e16a62239 100644 --- a/tracehandler/tracehandler.go +++ b/tracehandler/tracehandler.go @@ -34,6 +34,12 @@ type Times interface { // symbolization efforts. var traceCacheLifetime = 5 * time.Minute +// TraceInterceptor is called after ConvertTrace with the symbolized trace, +// metadata, and original BPF trace. Return true to consume the trace +// (skip caching and reporting), false to proceed normally. +type TraceInterceptor func(trace *libpf.Trace, meta *samples.TraceEventMeta, + rawTrace *host.Trace) bool + // TraceProcessor is an interface used by traceHandler to convert traces // from a form received from eBPF to the form we wish to dispatch to the // collection agent. @@ -72,12 +78,17 @@ type traceHandler struct { // reporter instance to use to send out traces. reporter reporter.TraceReporter + // interceptor, if set, is called after ConvertTrace on cache-miss. + // If it returns true the trace is consumed and not cached or reported. + interceptor TraceInterceptor + times Times } // newTraceHandler creates a new traceHandler func newTraceHandler(ctx context.Context, rep reporter.TraceReporter, - traceProcessor TraceProcessor, intervals Times, cacheSize uint32) (*traceHandler, error) { + traceProcessor TraceProcessor, intervals Times, cacheSize uint32, + interceptor TraceInterceptor) (*traceHandler, error) { traceCache, err := lru.NewSynced[host.TraceHash, libpf.Trace]( cacheSize, func(k host.TraceHash) uint32 { return uint32(k) }) if err != nil { @@ -111,6 +122,7 @@ func newTraceHandler(ctx context.Context, rep reporter.TraceReporter, traceProcessor: traceProcessor, traceCache: traceCache, reporter: rep, + interceptor: interceptor, times: intervals, }, nil } @@ -150,6 +162,13 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) { panic(err) } log.Debugf("Trace hash remap 0x%x -> 0x%x", bpfTrace.Hash, umTrace.Hash) + + // If an interceptor is set and consumes the trace, skip caching and reporting. + // CUDA traces are always intercepted here and never cached. + if m.interceptor != nil && m.interceptor(umTrace, meta, bpfTrace) { + return + } + m.traceCache.Add(bpfTrace.Hash, *umTrace) meta.APMServiceName = m.traceProcessor.MaybeNotifyAPMAgent(bpfTrace, umTrace.Hash, 1) @@ -164,9 +183,10 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) { // to exit after a cancellation through the context. func Start(ctx context.Context, rep reporter.TraceReporter, traceProcessor TraceProcessor, traceInChan <-chan *host.Trace, intervals Times, cacheSize uint32, + interceptor TraceInterceptor, ) (workerExited <-chan libpf.Void, err error) { handler, err := - newTraceHandler(ctx, rep, traceProcessor, intervals, cacheSize) + newTraceHandler(ctx, rep, traceProcessor, intervals, cacheSize, interceptor) if err != nil { return nil, fmt.Errorf("failed to create traceHandler: %v", err) } diff --git a/tracehandler/tracehandler_test.go b/tracehandler/tracehandler_test.go index c3cf98508..b3a0d10a4 100644 --- a/tracehandler/tracehandler_test.go +++ b/tracehandler/tracehandler_test.go @@ -67,6 +67,100 @@ func (m *mockReporter) ReportTraceEvent(trace *libpf.Trace, _ *samples.TraceEven return nil } +func TestTraceInterceptor(t *testing.T) { + tests := map[string]struct { + // interceptReturn controls what the interceptor returns for each trace. + // true = consumed (should NOT appear in reporter), false = pass through. + interceptReturn map[host.TraceHash]bool + input []arguments + expectedEvents map[libpf.TraceHash]uint16 + }{ + "interceptor consumes trace": { + interceptReturn: map[host.TraceHash]bool{ + host.TraceHash(0xaa): true, + }, + input: []arguments{ + {trace: &host.Trace{Hash: host.TraceHash(0xaa)}}, + }, + expectedEvents: nil, // consumed, nothing reported + }, + "interceptor passes through": { + interceptReturn: map[host.TraceHash]bool{ + host.TraceHash(0xbb): false, + }, + input: []arguments{ + {trace: &host.Trace{Hash: host.TraceHash(0xbb)}}, + }, + expectedEvents: map[libpf.TraceHash]uint16{ + libpf.NewTraceHash(0xbb, 0xbb): 1, + }, + }, + "mix of consumed and passed": { + interceptReturn: map[host.TraceHash]bool{ + host.TraceHash(1): true, // consumed + host.TraceHash(2): false, // passed + host.TraceHash(3): true, // consumed + }, + input: []arguments{ + {trace: &host.Trace{Hash: host.TraceHash(1)}}, + {trace: &host.Trace{Hash: host.TraceHash(2)}}, + {trace: &host.Trace{Hash: host.TraceHash(3)}}, + }, + expectedEvents: map[libpf.TraceHash]uint16{ + libpf.NewTraceHash(2, 2): 1, + }, + }, + "consumed trace not cached on repeat": { + interceptReturn: map[host.TraceHash]bool{ + host.TraceHash(0xcc): true, + }, + input: []arguments{ + {trace: &host.Trace{Hash: host.TraceHash(0xcc)}}, + {trace: &host.Trace{Hash: host.TraceHash(0xcc)}}, + }, + expectedEvents: nil, // both consumed, nothing reported + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + r := &mockReporter{ + t: t, + reports: make(map[libpf.TraceHash]uint16), + } + + var interceptCalls int + interceptor := func(trace *libpf.Trace, meta *samples.TraceEventMeta, + rawTrace *host.Trace) bool { + interceptCalls++ + return test.interceptReturn[rawTrace.Hash] + } + + traceChan := make(chan *host.Trace) + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + exitNotify, err := tracehandler.Start(ctx, r, &fakeTraceProcessor{}, + traceChan, defaultTimes(), 128, interceptor) + require.NoError(t, err) + + for _, input := range test.input { + traceChan <- input.trace + } + + cancel() + <-exitNotify + + expected := test.expectedEvents + if expected == nil { + expected = map[libpf.TraceHash]uint16{} + } + if !maps.Equal(r.reports, expected) { + t.Fatalf("Expected %#v but got %#v", expected, r.reports) + } + }) + } +} + func TestTraceHandler(t *testing.T) { tests := map[string]struct { input []arguments @@ -108,7 +202,7 @@ func TestTraceHandler(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() exitNotify, err := tracehandler.Start(ctx, r, &fakeTraceProcessor{}, - traceChan, defaultTimes(), 128) + traceChan, defaultTimes(), 128, nil) require.NoError(t, err) for _, input := range test.input {