Skip to content

Commit 9530368

Browse files
feat(sdk): add otel support (#177)
## Summary Added configurable observability sink selection for the Python SDK and server so control events can be routed to the default backend, registered custom sinks, or named sink factories. Added a built-in OpenTelemetry sink for the SDK, including OTLP configuration via settings/environment variables. Updated docs and exports so custom sink registration and OTEL usage are available as public integration points. ## Scope **User-facing/API changes:** - Added `observability_sink_name` and `observability_sink_config` to SDK initialization/configuration - Exposed sink registration helpers and OTEL conversion utilities from the Python SDK public API - Added documented support for the SDK `otel` extra and OTEL-related environment variables - Added server observability sink selection/config support for named backends **Internal changes:** - Introduced shared sink-selection models/registry helpers in `telemetry/` - Refactored SDK observability to resolve active sinks dynamically and support named/custom sinks - Refactored server startup/shutdown to resolve observability backends through sink selection - Added test coverage for sink selection, lifecycle handling, OTEL sink behavior, and re-init/policy refresh interactions **Out of scope:** - No changes to core control evaluation semantics - No UI changes - No new non-OTEL external sink implementation included beyond registration hooks ## Risk and Rollout **Risk level:** medium **Rollback plan:** Revert the sink-selection/OTEL changeset to restore the previous default SDK-to-server observability path only. If needed, disable custom routing by keeping `observability_sink_name=default` and not installing/configuring the `otel` extra. ## Testing - [x] Added or updated automated tests - [ ] Ran `make check` — typecheck was attempted but full local `make check` was not run because `uv` was unavailable in this environment - [x] Manually verified behavior ## Checklist - [ ] Linked issue/spec (if applicable) - [x] Updated docs/examples for user-facing changes - [ ] Included any required follow-up tasks
1 parent b4a9a0a commit 9530368

11 files changed

Lines changed: 979 additions & 27 deletions

File tree

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,24 @@ events the SDK emits through its normal event-construction flow. The default
185185
SDK sink remains the OSS path to the Agent Control server. To use registered
186186
or named custom sinks, set `observability_sink_name` explicitly.
187187

188+
The SDK also includes a built-in OpenTelemetry sink. Install the OTEL extra,
189+
select the `otel` sink, and configure the OTLP exporter through Agent Control
190+
settings or environment variables:
191+
192+
```bash
193+
uv pip install "agent-control-sdk[otel]"
194+
export AGENT_CONTROL_OBSERVABILITY_SINK_NAME=otel
195+
export AGENT_CONTROL_OTEL_ENABLED=true
196+
export AGENT_CONTROL_OTEL_ENDPOINT=http://localhost:4318/v1/traces
197+
export AGENT_CONTROL_OTEL_HEADERS='{"authorization":"Bearer demo-token"}'
198+
export AGENT_CONTROL_OTEL_SERVICE_NAME=awesome-bot
199+
```
200+
201+
If the `otel` sink is selected without an OTLP endpoint/exporter configured,
202+
the OTEL path stays inert and the default OSS SDK-to-server behavior still
203+
remains unchanged unless `observability_sink_name` is explicitly switched away
204+
from `default`.
205+
188206
Next, create a control in Step 4, then run the setup and agent scripts in
189207
order to see blocking in action.
190208

sdks/python/pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ Repository = "https://github.com/yourusername/agent-control"
3838
[project.optional-dependencies]
3939
strands-agents = ["strands-agents>=1.26.0"]
4040
google-adk = ["google-adk>=1.0.0"]
41+
otel = [
42+
"opentelemetry-api>=1.24.0",
43+
"opentelemetry-sdk>=1.24.0",
44+
"opentelemetry-exporter-otlp-proto-http>=1.24.0",
45+
]
4146
galileo = ["agent-control-evaluator-galileo>=7.5.0"]
4247

4348
[dependency-groups]

sdks/python/src/agent_control/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ async def handle_input(user_message: str) -> str:
112112
unregister_control_event_sink_factory,
113113
write_events,
114114
)
115+
from .otel_sink import control_event_to_otel_span
115116
from .tracing import (
116117
get_current_span_id,
117118
get_current_trace_id,
@@ -1421,6 +1422,7 @@ async def main():
14211422
"write_events",
14221423
"shutdown_observability",
14231424
"is_observability_enabled",
1425+
"control_event_to_otel_span",
14241426
"get_event_batcher",
14251427
"get_event_sink",
14261428
"get_registered_control_event_sink_factory_names",

sdks/python/src/agent_control/control_decorators.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ async def chat(message: str) -> str:
3636
from typing import Any, TypeVar
3737

3838
from agent_control_models import Step, normalize_action
39+
from agent_control_telemetry import get_trace_context_from_provider
3940

4041
from agent_control import AgentControlClient
4142
from agent_control.evaluation import check_evaluation_with_local
@@ -53,6 +54,25 @@ async def chat(message: str) -> str:
5354
F = TypeVar("F", bound=Callable[..., Any])
5455

5556

57+
def _resolve_control_trace_context() -> tuple[str, str]:
58+
"""Resolve trace/span IDs for a decorated control site.
59+
60+
External providers, such as the Galileo bridge, are authoritative because
61+
they may reserve the concrete span ID that the eventual LLM/tool call will
62+
use. Without a provider, keep the existing behavior: share an active trace
63+
but create a fresh function span for this decorated call.
64+
"""
65+
provider_context = get_trace_context_from_provider()
66+
if provider_context is not None:
67+
return provider_context["trace_id"], provider_context["span_id"]
68+
69+
existing_trace_id = get_current_trace_id()
70+
if existing_trace_id:
71+
return existing_trace_id, _generate_span_id()
72+
73+
return get_trace_and_span_ids()
74+
75+
5676
@dataclass
5777
class ControlContext:
5878
"""
@@ -697,14 +717,7 @@ async def _execute_with_control(
697717
# Get cached controls for local evaluation support
698718
controls = _get_server_controls()
699719

700-
# Get trace context: inherit trace_id if set, always generate new span_id
701-
# This allows multiple @control() calls to share the same trace but have unique spans
702-
existing_trace_id = get_current_trace_id()
703-
if existing_trace_id:
704-
trace_id = existing_trace_id
705-
span_id = _generate_span_id() # New span for this function
706-
else:
707-
trace_id, span_id = get_trace_and_span_ids() # New trace and span
720+
trace_id, span_id = _resolve_control_trace_context()
708721

709722
ctx = ControlContext(
710723
agent_name=agent.agent_name,

sdks/python/src/agent_control/observability.py

Lines changed: 84 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@
2828
Configuration (Environment Variables):
2929
# Observability (event batching)
3030
AGENT_CONTROL_OBSERVABILITY_ENABLED: Enable observability (default: true)
31+
AGENT_CONTROL_OBSERVABILITY_SINK_NAME: Selected control-event sink (default: default)
32+
AGENT_CONTROL_OBSERVABILITY_SINK_CONFIG: JSON config for the selected sink
33+
AGENT_CONTROL_OTEL_ENABLED: Enable the built-in OTEL sink (default: false)
34+
AGENT_CONTROL_OTEL_ENDPOINT: OTLP HTTP endpoint for exported control-event spans
35+
AGENT_CONTROL_OTEL_HEADERS: JSON object of OTLP exporter headers
36+
AGENT_CONTROL_OTEL_SERVICE_NAME: OTEL service.name for emitted spans
3137
AGENT_CONTROL_BATCH_SIZE: Max events per batch (default: 100)
3238
AGENT_CONTROL_FLUSH_INTERVAL: Seconds between flushes (default: 5.0)
3339
AGENT_CONTROL_SHUTDOWN_JOIN_TIMEOUT: Seconds to wait for worker shutdown (default: 5.0)
@@ -75,6 +81,11 @@
7581
if TYPE_CHECKING:
7682
from agent_control_models import ControlExecutionEvent
7783

84+
from .otel_sink import (
85+
OTEL_CONTROL_EVENT_SINK_NAME,
86+
create_otel_control_event_sink,
87+
)
88+
7889
# =============================================================================
7990
# Logger Setup - Standard Library Pattern
8091
# =============================================================================
@@ -816,6 +827,19 @@ def get_stats(self) -> dict:
816827
_configured_named_event_sink: ControlEventSink | None = None
817828
_configured_named_event_sink_selection: ControlEventSinkSelection | None = None
818829
_configured_named_event_sink_lock = threading.Lock()
830+
_used_custom_event_sinks: list[ControlEventSink] = []
831+
_used_custom_event_sinks_lock = threading.Lock()
832+
833+
834+
def _register_builtin_control_event_sink_factories() -> None:
835+
"""Ensure built-in named sink factories are available."""
836+
_named_event_sink_factories.register(
837+
OTEL_CONTROL_EVENT_SINK_NAME,
838+
create_otel_control_event_sink,
839+
)
840+
841+
842+
_register_builtin_control_event_sink_factories()
819843

820844

821845
class _BatcherControlEventSink(BaseControlEventSink):
@@ -907,12 +931,37 @@ def get_registered_control_event_sink_factory_names() -> tuple[str, ...]:
907931
return _named_event_sink_factories.registered_names()
908932

909933

934+
def _remember_custom_control_event_sinks(sinks: Sequence[ControlEventSink]) -> None:
935+
"""Track custom sink instances that should be cleaned up on shutdown."""
936+
with _used_custom_event_sinks_lock:
937+
for sink in sinks:
938+
if not any(remembered_sink is sink for remembered_sink in _used_custom_event_sinks):
939+
_used_custom_event_sinks.append(sink)
940+
941+
942+
def _sink_is_active(sink: ControlEventSink) -> bool:
943+
"""Return whether a sink instance is currently able to deliver events."""
944+
is_active = getattr(sink, "is_active", None)
945+
if callable(is_active):
946+
return bool(is_active())
947+
return True
948+
949+
910950
def _get_sink_selection() -> ControlEventSinkSelection:
911951
"""Build the current sink-selection model from SDK settings."""
912952
settings = get_settings()
953+
config: JSONObject = dict(settings.observability_sink_config or {})
954+
if settings.observability_sink_name == OTEL_CONTROL_EVENT_SINK_NAME:
955+
# Materialize OTEL-specific settings into the selection so that
956+
# changes to otel_endpoint / otel_headers / otel_service_name /
957+
# otel_enabled invalidate the cached sink instance.
958+
config.setdefault("enabled", settings.otel_enabled)
959+
config.setdefault("endpoint", settings.otel_endpoint)
960+
config.setdefault("headers", dict(settings.otel_headers))
961+
config.setdefault("service_name", settings.otel_service_name)
913962
return ControlEventSinkSelection(
914963
name=settings.observability_sink_name,
915-
config=settings.observability_sink_config,
964+
config=config,
916965
)
917966

918967

@@ -967,13 +1016,19 @@ def _get_active_control_event_sinks() -> tuple[ControlEventSink, ...]:
9671016

9681017
selection = _get_sink_selection()
9691018
if selection.name == DEFAULT_CONTROL_EVENT_SINK_NAME:
970-
return (_event_sink,) if _event_sink is not None else ()
1019+
if _event_sink is None or not _sink_is_active(_event_sink):
1020+
return ()
1021+
return (_event_sink,)
9711022
if selection.name == REGISTERED_CONTROL_EVENT_SINK_NAME:
972-
return get_registered_control_event_sinks()
1023+
sinks = get_registered_control_event_sinks()
1024+
sinks = tuple(sink for sink in sinks if _sink_is_active(sink))
1025+
_remember_custom_control_event_sinks(sinks)
1026+
return sinks
9731027

9741028
named_sink = _get_or_create_named_control_event_sink(selection)
975-
if named_sink is None:
1029+
if named_sink is None or not _sink_is_active(named_sink):
9761030
return ()
1031+
_remember_custom_control_event_sinks((named_sink,))
9771032
return (named_sink,)
9781033

9791034

@@ -987,6 +1042,20 @@ def _shutdown_built_in_event_sink() -> None:
9871042
_event_sink = None
9881043

9891044

1045+
def _shutdown_configured_named_event_sink() -> None:
1046+
"""Stop and clear the cached configured named sink if it is active."""
1047+
global _configured_named_event_sink, _configured_named_event_sink_selection
1048+
1049+
configured_named_sink: ControlEventSink | None = None
1050+
with _configured_named_event_sink_lock:
1051+
configured_named_sink = _configured_named_event_sink
1052+
_configured_named_event_sink = None
1053+
_configured_named_event_sink_selection = None
1054+
1055+
if configured_named_sink is not None:
1056+
_shutdown_custom_control_event_sink(configured_named_sink)
1057+
1058+
9901059
def _shutdown_custom_control_event_sink(sink: ControlEventSink) -> None:
9911060
"""Flush and close a custom sink when it exposes lifecycle hooks."""
9921061
flush = getattr(sink, "flush", None)
@@ -1020,6 +1089,12 @@ async def _run_awaitable_during_shutdown(result: Awaitable[Any]) -> None:
10201089
await result
10211090

10221091

1092+
def _get_custom_control_event_sinks_to_shutdown() -> tuple[ControlEventSink, ...]:
1093+
"""Collect custom sink instances that should be cleaned up on shutdown."""
1094+
with _used_custom_event_sinks_lock:
1095+
return tuple(_used_custom_event_sinks)
1096+
1097+
10231098
def init_observability(
10241099
server_url: str | None = None,
10251100
api_key: str | None = None,
@@ -1046,6 +1121,8 @@ def init_observability(
10461121

10471122
settings_updates: dict[str, object] = {}
10481123
current_settings = get_settings()
1124+
if enabled is not None:
1125+
settings_updates["observability_enabled"] = enabled
10491126
if sink_name is not None:
10501127
settings_updates["observability_sink_name"] = sink_name
10511128
if (
@@ -1058,11 +1135,12 @@ def init_observability(
10581135
if settings_updates:
10591136
configure_settings(**settings_updates)
10601137

1061-
is_enabled = enabled if enabled is not None else get_settings().observability_enabled
1138+
is_enabled = get_settings().observability_enabled
10621139

10631140
if not is_enabled:
10641141
logger.debug("Observability disabled")
10651142
_shutdown_built_in_event_sink()
1143+
_shutdown_configured_named_event_sink()
10661144
return None
10671145

10681146
selection = _get_sink_selection()
@@ -1137,15 +1215,8 @@ def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult:
11371215

11381216
def sync_shutdown_observability() -> None:
11391217
"""Synchronously shut down observability and flush remaining events."""
1140-
global _configured_named_event_sink, _configured_named_event_sink_selection
11411218
_shutdown_built_in_event_sink()
1142-
configured_named_sink: ControlEventSink | None = None
1143-
with _configured_named_event_sink_lock:
1144-
configured_named_sink = _configured_named_event_sink
1145-
_configured_named_event_sink = None
1146-
_configured_named_event_sink_selection = None
1147-
if configured_named_sink is not None:
1148-
_shutdown_custom_control_event_sink(configured_named_sink)
1219+
_shutdown_configured_named_event_sink()
11491220

11501221

11511222
async def shutdown_observability() -> None:

0 commit comments

Comments
 (0)