Skip to content

Commit e6ea039

Browse files
refactor(tracing): split _run worker into named helpers and expand tests
Code clarity changes (no behavior change): - Split SGPAsyncTracingProcessor._run into _is_shutting_down, _wait_for_flush_signal, and _safe_drain helpers so the loop reads top-to-bottom: "while not shutting down, wait for trigger, drain." - Add docstrings on _enqueue, _ensure_started, _drain, and _upsert_with_retry covering inputs, side effects, and dropped-batch semantics. New tests (regression coverage that was missing): - test_drain_splits_into_multiple_batches_above_max_batch_size: 80 enqueued events split into multiple upsert_batch calls, each batch capped at MAX_BATCH_SIZE (50). - test_worker_continues_after_unexpected_exception_in_one_batch: a RuntimeError on one upsert drops that batch; the worker keeps flushing and a subsequent batch lands. Exercises the per-iteration try/except in _run.
1 parent 81e2021 commit e6ea039

2 files changed

Lines changed: 112 additions & 25 deletions

File tree

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,10 @@ def _add_source_to_span(self, span: Span) -> None:
164164
def _ensure_started(self) -> None:
165165
"""Initialize per-loop queue + worker on first use, or after a loop swap.
166166
167-
Must be called from inside an async method so `get_running_loop()`
168-
is safe.
167+
Must be called from inside an async method so `get_running_loop()` is
168+
safe. Idempotent on the same loop while the worker is healthy; on a
169+
loop change or worker death, it rebuilds the queue and worker (items
170+
in the previous queue are lost — they were tied to a now-dead loop).
169171
"""
170172
if self.disabled:
171173
return
@@ -246,6 +248,8 @@ async def shutdown(self) -> None:
246248
self._worker.cancel()
247249

248250
def _enqueue(self, sgp_span: SGPSpan) -> None:
251+
"""Push a span onto the queue and signal an early flush if the queue
252+
has crossed `DEFAULT_TRIGGER_QUEUE_SIZE`. Drops the span on overflow."""
249253
if self._queue is None:
250254
return
251255
try:
@@ -256,37 +260,51 @@ def _enqueue(self, sgp_span: SGPSpan) -> None:
256260
if self._flush_event is not None and self._queue.qsize() >= DEFAULT_TRIGGER_QUEUE_SIZE:
257261
self._flush_event.set()
258262

259-
async def _run(self) -> None:
263+
def _is_shutting_down(self) -> bool:
264+
return self._shutdown_event is not None and self._shutdown_event.is_set()
265+
266+
async def _wait_for_flush_signal(self) -> None:
267+
"""Block until either an early-flush signal arrives or the cadence
268+
timer fires. Returns either way; the caller is responsible for
269+
draining."""
270+
assert self._flush_event is not None
260271
try:
261-
while not (self._shutdown_event and self._shutdown_event.is_set()):
262-
# Wake on either an early-flush signal or the cadence timer.
263-
assert self._flush_event is not None
264-
try:
265-
await asyncio.wait_for(self._flush_event.wait(), timeout=DEFAULT_TRIGGER_CADENCE)
266-
except asyncio.TimeoutError:
267-
pass
268-
self._flush_event.clear()
269-
# Per-iteration guard: an unexpected error during one drain
270-
# must not kill the worker, otherwise queued items stay
271-
# unflushed until shutdown.
272-
try:
273-
await self._drain()
274-
except asyncio.CancelledError:
275-
raise
276-
except Exception:
277-
logger.exception("Tracing worker iteration failed; continuing")
272+
await asyncio.wait_for(self._flush_event.wait(), timeout=DEFAULT_TRIGGER_CADENCE)
273+
except asyncio.TimeoutError:
274+
pass
275+
self._flush_event.clear()
278276

279-
# Final drain on shutdown.
280-
try:
281-
await self._drain()
282-
except Exception:
283-
logger.exception("Final tracing drain failed; some spans may be lost")
277+
async def _safe_drain(self, log_label: str) -> None:
278+
"""Run `_drain`, catching unexpected errors so one bad iteration
279+
doesn't kill the worker. CancelledError is always re-raised."""
280+
try:
281+
await self._drain()
282+
except asyncio.CancelledError:
283+
raise
284+
except Exception:
285+
logger.exception(log_label)
286+
287+
async def _run(self) -> None:
288+
"""Background worker. Sleeps until a flush trigger fires, drains the
289+
queue, and repeats. On shutdown signal, does one final drain so
290+
nothing pending is dropped. The outermost try / except keeps a worker
291+
crash from being silent."""
292+
try:
293+
while not self._is_shutting_down():
294+
await self._wait_for_flush_signal()
295+
await self._safe_drain("Tracing worker iteration failed; continuing")
296+
await self._safe_drain("Final tracing drain failed; some spans may be lost")
284297
except asyncio.CancelledError:
285298
raise
286299
except Exception:
287300
logger.exception("Async tracing worker crashed")
288301

289302
async def _drain(self) -> None:
303+
"""Pull spans from the queue and upsert them in batches of up to
304+
`DEFAULT_MAX_BATCH_SIZE`. Stops when the queue is empty.
305+
306+
A span whose `to_request_params()` raises is dropped (logged); the
307+
rest of the batch still goes out. This matches the SDK's exporter."""
290308
if self._queue is None or self.sgp_async_client is None:
291309
return
292310
while not self._queue.empty():
@@ -305,6 +323,13 @@ async def _drain(self) -> None:
305323
await self._upsert_with_retry(batch)
306324

307325
async def _upsert_with_retry(self, batch: list[dict]) -> None:
326+
"""POST a single batch with the SDK's retry policy: 4 attempts with
327+
exponential backoff (`INITIAL_BACKOFF` -> `MAX_BACKOFF` capped).
328+
329+
- `APIError` triggers retry up to `DEFAULT_RETRIES` attempts.
330+
- Anything else is logged and the batch is dropped (we don't know
331+
whether the server saw the request, and the SDK already wraps
332+
transport-level failures as `APIError`)."""
308333
if self.sgp_async_client is None:
309334
return
310335
backoff = INITIAL_BACKOFF

tests/lib/core/tracing/processors/test_sgp_tracing_processor.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import uuid
4+
import asyncio
45
from datetime import UTC, datetime
56
from unittest.mock import AsyncMock, MagicMock, patch
67

@@ -259,3 +260,64 @@ async def test_shutdown_flushes_queued_spans_in_one_batch(self):
259260
items = client.spans.upsert_batch.call_args.kwargs["items"]
260261
# 5 starts + 5 ends = 10 enqueued items, well under MAX_BATCH_SIZE.
261262
assert len(items) == 10, f"Expected 10 items in the batch, got {len(items)}"
263+
264+
async def test_drain_splits_into_multiple_batches_above_max_batch_size(self):
265+
"""Spans beyond MAX_BATCH_SIZE (50) must be split across multiple
266+
upsert_batch calls so a single call never exceeds the cap."""
267+
processor, client = self._make_processor()
268+
269+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
270+
for _ in range(40):
271+
span = _make_span()
272+
await processor.on_span_start(span)
273+
span.end_time = datetime.now(UTC)
274+
await processor.on_span_end(span)
275+
276+
# 40 starts + 40 ends = 80 enqueued items. With MAX_BATCH_SIZE=50,
277+
# that's at least 2 upsert calls.
278+
await processor.shutdown()
279+
280+
assert client.spans.upsert_batch.call_count >= 2, (
281+
f"Expected ≥2 batched upserts for 80 events, got {client.spans.upsert_batch.call_count}"
282+
)
283+
for call in client.spans.upsert_batch.call_args_list:
284+
items = call.kwargs["items"]
285+
assert len(items) <= 50, f"Batch of {len(items)} exceeds MAX_BATCH_SIZE=50"
286+
total_items = sum(len(call.kwargs["items"]) for call in client.spans.upsert_batch.call_args_list)
287+
assert total_items == 80, f"Expected 80 items across all batches, got {total_items}"
288+
289+
async def test_worker_continues_after_unexpected_exception_in_one_batch(self):
290+
"""A single upsert raising an unexpected (non-APIError) exception
291+
must drop that batch and let the worker keep flushing subsequent
292+
ones. Regression test for the per-iteration try/except in `_run`."""
293+
processor, client = self._make_processor()
294+
295+
# First call raises (unexpected exception → batch dropped),
296+
# subsequent calls succeed.
297+
client.spans.upsert_batch.side_effect = [RuntimeError("boom"), None]
298+
299+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
300+
# First flush — will raise inside _upsert_with_retry, batch dropped.
301+
span_a = _make_span()
302+
await processor.on_span_start(span_a)
303+
span_a.end_time = datetime.now(UTC)
304+
await processor.on_span_end(span_a)
305+
assert processor._flush_event is not None
306+
processor._flush_event.set()
307+
# Yield so the worker runs the failing flush.
308+
await asyncio.sleep(0)
309+
await asyncio.sleep(0)
310+
311+
# Worker must still be alive and able to handle a second batch.
312+
span_b = _make_span()
313+
await processor.on_span_start(span_b)
314+
span_b.end_time = datetime.now(UTC)
315+
await processor.on_span_end(span_b)
316+
317+
await processor.shutdown()
318+
319+
# First call raised, second succeeded → 2 calls total.
320+
assert client.spans.upsert_batch.call_count == 2, (
321+
f"Worker should have made a second upsert attempt after the first failed; "
322+
f"got {client.spans.upsert_batch.call_count}"
323+
)

0 commit comments

Comments
 (0)