Skip to content

Commit bb6f046

Browse files
committed
fix(tracing): make SGP processor stateless to stop dropping span closes
In multi-replica Temporal deployments, START_SPAN and END_SPAN run as separate Temporal activities and can land on different worker pods. The SGP processor previously stored started SGPSpans in a per-process dict; on END the lookup would miss on the foreign pod and the close upsert was silently dropped, leaving spans stuck on "Running" in the SGP UI. Drop the _spans cache entirely. Both on_spans_start and on_spans_end build a fresh SGPSpan from the agentex Span fields; SGP's upsert_batch is idempotent on span_id, so re-sending start fields on close is safe. Side benefit: also eliminates a START-pod memory leak that the Slack-proposed fall-back variant would have left in place (dict entries on pod A are never reclaimed when END lands on pod B). Reported by TASMU (Government of Qatar). Confirmed on agentex-sdk 0.9.6 and 0.11.0.
1 parent 73eca7a commit bb6f046

2 files changed

Lines changed: 128 additions & 132 deletions

File tree

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 47 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from typing import override
3+
from typing import cast, override
44

55
import scale_gp_beta.lib.tracing as tracing
66
from scale_gp_beta import SGPClient, AsyncSGPClient
@@ -38,7 +38,6 @@ def __init__(self, config: SGPTracingProcessorConfig):
3838
),
3939
disabled=disabled,
4040
)
41-
self._spans: dict[str, SGPSpan] = {}
4241
self.env_vars = EnvironmentVariables.refresh()
4342

4443
def _add_source_to_span(self, span: Span) -> None:
@@ -53,48 +52,44 @@ def _add_source_to_span(self, span: Span) -> None:
5352
if self.env_vars.AGENT_ID is not None:
5453
span.data["__agent_id__"] = self.env_vars.AGENT_ID
5554

56-
@override
57-
def on_span_start(self, span: Span) -> None:
55+
def _build_sgp_span(self, span: Span) -> SGPSpan:
56+
"""Build an SGPSpan from an agentex Span. Idempotent on span_id at the SGP backend."""
5857
self._add_source_to_span(span)
59-
60-
sgp_span = create_span(
61-
name=span.name,
62-
span_type=_get_span_type(span),
63-
span_id=span.id,
64-
parent_id=span.parent_id,
65-
trace_id=span.trace_id,
66-
input=span.input,
67-
output=span.output,
68-
metadata=span.data,
58+
sgp_span = cast(
59+
SGPSpan,
60+
create_span(
61+
name=span.name,
62+
span_type=_get_span_type(span),
63+
span_id=span.id,
64+
parent_id=span.parent_id,
65+
trace_id=span.trace_id,
66+
input=span.input,
67+
output=span.output,
68+
metadata=span.data,
69+
),
6970
)
7071
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]
71-
sgp_span.flush(blocking=False)
72+
return sgp_span
7273

73-
self._spans[span.id] = sgp_span
74+
@override
75+
def on_span_start(self, span: Span) -> None:
76+
sgp_span = self._build_sgp_span(span)
77+
sgp_span.flush(blocking=False)
7478

7579
@override
7680
def on_span_end(self, span: Span) -> None:
77-
sgp_span = self._spans.pop(span.id, None)
78-
if sgp_span is None:
79-
logger.warning(f"Span {span.id} not found in stored spans, skipping span end")
80-
return
81-
82-
self._add_source_to_span(span)
83-
sgp_span.output = span.output # type: ignore[assignment]
84-
sgp_span.metadata = span.data # type: ignore[assignment]
81+
sgp_span = self._build_sgp_span(span)
8582
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
8683
sgp_span.flush(blocking=False)
8784

8885
@override
8986
def shutdown(self) -> None:
90-
self._spans.clear()
9187
flush_queue()
9288

9389

9490
class SGPAsyncTracingProcessor(AsyncTracingProcessor):
9591
def __init__(self, config: SGPTracingProcessorConfig):
9692
self.disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
97-
self._spans: dict[str, SGPSpan] = {}
9893
import httpx
9994

10095
# Disable keepalive so each HTTP call gets a fresh TCP connection,
@@ -125,6 +120,25 @@ def _add_source_to_span(self, span: Span) -> None:
125120
if self.env_vars.AGENT_ID is not None:
126121
span.data["__agent_id__"] = self.env_vars.AGENT_ID
127122

123+
def _build_sgp_span(self, span: Span) -> SGPSpan:
124+
"""Build an SGPSpan from an agentex Span. Idempotent on span_id at the SGP backend."""
125+
self._add_source_to_span(span)
126+
sgp_span = cast(
127+
SGPSpan,
128+
create_span(
129+
name=span.name,
130+
span_type=_get_span_type(span),
131+
span_id=span.id,
132+
parent_id=span.parent_id,
133+
trace_id=span.trace_id,
134+
input=span.input,
135+
output=span.output,
136+
metadata=span.data,
137+
),
138+
)
139+
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]
140+
return sgp_span
141+
128142
@override
129143
async def on_span_start(self, span: Span) -> None:
130144
await self.on_spans_start([span])
@@ -138,22 +152,7 @@ async def on_spans_start(self, spans: list[Span]) -> None:
138152
if not spans:
139153
return
140154

141-
sgp_spans: list[SGPSpan] = []
142-
for span in spans:
143-
self._add_source_to_span(span)
144-
sgp_span = create_span(
145-
name=span.name,
146-
span_type=_get_span_type(span),
147-
span_id=span.id,
148-
parent_id=span.parent_id,
149-
trace_id=span.trace_id,
150-
input=span.input,
151-
output=span.output,
152-
metadata=span.data,
153-
)
154-
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]
155-
self._spans[span.id] = sgp_span
156-
sgp_spans.append(sgp_span)
155+
sgp_spans = [self._build_sgp_span(span) for span in spans]
157156

158157
if self.disabled:
159158
logger.warning("SGP is disabled, skipping span upsert")
@@ -167,29 +166,18 @@ async def on_spans_end(self, spans: list[Span]) -> None:
167166
if not spans:
168167
return
169168

170-
to_upsert: list[SGPSpan] = []
169+
sgp_spans: list[SGPSpan] = []
171170
for span in spans:
172-
sgp_span = self._spans.pop(span.id, None)
173-
if sgp_span is None:
174-
logger.warning(f"Span {span.id} not found in stored spans, skipping span end")
175-
continue
176-
177-
self._add_source_to_span(span)
178-
sgp_span.input = span.input # type: ignore[assignment]
179-
sgp_span.output = span.output # type: ignore[assignment]
180-
sgp_span.metadata = span.data # type: ignore[assignment]
171+
sgp_span = self._build_sgp_span(span)
181172
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
182-
to_upsert.append(sgp_span)
173+
sgp_spans.append(sgp_span)
183174

184-
if self.disabled or not to_upsert:
175+
if self.disabled:
185176
return
186177
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
187-
items=[s.to_request_params() for s in to_upsert]
178+
items=[s.to_request_params() for s in sgp_spans]
188179
)
189180

190181
@override
191182
async def shutdown(self) -> None:
192-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
193-
items=[sgp_span.to_request_params() for sgp_span in self._spans.values()]
194-
)
195-
self._spans.clear()
183+
pass

0 commit comments

Comments
 (0)