Skip to content

Commit 26306bc

Browse files
fix(tracing): batch span events in SGPAsyncTracingProcessor
Builds on the per-loop client work in #341 by adding the second half of the OVE-2 fix: replace the per-event upsert_batch(items=[one]) calls with the buffer-plus-flush model the synchronous TraceQueueManager already uses. - asyncio.Queue (max 4000) plus an asyncio.Task worker that drains it in batches of up to 50, every 4s or when 200+ events are queued. - Retry policy mirrors the SDK: 4 attempts with exponential backoff (0.4s -> 20s capped). Unexpected exceptions in upsert are logged and the batch is dropped so they don't kill the worker. - Per-iteration try/except in the worker so one bad batch doesn't break subsequent flushes. - shutdown() drains the queue, signals the worker, and joins with a 10s timeout; spans whose end was never recorded are re-enqueued so they aren't silently lost. Behavior preserved: - on_span_start / on_span_end / shutdown signatures unchanged. - _spans dict still tracks SGPSpan objects between start and end. - Externally injected sgp_async_client is still respected. - All 16 tests from part 1 of the split pass unchanged. New tests: - test_span_event_does_not_trigger_immediate_upsert: a single event must not hit the network. - test_shutdown_flushes_queued_spans_in_one_batch: 5 lifecycles (10 events) coalesce into one upsert call. Adjusted test_owned_client_recreated_after_loop_swap to also reset _worker and _loop, because the new short-circuit (loop matches AND worker alive) needs both to be invalidated to trigger re-init. Closes OVE-2.
1 parent e83e080 commit 26306bc

2 files changed

Lines changed: 236 additions & 44 deletions

File tree

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 164 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import scale_gp_beta.lib.tracing as tracing
77
from scale_gp_beta import SGPClient, AsyncSGPClient
8+
from scale_gp_beta._exceptions import APIError
89
from scale_gp_beta.lib.tracing import create_span, flush_queue
910
from scale_gp_beta.lib.tracing.span import Span as SGPSpan
1011

@@ -20,6 +21,19 @@
2021
logger = make_logger(__name__)
2122

2223

24+
# Mirrored from scale_gp_beta.lib.tracing.trace_queue_manager defaults so the
25+
# async processor batches and retries the same way the sync (daemon-thread)
26+
# path does.
27+
DEFAULT_MAX_QUEUE_SIZE = 4_000
28+
DEFAULT_TRIGGER_QUEUE_SIZE = 200
29+
DEFAULT_TRIGGER_CADENCE = 4.0
30+
DEFAULT_MAX_BATCH_SIZE = 50
31+
DEFAULT_RETRIES = 4
32+
INITIAL_BACKOFF = 0.4
33+
MAX_BACKOFF = 20.0
34+
SHUTDOWN_DRAIN_TIMEOUT = 10.0
35+
36+
2337
def _get_span_type(span: Span) -> str:
2438
"""Read span_type from span.data['__span_type__'], defaulting to STANDALONE."""
2539
if isinstance(span.data, dict):
@@ -93,19 +107,19 @@ def shutdown(self) -> None:
93107

94108

95109
class SGPAsyncTracingProcessor(AsyncTracingProcessor):
96-
"""Async tracing processor.
97-
98-
The HTTP client is lazy-initialized on the running event loop on first
99-
use, and re-created when the loop changes. This avoids the "bound to a
100-
different event loop" errors that the previous implementation worked
101-
around by disabling HTTP keepalive (`max_keepalive_connections=0`),
102-
which paid a TCP+TLS handshake on every span event. With per-loop
103-
construction, the underlying httpx client can keep its default
104-
connection pool, so repeated calls on the same loop reuse connections.
105-
106-
Note: span events are still upserted per-event in this version. Batching
107-
is intentionally not part of this change so it can be reviewed
108-
independently.
110+
"""Async tracing processor that buffers spans and flushes them in batches.
111+
112+
Mirrors the buffer-plus-flush behavior of the SDK's synchronous
113+
`TraceQueueManager`, but uses asyncio primitives so it works inside an
114+
asyncio event loop without blocking it.
115+
116+
The HTTP client, queue, and worker task are lazy-initialized on the
117+
running event loop the first time a span event is recorded. This avoids
118+
the "bound to a different event loop" errors that occur when the
119+
processor is constructed on one loop but used on another (e.g. a worker
120+
that creates a fresh loop per request) and lets us re-enable HTTP
121+
keepalive on the underlying httpx client without paying a TCP+TLS
122+
handshake on every span event.
109123
"""
110124

111125
def __init__(self, config: SGPTracingProcessorConfig):
@@ -114,12 +128,20 @@ def __init__(self, config: SGPTracingProcessorConfig):
114128
self._spans: dict[str, SGPSpan] = {}
115129
self.env_vars = EnvironmentVariables.refresh()
116130

117-
# Lazy-initialized on the running loop on first use.
131+
# Lazy-initialized on the running loop on first use. Holding these
132+
# as attributes (rather than constructing eagerly in __init__) is
133+
# what lets the processor survive the loop on which it was created
134+
# being replaced — a common pattern in sync-ACP / per-request loops.
135+
self._loop: Optional[asyncio.AbstractEventLoop] = None
118136
self.sgp_async_client: Optional[AsyncSGPClient] = None
119-
# Loop the *processor-owned* client was constructed on. Stays None
120-
# when the client was injected externally (e.g. by a test); in that
121-
# case we never replace it.
137+
# Loop the *processor-owned* client was constructed on. Remains
138+
# None when the client was injected externally (e.g. by a test);
139+
# in that case we never replace it.
122140
self._client_owned_at_loop: Optional[asyncio.AbstractEventLoop] = None
141+
self._queue: Optional[asyncio.Queue[SGPSpan]] = None
142+
self._worker: Optional[asyncio.Task[None]] = None
143+
self._shutdown_event: Optional[asyncio.Event] = None
144+
self._flush_event: Optional[asyncio.Event] = None
123145

124146
def _add_source_to_span(self, span: Span) -> None:
125147
if span.data is None:
@@ -133,15 +155,25 @@ def _add_source_to_span(self, span: Span) -> None:
133155
if self.env_vars.AGENT_ID is not None:
134156
span.data["__agent_id__"] = self.env_vars.AGENT_ID
135157

136-
def _ensure_client(self) -> None:
137-
"""Initialize or recreate the AsyncSGPClient on the running loop.
158+
def _ensure_started(self) -> None:
159+
"""Initialize per-loop state on first use, or after a loop swap.
138160
139161
Must be called from inside an async method so `get_running_loop()`
140162
is safe.
141163
"""
142164
if self.disabled:
143165
return
144166
loop = asyncio.get_running_loop()
167+
if self._loop is loop and self._worker is not None and not self._worker.done():
168+
return
169+
170+
self._loop = loop
171+
# We construct an httpx-backed client lazily on the running loop so
172+
# connection pooling and keepalive can be left at httpx defaults
173+
# without hitting "bound to a different event loop" errors when the
174+
# processor outlives its original loop. An externally injected
175+
# client (e.g. a test mock) is left alone — _client_owned_at_loop
176+
# stays None for those.
145177
if self.sgp_async_client is None:
146178
self.sgp_async_client = AsyncSGPClient(
147179
api_key=self._config.sgp_api_key,
@@ -150,17 +182,22 @@ def _ensure_client(self) -> None:
150182
)
151183
self._client_owned_at_loop = loop
152184
elif self._client_owned_at_loop is not None and self._client_owned_at_loop is not loop:
153-
# Owned client was bound to a now-stale loop. Replace it.
185+
# We previously created a client on a now-stale loop. Replace it.
154186
self.sgp_async_client = AsyncSGPClient(
155187
api_key=self._config.sgp_api_key,
156188
account_id=self._config.sgp_account_id,
157189
base_url=self._config.sgp_base_url,
158190
)
159191
self._client_owned_at_loop = loop
192+
self._queue = asyncio.Queue(maxsize=DEFAULT_MAX_QUEUE_SIZE)
193+
self._shutdown_event = asyncio.Event()
194+
self._flush_event = asyncio.Event()
195+
self._worker = loop.create_task(self._run())
160196

161197
@override
162198
async def on_span_start(self, span: Span) -> None:
163199
self._add_source_to_span(span)
200+
164201
sgp_span = create_span(
165202
name=span.name,
166203
span_type=_get_span_type(span),
@@ -172,17 +209,13 @@ async def on_span_start(self, span: Span) -> None:
172209
metadata=span.data,
173210
)
174211
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]
212+
self._spans[span.id] = sgp_span
175213

176214
if self.disabled:
177-
logger.warning("SGP is disabled, skipping span upsert")
178215
return
179216

180-
self._ensure_client()
181-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
182-
items=[sgp_span.to_request_params()]
183-
)
184-
185-
self._spans[span.id] = sgp_span
217+
self._ensure_started()
218+
self._enqueue(sgp_span)
186219

187220
@override
188221
async def on_span_end(self, span: Span) -> None:
@@ -199,27 +232,117 @@ async def on_span_end(self, span: Span) -> None:
199232
if self.disabled:
200233
return
201234

202-
self._ensure_client()
203-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
204-
items=[sgp_span.to_request_params()]
205-
)
235+
self._ensure_started()
236+
self._enqueue(sgp_span)
206237

207238
@override
208239
async def shutdown(self) -> None:
209-
if self.disabled or self.sgp_async_client is None:
240+
# Fast path when the processor was never started (disabled, or
241+
# shutdown called before any span event). Avoid spinning up a
242+
# worker just to tear it down.
243+
if self._worker is None:
210244
self._spans.clear()
211245
return
212246

213-
items: list[dict] = []
214-
for sgp_span in self._spans.values():
215-
try:
216-
items.append(sgp_span.to_request_params())
217-
except Exception:
218-
logger.exception("Failed to build span params during shutdown; dropping span")
247+
# Re-enqueue any spans whose end was never recorded so they aren't
248+
# silently lost. They were already enqueued at start, but on_span_end
249+
# is what mutates output / metadata / end_timestamp; without a
250+
# second enqueue, the server only sees the start payload for them.
251+
for sgp_span in list(self._spans.values()):
252+
self._enqueue(sgp_span)
219253
self._spans.clear()
220254

221-
if items:
255+
assert self._shutdown_event is not None
256+
self._shutdown_event.set()
257+
if self._flush_event is not None:
258+
self._flush_event.set()
259+
260+
try:
261+
await asyncio.wait_for(self._worker, timeout=SHUTDOWN_DRAIN_TIMEOUT)
262+
except asyncio.TimeoutError:
263+
logger.warning(f"Async tracing worker did not exit within {SHUTDOWN_DRAIN_TIMEOUT}s; cancelling")
264+
self._worker.cancel()
265+
266+
def _enqueue(self, sgp_span: SGPSpan) -> None:
267+
if self._queue is None:
268+
return
269+
try:
270+
self._queue.put_nowait(sgp_span)
271+
except asyncio.QueueFull:
272+
logger.warning(f"Tracing queue full; dropping span {sgp_span.span_id}")
273+
return
274+
if self._flush_event is not None and self._queue.qsize() >= DEFAULT_TRIGGER_QUEUE_SIZE:
275+
self._flush_event.set()
276+
277+
async def _run(self) -> None:
278+
try:
279+
while not (self._shutdown_event and self._shutdown_event.is_set()):
280+
# Wake on either an early-flush signal or the cadence timer.
281+
assert self._flush_event is not None
282+
try:
283+
await asyncio.wait_for(self._flush_event.wait(), timeout=DEFAULT_TRIGGER_CADENCE)
284+
except asyncio.TimeoutError:
285+
pass
286+
self._flush_event.clear()
287+
# Per-iteration guard: an unexpected error during one drain
288+
# must not kill the worker, otherwise queued items stay
289+
# unflushed until shutdown.
290+
try:
291+
await self._drain()
292+
except asyncio.CancelledError:
293+
raise
294+
except Exception:
295+
logger.exception("Tracing worker iteration failed; continuing")
296+
297+
# Final drain on shutdown.
298+
try:
299+
await self._drain()
300+
except Exception:
301+
logger.exception("Final tracing drain failed; some spans may be lost")
302+
except asyncio.CancelledError:
303+
raise
304+
except Exception:
305+
logger.exception("Async tracing worker crashed")
306+
307+
async def _drain(self) -> None:
308+
if self._queue is None or self.sgp_async_client is None:
309+
return
310+
while not self._queue.empty():
311+
batch: list[dict] = []
312+
while len(batch) < DEFAULT_MAX_BATCH_SIZE and not self._queue.empty():
313+
try:
314+
sgp_span = self._queue.get_nowait()
315+
except asyncio.QueueEmpty:
316+
break
317+
try:
318+
batch.append(sgp_span.to_request_params())
319+
except Exception:
320+
logger.exception("Failed to build span params; dropping span")
321+
if not batch:
322+
continue
323+
await self._upsert_with_retry(batch)
324+
325+
async def _upsert_with_retry(self, batch: list[dict]) -> None:
326+
if self.sgp_async_client is None:
327+
return
328+
backoff = INITIAL_BACKOFF
329+
for attempt in range(DEFAULT_RETRIES):
222330
try:
223-
await self.sgp_async_client.spans.upsert_batch(items=items) # type: ignore[arg-type]
331+
await self.sgp_async_client.spans.upsert_batch(items=batch) # type: ignore[arg-type]
332+
return
333+
except APIError as exc:
334+
if attempt == DEFAULT_RETRIES - 1:
335+
logger.error(f"Failed to export {len(batch)} spans after {DEFAULT_RETRIES} attempts: {exc.message}")
336+
return
337+
logger.warning(f"Span export failed ({exc.message}); retrying in {backoff:.1f}s")
338+
await asyncio.sleep(backoff)
339+
backoff = min(backoff * 2, MAX_BACKOFF)
340+
except asyncio.CancelledError:
341+
raise
224342
except Exception:
225-
logger.exception("Final span flush failed during shutdown")
343+
# Unexpected error (not APIError, not cancellation): log and
344+
# drop the batch. We deliberately do not retry because we
345+
# don't know whether the request reached the server, and
346+
# the SDK already surfaces transport failures as APIError.
347+
logger.exception(f"Unexpected error exporting {len(batch)} spans; dropping batch")
348+
return

tests/lib/core/tracing/processors/test_sgp_tracing_processor.py

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,14 +216,19 @@ async def test_owned_client_recreated_after_loop_swap(self):
216216
assert processor.sgp_async_client is first
217217
assert mock_client_cls.call_count == 1
218218

219-
# Simulate a loop swap: the processor's tracked loop is stale.
220-
# The next call must recreate the client.
221-
processor._client_owned_at_loop = MagicMock()
219+
# Simulate a loop swap: pretend a previous run left state
220+
# behind that's all bound to a now-stale loop.
221+
stale_loop = MagicMock()
222+
processor._loop = stale_loop
223+
processor._client_owned_at_loop = stale_loop
224+
processor._worker = None # force re-init
222225

223226
await processor.on_span_start(_make_span())
224227
assert processor.sgp_async_client is second, "Owned client must be recreated after loop swap"
225228
assert mock_client_cls.call_count == 2
226229

230+
await processor.shutdown()
231+
227232
async def test_injected_client_preserved(self):
228233
"""A client assigned externally (test mock or caller-built) must
229234
never be replaced by the processor. Contract:
@@ -251,3 +256,67 @@ async def test_injected_client_preserved(self):
251256
assert mock_client_cls.call_count == 0, (
252257
"Injected client must not be replaced (no AsyncSGPClient construction)"
253258
)
259+
260+
261+
# ---------------------------------------------------------------------------
262+
# Async processor batching tests
263+
#
264+
# Previously, on_span_start and on_span_end each issued an awaited
265+
# upsert_batch(items=[one]) call on the agent's hot path. The processor now
266+
# buffers events and flushes them in batches from a background asyncio.Task,
267+
# mirroring the SDK's TraceQueueManager.
268+
# ---------------------------------------------------------------------------
269+
270+
271+
class TestSGPAsyncTracingProcessorBatching:
272+
@staticmethod
273+
def _make_processor():
274+
mock_env = MagicMock()
275+
mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)
276+
277+
mock_async_client = MagicMock()
278+
mock_async_client.spans.upsert_batch = AsyncMock()
279+
280+
with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(
281+
f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()
282+
), patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client):
283+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
284+
SGPAsyncTracingProcessor,
285+
)
286+
287+
processor = SGPAsyncTracingProcessor(_make_config())
288+
289+
processor.sgp_async_client = mock_async_client
290+
return processor, mock_async_client
291+
292+
async def test_span_event_does_not_trigger_immediate_upsert(self):
293+
"""Regression: a single span event must not result in an upsert call
294+
on the hot path. Events must be enqueued and flushed by the worker."""
295+
processor, client = self._make_processor()
296+
297+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
298+
span = _make_span()
299+
await processor.on_span_start(span)
300+
301+
assert client.spans.upsert_batch.call_count == 0, "on_span_start should enqueue, not trigger a network call"
302+
303+
async def test_shutdown_flushes_queued_spans_in_one_batch(self):
304+
"""Many span events should be coalesced into a single upsert_batch
305+
call when the buffer fits under MAX_BATCH_SIZE (50)."""
306+
processor, client = self._make_processor()
307+
308+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
309+
for _ in range(5):
310+
span = _make_span()
311+
await processor.on_span_start(span)
312+
span.end_time = datetime.now(UTC)
313+
await processor.on_span_end(span)
314+
315+
await processor.shutdown()
316+
317+
assert client.spans.upsert_batch.call_count == 1, (
318+
f"Expected a single batched upsert, got {client.spans.upsert_batch.call_count}"
319+
)
320+
items = client.spans.upsert_batch.call_args.kwargs["items"]
321+
# 5 starts + 5 ends = 10 enqueued items, well under MAX_BATCH_SIZE.
322+
assert len(items) == 10, f"Expected 10 items in the batch, got {len(items)}"

0 commit comments

Comments
 (0)