Skip to content

Commit 6a9f496

Browse files
committed
fix(tracing): bound span queue + count drops + bounded transient retry
Under load, span export failures were silently dropped and unbounded queue growth was invisible. Add observability and a narrow retry: - Bound the queue (AGENTEX_SPAN_QUEUE_MAX_SIZE, 0=unbounded default) and expose a dropped_spans counter + depth so span loss is measurable instead of silent. - Re-enqueue only transient HTTP failures {429,500,502,503,504} up to AGENTEX_SPAN_QUEUE_MAX_RETRIES (default 1 = no retry). Auth/4xx (e.g. the 401s seen in the load test) and non-HTTP exceptions are dropped and counted, never retried, preserving the drain-continues-on-error contract. Defaults preserve prior behavior (unbounded, no retry).
1 parent 9bb4ae6 commit 6a9f496

2 files changed

Lines changed: 295 additions & 42 deletions

File tree

src/agentex/lib/core/tracing/span_queue.py

Lines changed: 156 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,46 @@
1515

1616
_DEFAULT_BATCH_SIZE = 50
1717
_DEFAULT_LINGER_MS = 100
18+
# 0 == unbounded (preserves prior behavior). A bound makes backpressure
19+
# visible (dropped spans are counted) and caps worst-case memory.
20+
_DEFAULT_MAX_SIZE = 0
21+
# Total attempts per batch for a *transient* failure (1 == no retry).
22+
_DEFAULT_MAX_RETRIES = 1
23+
# HTTP statuses worth retrying at the queue level. These are explicit
24+
# backpressure / transient signals; everything else (esp. 401/403/4xx auth and
25+
# validation errors) is a permanent failure that re-enqueuing cannot fix. Note
26+
# the underlying SGP client already retries these internally, so queue-level
27+
# retry only helps when its budget is exhausted by a longer blip.
28+
_RETRYABLE_STATUS_CODES = frozenset({429, 500, 502, 503, 504})
29+
30+
31+
def _read_int_env(name: str, default: int, *, minimum: int = 0) -> int:
32+
"""Read a non-negative int from the environment, clamping to ``minimum``
33+
and falling back to ``default`` when unset or unparseable."""
34+
raw = os.environ.get(name)
35+
if raw is None:
36+
return default
37+
try:
38+
return max(minimum, int(raw))
39+
except ValueError:
40+
logger.warning("Ignoring invalid %s=%r; using default %d", name, raw, default)
41+
return default
1842

1943

2044
def _read_linger_ms_env() -> int:
2145
"""Read AGENTEX_SPAN_QUEUE_LINGER_MS from the environment, falling back to
2246
_DEFAULT_LINGER_MS when unset or unparseable. Negative values are clamped
2347
to 0 (i.e. "drain immediately, no linger")."""
24-
raw = os.environ.get("AGENTEX_SPAN_QUEUE_LINGER_MS")
25-
if raw is None:
26-
return _DEFAULT_LINGER_MS
27-
try:
28-
return max(0, int(raw))
29-
except ValueError:
30-
logger.warning(
31-
"Ignoring invalid AGENTEX_SPAN_QUEUE_LINGER_MS=%r; using default %d ms",
32-
raw,
33-
_DEFAULT_LINGER_MS,
34-
)
35-
return _DEFAULT_LINGER_MS
48+
return _read_int_env("AGENTEX_SPAN_QUEUE_LINGER_MS", _DEFAULT_LINGER_MS)
49+
50+
51+
def _is_retryable_exc(exc: BaseException) -> bool:
52+
"""A failure is retryable only when it carries an HTTP ``status_code`` in
53+
the retryable set. Connection/timeout errors (no status_code) have already
54+
been retried by the SGP client, and bare exceptions (programming bugs) must
55+
never be retried — re-enqueuing them would spin forever."""
56+
status_code = getattr(exc, "status_code", None)
57+
return isinstance(status_code, int) and status_code in _RETRYABLE_STATUS_CODES
3658

3759

3860
class SpanEventType(str, Enum):
@@ -45,6 +67,9 @@ class _SpanQueueItem:
4567
event_type: SpanEventType
4668
span: Span
4769
processors: list[AsyncTracingProcessor]
70+
# Number of times this item has already been dispatched. Used to bound
71+
# re-enqueue on transient failures.
72+
attempts: int = 0
4873

4974

5075
class AsyncSpanQueue:
@@ -60,18 +85,64 @@ class AsyncSpanQueue:
6085
``linger_ms`` waiting for more items to coalesce into the same batch.
6186
Without the linger the drain almost always returned size-1 batches under
6287
real agent workloads, because spans typically arrive a few ms apart.
88+
89+
Reliability:
90+
- ``max_size`` bounds the queue. When full, new events are dropped and
91+
counted (see ``dropped_spans``) rather than growing memory without limit.
92+
``0`` keeps the queue unbounded.
93+
- A batch that fails with a *transient* HTTP status (429/5xx) is
94+
re-enqueued up to ``max_retries`` total attempts. Permanent failures
95+
(auth/validation/bugs) are dropped and counted immediately.
6396
"""
6497

6598
def __init__(
6699
self,
67100
batch_size: int = _DEFAULT_BATCH_SIZE,
68101
linger_ms: int | None = None,
102+
max_size: int | None = None,
103+
max_retries: int | None = None,
69104
) -> None:
70-
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue()
105+
resolved_max_size = (
106+
_read_int_env("AGENTEX_SPAN_QUEUE_MAX_SIZE", _DEFAULT_MAX_SIZE) if max_size is None else max(0, max_size)
107+
)
108+
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=resolved_max_size)
71109
self._drain_task: asyncio.Task[None] | None = None
72110
self._stopping = False
73111
self._batch_size = batch_size
74112
self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms)
113+
self._max_retries = (
114+
_read_int_env("AGENTEX_SPAN_QUEUE_MAX_RETRIES", _DEFAULT_MAX_RETRIES, minimum=1)
115+
if max_retries is None
116+
else max(1, max_retries)
117+
)
118+
# Total spans dropped for any reason (full queue, shutdown, permanent
119+
# failure, exhausted retries). Surfaced for metrics/observability so
120+
# span loss stops being silent.
121+
self._dropped_spans = 0
122+
123+
@property
124+
def dropped_spans(self) -> int:
125+
"""Cumulative count of spans dropped (never delivered)."""
126+
return self._dropped_spans
127+
128+
@property
129+
def depth(self) -> int:
130+
"""Current number of items waiting in the queue."""
131+
return self._queue.qsize()
132+
133+
def _record_drop(self, count: int, reason: str) -> None:
134+
if count <= 0:
135+
return
136+
self._dropped_spans += count
137+
# Warn on the first drop and then sparsely, so a drop storm is visible
138+
# without flooding the log.
139+
if self._dropped_spans == count or self._dropped_spans % 100 < count:
140+
logger.warning(
141+
"Span queue dropped %d span(s) (%s); %d dropped in total",
142+
count,
143+
reason,
144+
self._dropped_spans,
145+
)
75146

76147
def enqueue(
77148
self,
@@ -80,10 +151,13 @@ def enqueue(
80151
processors: list[AsyncTracingProcessor],
81152
) -> None:
82153
if self._stopping:
83-
logger.warning("Span queue is shutting down, dropping %s event for span %s", event_type.value, span.id)
154+
self._record_drop(1, "queue shutting down")
84155
return
85156
self._ensure_drain_running()
86-
self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors))
157+
try:
158+
self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors))
159+
except asyncio.QueueFull:
160+
self._record_drop(1, "queue full")
87161

88162
def _ensure_drain_running(self) -> None:
89163
if self._drain_task is None or self._drain_task.done():
@@ -111,9 +185,7 @@ async def _drain_loop(self) -> None:
111185
if remaining <= 0:
112186
break
113187
try:
114-
batch.append(
115-
await asyncio.wait_for(self._queue.get(), timeout=remaining)
116-
)
188+
batch.append(await asyncio.wait_for(self._queue.get(), timeout=remaining))
117189
except asyncio.TimeoutError:
118190
break
119191
else:
@@ -141,8 +213,7 @@ async def _drain_loop(self) -> None:
141213
# Release span data for GC.
142214
batch.clear()
143215

144-
@staticmethod
145-
async def _process_items(items: list[_SpanQueueItem]) -> None:
216+
async def _process_items(self, items: list[_SpanQueueItem]) -> None:
146217
"""Dispatch a batch of same-event-type items to each processor in one call.
147218
148219
Groups spans by processor so each processor sees its full slice of the
@@ -157,26 +228,78 @@ async def _process_items(items: list[_SpanQueueItem]) -> None:
157228
"_process_items requires all items to share the same event_type; "
158229
"callers must split START and END batches before dispatching."
159230
)
160-
by_processor: dict[AsyncTracingProcessor, list[Span]] = {}
231+
by_processor: dict[AsyncTracingProcessor, list[_SpanQueueItem]] = {}
161232
for item in items:
162233
for p in item.processors:
163-
by_processor.setdefault(p, []).append(item.span)
234+
by_processor.setdefault(p, []).append(item)
164235

165-
async def _handle(p: AsyncTracingProcessor, spans: list[Span]) -> None:
166-
try:
167-
if event_type == SpanEventType.START:
168-
await p.on_spans_start(spans)
169-
else:
170-
await p.on_spans_end(spans)
171-
except Exception:
172-
logger.exception(
173-
"Tracing processor %s failed handling %d spans during %s",
236+
await asyncio.gather(*[self._handle(p, batch, event_type) for p, batch in by_processor.items()])
237+
238+
async def _handle(
239+
self,
240+
p: AsyncTracingProcessor,
241+
items: list[_SpanQueueItem],
242+
event_type: SpanEventType,
243+
) -> None:
244+
spans = [item.span for item in items]
245+
try:
246+
if event_type == SpanEventType.START:
247+
await p.on_spans_start(spans)
248+
else:
249+
await p.on_spans_end(spans)
250+
except Exception as exc:
251+
self._handle_failure(p, items, event_type, exc)
252+
253+
def _handle_failure(
254+
self,
255+
p: AsyncTracingProcessor,
256+
items: list[_SpanQueueItem],
257+
event_type: SpanEventType,
258+
exc: Exception,
259+
) -> None:
260+
# Re-enqueue transient failures, drop everything else. Re-enqueue is
261+
# bounded by max_retries, so even during shutdown the queue's join()
262+
# still terminates after a finite number of passes.
263+
if _is_retryable_exc(exc):
264+
retriable = [item for item in items if item.attempts + 1 < self._max_retries]
265+
exhausted = len(items) - len(retriable)
266+
if exhausted:
267+
self._record_drop(exhausted, f"{type(p).__name__} retries exhausted during {event_type.value}")
268+
for item in retriable:
269+
self._reenqueue(item, p)
270+
if retriable:
271+
logger.warning(
272+
"Tracing processor %s failed handling %d spans during %s (%s); re-enqueued %d for retry",
174273
type(p).__name__,
175-
len(spans),
274+
len(items),
176275
event_type.value,
276+
type(exc).__name__,
277+
len(retriable),
177278
)
279+
return
280+
281+
self._record_drop(len(items), f"{type(p).__name__} permanent failure during {event_type.value}")
282+
logger.exception(
283+
"Tracing processor %s failed handling %d spans during %s",
284+
type(p).__name__,
285+
len(items),
286+
event_type.value,
287+
)
178288

179-
await asyncio.gather(*[_handle(p, spans) for p, spans in by_processor.items()])
289+
def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None:
290+
"""Put a single failed item back on the queue, scoped to the processor
291+
that failed, with an incremented attempt count."""
292+
try:
293+
self._queue.put_nowait(
294+
_SpanQueueItem(
295+
event_type=item.event_type,
296+
span=item.span,
297+
processors=[p],
298+
attempts=item.attempts + 1,
299+
)
300+
)
301+
except asyncio.QueueFull:
302+
self._record_drop(1, "queue full on retry")
180303

181304
# ------------------------------------------------------------------
182305
# Shutdown

0 commit comments

Comments
 (0)