Skip to content

Commit 9ae9b5a

Browse files
NiteshDhanpalclaude
andcommitted
perf(tracing): larger span batch (50→200) + linger (100→250ms) for high-volume ingest
Coalesce more spans into each upsert_batch HTTP call to amortize the per-request round trip and the per-statement parse/plan + index-maintenance overhead that dominates at high span volume. batch_size is now env-tunable via AGENTEX_SPAN_QUEUE_BATCH_SIZE (default 200), and linger rises to 250ms so batches fill rather than shipping near-size-1. Split out of #394 (end-only ingest) so each change ships and is reviewed independently. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 4d7cf48 commit 9ae9b5a

2 files changed

Lines changed: 51 additions & 5 deletions

File tree

src/agentex/lib/core/tracing/span_queue.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,18 @@
1515

1616
logger = make_logger(__name__)
1717

18-
_DEFAULT_BATCH_SIZE = 50
19-
_DEFAULT_LINGER_MS = 100
18+
# Max spans coalesced into one ``upsert_batch`` HTTP call (one
19+
# ``INSERT ... ON CONFLICT`` statement server-side). Larger batches amortize
20+
# the per-request round trip and the per-statement parse/plan + index
21+
# maintenance overhead, which dominates at high span volume. Kept well under
22+
# the EGP backend's 1000-row cap; tune per-deploy via
23+
# ``AGENTEX_SPAN_QUEUE_BATCH_SIZE``.
24+
_DEFAULT_BATCH_SIZE = 200
25+
# Max time the drain lingers after the first span to let a batch fill. Spans
26+
# typically arrive a few ms apart, so a longer linger fills the larger batch
27+
# above rather than shipping near-size-1 batches; bounded so worst-case ingest
28+
# latency (and the in-flight loss window) stays sub-second.
29+
_DEFAULT_LINGER_MS = 250
2030
# 0 == unbounded (preserves prior behavior). A bound makes backpressure
2131
# visible (dropped spans are counted) and caps worst-case memory.
2232
_DEFAULT_MAX_SIZE = 0
@@ -114,7 +124,7 @@ class AsyncSpanQueue:
114124

115125
def __init__(
116126
self,
117-
batch_size: int = _DEFAULT_BATCH_SIZE,
127+
batch_size: int | None = None,
118128
linger_ms: int | None = None,
119129
max_size: int | None = None,
120130
max_retries: int | None = None,
@@ -126,7 +136,11 @@ def __init__(
126136
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=resolved_max_size)
127137
self._drain_task: asyncio.Task[None] | None = None
128138
self._stopping = False
129-
self._batch_size = batch_size
139+
self._batch_size = (
140+
_read_int_env("AGENTEX_SPAN_QUEUE_BATCH_SIZE", _DEFAULT_BATCH_SIZE, minimum=1)
141+
if batch_size is None
142+
else max(1, batch_size)
143+
)
130144
self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms)
131145
self._max_retries = (
132146
_read_int_env("AGENTEX_SPAN_QUEUE_MAX_RETRIES", _DEFAULT_MAX_RETRIES, minimum=1)

tests/lib/core/tracing/test_span_queue.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88
from unittest.mock import AsyncMock, MagicMock, patch
99

1010
from agentex.types.span import Span
11-
from agentex.lib.core.tracing.span_queue import SpanEventType, AsyncSpanQueue
11+
from agentex.lib.core.tracing.span_queue import (
12+
_DEFAULT_BATCH_SIZE,
13+
SpanEventType,
14+
AsyncSpanQueue,
15+
)
1216

1317

1418
def _make_span(span_id: str | None = None) -> Span:
@@ -859,3 +863,31 @@ async def test_enqueue_overhead_with_metrics_disabled(self, monkeypatch):
859863

860864
assert elapsed < 0.05, f"disabled metrics enqueue too slow: {elapsed:.3f}s"
861865
mock_get.assert_not_called()
866+
867+
868+
class TestAsyncSpanQueueBatchSizeConfig:
869+
"""batch_size resolution: explicit arg > AGENTEX_SPAN_QUEUE_BATCH_SIZE env > default."""
870+
871+
async def test_default_batch_size(self, monkeypatch):
872+
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
873+
assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE
874+
875+
async def test_explicit_arg_overrides_default(self, monkeypatch):
876+
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
877+
assert AsyncSpanQueue(batch_size=10)._batch_size == 10
878+
879+
async def test_explicit_arg_clamped_to_min_one(self, monkeypatch):
880+
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
881+
assert AsyncSpanQueue(batch_size=0)._batch_size == 1
882+
883+
async def test_env_used_when_arg_is_none(self, monkeypatch):
884+
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500")
885+
assert AsyncSpanQueue()._batch_size == 500
886+
887+
async def test_explicit_arg_beats_env(self, monkeypatch):
888+
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500")
889+
assert AsyncSpanQueue(batch_size=7)._batch_size == 7
890+
891+
async def test_invalid_env_falls_back_to_default(self, monkeypatch):
892+
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "not-an-int")
893+
assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE

0 commit comments

Comments
 (0)