Skip to content

Commit eedb090

Browse files
feat(tracing): add OTel span queue and SGP export metrics
Emit queue depth, batch lag, drain timing, and export success/failure counters for async span processing. Failures include bounded HTTP status labels; disable SDK recording with AGENTEX_TRACING_METRICS=0. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 9bb4ae6 commit eedb090

9 files changed

Lines changed: 682 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
### Features
6+
7+
* **tracing:** emit OTel metrics for async span queue depth, batch drain, and SGP export success/failure (HTTP status labels). Disable SDK-side recording with ``AGENTEX_TRACING_METRICS=0``.
8+
39
## 0.11.2 (2026-05-13)
410

511
Full Changelog: [v0.11.1...v0.11.2](https://github.com/scaleapi/scale-agentex-python/compare/v0.11.1...v0.11.2)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""Tests for ``agentex.lib.core.observability.tracing_metrics``."""
2+
3+
from __future__ import annotations
4+
5+
import agentex.lib.core.observability.tracing_metrics as tracing_metrics
6+
from agentex.lib.core.observability.tracing_metrics import (
7+
TracingMetrics,
8+
classify_export_error,
9+
get_tracing_metrics,
10+
processor_label,
11+
)
12+
13+
14+
class TestClassifyExportError:
15+
def test_scale_gp_authentication_error(self):
16+
class AuthenticationError(Exception):
17+
pass
18+
19+
exc = AuthenticationError("Error code: 401 - {'message': 'Not authorized to access Account'}")
20+
assert classify_export_error(exc) == ("authentication", "401")
21+
22+
def test_rate_limit_code(self):
23+
class APIError(Exception):
24+
pass
25+
26+
exc = APIError("Error code: 429 - rate limited")
27+
assert classify_export_error(exc) == ("rate_limit", "429")
28+
29+
def test_server_error_code(self):
30+
class APIError(Exception):
31+
pass
32+
33+
exc = APIError("Error code: 503 - unavailable")
34+
assert classify_export_error(exc) == ("server_error", "5xx")
35+
36+
def test_timeout_by_name(self):
37+
class APITimeoutError(Exception):
38+
pass
39+
40+
assert classify_export_error(APITimeoutError("slow")) == ("timeout", "timeout")
41+
42+
def test_unknown_error(self):
43+
class WeirdError(Exception):
44+
pass
45+
46+
assert classify_export_error(WeirdError("boom")) == ("other_error", "unknown")
47+
48+
49+
class TestProcessorLabel:
50+
def test_sgp_async_processor(self):
51+
class SGPAsyncTracingProcessor:
52+
pass
53+
54+
assert processor_label(SGPAsyncTracingProcessor()) == "sgp"
55+
56+
def test_other_processor(self):
57+
class AgentexAsyncTracingProcessor:
58+
pass
59+
60+
assert processor_label(AgentexAsyncTracingProcessor()) == "other"
61+
62+
63+
class TestGetTracingMetrics:
64+
def test_returns_tracing_metrics_instance(self, monkeypatch):
65+
monkeypatch.setattr(tracing_metrics, "_tracing_metrics", None)
66+
m = get_tracing_metrics()
67+
assert isinstance(m, TracingMetrics)
68+
69+
def test_singleton_returns_same_instance(self, monkeypatch):
70+
monkeypatch.setattr(tracing_metrics, "_tracing_metrics", None)
71+
first = get_tracing_metrics()
72+
second = get_tracing_metrics()
73+
assert first is second
74+
75+
def test_instruments_exist(self, monkeypatch):
76+
monkeypatch.setattr(tracing_metrics, "_tracing_metrics", None)
77+
m = get_tracing_metrics()
78+
for name in (
79+
"span_events_enqueued",
80+
"span_events_dropped",
81+
"queue_depth",
82+
"queue_lag_ms",
83+
"batch_items",
84+
"batch_size",
85+
"batch_drain_duration_ms",
86+
"export_batches",
87+
"export_spans",
88+
"export_batch_failures",
89+
"export_spans_failed",
90+
"shutdown_timeouts",
91+
"shutdown_remaining_items",
92+
):
93+
assert hasattr(m, name), f"missing instrument: {name}"
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
"""Tests for ``agentex.lib.core.observability.tracing_metrics_recording``."""
2+
3+
from __future__ import annotations
4+
5+
from unittest.mock import MagicMock, patch
6+
7+
import agentex.lib.core.observability.tracing_metrics_recording as recording
8+
9+
10+
class _Item:
11+
def __init__(self, enqueued_at: float | None) -> None:
12+
self.enqueued_at = enqueued_at
13+
14+
15+
class TestIsMetricsEnabled:
16+
def setup_method(self) -> None:
17+
recording._metrics_enabled = None
18+
19+
def test_enabled_by_default(self, monkeypatch):
20+
monkeypatch.delenv("AGENTEX_TRACING_METRICS", raising=False)
21+
assert recording.is_metrics_enabled() is True
22+
23+
def test_disabled_by_zero(self, monkeypatch):
24+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0")
25+
recording._metrics_enabled = None
26+
assert recording.is_metrics_enabled() is False
27+
28+
29+
class TestRecordingHelpers:
30+
def setup_method(self) -> None:
31+
recording._metrics_enabled = None
32+
33+
def test_record_span_enqueued_when_disabled_does_not_load_metrics(self, monkeypatch):
34+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0")
35+
recording._metrics_enabled = None
36+
with patch(
37+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics"
38+
) as mock_get:
39+
recording.record_span_enqueued("start")
40+
mock_get.assert_not_called()
41+
42+
def test_record_span_enqueued_when_enabled(self, monkeypatch):
43+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
44+
recording._metrics_enabled = None
45+
mock_metrics = MagicMock()
46+
with patch(
47+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
48+
return_value=mock_metrics,
49+
):
50+
recording.record_span_enqueued("end")
51+
mock_metrics.span_events_enqueued.add.assert_called_once_with(1, {"event_type": "end"})
52+
53+
def test_monotonic_if_enabled_respects_kill_switch(self, monkeypatch):
54+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0")
55+
recording._metrics_enabled = None
56+
assert recording.monotonic_if_enabled() is None
57+
58+
def test_record_batch_coalesced_records_lag(self, monkeypatch):
59+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
60+
recording._metrics_enabled = None
61+
mock_metrics = MagicMock()
62+
with patch(
63+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
64+
return_value=mock_metrics,
65+
), patch("agentex.lib.core.observability.tracing_metrics_recording.time.monotonic", return_value=10.0):
66+
recording.record_batch_coalesced(
67+
queue_depth=3,
68+
batch_items=[_Item(9.5), _Item(9.0)],
69+
)
70+
mock_metrics.queue_depth.record.assert_called_once_with(3)
71+
mock_metrics.batch_items.record.assert_called_once_with(2)
72+
mock_metrics.queue_lag_ms.record.assert_called_once_with(1000.0)
73+
74+
def test_record_export_failure(self, monkeypatch):
75+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
76+
recording._metrics_enabled = None
77+
mock_metrics = MagicMock()
78+
79+
class AuthenticationError(Exception):
80+
pass
81+
82+
exc = AuthenticationError("Error code: 401 - denied")
83+
processor = type("SGPAsyncTracingProcessor", (), {})()
84+
85+
with patch(
86+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
87+
return_value=mock_metrics,
88+
):
89+
recording.record_export_failure(
90+
processor=processor,
91+
event_type="start",
92+
span_count=5,
93+
exc=exc,
94+
)
95+
96+
mock_metrics.export_batch_failures.add.assert_called_once()
97+
mock_metrics.export_spans_failed.add.assert_called_once_with(
98+
5,
99+
{
100+
"processor": "sgp",
101+
"event_type": "start",
102+
"http_code": "401",
103+
"error_class": "authentication",
104+
},
105+
)
106+
107+
def test_record_export_success(self, monkeypatch):
108+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
109+
recording._metrics_enabled = None
110+
mock_metrics = MagicMock()
111+
with patch(
112+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
113+
return_value=mock_metrics,
114+
):
115+
recording.record_export_success(event_type="end", span_count=12)
116+
117+
mock_metrics.export_batches.add.assert_called_once_with(
118+
1,
119+
{"processor": "sgp", "event_type": "end", "outcome": "success"},
120+
)
121+
mock_metrics.export_spans.add.assert_called_once_with(
122+
12,
123+
{"processor": "sgp", "event_type": "end", "outcome": "success"},
124+
)
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
"""OTel metrics for async span queue and SGP export telemetry.
2+
3+
Single source of truth for span-queue / export instrumentation. Import
4+
``get_tracing_metrics()`` or the ``record_*`` helpers in
5+
``tracing_metrics_recording`` from hot paths — never configure a
6+
``MeterProvider`` here.
7+
8+
The meter is no-op when the application has not configured a
9+
``MeterProvider``. Set ``AGENTEX_TRACING_METRICS=0`` to skip recording
10+
entirely (see ``tracing_metrics_recording.is_metrics_enabled``).
11+
12+
Cardinality is bounded:
13+
- ``event_type``: ``start`` | ``end``
14+
- ``processor``: ``sgp`` | ``other``
15+
- ``outcome``: ``success`` | ``failure`` (export counters only)
16+
- ``http_code``: small fixed set from ``classify_export_error``
17+
- ``error_class``: small fixed set from ``classify_export_error``
18+
- ``reason``: ``shutdown`` (drops only)
19+
- ``phase``: ``start`` | ``end`` (batch drain histograms)
20+
21+
Resource attributes (``service.name``, ``k8s.*``, etc.) come from the
22+
host application's OTel resource configuration.
23+
"""
24+
25+
from __future__ import annotations
26+
27+
import re
28+
from typing import Optional
29+
30+
from opentelemetry import metrics
31+
32+
_HTTP_CODE_RE = re.compile(r"Error code:\s*(\d+)")
33+
34+
35+
class TracingMetrics:
36+
"""Lazily-created OTel instruments for span queue + export telemetry."""
37+
38+
def __init__(self) -> None:
39+
meter = metrics.get_meter("agentex.tracing")
40+
self.span_events_enqueued = meter.create_counter(
41+
name="agentex.tracing.span_events.enqueued",
42+
unit="1",
43+
description="Span queue START/END events accepted by enqueue()",
44+
)
45+
self.span_events_dropped = meter.create_counter(
46+
name="agentex.tracing.span_events.dropped",
47+
unit="1",
48+
description="Span queue events dropped (e.g. shutdown)",
49+
)
50+
self.queue_depth = meter.create_histogram(
51+
name="agentex.tracing.queue.depth",
52+
unit="1",
53+
description="asyncio queue depth at the start of a drain batch",
54+
)
55+
self.queue_lag_ms = meter.create_histogram(
56+
name="agentex.tracing.queue.lag_ms",
57+
unit="ms",
58+
description="Max time from enqueue to drain-batch start for items in the batch",
59+
)
60+
self.batch_items = meter.create_histogram(
61+
name="agentex.tracing.batch.items",
62+
unit="1",
63+
description="Total span events coalesced in one linger/drain batch",
64+
)
65+
self.batch_size = meter.create_histogram(
66+
name="agentex.tracing.batch.size",
67+
unit="1",
68+
description="Span events in one START or END dispatch phase",
69+
)
70+
self.batch_drain_duration_ms = meter.create_histogram(
71+
name="agentex.tracing.batch.drain_duration_ms",
72+
unit="ms",
73+
description="Wall time for one START or END _process_items dispatch",
74+
)
75+
self.export_batches = meter.create_counter(
76+
name="agentex.tracing.export.batches",
77+
unit="1",
78+
description="HTTP export batch attempts tagged with outcome",
79+
)
80+
self.export_spans = meter.create_counter(
81+
name="agentex.tracing.export.spans",
82+
unit="1",
83+
description="Spans included in HTTP export batches tagged with outcome",
84+
)
85+
self.export_batch_failures = meter.create_counter(
86+
name="agentex.tracing.export.batch_failures",
87+
unit="1",
88+
description="Failed HTTP export batches by processor and HTTP status",
89+
)
90+
self.export_spans_failed = meter.create_counter(
91+
name="agentex.tracing.export.spans_failed",
92+
unit="1",
93+
description="Spans in failed HTTP export batches by processor and HTTP status",
94+
)
95+
self.shutdown_timeouts = meter.create_counter(
96+
name="agentex.tracing.shutdown.timeouts",
97+
unit="1",
98+
description="Span queue shutdown calls that hit the join timeout",
99+
)
100+
self.shutdown_remaining_items = meter.create_histogram(
101+
name="agentex.tracing.shutdown.remaining_items",
102+
unit="1",
103+
description="Queue depth when span queue shutdown times out",
104+
)
105+
106+
107+
_tracing_metrics: Optional[TracingMetrics] = None
108+
109+
110+
def get_tracing_metrics() -> TracingMetrics:
111+
"""Return the tracing metrics singleton, creating it on first use."""
112+
global _tracing_metrics
113+
if _tracing_metrics is None:
114+
_tracing_metrics = TracingMetrics()
115+
return _tracing_metrics
116+
117+
118+
def processor_label(processor: object) -> str:
119+
"""Map a tracing processor instance to a low-cardinality label."""
120+
if type(processor).__name__ == "SGPAsyncTracingProcessor":
121+
return "sgp"
122+
return "other"
123+
124+
125+
def classify_export_error(exc: BaseException) -> tuple[str, str]:
126+
"""Categorize an export failure into (error_class, http_code_label).
127+
128+
``http_code_label`` is a small fixed set suitable for Prometheus labels.
129+
"""
130+
name = type(exc).__name__
131+
message = str(exc)
132+
133+
if "Timeout" in name:
134+
return "timeout", "timeout"
135+
if "Connection" in name or "Connect" in name:
136+
return "network_error", "network"
137+
138+
match = _HTTP_CODE_RE.search(message)
139+
if match:
140+
code = int(match.group(1))
141+
if code == 401:
142+
return "authentication", "401"
143+
if code == 403:
144+
return "authentication", "403"
145+
if code == 429:
146+
return "rate_limit", "429"
147+
if 400 <= code < 500:
148+
return "client_error", "4xx"
149+
if 500 <= code < 600:
150+
return "server_error", "5xx"
151+
return "other_error", str(code)
152+
153+
if any(s in name for s in ("Authentication", "Permission")):
154+
return "authentication", "unknown"
155+
if "RateLimit" in name:
156+
return "rate_limit", "429"
157+
if any(s in name for s in ("ServerError", "InternalServer", "ServiceUnavailable", "BadGateway")):
158+
return "server_error", "5xx"
159+
if any(
160+
s in name
161+
for s in ("BadRequest", "NotFound", "Conflict", "UnprocessableEntity")
162+
):
163+
return "client_error", "4xx"
164+
165+
return "other_error", "unknown"

0 commit comments

Comments
 (0)