1515
1616logger = make_logger (__name__ )
1717
18- _DEFAULT_BATCH_SIZE = 50
19- _DEFAULT_LINGER_MS = 100
18+ # Max spans coalesced into one ``upsert_batch`` HTTP call (one
19+ # ``INSERT ... ON CONFLICT`` statement server-side). Larger batches amortize
20+ # the per-request round trip and the per-statement parse/plan + index
21+ # maintenance overhead, which dominates at high span volume. Kept well under
22+ # the EGP backend's 1000-row cap; tune per-deploy via
23+ # ``AGENTEX_SPAN_QUEUE_BATCH_SIZE``.
24+ _DEFAULT_BATCH_SIZE = 200
25+ # Max time the drain lingers after the first span to let a batch fill. Spans
26+ # typically arrive a few ms apart, so a longer linger fills the larger batch
27+ # above rather than shipping near-size-1 batches; bounded so worst-case ingest
28+ # latency (and the in-flight loss window) stays sub-second.
29+ _DEFAULT_LINGER_MS = 250
2030# 0 == unbounded (preserves prior behavior). A bound makes backpressure
2131# visible (dropped spans are counted) and caps worst-case memory.
2232_DEFAULT_MAX_SIZE = 0
@@ -114,7 +124,7 @@ class AsyncSpanQueue:
114124
115125 def __init__ (
116126 self ,
117- batch_size : int = _DEFAULT_BATCH_SIZE ,
127+ batch_size : int | None = None ,
118128 linger_ms : int | None = None ,
119129 max_size : int | None = None ,
120130 max_retries : int | None = None ,
@@ -126,7 +136,11 @@ def __init__(
126136 self ._queue : asyncio .Queue [_SpanQueueItem ] = asyncio .Queue (maxsize = resolved_max_size )
127137 self ._drain_task : asyncio .Task [None ] | None = None
128138 self ._stopping = False
129- self ._batch_size = batch_size
139+ self ._batch_size = (
140+ _read_int_env ("AGENTEX_SPAN_QUEUE_BATCH_SIZE" , _DEFAULT_BATCH_SIZE , minimum = 1 )
141+ if batch_size is None
142+ else max (1 , batch_size )
143+ )
130144 self ._linger_ms = _read_linger_ms_env () if linger_ms is None else max (0 , linger_ms )
131145 self ._max_retries = (
132146 _read_int_env ("AGENTEX_SPAN_QUEUE_MAX_RETRIES" , _DEFAULT_MAX_RETRIES , minimum = 1 )
0 commit comments