Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ if __name__ == "__main__":
Use `agent_control.shutdown()` or `await agent_control.ashutdown()` before process exit so short-lived scripts flush pending observability events cleanly.

External integrations can register a sink for the same finalized
control-event payloads:
control-event payloads and then select it through observability config:

```python
from agent_control import (
Expand All @@ -169,14 +169,21 @@ class MyControlEventSink(BaseControlEventSink):
sink = MyControlEventSink()
register_control_event_sink(sink)

# Select registered sinks instead of the default SDK -> server sink.
agent_control.init(
agent_name="awesome_bot_3000",
observability_enabled=True,
observability_sink_name="registered",
)

# Later, when tearing down the integration:
unregister_control_event_sink(sink)
```

Registered sinks receive the same local, server, and merged control-execution
events the SDK emits through its normal event-construction flow. If no
external sink is registered, the default OSS delivery path is unchanged. If one
or more sinks are registered, they replace the default built-in delivery path.
events the SDK emits through its normal event-construction flow. The default
SDK sink remains the OSS path to the Agent Control server. To use registered
or named custom sinks, set `observability_sink_name` explicitly.

Next, create a control in Step 4, then run the setup and agent scripts in
order to see blocking in action.
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/src/agent_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async def handle_input(user_message: str) -> str:
EvaluationResult,
EvaluatorResult,
EvaluatorSpec,
JSONObject,
Step,
StepSchema,
TemplateControlInput,
Expand Down Expand Up @@ -98,14 +99,17 @@ async def handle_input(user_message: str) -> str:
get_event_sink,
get_log_config,
get_logger,
get_registered_control_event_sink_factory_names,
get_registered_control_event_sinks,
init_observability,
is_observability_enabled,
log_control_evaluation,
register_control_event_sink,
register_control_event_sink_factory,
shutdown_observability,
sync_shutdown_observability,
unregister_control_event_sink,
unregister_control_event_sink_factory,
write_events,
)
from .tracing import (
Expand Down Expand Up @@ -415,6 +419,8 @@ def init(
steps: list[StepSchemaDict] | None = None,
conflict_mode: Literal["strict", "overwrite"] = "overwrite",
observability_enabled: bool | None = None,
observability_sink_name: str | None = None,
observability_sink_config: JSONObject | None = None,
log_config: dict[str, Any] | None = None,
policy_refresh_interval_seconds: int = 60,
**kwargs: object
Expand Down Expand Up @@ -443,6 +449,9 @@ def init(
conflict_mode: Conflict handling mode for initAgent registration.
Defaults to "overwrite" in SDK flows.
observability_enabled: Optional bool to enable/disable observability (defaults to env var)
observability_sink_name: Optional sink selection name. Use "default" to preserve
SDK -> server OSS delivery or "registered" / a named sink factory for custom sinks.
observability_sink_config: Optional JSON config payload for the selected sink.
log_config: Optional logging configuration dict:
{"enabled": True, "span_start": True, "span_end": True, "control_eval": True}
policy_refresh_interval_seconds: Interval for background policy refresh loop.
Expand Down Expand Up @@ -636,6 +645,8 @@ def run_in_thread() -> None:
server_url=state.server_url,
api_key=state.api_key,
enabled=observability_enabled,
sink_name=observability_sink_name,
sink_config=observability_sink_config,
)
if batcher:
logger.info("Observability enabled")
Expand Down Expand Up @@ -1371,9 +1382,12 @@ async def main():
"is_observability_enabled",
"get_event_batcher",
"get_event_sink",
"get_registered_control_event_sink_factory_names",
"get_registered_control_event_sinks",
"register_control_event_sink",
"register_control_event_sink_factory",
"unregister_control_event_sink",
"unregister_control_event_sink_factory",
"configure_logging",
"get_log_config",
"log_control_evaluation",
Expand Down
Loading
Loading