Skip to content

Commit 1753208

Browse files
NiteshDhanpalclaude
andcommitted
refactor(tracing): split span batch/linger tuning out of this PR
Reverts the span_queue.py batch_size/linger_ms changes (and their tests) so this PR stays scoped to end-only ingest (sgp_tracing_processor.py). The batching throughput tuning ships separately in #397. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 7001514 commit 1753208

2 files changed

Lines changed: 5 additions & 51 deletions

File tree

src/agentex/lib/core/tracing/span_queue.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,8 @@
1515

1616
logger = make_logger(__name__)
1717

18-
# Max spans coalesced into one ``upsert_batch`` HTTP call (one
19-
# ``INSERT ... ON CONFLICT`` statement server-side). Larger batches amortize
20-
# the per-request round trip and the per-statement parse/plan + index
21-
# maintenance overhead, which dominates at high span volume. Kept well under
22-
# the EGP backend's 1000-row cap; tune per-deploy via
23-
# ``AGENTEX_SPAN_QUEUE_BATCH_SIZE``.
24-
_DEFAULT_BATCH_SIZE = 200
25-
# Max time the drain lingers after the first span to let a batch fill. Spans
26-
# typically arrive a few ms apart, so a longer linger fills the larger batch
27-
# above rather than shipping near-size-1 batches; bounded so worst-case ingest
28-
# latency (and the in-flight loss window) stays sub-second.
29-
_DEFAULT_LINGER_MS = 250
18+
_DEFAULT_BATCH_SIZE = 50
19+
_DEFAULT_LINGER_MS = 100
3020
# 0 == unbounded (preserves prior behavior). A bound makes backpressure
3121
# visible (dropped spans are counted) and caps worst-case memory.
3222
_DEFAULT_MAX_SIZE = 0
@@ -124,7 +114,7 @@ class AsyncSpanQueue:
124114

125115
def __init__(
126116
self,
127-
batch_size: int | None = None,
117+
batch_size: int = _DEFAULT_BATCH_SIZE,
128118
linger_ms: int | None = None,
129119
max_size: int | None = None,
130120
max_retries: int | None = None,
@@ -136,11 +126,7 @@ def __init__(
136126
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=resolved_max_size)
137127
self._drain_task: asyncio.Task[None] | None = None
138128
self._stopping = False
139-
self._batch_size = (
140-
_read_int_env("AGENTEX_SPAN_QUEUE_BATCH_SIZE", _DEFAULT_BATCH_SIZE, minimum=1)
141-
if batch_size is None
142-
else max(1, batch_size)
143-
)
129+
self._batch_size = batch_size
144130
self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms)
145131
self._max_retries = (
146132
_read_int_env("AGENTEX_SPAN_QUEUE_MAX_RETRIES", _DEFAULT_MAX_RETRIES, minimum=1)

tests/lib/core/tracing/test_span_queue.py

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,7 @@
88
from unittest.mock import AsyncMock, MagicMock, patch
99

1010
from agentex.types.span import Span
11-
from agentex.lib.core.tracing.span_queue import (
12-
_DEFAULT_BATCH_SIZE,
13-
SpanEventType,
14-
AsyncSpanQueue,
15-
)
11+
from agentex.lib.core.tracing.span_queue import SpanEventType, AsyncSpanQueue
1612

1713

1814
def _make_span(span_id: str | None = None) -> Span:
@@ -863,31 +859,3 @@ async def test_enqueue_overhead_with_metrics_disabled(self, monkeypatch):
863859

864860
assert elapsed < 0.05, f"disabled metrics enqueue too slow: {elapsed:.3f}s"
865861
mock_get.assert_not_called()
866-
867-
868-
class TestAsyncSpanQueueBatchSizeConfig:
869-
"""batch_size resolution: explicit arg > AGENTEX_SPAN_QUEUE_BATCH_SIZE env > default."""
870-
871-
async def test_default_batch_size(self, monkeypatch):
872-
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
873-
assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE
874-
875-
async def test_explicit_arg_overrides_default(self, monkeypatch):
876-
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
877-
assert AsyncSpanQueue(batch_size=10)._batch_size == 10
878-
879-
async def test_explicit_arg_clamped_to_min_one(self, monkeypatch):
880-
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
881-
assert AsyncSpanQueue(batch_size=0)._batch_size == 1
882-
883-
async def test_env_used_when_arg_is_none(self, monkeypatch):
884-
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500")
885-
assert AsyncSpanQueue()._batch_size == 500
886-
887-
async def test_explicit_arg_beats_env(self, monkeypatch):
888-
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500")
889-
assert AsyncSpanQueue(batch_size=7)._batch_size == 7
890-
891-
async def test_invalid_env_falls_back_to_default(self, monkeypatch):
892-
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "not-an-int")
893-
assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE

0 commit comments

Comments
 (0)