Skip to content

Commit 82e2f8e

Browse files
feat(tracing): log disabled-state warning once at init plus 4 edge-case tests
Two related changes addressing review feedback: 1. Restore observability for disabled processors. When the rewrite moved the per-event 'SGP is disabled, skipping span upsert' warning out of on_span_start it left the disabled state silent. The original per-event log was spammy (one entry per span event at agent throughput) and inconsistent (only on_span_start had it, on_span_end did not). Replace it with a single warning at __init__ time. 2. Edge-case tests in TestSGPAsyncTracingProcessorEdgeCases: - test_disabled_processor_never_enqueues_or_calls_upsert: confirms a disabled processor builds no client, no queue, no worker, and shutdown is a no-op. - test_shutdown_is_safe_when_called_multiple_times: idempotency regression. Second shutdown after the worker has already exited does not re-flush or raise. - test_shutdown_before_any_event_is_noop: shutdown invoked before any span event must early-return without spinning up a worker. - test_apierror_triggers_retry_then_drops_batch_on_exhaustion: APIError is retried up to DEFAULT_RETRIES, batch is dropped after exhaustion. asyncio.sleep is patched to keep the test fast.
1 parent e6ea039 commit 82e2f8e

2 files changed

Lines changed: 117 additions & 0 deletions

File tree

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,13 @@ def __init__(self, config: SGPTracingProcessorConfig):
149149
self._shutdown_event: Optional[asyncio.Event] = None
150150
self._flush_event: Optional[asyncio.Event] = None
151151

152+
if self.disabled:
153+
# Log once at init rather than on every span event, which would
154+
# flood logs at agent throughput.
155+
logger.warning(
156+
"SGP tracing is disabled (sgp_api_key or sgp_account_id missing); span events will be ignored"
157+
)
158+
152159
def _add_source_to_span(self, span: Span) -> None:
153160
if span.data is None:
154161
span.data = {}

tests/lib/core/tracing/processors/test_sgp_tracing_processor.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,3 +321,113 @@ async def test_worker_continues_after_unexpected_exception_in_one_batch(self):
321321
f"Worker should have made a second upsert attempt after the first failed; "
322322
f"got {client.spans.upsert_batch.call_count}"
323323
)
324+
325+
326+
# ---------------------------------------------------------------------------
327+
# Edge-case correctness tests
328+
# ---------------------------------------------------------------------------
329+
330+
331+
class TestSGPAsyncTracingProcessorEdgeCases:
332+
async def test_disabled_processor_never_enqueues_or_calls_upsert(self):
333+
"""When the config has no api_key / account_id, the processor must
334+
be a no-op: no client constructed, no worker spun up, no upsert
335+
calls. Only span tracking in `_spans` is preserved (matches the
336+
sync processor's contract)."""
337+
env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)))
338+
with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch(
339+
f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()
340+
), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls:
341+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
342+
SGPAsyncTracingProcessor,
343+
)
344+
345+
disabled_config = SGPTracingProcessorConfig(sgp_api_key="", sgp_account_id="")
346+
processor = SGPAsyncTracingProcessor(disabled_config)
347+
348+
assert processor.disabled is True
349+
assert processor.sgp_async_client is None, "Disabled processor must not construct a client"
350+
mock_client_cls.assert_not_called()
351+
352+
span = _make_span()
353+
await processor.on_span_start(span)
354+
span.end_time = datetime.now(UTC)
355+
await processor.on_span_end(span)
356+
357+
# No worker, no queue.
358+
assert processor._worker is None
359+
assert processor._queue is None
360+
361+
# Shutdown is also a no-op.
362+
await processor.shutdown()
363+
364+
async def test_shutdown_is_safe_when_called_multiple_times(self):
365+
"""Shutdown must be idempotent: a second call after the worker has
366+
already exited cleanly should not raise, double-flush, or hang."""
367+
processor, client = TestSGPAsyncTracingProcessorBatching._make_processor()
368+
369+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
370+
span = _make_span()
371+
await processor.on_span_start(span)
372+
span.end_time = datetime.now(UTC)
373+
await processor.on_span_end(span)
374+
375+
await processor.shutdown()
376+
first_call_count = client.spans.upsert_batch.call_count
377+
assert first_call_count == 1
378+
379+
# Second shutdown: worker is already done; should not raise or
380+
# produce additional upserts since _spans is already cleared and
381+
# the queue has been drained.
382+
await processor.shutdown()
383+
assert client.spans.upsert_batch.call_count == first_call_count, (
384+
"Calling shutdown twice must not produce extra upserts"
385+
)
386+
387+
async def test_shutdown_before_any_event_is_noop(self):
388+
"""If shutdown runs before any span event, the worker was never
389+
started; it must early-return without spinning anything up just to
390+
tear it down."""
391+
env_mock = MagicMock(refresh=MagicMock(return_value=MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)))
392+
with patch(f"{MODULE}.EnvironmentVariables", env_mock), patch(f"{MODULE}.AsyncSGPClient") as mock_client_cls:
393+
mock_client_cls.return_value = MagicMock(spans=MagicMock(upsert_batch=AsyncMock()))
394+
395+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
396+
SGPAsyncTracingProcessor,
397+
)
398+
399+
processor = SGPAsyncTracingProcessor(_make_config())
400+
assert processor._worker is None
401+
402+
await processor.shutdown()
403+
404+
assert processor._worker is None, "Shutdown must not spin up a worker just to tear it down"
405+
406+
async def test_apierror_triggers_retry_then_drops_batch_on_exhaustion(self):
407+
"""`APIError` must be retried up to DEFAULT_RETRIES times. After
408+
exhaustion, the batch is dropped and the worker continues."""
409+
from scale_gp_beta._exceptions import APIError
410+
411+
processor, client = TestSGPAsyncTracingProcessorBatching._make_processor()
412+
413+
# Make every attempt raise APIError so we exhaust the retry budget.
414+
api_error = APIError(message="boom", request=MagicMock(), body=None)
415+
client.spans.upsert_batch.side_effect = api_error
416+
417+
# Patch sleep so retries don't block the test on real backoff timing.
418+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()), patch(
419+
"asyncio.sleep", new=AsyncMock()
420+
):
421+
span = _make_span()
422+
await processor.on_span_start(span)
423+
span.end_time = datetime.now(UTC)
424+
await processor.on_span_end(span)
425+
426+
await processor.shutdown()
427+
428+
# 4 attempts, all failed. Batch dropped. Importantly, no fifth call.
429+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import DEFAULT_RETRIES
430+
431+
assert client.spans.upsert_batch.call_count == DEFAULT_RETRIES, (
432+
f"Expected exactly {DEFAULT_RETRIES} attempts before dropping; got {client.spans.upsert_batch.call_count}"
433+
)

0 commit comments

Comments
 (0)