Skip to content

Commit 81e2021

Browse files
fix(tracing): batch span events in SGPAsyncTracingProcessor
`SGPAsyncTracingProcessor` previously `await client.spans.upsert_batch(items=[one_span])` on every `on_span_start` and every `on_span_end`, awaited synchronously on the agent's hot path. Under burst load this surfaces as start-span timeouts. The synchronous processor inherits the SDK's `TraceQueueManager` and is fine; this change brings the async one in line. Replace the per-event upsert with a buffer-plus-flush model in asyncio: - asyncio.Queue (max 4000) plus an asyncio.Task worker that drains in batches of up to 50, every 4s or when 200+ events queued. Constants mirror scale_gp_beta.lib.tracing.trace_queue_manager. - Retry policy mirrors the SDK: 4 attempts with exponential backoff (0.4s -> 20s capped). Unexpected exceptions are dropped, not retried. - Per-iteration try / except in the worker so one bad batch doesn't break subsequent flushes. - shutdown() drains the queue, signals the worker, joins with a 10s timeout. Spans whose end was never recorded are re-enqueued so they aren't silently lost. Behavior preserved: - on_span_start / on_span_end / shutdown signatures unchanged. - _spans dict still tracks SGPSpan objects between start and end. - Externally injected sgp_async_client is still respected. - The existing AsyncSGPClient construction in __init__, with the max_keepalive_connections=0 workaround, is kept untouched. Closes OVE-4.
1 parent ed6fd5e commit 81e2021

2 files changed

Lines changed: 251 additions & 31 deletions

File tree

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 165 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
from typing import override
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from typing import Optional, override
25

36
import scale_gp_beta.lib.tracing as tracing
47
from scale_gp_beta import SGPClient, AsyncSGPClient
8+
from scale_gp_beta._exceptions import APIError
59
from scale_gp_beta.lib.tracing import create_span, flush_queue
610
from scale_gp_beta.lib.tracing.span import Span as SGPSpan
711

@@ -17,6 +21,19 @@
1721
logger = make_logger(__name__)
1822

1923

24+
# Mirrored from scale_gp_beta.lib.tracing.trace_queue_manager defaults so the
25+
# async processor batches and retries the same way the sync (daemon-thread)
26+
# path does.
27+
DEFAULT_MAX_QUEUE_SIZE = 4_000
28+
DEFAULT_TRIGGER_QUEUE_SIZE = 200
29+
DEFAULT_TRIGGER_CADENCE = 4.0
30+
DEFAULT_MAX_BATCH_SIZE = 50
31+
DEFAULT_RETRIES = 4
32+
INITIAL_BACKOFF = 0.4
33+
MAX_BACKOFF = 20.0
34+
SHUTDOWN_DRAIN_TIMEOUT = 10.0
35+
36+
2037
def _get_span_type(span: Span) -> str:
2138
"""Read span_type from span.data['__span_type__'], defaulting to STANDALONE."""
2239
if isinstance(span.data, dict):
@@ -90,6 +107,18 @@ def shutdown(self) -> None:
90107

91108

92109
class SGPAsyncTracingProcessor(AsyncTracingProcessor):
110+
"""Async tracing processor that buffers spans and flushes them in batches.
111+
112+
Mirrors the buffer-plus-flush behavior of the SDK's synchronous
113+
`TraceQueueManager`, but uses asyncio primitives so it works inside an
114+
asyncio event loop without blocking it.
115+
116+
Spans are enqueued on `on_span_start` and `on_span_end`; a background
117+
`asyncio.Task` worker drains the queue into batches and posts them via
118+
`client.spans.upsert_batch`. The worker is lazy-initialized on the
119+
running event loop on first use.
120+
"""
121+
93122
def __init__(self, config: SGPTracingProcessorConfig):
94123
self.disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
95124
self._spans: dict[str, SGPSpan] = {}
@@ -111,6 +140,15 @@ def __init__(self, config: SGPTracingProcessorConfig):
111140
)
112141
self.env_vars = EnvironmentVariables.refresh()
113142

143+
# Lazy-initialized on the running loop on first use. Re-created if
144+
# the loop changes (e.g. sync-ACP / per-request loops) so the worker
145+
# is always bound to the loop currently consuming it.
146+
self._loop: Optional[asyncio.AbstractEventLoop] = None
147+
self._queue: Optional[asyncio.Queue[SGPSpan]] = None
148+
self._worker: Optional[asyncio.Task[None]] = None
149+
self._shutdown_event: Optional[asyncio.Event] = None
150+
self._flush_event: Optional[asyncio.Event] = None
151+
114152
def _add_source_to_span(self, span: Span) -> None:
115153
if span.data is None:
116154
span.data = {}
@@ -123,6 +161,23 @@ def _add_source_to_span(self, span: Span) -> None:
123161
if self.env_vars.AGENT_ID is not None:
124162
span.data["__agent_id__"] = self.env_vars.AGENT_ID
125163

164+
def _ensure_started(self) -> None:
165+
"""Initialize per-loop queue + worker on first use, or after a loop swap.
166+
167+
Must be called from inside an async method so `get_running_loop()`
168+
is safe.
169+
"""
170+
if self.disabled:
171+
return
172+
loop = asyncio.get_running_loop()
173+
if self._loop is loop and self._worker is not None and not self._worker.done():
174+
return
175+
self._loop = loop
176+
self._queue = asyncio.Queue(maxsize=DEFAULT_MAX_QUEUE_SIZE)
177+
self._shutdown_event = asyncio.Event()
178+
self._flush_event = asyncio.Event()
179+
self._worker = loop.create_task(self._run())
180+
126181
@override
127182
async def on_span_start(self, span: Span) -> None:
128183
self._add_source_to_span(span)
@@ -137,18 +192,13 @@ async def on_span_start(self, span: Span) -> None:
137192
metadata=span.data,
138193
)
139194
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]
195+
self._spans[span.id] = sgp_span
140196

141197
if self.disabled:
142-
logger.warning("SGP is disabled, skipping span upsert")
143198
return
144-
# TODO(AGX1-198): Batch multiple spans into a single upsert_batch call
145-
# instead of one span per HTTP request.
146-
# https://linear.app/scale-epd/issue/AGX1-198/actually-use-sgp-batching-for-spans
147-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
148-
items=[sgp_span.to_request_params()]
149-
)
150199

151-
self._spans[span.id] = sgp_span
200+
self._ensure_started()
201+
self._enqueue(sgp_span)
152202

153203
@override
154204
async def on_span_end(self, span: Span) -> None:
@@ -158,20 +208,119 @@ async def on_span_end(self, span: Span) -> None:
158208
return
159209

160210
self._add_source_to_span(span)
161-
sgp_span.input = span.input # type: ignore[assignment]
162211
sgp_span.output = span.output # type: ignore[assignment]
163212
sgp_span.metadata = span.data # type: ignore[assignment]
164213
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
165214

166215
if self.disabled:
167216
return
168-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
169-
items=[sgp_span.to_request_params()]
170-
)
217+
218+
self._ensure_started()
219+
self._enqueue(sgp_span)
171220

172221
@override
173222
async def shutdown(self) -> None:
174-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
175-
items=[sgp_span.to_request_params() for sgp_span in self._spans.values()]
176-
)
223+
# Fast path when the processor was never started (disabled, or
224+
# shutdown called before any span event).
225+
if self._worker is None:
226+
self._spans.clear()
227+
return
228+
229+
# Re-enqueue any spans whose end was never recorded so they aren't
230+
# silently lost. They were already enqueued at start, but on_span_end
231+
# is what mutates output / metadata / end_timestamp; without a second
232+
# enqueue, the server only sees the start payload for them.
233+
for sgp_span in list(self._spans.values()):
234+
self._enqueue(sgp_span)
177235
self._spans.clear()
236+
237+
assert self._shutdown_event is not None
238+
self._shutdown_event.set()
239+
if self._flush_event is not None:
240+
self._flush_event.set()
241+
242+
try:
243+
await asyncio.wait_for(self._worker, timeout=SHUTDOWN_DRAIN_TIMEOUT)
244+
except asyncio.TimeoutError:
245+
logger.warning(f"Async tracing worker did not exit within {SHUTDOWN_DRAIN_TIMEOUT}s; cancelling")
246+
self._worker.cancel()
247+
248+
def _enqueue(self, sgp_span: SGPSpan) -> None:
249+
if self._queue is None:
250+
return
251+
try:
252+
self._queue.put_nowait(sgp_span)
253+
except asyncio.QueueFull:
254+
logger.warning(f"Tracing queue full; dropping span {sgp_span.span_id}")
255+
return
256+
if self._flush_event is not None and self._queue.qsize() >= DEFAULT_TRIGGER_QUEUE_SIZE:
257+
self._flush_event.set()
258+
259+
async def _run(self) -> None:
260+
try:
261+
while not (self._shutdown_event and self._shutdown_event.is_set()):
262+
# Wake on either an early-flush signal or the cadence timer.
263+
assert self._flush_event is not None
264+
try:
265+
await asyncio.wait_for(self._flush_event.wait(), timeout=DEFAULT_TRIGGER_CADENCE)
266+
except asyncio.TimeoutError:
267+
pass
268+
self._flush_event.clear()
269+
# Per-iteration guard: an unexpected error during one drain
270+
# must not kill the worker, otherwise queued items stay
271+
# unflushed until shutdown.
272+
try:
273+
await self._drain()
274+
except asyncio.CancelledError:
275+
raise
276+
except Exception:
277+
logger.exception("Tracing worker iteration failed; continuing")
278+
279+
# Final drain on shutdown.
280+
try:
281+
await self._drain()
282+
except Exception:
283+
logger.exception("Final tracing drain failed; some spans may be lost")
284+
except asyncio.CancelledError:
285+
raise
286+
except Exception:
287+
logger.exception("Async tracing worker crashed")
288+
289+
async def _drain(self) -> None:
290+
if self._queue is None or self.sgp_async_client is None:
291+
return
292+
while not self._queue.empty():
293+
batch: list[dict] = []
294+
while len(batch) < DEFAULT_MAX_BATCH_SIZE and not self._queue.empty():
295+
try:
296+
sgp_span = self._queue.get_nowait()
297+
except asyncio.QueueEmpty:
298+
break
299+
try:
300+
batch.append(sgp_span.to_request_params())
301+
except Exception:
302+
logger.exception("Failed to build span params; dropping span")
303+
if not batch:
304+
continue
305+
await self._upsert_with_retry(batch)
306+
307+
async def _upsert_with_retry(self, batch: list[dict]) -> None:
308+
if self.sgp_async_client is None:
309+
return
310+
backoff = INITIAL_BACKOFF
311+
for attempt in range(DEFAULT_RETRIES):
312+
try:
313+
await self.sgp_async_client.spans.upsert_batch(items=batch) # type: ignore[arg-type]
314+
return
315+
except APIError as exc:
316+
if attempt == DEFAULT_RETRIES - 1:
317+
logger.error(f"Failed to export {len(batch)} spans after {DEFAULT_RETRIES} attempts: {exc.message}")
318+
return
319+
logger.warning(f"Span export failed ({exc.message}); retrying in {backoff:.1f}s")
320+
await asyncio.sleep(backoff)
321+
backoff = min(backoff * 2, MAX_BACKOFF)
322+
except asyncio.CancelledError:
323+
raise
324+
except Exception:
325+
logger.exception(f"Unexpected error exporting {len(batch)} spans; dropping batch")
326+
return

tests/lib/core/tracing/processors/test_sgp_tracing_processor.py

Lines changed: 86 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,9 @@ def _make_processor():
4848
mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)
4949
mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span())
5050

51-
with patch(f"{MODULE}.EnvironmentVariables", mock_env), \
52-
patch(f"{MODULE}.SGPClient"), \
53-
patch(f"{MODULE}.tracing"), \
54-
patch(f"{MODULE}.flush_queue"), \
55-
patch(f"{MODULE}.create_span", mock_create_span):
51+
with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.SGPClient"), patch(
52+
f"{MODULE}.tracing"
53+
), patch(f"{MODULE}.flush_queue"), patch(f"{MODULE}.create_span", mock_create_span):
5654
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
5755
SGPSyncTracingProcessor,
5856
)
@@ -113,9 +111,9 @@ def _make_processor():
113111
mock_async_client = MagicMock()
114112
mock_async_client.spans.upsert_batch = AsyncMock()
115113

116-
with patch(f"{MODULE}.EnvironmentVariables", mock_env), \
117-
patch(f"{MODULE}.create_span", mock_create_span), \
118-
patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client):
114+
with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.create_span", mock_create_span), patch(
115+
f"{MODULE}.AsyncSGPClient", return_value=mock_async_client
116+
):
119117
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
120118
SGPAsyncTracingProcessor,
121119
)
@@ -164,7 +162,9 @@ async def test_span_end_for_unknown_span_is_noop(self):
164162
assert len(processor._spans) == 0
165163

166164
async def test_sgp_span_input_updated_on_end(self):
167-
"""on_span_end should update sgp_span.input from the incoming span."""
165+
"""on_span_end should mutate the tracked SGP span and enqueue it.
166+
With batched flushing, the upsert happens once on shutdown, with the
167+
final state of the span after both start and end have run."""
168168
processor, _ = self._make_processor()
169169

170170
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
@@ -175,16 +175,87 @@ async def test_sgp_span_input_updated_on_end(self):
175175
assert len(processor._spans) == 1
176176

177177
# Simulate modified input at end time
178-
updated_input: dict[str, object] = {"messages": [
179-
{"role": "user", "content": "hello"},
180-
{"role": "assistant", "content": "hi"},
181-
]}
178+
updated_input: dict[str, object] = {
179+
"messages": [
180+
{"role": "user", "content": "hello"},
181+
{"role": "assistant", "content": "hi"},
182+
]
183+
}
182184
span.input = updated_input
183185
span.output = {"response": "hi"}
184186
span.end_time = datetime.now(UTC)
185187
await processor.on_span_end(span)
186188

187189
# Span should be removed after end
188190
assert len(processor._spans) == 0
189-
# The end upsert should have been called
190-
assert processor.sgp_async_client.spans.upsert_batch.call_count == 2 # start + end
191+
192+
# No upsert on the hot path; the worker batches and flushes asynchronously.
193+
assert processor.sgp_async_client.spans.upsert_batch.call_count == 0
194+
195+
# Shutdown drains the queue and produces a single batched upsert.
196+
await processor.shutdown()
197+
assert processor.sgp_async_client.spans.upsert_batch.call_count == 1
198+
199+
200+
# ---------------------------------------------------------------------------
201+
# Async processor batching tests
202+
#
203+
# Before this change, on_span_start and on_span_end each issued an awaited
204+
# upsert_batch(items=[one]) call on the agent's hot path. The processor now
205+
# buffers events and flushes them in batches from a background asyncio.Task,
206+
# mirroring the SDK's TraceQueueManager.
207+
# ---------------------------------------------------------------------------
208+
209+
210+
class TestSGPAsyncTracingProcessorBatching:
211+
@staticmethod
212+
def _make_processor():
213+
mock_env = MagicMock()
214+
mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)
215+
216+
mock_async_client = MagicMock()
217+
mock_async_client.spans.upsert_batch = AsyncMock()
218+
219+
with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(
220+
f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()
221+
), patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client):
222+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
223+
SGPAsyncTracingProcessor,
224+
)
225+
226+
processor = SGPAsyncTracingProcessor(_make_config())
227+
228+
processor.sgp_async_client = mock_async_client
229+
return processor, mock_async_client
230+
231+
async def test_span_event_does_not_trigger_immediate_upsert(self):
232+
"""Regression: a single span event must not result in an upsert call
233+
on the hot path. Events must be enqueued and flushed by the worker."""
234+
processor, client = self._make_processor()
235+
236+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
237+
span = _make_span()
238+
await processor.on_span_start(span)
239+
240+
assert client.spans.upsert_batch.call_count == 0, "on_span_start should enqueue, not trigger a network call"
241+
242+
async def test_shutdown_flushes_queued_spans_in_one_batch(self):
243+
"""Many span events should be coalesced into a single upsert_batch
244+
call when the buffer fits under MAX_BATCH_SIZE (50)."""
245+
processor, client = self._make_processor()
246+
247+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
248+
for _ in range(5):
249+
span = _make_span()
250+
await processor.on_span_start(span)
251+
span.end_time = datetime.now(UTC)
252+
await processor.on_span_end(span)
253+
254+
await processor.shutdown()
255+
256+
assert client.spans.upsert_batch.call_count == 1, (
257+
f"Expected a single batched upsert, got {client.spans.upsert_batch.call_count}"
258+
)
259+
items = client.spans.upsert_batch.call_args.kwargs["items"]
260+
# 5 starts + 5 ends = 10 enqueued items, well under MAX_BATCH_SIZE.
261+
assert len(items) == 10, f"Expected 10 items in the batch, got {len(items)}"

0 commit comments

Comments
 (0)