Skip to content

Commit ae1c7ca

Browse files
perf(tracing): skip span-start upsert by default (end-only ingest) (#394)
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 9825dba commit ae1c7ca

2 files changed

Lines changed: 141 additions & 6 deletions

File tree

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import os
34
import asyncio
45
import weakref
56
from typing import cast, override
@@ -22,6 +23,28 @@
2223
logger = make_logger(__name__)
2324

2425

26+
_SKIP_SPAN_START_ENV = "AGENTEX_TRACING_SKIP_SPAN_START"
27+
28+
29+
def _skip_span_start_enabled() -> bool:
30+
"""Whether to skip the span-start upsert and write each span only on end.
31+
32+
Tracing writes each span twice — once on start (no ``end_time``) and once
33+
on end. The start row is only ever overwritten by the end write moments
34+
later, so persisting it doubles span-ingest write volume and, on the SGP
35+
backend, costs a non-HOT UPDATE (tsvector/GIN recompute + index churn) plus
36+
a dead tuple per span. Skipping the start makes the end write a single
37+
INSERT.
38+
39+
Default ON. Set ``AGENTEX_TRACING_SKIP_SPAN_START`` to
40+
``0``/``false``/``no``/``off`` to restore the start write — e.g. if you
41+
need in-flight spans visible before they complete, or spans that never end
42+
(process crash) to still be persisted.
43+
"""
44+
raw = os.environ.get(_SKIP_SPAN_START_ENV, "1").strip().lower()
45+
return raw not in ("0", "false", "no", "off")
46+
47+
2548
def _get_span_type(span: Span) -> str:
2649
"""Read span_type from span.data['__span_type__'], defaulting to STANDALONE."""
2750
if isinstance(span.data, dict):
@@ -75,9 +98,18 @@ def __init__(self, config: SGPTracingProcessorConfig):
7598
disabled=disabled,
7699
)
77100
self.env_vars = EnvironmentVariables.refresh()
101+
logger.info(
102+
"SGP tracing span-start upsert %s (%s)",
103+
"disabled — end-only ingest" if _skip_span_start_enabled() else "enabled",
104+
_SKIP_SPAN_START_ENV,
105+
)
78106

79107
@override
80108
def on_span_start(self, span: Span) -> None:
109+
# End-only ingest: by default the start write is skipped (see
110+
# _skip_span_start_enabled) so each span is persisted once, on end.
111+
if _skip_span_start_enabled():
112+
return
81113
sgp_span = _build_sgp_span(span, self.env_vars)
82114
sgp_span.flush(blocking=False)
83115

@@ -107,6 +139,11 @@ def __init__(self, config: SGPTracingProcessorConfig):
107139
asyncio.AbstractEventLoop, AsyncSGPClient
108140
] = weakref.WeakKeyDictionary()
109141
self.env_vars = EnvironmentVariables.refresh()
142+
logger.info(
143+
"SGP tracing span-start upsert %s (%s)",
144+
"disabled — end-only ingest" if _skip_span_start_enabled() else "enabled",
145+
_SKIP_SPAN_START_ENV,
146+
)
110147

111148
def _build_client(self) -> AsyncSGPClient:
112149
import httpx
@@ -150,6 +187,10 @@ async def on_span_end(self, span: Span) -> None:
150187

151188
@override
152189
async def on_spans_start(self, spans: list[Span]) -> None:
190+
# End-only ingest: by default the start write is skipped (see
191+
# _skip_span_start_enabled) so each span is persisted once, on end.
192+
if _skip_span_start_enabled():
193+
return
153194
if not spans:
154195
return
155196

tests/lib/core/tracing/processors/test_sgp_tracing_processor.py

Lines changed: 100 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from datetime import UTC, datetime
66
from unittest.mock import AsyncMock, MagicMock, patch
77

8+
import pytest
9+
810
from agentex.types.span import Span
911
from agentex.lib.types.tracing import SGPTracingProcessorConfig
1012

@@ -65,8 +67,9 @@ def test_processor_holds_no_per_span_state(self):
6567
processor, _ = self._make_processor()
6668
assert not hasattr(processor, "_spans")
6769

68-
def test_span_lifecycle_produces_two_flushes(self):
69-
"""Each span produces one flush on start and one on end."""
70+
def test_span_lifecycle_produces_two_flushes(self, monkeypatch):
71+
"""With start writes enabled, each span produces one flush on start and one on end."""
72+
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
7073
processor, _ = self._make_processor()
7174

7275
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()) as mock_cs:
@@ -105,6 +108,38 @@ def capture_create_span(**kwargs):
105108
assert captured_spans[0].start_time is not None
106109
assert captured_spans[0].end_time is not None
107110

111+
def test_span_start_skipped_by_default(self, monkeypatch):
112+
"""Default (end-only): on_span_start is a no-op; only on_span_end writes."""
113+
monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False)
114+
processor, _ = self._make_processor()
115+
116+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()) as mock_cs:
117+
span = _make_span()
118+
processor.on_span_start(span)
119+
assert mock_cs.call_count == 0 # start skipped — nothing built or flushed
120+
span.end_time = datetime.now(UTC)
121+
processor.on_span_end(span)
122+
123+
assert mock_cs.call_count == 1 # only the end write
124+
125+
def test_span_start_emitted_when_skip_disabled(self, monkeypatch):
126+
"""With skip disabled, on_span_start builds and flushes a span."""
127+
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
128+
processor, _ = self._make_processor()
129+
130+
captured: list[MagicMock] = []
131+
132+
def capture(**kwargs):
133+
sgp_span = _make_mock_sgp_span()
134+
captured.append(sgp_span)
135+
return sgp_span
136+
137+
with patch(f"{MODULE}.create_span", side_effect=capture):
138+
processor.on_span_start(_make_span())
139+
140+
assert len(captured) == 1
141+
assert captured[0].flush.called
142+
108143

109144
# ---------------------------------------------------------------------------
110145
# Async processor tests
@@ -141,8 +176,9 @@ def test_processor_holds_no_per_span_state(self):
141176
processor, _, _ = self._make_processor()
142177
assert not hasattr(processor, "_spans")
143178

144-
async def test_span_lifecycle_produces_two_upserts(self):
145-
"""Each span produces one upsert_batch call on start and one on end."""
179+
async def test_span_lifecycle_produces_two_upserts(self, monkeypatch):
180+
"""With start writes enabled, each span produces one upsert on start and one on end."""
181+
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
146182
processor, _, mock_client = self._make_processor()
147183

148184
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
@@ -153,6 +189,31 @@ async def test_span_lifecycle_produces_two_upserts(self):
153189

154190
assert mock_client.spans.upsert_batch.call_count == 2
155191

192+
async def test_spans_start_skipped_by_default(self, monkeypatch):
193+
"""Default (end-only): on_spans_start makes no upsert; on_spans_end does."""
194+
monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False)
195+
processor, _, mock_client = self._make_processor()
196+
197+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
198+
spans = [_make_span() for _ in range(3)]
199+
await processor.on_spans_start(spans)
200+
assert mock_client.spans.upsert_batch.call_count == 0 # start skipped
201+
for s in spans:
202+
s.end_time = datetime.now(UTC)
203+
await processor.on_spans_end(spans)
204+
205+
assert mock_client.spans.upsert_batch.call_count == 1 # only the end write
206+
207+
async def test_spans_start_emitted_when_skip_disabled(self, monkeypatch):
208+
"""With skip disabled, on_spans_start makes one upsert_batch call."""
209+
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
210+
processor, _, mock_client = self._make_processor()
211+
212+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
213+
await processor.on_spans_start([_make_span()])
214+
215+
assert mock_client.spans.upsert_batch.call_count == 1
216+
156217
async def test_span_end_without_prior_start_still_upserts(self):
157218
"""Cross-pod Temporal case: END activity lands on a pod that never saw START.
158219
@@ -171,8 +232,9 @@ async def test_span_end_without_prior_start_still_upserts(self):
171232
items = mock_client.spans.upsert_batch.call_args.kwargs["items"]
172233
assert len(items) == 1
173234

174-
async def test_sgp_span_input_and_output_propagated_on_end(self):
235+
async def test_sgp_span_input_and_output_propagated_on_end(self, monkeypatch):
175236
"""on_span_end should send the span's current input and output via upsert_batch."""
237+
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
176238
processor, _, mock_client = self._make_processor()
177239

178240
captured: list[MagicMock] = []
@@ -207,8 +269,9 @@ def capture_create_span(**kwargs):
207269
assert end_call_kwargs["input"]["messages"][-1]["role"] == "assistant"
208270
assert end_call_kwargs["output"] == {"response": "hi"}
209271

210-
async def test_on_spans_start_sends_single_upsert_for_batch(self):
272+
async def test_on_spans_start_sends_single_upsert_for_batch(self, monkeypatch):
211273
"""Given N spans at once, on_spans_start should make ONE upsert_batch HTTP call."""
274+
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
212275
processor, _, mock_client = self._make_processor()
213276

214277
n = 10
@@ -224,6 +287,7 @@ async def test_on_spans_start_sends_single_upsert_for_batch(self):
224287

225288
async def test_on_spans_start_records_export_success_metrics(self, monkeypatch):
226289
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
290+
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
227291
import agentex.lib.core.observability.tracing_metrics_recording as recording
228292

229293
recording._metrics_enabled = None
@@ -400,3 +464,33 @@ async def test_on_spans_end_sends_single_upsert_for_batch(self):
400464
)
401465
items = mock_client.spans.upsert_batch.call_args.kwargs["items"]
402466
assert len(items) == n
467+
468+
469+
# ---------------------------------------------------------------------------
470+
# AGENTEX_TRACING_SKIP_SPAN_START env parsing
471+
# ---------------------------------------------------------------------------
472+
473+
474+
class TestSkipSpanStartEnv:
475+
@staticmethod
476+
def _fn():
477+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
478+
_skip_span_start_enabled,
479+
)
480+
481+
return _skip_span_start_enabled
482+
483+
def test_default_is_skip_enabled(self, monkeypatch):
484+
"""Unset → skip span-start (end-only ingest is the default)."""
485+
monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False)
486+
assert self._fn()() is True
487+
488+
@pytest.mark.parametrize("val", ["0", "false", "no", "off", "FALSE", "Off", " no "])
489+
def test_falsy_values_restore_span_start(self, monkeypatch, val):
490+
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", val)
491+
assert self._fn()() is False
492+
493+
@pytest.mark.parametrize("val", ["1", "true", "yes", "on", "anything"])
494+
def test_other_values_keep_skip_enabled(self, monkeypatch, val):
495+
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", val)
496+
assert self._fn()() is True

0 commit comments

Comments
 (0)