Skip to content

Commit 59e5795

Browse files
christos68kgnurizen
authored andcommitted
tracer: Add limit to number of trace events processed in one batch (open-telemetry#596)
1 parent d096854 commit 59e5795

1 file changed

Lines changed: 45 additions & 4 deletions

File tree

tracer/events.go

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ const (
3434
// so that the hostagent startup phase can wait on most PID notifications
3535
// to be processed before starting the tracer.
3636
pidEventBufferSize = 10
37+
// Maximum number of trace events to process in one batch. This is used as a
38+
// safe threshold for when off-cpu profiling is enabled, as the kernel can generate
39+
// enough events to completely monopolize userspace processing. If more than maxEvents
40+
// events are produced by the kernel between two polling intervals, the queue from bpf
41+
// to userspace will fill up and the kernel will start dropping events.
42+
maxEvents = 4096
3743
)
3844

3945
// StartPIDEventProcessor spawns a goroutine to process PID events.
@@ -151,28 +157,55 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
151157
go func() {
152158
var data perf.Record
153159
var oldKTime, minKTime times.KTime
160+
var eventCount int
154161

155162
pollTicker := time.NewTicker(t.intervals.TracePollInterval())
156163
defer pollTicker.Stop()
157-
158164
PollLoop:
159165
for {
166+
// We use two selects to avoid starvation in scenarios where the kernel
167+
// is generating a lot of events.
160168
select {
161-
case <-pollTicker.C:
162-
// Continue execution below.
169+
// Always check for context cancellation in each iteration
170+
case <-ctx.Done():
171+
break PollLoop
172+
default:
173+
// Continue below
174+
}
175+
176+
select {
177+
// This context cancellation check may not execute in timely manner
163178
case <-ctx.Done():
164179
break PollLoop
180+
case <-pollTicker.C:
181+
// Continue execution below
165182
}
166183

184+
eventCount = 0
167185
minKTime = 0
168-
// Eagerly read events until the buffer is exhausted.
186+
187+
// Eagerly read events until the buffer is exhausted or we reach maxEvents
169188
for {
170189
if err = eventReader.ReadInto(&data); err != nil {
171190
if !errors.Is(err, os.ErrDeadlineExceeded) {
172191
readErrorCount.Add(1)
173192
}
174193
break
175194
}
195+
196+
// There's a theoretical possibility that this inner loop never exits if the
197+
// following two error cases are continuously being hit. In practice this would
198+
// mean that userspace doesn't manage to make ANY progress when reading events
199+
// (eventCount never reaching maxEvents and underlying buffers never being empty),
200+
// something that should not happen even with off-cpu at maximum sampling rates:
201+
// probabilistically, there should always be some events read per X iterations.
202+
// We could add a secondary fallback (ideally deterministic, e.g. maximum time
203+
// elapsed) to guard against that possibility if we see it as a concern (currently
204+
// not done).
205+
//
206+
// Regardless, the current data transmission architecture from kernel to user and
207+
// the -serial- event processing pipeline in the rest of the agent is not designed
208+
// for the data volumes that off-cpu profiling can generate and should be revisited.
176209
if data.LostSamples != 0 {
177210
lostEventsCount.Add(data.LostSamples)
178211
continue
@@ -182,12 +215,20 @@ func (t *Tracer) startTraceEventMonitor(ctx context.Context,
182215
continue
183216
}
184217

218+
eventCount++
219+
185220
// Keep track of min KTime seen in this batch processing loop
186221
trace := t.loadBpfTrace(data.RawSample, data.CPU)
187222
if minKTime == 0 || trace.KTime < minKTime {
188223
minKTime = trace.KTime
189224
}
225+
// TODO: This per-event channel send couples event processing in the rest of
226+
// the agent with event reading from the perf buffers slowing down the latter.
190227
traceOutChan <- trace
228+
if eventCount == maxEvents {
229+
// Break this inner loop to ensure ProcessedUntil logic executes
230+
break
231+
}
191232
}
192233
// After we've received and processed all trace events, call
193234
// ProcessedUntil if there is a pending oldKTime that we

0 commit comments

Comments
 (0)