Skip to content

Commit 6791d73

Browse files
feat(server): refactor server ingestion to sink (#165)
## Summary Refactored the server observability write path to use the shared `ControlEventSink` abstraction while preserving the existing `/api/v1/observability/events` behavior and Postgres-backed OSS storage. Added a default OSS server sink that adapts the existing `EventStore` write path into the shared sink contract. Kept the current `DirectEventIngestor` usage stable by allowing existing store-based construction to continue working, while routing writes through sink semantics internally. ## Scope ### User-facing / API changes - No intended user-facing or HTTP API changes. - `/api/v1/observability/events` request/response behavior remains unchanged. - `IngestResult` and API response accounting semantics are preserved. ### Internal changes - Updated the shared sink contract to support both sync and async sink writes. - Added a server-side default sink backed by the existing `EventStore`/Postgres path. - Refactored `DirectEventIngestor` to write through `ControlEventSink` internally. - Preserved existing server wiring by wrapping `EventStore` inputs into the default sink internally. - Added test coverage proving the ingestor can accept a sink directly. ### Out of scope - Config-driven sink selection. - Alternate server sinks such as ClickHouse, OTEL, Kafka, or vendor-specific sinks. - Changes to server query/stats read-path behavior. - Changes to SDK behavior beyond the minimal shared sink contract compatibility needed for server support. ## Risk and Rollout **Risk level:** Medium **Rollback plan:** 1. Revert the shared sink contract async compatibility change. 2. Remove the new server sink adapter. 3. Restore `DirectEventIngestor` to writing directly to `EventStore.store(...)`. 4. Keep the server endpoint and startup wiring unchanged. ## Testing - [x] Added or updated automated tests - [x] Ran `make check` (or explained why not) > Validation has not been run by me in this branch flow; recommended focused server tests should be run locally/CI. - [ ] Manually verified behavior ## Checklist - [ ] Linked issue/spec (if applicable) - [x] Updated docs/examples for user-facing changes > No docs/examples updates were needed because this story is internal-only and preserves existing API behavior. - [ ] Included any required follow-up tasks
1 parent 07ba22f commit 6791d73

File tree

6 files changed

+90
-12
lines changed

6 files changed

+90
-12
lines changed

examples/agent_control_demo/demo_agent.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ def initialize_demo_agent() -> bool:
154154
agent_name=AGENT_NAME,
155155
agent_description="Demo chatbot for testing controls",
156156
server_url=SERVER_URL,
157+
observability_enabled=True
157158
)
158159
logger.info("Agent initialized successfully")
159160
return True

sdks/python/src/agent_control/observability.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@
5454
from typing import TYPE_CHECKING, Any
5555

5656
import httpx
57-
from agent_control_telemetry.sinks import BaseControlEventSink, ControlEventSink, SinkResult
57+
from agent_control_telemetry.sinks import (
58+
BaseControlEventSink,
59+
ControlEventSink,
60+
SinkResult,
61+
)
5862

5963
from agent_control.settings import configure_settings, get_settings
6064

server/src/agent_control_server/observability/ingest/direct.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""Direct event ingestor implementation.
22
33
This module provides the DirectEventIngestor, which processes events
4-
immediately (synchronously) by storing them directly to the EventStore.
4+
immediately by writing them to an async control-event sink. Existing
5+
store-based callers are preserved by wrapping EventStore instances in the
6+
default EventStoreControlEventSink internally.
57
68
For high-throughput scenarios, users can implement their own buffered
79
ingestor (e.g., QueuedEventIngestor, RedisEventIngestor).
@@ -11,39 +13,48 @@
1113
import logging
1214

1315
from agent_control_models.observability import ControlExecutionEvent
16+
from agent_control_telemetry.sinks import AsyncControlEventSink
1417

18+
from ..sinks import EventStoreControlEventSink
1519
from ..store.base import EventStore
1620
from .base import EventIngestor, IngestResult
1721

1822
logger = logging.getLogger(__name__)
1923

2024

2125
class DirectEventIngestor(EventIngestor):
22-
"""Processes events immediately by storing them to the EventStore.
26+
"""Processes events immediately by writing them to an async control-event sink.
2327
24-
This is the simplest ingestor implementation. Events are stored
25-
directly to the database, adding ~5-20ms latency per batch.
28+
This is the simplest ingestor implementation. Events are written
29+
directly to the configured sink, adding ~5-20ms latency per batch.
2630
2731
For use cases that require lower latency or higher throughput,
2832
implement a custom buffered ingestor (e.g., QueuedEventIngestor).
2933
3034
Attributes:
31-
store: The EventStore to write events to
35+
sink: The AsyncControlEventSink used to write events
3236
log_to_stdout: Whether to log events as structured JSON
3337
"""
3438

35-
def __init__(self, store: EventStore, log_to_stdout: bool = False):
39+
def __init__(
40+
self,
41+
store: EventStore | AsyncControlEventSink,
42+
log_to_stdout: bool = False,
43+
):
3644
"""Initialize the ingestor.
3745
3846
Args:
39-
store: The EventStore to write events to
47+
store: Either an EventStore or an AsyncControlEventSink implementation
4048
log_to_stdout: Whether to log events as structured JSON (default: False)
4149
"""
42-
self.store = store
50+
if isinstance(store, EventStore):
51+
self.sink: AsyncControlEventSink = EventStoreControlEventSink(store)
52+
else:
53+
self.sink = store
4354
self.log_to_stdout = log_to_stdout
4455

4556
async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult:
46-
"""Ingest events by storing them directly to the EventStore.
57+
"""Ingest events by writing them directly to the configured sink.
4758
4859
Args:
4960
events: List of control execution events to ingest
@@ -59,8 +70,9 @@ async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult:
5970
dropped = 0
6071

6172
try:
62-
# Store events
63-
processed = await self.store.store(events)
73+
sink_result = await self.sink.write_events(events)
74+
processed = sink_result.accepted
75+
dropped = sink_result.dropped
6476

6577
# Log to stdout if enabled
6678
if self.log_to_stdout:
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Server-side sink implementations for observability event delivery."""
2+
3+
from __future__ import annotations
4+
5+
from collections.abc import Sequence
6+
7+
from agent_control_models.observability import ControlExecutionEvent
8+
from agent_control_telemetry.sinks import SinkResult
9+
10+
from .store.base import EventStore
11+
12+
13+
class EventStoreControlEventSink:
14+
"""Write events through an EventStore-backed sink."""
15+
16+
def __init__(self, store: EventStore):
17+
self.store = store
18+
19+
async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult:
20+
"""Write events to the underlying store and report accepted/dropped counts."""
21+
stored = await self.store.store(list(events))
22+
dropped = max(len(events) - stored, 0)
23+
return SinkResult(accepted=stored, dropped=dropped)

server/tests/test_observability_direct_ingest.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from uuid import uuid4
88

99
from agent_control_models.observability import ControlExecutionEvent
10+
from agent_control_telemetry.sinks import SinkResult
1011
from agent_control_server.observability.ingest.direct import DirectEventIngestor
1112
from agent_control_server.observability.store.base import EventStore
1213

@@ -37,6 +38,15 @@ async def query_events(self, query): # pragma: no cover - not used
3738
raise NotImplementedError
3839

3940

41+
class CountingSink:
42+
def __init__(self) -> None:
43+
self.calls: list[list[ControlExecutionEvent]] = []
44+
45+
async def write_events(self, events: list[ControlExecutionEvent]) -> SinkResult:
46+
self.calls.append(events)
47+
return SinkResult(accepted=len(events), dropped=0)
48+
49+
4050
@pytest.mark.asyncio
4151
async def test_direct_ingestor_drops_on_store_error() -> None:
4252
# Given: an ingestor with a failing store
@@ -117,3 +127,30 @@ async def test_direct_ingestor_flush_noop() -> None:
117127

118128
# Then: no error is raised
119129
assert True
130+
131+
132+
@pytest.mark.asyncio
133+
async def test_direct_ingestor_accepts_control_event_sink() -> None:
134+
sink = CountingSink()
135+
ingestor = DirectEventIngestor(sink)
136+
events = [
137+
ControlExecutionEvent(
138+
trace_id="a" * 32,
139+
span_id="b" * 16,
140+
agent_name="agent-test-01",
141+
control_id=1,
142+
control_name="c",
143+
check_stage="pre",
144+
applies_to="llm_call",
145+
action="observe",
146+
matched=True,
147+
confidence=0.9,
148+
)
149+
]
150+
151+
result = await ingestor.ingest(events)
152+
153+
assert result.received == 1
154+
assert result.processed == 1
155+
assert result.dropped == 0
156+
assert sink.calls == [events]

telemetry/tests/test_sinks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from datetime import UTC, datetime
66

77
from agent_control_models import ControlExecutionEvent
8+
89
from agent_control_telemetry import (
910
BaseAsyncControlEventSink,
1011
BaseControlEventSink,

0 commit comments

Comments
 (0)