Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions src/openarmature/graph/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
if TYPE_CHECKING:
from openarmature.llm.messages import ToolCall
from openarmature.llm.response import Usage
from openarmature.retrieval.response import EmbeddingUsage

# Sentinel empty metadata mapping for events constructed without a
# live caller-metadata snapshot (test helpers, synthetic events).
Expand Down Expand Up @@ -749,6 +750,125 @@ class LlmRetryAttemptEvent:
output_tool_calls: list["ToolCall"] = field(default_factory=list["ToolCall"])


# Spec: realizes graph-engine §6 + observability §5.5.9 -- the typed
# EmbeddingEvent / EmbeddingFailedEvent pair (proposal 0059,
# retrieval-provider capability). Dispatched on the observer delivery
# queue per EmbeddingProvider.embed() call: the success variant after the
# response is parsed + validated, the failure variant alongside a raised
# §7 category exception (mutually exclusive per call). Scalar
# fan_out_index / branch_name only; the lineage chains arrive uniformly
# across the provider events with proposal 0084 (v0.81.0). input_strings /
# request_extras are payload-bearing, populated unconditionally; observer-
# side privacy gates (OTel disable_provider_payload, Langfuse equivalents)
# apply at rendering, symmetric with LlmCompletionEvent.
@dataclass(frozen=True)
class EmbeddingEvent:
"""A typed embedding provider call event delivered to observers.

Carries identity, scoping, and outcome data for a successful
``EmbeddingProvider.embed()`` call. Observer code filters by type
discrimination (``isinstance(event, EmbeddingEvent)``).

The identity / scoping / request-side fields mirror
``LlmCompletionEvent``'s convention; the outcome fields are
embedding-specific:

- ``input_strings``: the input strings the call was made with;
non-nullable, populated unconditionally (privacy gating is
observer-side at rendering).
- ``input_count``: ``len(input_strings)``; a convenience field.
- ``dimensions``: the output vector dimensionality from the response;
``None`` when the response surfaced no determinate dimensionality.
- ``response_model`` / ``response_id``: the provider-returned model
and response identifiers; ``None`` when the provider returned none.
- ``usage``: the embedding token record; ``None`` when the call
returned no usage.
- ``request_params``: the embedding request parameters the caller
supplied (e.g. ``dimensions``). Absence-is-meaningful: only supplied
keys appear; an empty mapping when none.
- ``request_extras``: the runtime-config extras pass-through bag.
- ``active_prompt`` / ``active_prompt_group``: prompt-context
snapshots at embed-call time; ``None`` outside a binding.
- ``call_id``: a per-call disambiguator, always present, freshly
minted per ``embed()`` call.
"""

invocation_id: str
correlation_id: str | None
node_name: str
namespace: tuple[str, ...]
attempt_index: int
fan_out_index: int | None
branch_name: str | None
provider: str
model: str
response_id: str | None
response_model: str | None
# EmbeddingUsage is a string-typed forward reference per the
# TYPE_CHECKING import -- keeps the runtime import direction
# graph -> retrieval off the module-load path.
usage: "EmbeddingUsage | None"
latency_ms: float | None
input_strings: list[str]
input_count: int
dimensions: int | None
request_params: Mapping[str, Any]
request_extras: Mapping[str, Any]
active_prompt: Any
active_prompt_group: Any
call_id: str
caller_invocation_metadata: Mapping[str, AttributeValue] | None = None


# Spec: the failure sibling of EmbeddingEvent (proposal 0059). Dispatched
# whenever EmbeddingProvider.embed() raises a §7 category exception --
# covers both the provider-caught path and the pre-send validation raise
# (provider_invalid_request on an empty input list). Dispatched ALONGSIDE
# the exception, not in place of it; mutually exclusive with EmbeddingEvent
# on the same call. The response-side fields are absent (no response).
@dataclass(frozen=True)
class EmbeddingFailedEvent:
"""A typed embedding provider call failure event delivered to observers.

Carries identity, scoping, and failure-context data for an ``embed()``
call that raised a retrieval-provider category exception. Observer code
filters by type discrimination
(``isinstance(event, EmbeddingFailedEvent)``).

The identity / scoping / request-side field set mirrors
``EmbeddingEvent``; the response-side fields are absent. Failure-
specific fields:

- ``error_category``: the error category the call raised (one of the
embedding-applicable provider categories). Always present.
- ``error_type``: an optional impl-level / vendor-specific type or
code; ``None`` when unavailable.
- ``error_message``: a human-readable message; always present (the
empty string when the exception carried no message).
"""

invocation_id: str
correlation_id: str | None
node_name: str
namespace: tuple[str, ...]
attempt_index: int
fan_out_index: int | None
branch_name: str | None
provider: str
model: str
latency_ms: float | None
input_strings: list[str]
request_params: Mapping[str, Any]
request_extras: Mapping[str, Any]
active_prompt: Any
active_prompt_group: Any
call_id: str
error_category: str
error_message: str
error_type: str | None = None
caller_invocation_metadata: Mapping[str, AttributeValue] | None = None


# Spec: realizes pipeline-utilities §6.3 failure-isolation middleware
# (proposal 0050). Emitted by FailureIsolationMiddleware when it
# catches an exception escaping the inner chain and substitutes a
Expand Down Expand Up @@ -905,6 +1025,8 @@ class ToolCallFailedEvent:


__all__ = [
"EmbeddingEvent",
"EmbeddingFailedEvent",
"FailureIsolatedEvent",
"FanOutEventConfig",
"InvocationCompletedEvent",
Expand Down
18 changes: 12 additions & 6 deletions src/openarmature/graph/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from typing import Any, Literal, Protocol

from .events import (
EmbeddingEvent,
EmbeddingFailedEvent,
FailureIsolatedEvent,
InvocationCompletedEvent,
InvocationStartedEvent,
Expand Down Expand Up @@ -63,7 +65,9 @@
# retry to drive the per-attempt OTel span surface),
# and FailureIsolatedEvent (proposal 0050 §6.3 framework-emitted event,
# dispatched by FailureIsolationMiddleware when it catches an exception
# escaping the inner chain and substitutes a degraded partial update).
# escaping the inner chain and substitutes a degraded partial update);
# and EmbeddingEvent / EmbeddingFailedEvent (proposal 0059 typed embedding
# provider call events, dispatched on every EmbeddingProvider.embed()).
ObserverEvent = (
NodeEvent
| MetadataAugmentationEvent
Expand All @@ -75,6 +79,8 @@
| FailureIsolatedEvent
| ToolCallEvent
| ToolCallFailedEvent
| EmbeddingEvent
| EmbeddingFailedEvent
)


Expand All @@ -85,7 +91,7 @@ class Observer(Protocol):
signature qualifies, no subclass required. Plain functions, bound
methods, and class instances with `__call__` all work::

async def log_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
async def log_observer(event: ObserverEvent) -> None:
if isinstance(event, NodeEvent):
print(event.node_name, event.phase)

Expand All @@ -104,9 +110,9 @@ async def log_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
conformance doesn't pin you to that name; any of `event`, `_event`,
`e`, etc. matches.

Seven event variants reach observers. The signature is the union;
observers ``isinstance``-narrow on the first line and choose which
variants they handle.
The variants reaching observers are the :data:`ObserverEvent` members.
The signature is that union; observers ``isinstance``-narrow on the
first line and choose which variants they handle.
Comment thread
chris-colinsky marked this conversation as resolved.

- :class:`NodeEvent` — the started/completed/checkpoint phase
events. Subject to the ``phases`` filter on
Expand Down Expand Up @@ -778,7 +784,7 @@ def _dispatch(
) -> None:
"""Enqueue an event for the delivery worker.

Handles four event variants:
Handles the :data:`ObserverEvent` variants. The principal ones:

- :class:`NodeEvent`: the started/completed/checkpoint pair model.
For ``"started"``-phase events, also calls any subscribed
Expand Down
120 changes: 15 additions & 105 deletions src/openarmature/observability/correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,13 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from openarmature.graph.events import (
FailureIsolatedEvent,
InvocationCompletedEvent,
InvocationStartedEvent,
LlmCompletionEvent,
LlmFailedEvent,
LlmRetryAttemptEvent,
MetadataAugmentationEvent,
NodeEvent,
ToolCallEvent,
ToolCallFailedEvent,
)
from openarmature.graph.observer import SubscribedObserver
from openarmature.graph.observer import ObserverEvent, SubscribedObserver

Check notice

Code scanning / CodeQL

Cyclic import Note

Import of module
openarmature.graph.observer
begins an import cycle.
Comment thread
chris-colinsky marked this conversation as resolved.

# The dispatch callable accepts the full ObserverEvent union (the single
# source of truth, defined in graph.observer): a new capability event
# widens the contract in one place rather than across every dispatch
# accessor here.
_DispatchFn = Callable[[ObserverEvent], None]


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -221,44 +215,12 @@
# ---------------------------------------------------------------------------


_active_dispatch_var: ContextVar[
Callable[
[
NodeEvent
| MetadataAugmentationEvent
| InvocationStartedEvent
| InvocationCompletedEvent
| LlmCompletionEvent
| LlmFailedEvent
| LlmRetryAttemptEvent
| FailureIsolatedEvent
| ToolCallEvent
| ToolCallFailedEvent
],
None,
]
| None
] = ContextVar("openarmature.active_dispatch", default=None)


def current_dispatch() -> (
Callable[
[
NodeEvent
| MetadataAugmentationEvent
| InvocationStartedEvent
| InvocationCompletedEvent
| LlmCompletionEvent
| LlmFailedEvent
| LlmRetryAttemptEvent
| FailureIsolatedEvent
| ToolCallEvent
| ToolCallFailedEvent
],
None,
]
| None
):
_active_dispatch_var: ContextVar[_DispatchFn | None] = ContextVar(
"openarmature.active_dispatch", default=None
)


def current_dispatch() -> _DispatchFn | None:
"""Return the engine's dispatch callable for the current invocation,
or ``None`` outside any invocation.

Expand All @@ -272,65 +234,13 @@
return _active_dispatch_var.get()


def _set_active_dispatch(
dispatch: Callable[
[
NodeEvent
| MetadataAugmentationEvent
| InvocationStartedEvent
| InvocationCompletedEvent
| LlmCompletionEvent
| LlmFailedEvent
| LlmRetryAttemptEvent
| FailureIsolatedEvent
| ToolCallEvent
| ToolCallFailedEvent
],
None,
],
) -> Token[
Callable[
[
NodeEvent
| MetadataAugmentationEvent
| InvocationStartedEvent
| InvocationCompletedEvent
| LlmCompletionEvent
| LlmFailedEvent
| LlmRetryAttemptEvent
| FailureIsolatedEvent
| ToolCallEvent
| ToolCallFailedEvent
],
None,
]
| None
]:
def _set_active_dispatch(dispatch: _DispatchFn) -> Token[_DispatchFn | None]:
"""Set the engine's dispatch callable in scope. Internal —
engine-only."""
return _active_dispatch_var.set(dispatch)


def _reset_active_dispatch(
token: Token[
Callable[
[
NodeEvent
| MetadataAugmentationEvent
| InvocationStartedEvent
| InvocationCompletedEvent
| LlmCompletionEvent
| LlmFailedEvent
| LlmRetryAttemptEvent
| FailureIsolatedEvent
| ToolCallEvent
| ToolCallFailedEvent
],
None,
]
| None
],
) -> None:
def _reset_active_dispatch(token: Token[_DispatchFn | None]) -> None:
_active_dispatch_var.reset(token)


Expand Down
22 changes: 10 additions & 12 deletions src/openarmature/observability/langfuse/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from typing import Any, cast

from openarmature.graph.events import (
EmbeddingEvent,
EmbeddingFailedEvent,
FailureIsolatedEvent,
InvocationCompletedEvent,
InvocationStartedEvent,
Expand All @@ -41,6 +43,7 @@
ToolCallEvent,
ToolCallFailedEvent,
)
from openarmature.graph.observer import ObserverEvent
from openarmature.observability.lineage import is_strict_prefix

from .client import (
Expand Down Expand Up @@ -432,18 +435,7 @@ def __post_init__(self) -> None:

async def __call__(
self,
event: (
NodeEvent
| MetadataAugmentationEvent
| InvocationStartedEvent
| InvocationCompletedEvent
| LlmCompletionEvent
| LlmFailedEvent
| LlmRetryAttemptEvent
| FailureIsolatedEvent
| ToolCallEvent
| ToolCallFailedEvent
),
event: ObserverEvent,
) -> None:
if isinstance(event, InvocationStartedEvent):
self._handle_invocation_started(event)
Expand Down Expand Up @@ -484,6 +476,12 @@ async def __call__(
if isinstance(event, MetadataAugmentationEvent):
self._handle_metadata_augmentation(event)
return
# Proposal 0059 embedding events: the bundled Langfuse Embedding
# observation is a follow-up. Until it lands the events are safely
# ignored here rather than falling through to the NodeEvent phase
# dispatch (which would AttributeError on the absent ``phase``).
if isinstance(event, EmbeddingEvent | EmbeddingFailedEvent):
return
if event.phase == "started":
self._open_started_observation(event)
elif event.phase == "completed":
Expand Down
Loading