Skip to content

Commit 64c169f

Browse files
feat(sdk): add config driven sink selection (#176)
## Summary Added config-driven observability sink selection across the SDK and server. Introduced shared telemetry sink-selection primitives so observability can use the default backend, registered SDK sinks, or named custom sink factories. Expanded tests and README examples to cover the new sink-selection behavior and lifecycle handling. Fixed the SDK mypy issue in shutdown handling by wrapping sink lifecycle awaitables in a concrete coroutine before passing them to `asyncio.run()`. ## Scope **User-facing/API changes:** - SDK `agent_control.init()` now accepts `observability_sink_name` and `observability_sink_config` - SDK exports new sink registration helpers for external/custom observability sinks - Server observability config now supports sink selection and sink config via environment/settings - README includes a new example for registering and selecting an external sink **Internal changes:** - Added shared sink-selection models/registry in `telemetry/` - Refactored SDK observability to resolve active sinks from config rather than always using the built-in batcher - Refactored server startup to resolve an observability backend from sink selection - Added server/SDK/telemetry tests for sink selection, lifecycle, and event delivery paths - Adjusted SDK shutdown typing to satisfy mypy **Out of scope:** - No changes to core control evaluation logic - No new built-in third-party sink implementation is included in this branch - No UI work or rollout automation is included ## Risk and Rollout **Risk level:** medium **Rollback plan:** Revert this branch to restore the previous default-only observability path. As a partial mitigation, deployments can keep `observability_sink_name=default` to stay on the legacy behavior even with this code merged. ## Testing - [x] Added or updated automated tests - [ ] Ran `make check` — could not run in this environment because `uv` was not available in the shell - [x] Manually verified behavior ## Checklist - [ ] Linked issue/spec (if applicable) - [x] Updated docs/examples for user-facing changes - [ ] Included any required follow-up tasks **Follow-up tasks:** - Run `make check` in a full local/CI environment with `uv` available - Confirm any downstream custom sink integrations against the new SDK/server registration APIs
1 parent 30356a2 commit 64c169f

17 files changed

Lines changed: 1276 additions & 89 deletions

File tree

README.md

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ if __name__ == "__main__":
149149
Use `agent_control.shutdown()` or `await agent_control.ashutdown()` before process exit so short-lived scripts flush pending observability events cleanly.
150150

151151
External integrations can register a sink for the same finalized
152-
control-event payloads:
152+
control-event payloads and then select it through observability config:
153153

154154
```python
155155
from agent_control import (
@@ -169,14 +169,21 @@ class MyControlEventSink(BaseControlEventSink):
169169
sink = MyControlEventSink()
170170
register_control_event_sink(sink)
171171

172+
# Select registered sinks instead of the default SDK -> server sink.
173+
agent_control.init(
174+
agent_name="awesome_bot_3000",
175+
observability_enabled=True,
176+
observability_sink_name="registered",
177+
)
178+
172179
# Later, when tearing down the integration:
173180
unregister_control_event_sink(sink)
174181
```
175182

176183
Registered sinks receive the same local, server, and merged control-execution
177-
events the SDK emits through its normal event-construction flow. If no
178-
external sink is registered, the default OSS delivery path is unchanged. If one
179-
or more sinks are registered, they replace the default built-in delivery path.
184+
events the SDK emits through its normal event-construction flow. The default
185+
SDK sink remains the OSS path to the Agent Control server. To use registered
186+
or named custom sinks, set `observability_sink_name` explicitly.
180187

181188
Next, create a control in Step 4, then run the setup and agent scripts in
182189
order to see blocking in action.

sdks/python/src/agent_control/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ async def handle_input(user_message: str) -> str:
6565
EvaluationResult,
6666
EvaluatorResult,
6767
EvaluatorSpec,
68+
JSONObject,
6869
Step,
6970
StepSchema,
7071
TemplateControlInput,
@@ -98,14 +99,17 @@ async def handle_input(user_message: str) -> str:
9899
get_event_sink,
99100
get_log_config,
100101
get_logger,
102+
get_registered_control_event_sink_factory_names,
101103
get_registered_control_event_sinks,
102104
init_observability,
103105
is_observability_enabled,
104106
log_control_evaluation,
105107
register_control_event_sink,
108+
register_control_event_sink_factory,
106109
shutdown_observability,
107110
sync_shutdown_observability,
108111
unregister_control_event_sink,
112+
unregister_control_event_sink_factory,
109113
write_events,
110114
)
111115
from .tracing import (
@@ -415,6 +419,8 @@ def init(
415419
steps: list[StepSchemaDict] | None = None,
416420
conflict_mode: Literal["strict", "overwrite"] = "overwrite",
417421
observability_enabled: bool | None = None,
422+
observability_sink_name: str | None = None,
423+
observability_sink_config: JSONObject | None = None,
418424
log_config: dict[str, Any] | None = None,
419425
policy_refresh_interval_seconds: int = 60,
420426
**kwargs: object
@@ -443,6 +449,9 @@ def init(
443449
conflict_mode: Conflict handling mode for initAgent registration.
444450
Defaults to "overwrite" in SDK flows.
445451
observability_enabled: Optional bool to enable/disable observability (defaults to env var)
452+
observability_sink_name: Optional sink selection name. Use "default" to preserve
453+
SDK -> server OSS delivery or "registered" / a named sink factory for custom sinks.
454+
observability_sink_config: Optional JSON config payload for the selected sink.
446455
log_config: Optional logging configuration dict:
447456
{"enabled": True, "span_start": True, "span_end": True, "control_eval": True}
448457
policy_refresh_interval_seconds: Interval for background policy refresh loop.
@@ -636,6 +645,8 @@ def run_in_thread() -> None:
636645
server_url=state.server_url,
637646
api_key=state.api_key,
638647
enabled=observability_enabled,
648+
sink_name=observability_sink_name,
649+
sink_config=observability_sink_config,
639650
)
640651
if batcher:
641652
logger.info("Observability enabled")
@@ -1371,9 +1382,12 @@ async def main():
13711382
"is_observability_enabled",
13721383
"get_event_batcher",
13731384
"get_event_sink",
1385+
"get_registered_control_event_sink_factory_names",
13741386
"get_registered_control_event_sinks",
13751387
"register_control_event_sink",
1388+
"register_control_event_sink_factory",
13761389
"unregister_control_event_sink",
1390+
"unregister_control_event_sink_factory",
13771391
"configure_logging",
13781392
"get_log_config",
13791393
"log_control_evaluation",

0 commit comments

Comments
 (0)