Skip to content

Commit 958320a

Browse files
committed
perf(tracing): span queue linger + per-loop httpx keepalive
Two compounding causes of slow SGP trace export under load: - The async drain loop returned size-1 batches almost every time because there was no time window for spans to accumulate. Adds a 100ms linger (tunable via AGENTEX_SPAN_QUEUE_LINGER_MS) so concurrently-emitted spans coalesce into one upsert_batch call. - httpx keepalive was disabled (max_keepalive_connections=0) in SGPAsyncTracingProcessor, AgentexAsyncTracingProcessor, and the ADK TracingModule to avoid "bound to a different event loop" errors in sync-ACP. Each span paid a full TLS handshake. Replaced with a per-event-loop client cache keyed on id(asyncio.get_running_loop()); connections are reused within a loop and cross-loop safety is preserved. Tests cover linger coalescing, batch-size cap interaction, per-loop client caching, a keepalive-enabled regression guard, and disabled-processor null-client behavior.
1 parent 781dfe1 commit 958320a

6 files changed

Lines changed: 340 additions & 65 deletions

File tree

src/agentex/lib/adk/_modules/tracing.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,13 @@ def _tracing_service(self) -> TracingService:
6767
if self._tracing_service_lazy is None or (loop_id is not None and loop_id != self._bound_loop_id):
6868
import httpx
6969

70-
# Disable keepalive so each span HTTP call gets a fresh TCP
71-
# connection. Reused connections carry asyncio primitives bound
72-
# to the event loop that created them; in sync-ACP / streaming
73-
# contexts the loop context can shift between calls, causing
74-
# "bound to a different event loop" RuntimeErrors.
70+
# Keepalive ON: connections are reused within a single event
71+
# loop, eliminating the TLS-handshake-per-span penalty under
72+
# load. Cross-loop safety is preserved by rebuilding the
73+
# client whenever loop_id changes (the conditional above).
7574
agentex_client = create_async_agentex_client(
7675
http_client=httpx.AsyncClient(
77-
limits=httpx.Limits(max_keepalive_connections=0),
76+
limits=httpx.Limits(max_keepalive_connections=20),
7877
),
7978
)
8079
tracer = AsyncTracer(agentex_client)

src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from typing import Any, Dict, override
1+
import asyncio
2+
from typing import TYPE_CHECKING, Any, Dict, override
23

34
from agentex import Agentex
45
from agentex.types.span import Span
@@ -9,6 +10,9 @@
910
AsyncTracingProcessor,
1011
)
1112

13+
if TYPE_CHECKING:
14+
from agentex import AsyncAgentex
15+
1216

1317
class AgentexSyncTracingProcessor(SyncTracingProcessor):
1418
def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002
@@ -67,19 +71,35 @@ def shutdown(self) -> None:
6771

6872
class AgentexAsyncTracingProcessor(AsyncTracingProcessor):
6973
def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002
74+
# Per-event-loop client cache. httpx.AsyncClient is bound to the
75+
# loop that created it, so in sync-ACP / streaming contexts (where
76+
# the active loop can change between requests) we keep one client
77+
# per loop instead of disabling keepalive entirely.
78+
self._clients_by_loop_id: dict[int, "AsyncAgentex"] = {}
79+
80+
def _build_client(self) -> "AsyncAgentex":
7081
import httpx
7182

72-
# Disable keepalive so each span HTTP call gets a fresh TCP connection.
73-
# Reused connections carry asyncio primitives bound to the event loop
74-
# that created them; in sync-ACP / streaming contexts the loop context
75-
# can shift between calls, causing "bound to a different event loop"
76-
# RuntimeErrors.
77-
self.client = create_async_agentex_client(
83+
# Keepalive ON: connections are reused within a single event loop,
84+
# eliminating the TLS-handshake-per-span penalty under load.
85+
return create_async_agentex_client(
7886
http_client=httpx.AsyncClient(
79-
limits=httpx.Limits(max_keepalive_connections=0),
87+
limits=httpx.Limits(max_keepalive_connections=20),
8088
),
8189
)
8290

91+
@property
92+
def client(self) -> "AsyncAgentex":
93+
try:
94+
loop_id = id(asyncio.get_running_loop())
95+
except RuntimeError:
96+
return self._build_client()
97+
client = self._clients_by_loop_id.get(loop_id)
98+
if client is None:
99+
client = self._build_client()
100+
self._clients_by_loop_id[loop_id] = client
101+
return client
102+
83103
# TODO(AGX1-199): Add batch create/update endpoints to Agentex API and use
84104
# them here instead of one HTTP call per span.
85105
# https://linear.app/scale-epd/issue/AGX1-199/add-agentex-batch-endpoint-for-traces

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
from typing import cast, override
45

56
import scale_gp_beta.lib.tracing as tracing
@@ -92,23 +93,47 @@ def shutdown(self) -> None:
9293
class SGPAsyncTracingProcessor(AsyncTracingProcessor):
9394
def __init__(self, config: SGPTracingProcessorConfig):
9495
self.disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
96+
self._config = config
97+
# Per-event-loop client cache. httpx.AsyncClient ties its connection
98+
# pool to the loop it was created on; in sync-ACP / streaming contexts
99+
# the active loop can change between requests. Keying by loop id lets
100+
# us keep keepalive on within each loop while staying safe across
101+
# loops. The construction can also happen at module import time when
102+
# no loop is running, so we have to defer it until the first call.
103+
self._clients_by_loop_id: dict[int, AsyncSGPClient] = {}
104+
self.env_vars = EnvironmentVariables.refresh()
105+
106+
def _build_client(self) -> AsyncSGPClient:
95107
import httpx
96108

97-
# Disable keepalive so each HTTP call gets a fresh TCP connection,
98-
# avoiding "bound to a different event loop" errors in sync-ACP.
99-
self.sgp_async_client = (
100-
AsyncSGPClient(
101-
api_key=config.sgp_api_key,
102-
account_id=config.sgp_account_id,
103-
base_url=config.sgp_base_url,
104-
http_client=httpx.AsyncClient(
105-
limits=httpx.Limits(max_keepalive_connections=0),
106-
),
107-
)
108-
if not self.disabled
109-
else None
109+
return AsyncSGPClient(
110+
api_key=self._config.sgp_api_key,
111+
account_id=self._config.sgp_account_id,
112+
base_url=self._config.sgp_base_url,
113+
# Keepalive ON: connections are reused within a single event loop,
114+
# which removes the TLS-handshake-per-span penalty observed under
115+
# load. Cross-loop safety is preserved by the per-loop cache.
116+
http_client=httpx.AsyncClient(
117+
limits=httpx.Limits(max_keepalive_connections=20),
118+
),
110119
)
111-
self.env_vars = EnvironmentVariables.refresh()
120+
121+
def _get_client(self) -> AsyncSGPClient | None:
122+
"""Return the AsyncSGPClient bound to the current event loop, creating
123+
one on first use. Returns None when the processor is disabled."""
124+
if self.disabled:
125+
return None
126+
try:
127+
loop_id = id(asyncio.get_running_loop())
128+
except RuntimeError:
129+
# Called from outside an event loop — should not happen on the
130+
# hot path, but build a one-off client rather than crashing.
131+
return self._build_client()
132+
client = self._clients_by_loop_id.get(loop_id)
133+
if client is None:
134+
client = self._build_client()
135+
self._clients_by_loop_id[loop_id] = client
136+
return client
112137

113138
@override
114139
async def on_span_start(self, span: Span) -> None:
@@ -123,31 +148,29 @@ async def on_spans_start(self, spans: list[Span]) -> None:
123148
if not spans:
124149
return
125150

126-
sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans]
127-
128-
if self.disabled:
151+
client = self._get_client()
152+
if client is None:
129153
logger.warning("SGP is disabled, skipping span upsert")
130154
return
131-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
132-
items=[s.to_request_params() for s in sgp_spans]
133-
)
155+
156+
sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans]
157+
await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans])
134158

135159
@override
136160
async def on_spans_end(self, spans: list[Span]) -> None:
137161
if not spans:
138162
return
139163

164+
client = self._get_client()
165+
if client is None:
166+
return
167+
140168
sgp_spans: list[SGPSpan] = []
141169
for span in spans:
142170
sgp_span = _build_sgp_span(span, self.env_vars)
143171
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
144172
sgp_spans.append(sgp_span)
145-
146-
if self.disabled:
147-
return
148-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
149-
items=[s.to_request_params() for s in sgp_spans]
150-
)
173+
await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans])
151174

152175
@override
153176
async def shutdown(self) -> None:

src/agentex/lib/core/tracing/span_queue.py

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import os
34
import asyncio
45
from enum import Enum
56
from dataclasses import dataclass
@@ -13,6 +14,25 @@
1314
logger = make_logger(__name__)
1415

1516
_DEFAULT_BATCH_SIZE = 50
17+
_DEFAULT_LINGER_MS = 100
18+
19+
20+
def _read_linger_ms_env() -> int:
21+
"""Read AGENTEX_SPAN_QUEUE_LINGER_MS from the environment, falling back to
22+
_DEFAULT_LINGER_MS when unset or unparseable. Negative values are clamped
23+
to 0 (i.e. "drain immediately, no linger")."""
24+
raw = os.environ.get("AGENTEX_SPAN_QUEUE_LINGER_MS")
25+
if raw is None:
26+
return _DEFAULT_LINGER_MS
27+
try:
28+
return max(0, int(raw))
29+
except ValueError:
30+
logger.warning(
31+
"Ignoring invalid AGENTEX_SPAN_QUEUE_LINGER_MS=%r; using default %d ms",
32+
raw,
33+
_DEFAULT_LINGER_MS,
34+
)
35+
return _DEFAULT_LINGER_MS
1636

1737

1838
class SpanEventType(str, Enum):
@@ -35,13 +55,23 @@ class AsyncSpanQueue:
3555
batch are flushed concurrently, then all END events, so that per-span
3656
start-before-end ordering is preserved while HTTP calls for independent
3757
spans execute in parallel.
58+
59+
Once the drain loop picks up the first item, it lingers up to
60+
``linger_ms`` waiting for more items to coalesce into the same batch.
61+
Without the linger the drain almost always returned size-1 batches under
62+
real agent workloads, because spans typically arrive a few ms apart.
3863
"""
3964

40-
def __init__(self, batch_size: int = _DEFAULT_BATCH_SIZE) -> None:
65+
def __init__(
66+
self,
67+
batch_size: int = _DEFAULT_BATCH_SIZE,
68+
linger_ms: int | None = None,
69+
) -> None:
4170
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue()
4271
self._drain_task: asyncio.Task[None] | None = None
4372
self._stopping = False
4473
self._batch_size = batch_size
74+
self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms)
4575

4676
def enqueue(
4777
self,
@@ -69,12 +99,30 @@ async def _drain_loop(self) -> None:
6999
first = await self._queue.get()
70100
batch: list[_SpanQueueItem] = [first]
71101

72-
# Opportunistically grab more ready items (non-blocking).
73-
while len(batch) < self._batch_size:
74-
try:
75-
batch.append(self._queue.get_nowait())
76-
except asyncio.QueueEmpty:
77-
break
102+
# Linger briefly so spans emitted within the window coalesce into
103+
# one batch. Stop early when the batch fills, when the linger
104+
# window elapses, or as soon as the queue is briefly empty *after*
105+
# the deadline.
106+
if self._linger_ms > 0 and not self._stopping:
107+
loop = asyncio.get_running_loop()
108+
deadline = loop.time() + (self._linger_ms / 1000.0)
109+
while len(batch) < self._batch_size:
110+
remaining = deadline - loop.time()
111+
if remaining <= 0:
112+
break
113+
try:
114+
batch.append(
115+
await asyncio.wait_for(self._queue.get(), timeout=remaining)
116+
)
117+
except asyncio.TimeoutError:
118+
break
119+
else:
120+
# No linger — drain whatever is already queued and stop.
121+
while len(batch) < self._batch_size:
122+
try:
123+
batch.append(self._queue.get_nowait())
124+
except asyncio.QueueEmpty:
125+
break
78126

79127
try:
80128
# Separate START and END events. Processing all STARTs before

0 commit comments

Comments
 (0)