Skip to content

Commit c5761bb

Browse files
observability: phase 6.1 PR-C.3 — prepare_sync + fixture 010 (#27)
* prepare-sync: add active-span ContextVar + protocol docstring Step 1 of PR-C.3. Adds the engine-readable ``current_active_observer_span`` ContextVar in ``observability/correlation.py``, with the inverted-directionality docstring spec recommended (``observer→engine`` flow vs. PR-A's ``engine→observer`` set). Typed as ``object | None`` so the base package stays free of an OpenTelemetry import — the OTel observer writes ``Span`` instances; the engine treats the value opaquely and delegates the actual attach to a try-imported OTel helper. Also extends the ``Observer`` Protocol docstring to document the optional ``prepare_sync(event) -> None`` extension method: opt-in via ``hasattr``, no subclass or runtime_checkable Protocol required, engine calls only for ``"started"``-phase events with the same isolation contract as the async path. Engine wiring + OTel observer refactor land in subsequent steps. * prepare-sync: engine wiring + phase-gated forward in _dispatch Step 2 of PR-C.3. ``_dispatch`` now calls each subscribed observer's optional ``prepare_sync(event)`` synchronously BEFORE queueing for ``"started"``-phase events, with the same isolation contract as the async path: ``warnings.warn`` on exception, doesn't block queueing or subsequent events. Phase-gated: forwarding to ``prepare_sync`` only fires when ``"started"`` is in the subscribed observer's ``phases`` set — mirrors how ``deliver_loop`` filters async dispatch. A user who explicitly subscribes only to ``{"completed"}`` gets neither the sync prep nor the async started events, so the wrapper acts as a uniform phase shield across both axes. Hook is opt-in via ``hasattr`` — observers without ``prepare_sync`` are unaffected. OTel observer's ``prepare_sync`` method lands in step 3. * prepare-sync: OTelObserver sync core + ContextVar publish Step 3 of PR-C.3. Renames ``_handle_started`` → ``_open_started_span`` and bakes idempotency into it: a short-circuit at the top returns early if a span already exists for the event's ``_StackKey``. That covers the common case where ``prepare_sync`` opened the span synchronously in the engine task and the async ``__call__`` later re-fires for the same event — the second call becomes a true no-op rather than opening a duplicate span. Observer-attached-late and test paths that bypass ``prepare_sync`` still get the span opened via ``__call__``'s fall-through. Adds the public ``prepare_sync(event)`` method. Routing-gated (only ``"started"``-phase non-LLM events qualify), it calls ``_open_started_span`` and then publishes the just-opened span via ``_set_active_observer_span``. The engine's ``innermost`` reads the ContextVar in step 4 to attach the span into the OTel context so logs emitted from inside the node body — even on the first line, before any ``await`` — pick up the right trace_id/span_id via the OTel ``LoggingHandler``. The Token returned by ``_set_active_observer_span`` is discarded on purpose: last-writer-wins is the documented contract — the next ``prepare_sync`` call overwrites, and the task-local context dies with the invocation task. * prepare-sync: engine OTel attach around node bodies Step 4 of PR-C.3. Adds a try-imported OTel attach helper pair in compiled.py: ``_attach_active_observer_span`` reads ``current_active_observer_span`` (set synchronously by an observer's ``prepare_sync`` before queueing) and splices the span into the OTel context via ``opentelemetry.context.attach(set_span_in_context(span))``; ``_detach_active_observer_span`` pairs the detach in ``finally``. Both ``_step_function_node``'s and ``_step_fan_out_node``'s ``innermost`` closures now attach right after ``_dispatch_started`` returns and detach in a ``finally`` around the ``await node.run(...)`` / ``await node.run_with_context(...)`` call. That puts the attach scope around exactly the user-code window — so logs emitted on the FIRST line of a node body, before any ``await``, pick up the right ``trace_id``/``span_id`` via OTel's ``LoggingHandler`` — and the detach fires before ``_dispatch_completed`` queues the completed event or the merge runs. The except branch binds the OTel names to ``None`` so pyright narrows on ``if _otel_attach is None: ...`` rather than flagging "possibly unbound." Engine stays no-OTel-dep at runtime: installs without ``[otel]`` get a no-op attach/detach, the ContextVar stays ``None``, and nothing changes. Drives the load-bearing log-correlation cases landing in steps 5 and 6. * prepare-sync: drive fixture 010 (log correlation) Step 5 of PR-C.3. Promotes ``010-otel-log-correlation`` from ``_DEFERRED_FIXTURES`` to ``_SUPPORTED_FIXTURES`` and adds a hand-built driver covering both YAML sub-cases. Driver is hand-built rather than going through the conformance adapter — fixture 010's ``emits_log:`` directive isn't an adapter primitive (the adapter recognizes ``update_pure``, ``subgraph``, etc., and silently ignores anything else), and the sub-cases are small enough that hand-built python is clearer than threading a new directive through the adapter. Sub-case 1 (``log_records_carry_trace_span_correlation_ids``): two nodes ``a`` → ``b``, both emit a log on the FIRST line of their body (before any ``await`` — the load-bearing case ``prepare_sync`` exists to cover). Asserts all logs share a trace_id, each log's span_id matches the active node span at emission, and all carry the invocation's correlation_id. Sub-case 2 (``detached_subgraph_log_uses_detached_trace_id...``): outer invocation has a detached subgraph; logs across the boundary land in different traces but share the correlation_id. Outer log fires from per-node middleware on the SubgraphNode wrapper (SubgraphNode wrappers don't get ``prepare_sync`` per spec — the inner detached node handles attach for itself). Asserts trace_ids differ + correlation_id flows unchanged. Helpers ``_setup_isolated_log_bridge`` and ``_restore_log_state`` snapshot/restore root-logger handler+filter+factory state so the process-global ``install_log_bridge`` mutations don't bleed into neighboring tests. ``_enable_test_logger_at_info`` walks the fixture-010 logger up to ``INFO`` so YAML's ``level: INFO`` records actually flow through Python's logger-level filter to the bridge handler — undone on exit. * prepare-sync: load-bearing first-line-log unit test Step 6 of PR-C.3. Adds ``test_log_on_first_line_of_node_body_carries_node_span`` under ``tests/unit/test_observability_otel.py``: a focused single-node test that emits a log on the FIRST line of a node body (before any ``await``) and asserts the resulting log record carries the node span's ``trace_id`` AND ``span_id``. This is the regression target ``prepare_sync`` exists to cover. Without the synchronous engine-task observer prep: - The engine queues the started event for async dispatch. - The node body runs immediately in the engine task. - A log emitted on the first line, before any ``await``, runs before the OTel observer's ``__call__`` has fired on the worker task — so the span isn't open yet, OTel's ``get_current()`` returns an invalid span, and the log lands with ``trace_id=0`` / ``span_id=0``. With ``prepare_sync``, the observer creates the span synchronously in the engine task BEFORE queueing, publishes it via the ``current_active_observer_span`` ContextVar, and the engine attaches it to OTel context around the body. The first-line log sees the right span. Lives in unit/ (not just buried in fixture 010's driver) so a regression jumps straight to ``prepare_sync``- related code. Snapshot/restore the root logger's handlers, filters, factory, and the test logger's level so process-global ``install_log_bridge`` state doesn't bleed into other tests. * prepare-sync: clear active-span ContextVar after detach PR-C.3 review fixup. The cleared spec lifecycle reasoning at the coord thread covered only the happy path: "ContextVar gets overwritten on the next prepare_sync." If a subsequent prepare_sync raises or early-returns without publishing — for any reason — the engine reads the previous node's span and attaches it around the new node's body, producing wrong log correlation. Bound the "ContextVar is set" window to the node-body scope by clearing it to None in innermost's finally right after the OTel detach (both _step_function_node's and _step_fan_out_node's paths). Between dispatches and during merge / completed-event dispatch the ContextVar is now None, so a failing or early-returning prepare_sync can't reveal a stale span when the engine reads. Lifecycle ownership stays with the attach/detach scope rather than fanning out across observers in _dispatch. Updated current_active_observer_span's docstring to reflect the narrower lifecycle. * prepare-sync: detect & warn on async user implementations PR-C.3 review fixup. The opt-in-via-hasattr contract means pyright doesn't catch a user signature mismatch when a developer assumes "all observer methods are async" and defines ``async def prepare_sync(...)``. Today the call silently returns an unawaited coroutine — the prep work never runs and Python emits a delayed "coroutine was never awaited" RuntimeWarning at GC time, breaking log correlation in a way that's hard to trace back to the observer. In ``_dispatch``, after each ``prepare_sync(event)`` returns, check ``inspect.isawaitable(result)``. On hit: close the awaitable (suppresses the secondary RuntimeWarning) and emit an explicit ``warnings.warn`` naming the misconfiguration so it fails loudly at the call site. Post-call detection catches the common ``async def`` case AND the rarer lambda-returning-coroutine / ``functools.partial``-of-async cases — one check, all forms covered. * prepare-sync: warn on close-cleanup failure (codeql) PR-C.3 review fixup. The ``except Exception: pass`` after the best-effort ``close_method()`` call tripped CodeQL's ``py/empty-except`` rule on two surfaces (code-quality + advanced security). Cleanup is intentionally best-effort — a raise here MUST NOT propagate or break sibling observers' dispatch — but swallowing silently makes the rare cleanup-failure case invisible. Replace the empty pass with ``except Exception as close_error:`` followed by a ``warnings.warn`` mentioning the cleanup-failure. Same isolation contract preserved (no propagation, no sibling-blocking) but the swallow is now observable. CodeQL ``py/empty-except`` cleared on both surfaces. * prepare-sync: precise warn text + spec-derived test bodies PR-C.3 review fixup. - observer.py: rewrite the awaitable-from-prepare_sync warning. The old text claimed "did NOT run", but a user returning an ``asyncio.Task`` / ``Future`` may have work in flight on the loop — just not awaited at the prepare_sync call site. The contract violation is "no guarantee the prep completes before the node body," not "definitely doesn't run." Reworded to that shape and included ``type(result).__name__`` so the user can see which awaitable they returned at a glance. - tests/conformance/test_observability.py: sub-case 1's driver hardcoded the YAML message bodies ("node a executing" / "node b executing") for record filtering and lookup, even though it had already read ``emits_log.message`` from the spec to drive the node body. That duplicated spec data and made the test brittle to fixture wording changes. Derive a ``node_emit_messages`` map from ``nodes_spec`` up front; use the values for both record filtering and ``by_body`` indexing. Sub-case 2 already worked this way (uses ``outer_emit`` / ``inner_emit`` derived from spec); sub-case 1 now matches.
1 parent 9a29795 commit c5761bb

6 files changed

Lines changed: 783 additions & 73 deletions

File tree

src/openarmature/graph/compiled.py

Lines changed: 126 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,14 @@
6464
_reset_invocation_id,
6565
_reset_namespace_prefix,
6666
_set_active_dispatch,
67+
_set_active_observer_span,
6768
_set_active_observers,
6869
_set_attempt_index,
6970
_set_correlation_id,
7071
_set_fan_out_index,
7172
_set_invocation_id,
7273
_set_namespace_prefix,
74+
current_active_observer_span,
7375
)
7476

7577
from .edges import END, ConditionalEdge, EndSentinel, StaticEdge
@@ -99,6 +101,54 @@
99101
from .state import State
100102
from .subgraph import SubgraphNode
101103

104+
# Try-import OpenTelemetry attach primitives so the engine can splice an
105+
# observer-published span into the OTel context for the duration of a
106+
# node body. The engine treats the span value opaquely (writes by an
107+
# observer's ``prepare_sync``, reads via ``current_active_observer_span``)
108+
# and only touches OTel when both: (a) the extras are installed, and
109+
# (b) an observer actually published a span. Installs without ``[otel]``
110+
# get a no-op attach/detach pair; the observer ContextVar stays
111+
# ``None`` and nothing changes.
112+
#
113+
# The names are bound to ``None`` in the except branch so pyright
114+
# narrows correctly at call sites (``if _otel_attach is None: ...``)
115+
# rather than flagging "possibly unbound."
116+
try:
117+
from opentelemetry.context import attach as _otel_attach
118+
from opentelemetry.context import detach as _otel_detach
119+
from opentelemetry.trace.propagation import set_span_in_context as _otel_set_span_in_context
120+
except ImportError: # pragma: no cover — exercised only in non-otel installs
121+
_otel_attach = None # type: ignore[assignment]
122+
_otel_detach = None # type: ignore[assignment]
123+
_otel_set_span_in_context = None # type: ignore[assignment]
124+
125+
126+
def _attach_active_observer_span() -> object | None:
127+
"""Read ``current_active_observer_span``; if an observer published
128+
one and OTel is installed, attach the span into the OTel context
129+
so that any logs emitted from the next user-code scope (a node
130+
body) pick up the right ``trace_id``/``span_id`` via OTel's
131+
``LoggingHandler``.
132+
133+
Returns the OTel context token to hand back to
134+
:func:`_detach_active_observer_span` in ``finally``, or ``None``
135+
if no attach happened (no observer, no OTel, or both).
136+
"""
137+
if _otel_attach is None or _otel_set_span_in_context is None:
138+
return None
139+
span = current_active_observer_span()
140+
if span is None:
141+
return None
142+
return _otel_attach(_otel_set_span_in_context(cast("Any", span)))
143+
144+
145+
def _detach_active_observer_span(token: object | None) -> None:
146+
"""Pair to :func:`_attach_active_observer_span`. No-op when no
147+
attach was performed (token is ``None``)."""
148+
if token is None or _otel_detach is None:
149+
return
150+
_otel_detach(cast("Any", token))
151+
102152

103153
def _merge_partial[StateT: State](
104154
prior: StateT,
@@ -690,20 +740,35 @@ async def innermost(s: Any) -> Mapping[str, Any]:
690740
try:
691741
self._dispatch_started(context, current, namespace, step, s, attempt_index=attempt_index)
692742

743+
# Splice the observer-published span (if any) into the
744+
# OTel context so logs emitted from the FIRST line of
745+
# the node body — before any ``await`` — pick up the
746+
# right trace_id/span_id via OTel's LoggingHandler.
747+
# Detach in ``finally`` so retries / merge / completed
748+
# dispatch don't run with the span still active, and
749+
# clear ``current_active_observer_span`` to ``None`` so
750+
# the next dispatch that raises or early-returns from
751+
# ``prepare_sync`` can't reveal this node's span as a
752+
# stale value to the engine's read.
753+
otel_token = _attach_active_observer_span()
693754
try:
694-
partial = await node.run(s)
695-
except Exception as e:
696-
wrapped = NodeException(node_name=current, cause=e, recoverable_state=s)
697-
self._dispatch_completed(
698-
context,
699-
current,
700-
namespace,
701-
step,
702-
s,
703-
error=wrapped,
704-
attempt_index=attempt_index,
705-
)
706-
raise
755+
try:
756+
partial = await node.run(s)
757+
except Exception as e:
758+
wrapped = NodeException(node_name=current, cause=e, recoverable_state=s)
759+
self._dispatch_completed(
760+
context,
761+
current,
762+
namespace,
763+
step,
764+
s,
765+
error=wrapped,
766+
attempt_index=attempt_index,
767+
)
768+
raise
769+
finally:
770+
_detach_active_observer_span(otel_token)
771+
_set_active_observer_span(None)
707772

708773
try:
709774
merged = _merge_partial(s, partial, self.reducers, current)
@@ -1045,38 +1110,55 @@ async def innermost(s: Any) -> Mapping[str, Any]:
10451110
attempt_index=attempt_index,
10461111
fan_out_config=fan_out_event_config,
10471112
)
1113+
# Same OTel attach pattern as ``_step_function_node``'s
1114+
# ``innermost`` — splice the observer-published span
1115+
# into the OTel context so logs emitted from inside
1116+
# the fan-out node's own scope (middleware bodies,
1117+
# the dispatch machinery) carry the right
1118+
# trace_id/span_id. Per-instance bodies get their own
1119+
# attach inside their ``_step_function_node``
1120+
# innermost when the recursive invocation hits leaf
1121+
# nodes. ``finally`` clears the ContextVar so a later
1122+
# dispatch whose ``prepare_sync`` raises or early-
1123+
# returns can't reveal this fan-out's span as a stale
1124+
# value to the engine's read.
1125+
otel_token = _attach_active_observer_span()
10481126
try:
1049-
partial = await node.run_with_context(
1050-
s,
1051-
context,
1052-
pre_resolved_count=item_count,
1053-
pre_resolved_concurrency=(concurrency_resolved,),
1054-
)
1055-
except RuntimeGraphError as e:
1056-
self._dispatch_completed(
1057-
context,
1058-
current,
1059-
namespace,
1060-
step,
1061-
s,
1062-
error=e,
1063-
attempt_index=attempt_index,
1064-
fan_out_config=fan_out_event_config,
1065-
)
1066-
raise
1067-
except Exception as e:
1068-
wrapped = NodeException(node_name=current, cause=e, recoverable_state=s)
1069-
self._dispatch_completed(
1070-
context,
1071-
current,
1072-
namespace,
1073-
step,
1074-
s,
1075-
error=wrapped,
1076-
attempt_index=attempt_index,
1077-
fan_out_config=fan_out_event_config,
1078-
)
1079-
raise wrapped from e
1127+
try:
1128+
partial = await node.run_with_context(
1129+
s,
1130+
context,
1131+
pre_resolved_count=item_count,
1132+
pre_resolved_concurrency=(concurrency_resolved,),
1133+
)
1134+
except RuntimeGraphError as e:
1135+
self._dispatch_completed(
1136+
context,
1137+
current,
1138+
namespace,
1139+
step,
1140+
s,
1141+
error=e,
1142+
attempt_index=attempt_index,
1143+
fan_out_config=fan_out_event_config,
1144+
)
1145+
raise
1146+
except Exception as e:
1147+
wrapped = NodeException(node_name=current, cause=e, recoverable_state=s)
1148+
self._dispatch_completed(
1149+
context,
1150+
current,
1151+
namespace,
1152+
step,
1153+
s,
1154+
error=wrapped,
1155+
attempt_index=attempt_index,
1156+
fan_out_config=fan_out_event_config,
1157+
)
1158+
raise wrapped from e
1159+
finally:
1160+
_detach_active_observer_span(otel_token)
1161+
_set_active_observer_span(None)
10801162

10811163
try:
10821164
merged = _merge_partial(s, partial, self.reducers, current)

src/openarmature/graph/observer.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from __future__ import annotations
2929

3030
import asyncio
31+
import inspect
3132
import warnings
3233
from collections.abc import Iterable
3334
from dataclasses import dataclass, field
@@ -60,6 +61,25 @@ async def log_observer(event: NodeEvent) -> None:
6061
The event parameter is positional-only (`event, /`) so structural
6162
conformance doesn't pin you to that name — any of `event`, `_event`,
6263
`e`, etc. matches.
64+
65+
Optional ``prepare_sync`` extension
66+
-----------------------------------
67+
An observer MAY additionally define a synchronous method::
68+
69+
def prepare_sync(self, event: NodeEvent, /) -> None: ...
70+
71+
that the engine calls IN THE ENGINE TASK, BEFORE queueing the
72+
event for the async ``__call__``. This exists for observers that
73+
need to set up state — e.g., open a span and stash a handle in
74+
a ContextVar — that the engine itself must read synchronously
75+
before running the node body (otherwise logs emitted on the
76+
first line of the body wouldn't see the right span).
77+
78+
``prepare_sync`` is **opt-in via ``hasattr``** — no subclass or
79+
Protocol method required. Observers that don't define it skip
80+
the synchronous prep entirely; observers that do define it run
81+
only for ``"started"``-phase events, errors warned not propagated
82+
(same isolation contract as the async path per spec §6).
6383
"""
6484

6585
async def __call__(self, event: NodeEvent, /) -> None: ...
@@ -344,12 +364,85 @@ def take_step(self) -> int:
344364
def _dispatch(context: _InvocationContext, event: NodeEvent) -> None:
345365
"""Enqueue a node event for the delivery worker.
346366
367+
For ``"started"``-phase events, also call any subscribed observer's
368+
optional ``prepare_sync(event)`` synchronously — in the engine task,
369+
BEFORE queueing — so observers that need to publish per-event state
370+
the engine itself reads in the same engine-task scope (e.g., the
371+
OTel observer setting ``current_active_observer_span`` for the
372+
engine to attach into the OTel context) can do so before the node
373+
body runs.
374+
375+
Phase-gated forwarding: ``prepare_sync`` only fires when ``"started"``
376+
is in the subscribed observer's ``phases`` set, mirroring how the
377+
async ``deliver_loop`` filters dispatch. A user who explicitly
378+
subscribes only to ``{"completed"}`` doesn't get the synchronous
379+
prep — the wrapper acts as a uniform phase shield across both
380+
sync prep and async dispatch.
381+
382+
Errors from ``prepare_sync`` follow the same isolation contract as
383+
the async path per spec §6: don't propagate, don't break siblings,
384+
don't block the queueing or subsequent events. Reported via
385+
``warnings.warn``.
386+
347387
No-op when no observers exist for this depth — avoids paying the queue
348388
overhead for graphs that don't observe anything.
349389
"""
350390
observers = context.full_observers()
351391
if not observers:
352392
return
393+
if event.phase == "started":
394+
for subscribed in observers:
395+
if "started" not in subscribed.phases:
396+
continue
397+
prepare_sync = getattr(subscribed.observer, "prepare_sync", None)
398+
if prepare_sync is None:
399+
continue
400+
try:
401+
result = prepare_sync(event)
402+
except Exception as e:
403+
warnings.warn(
404+
f"observer prepare_sync raised {type(e).__name__}: {e}",
405+
stacklevel=2,
406+
)
407+
continue
408+
if inspect.isawaitable(result):
409+
# ``prepare_sync`` is opt-in via ``hasattr`` (not a
410+
# Protocol method) so pyright can't catch a user's
411+
# ``async def prepare_sync`` signature drift up front.
412+
# The call here would silently return an unawaited
413+
# coroutine — the prep work wouldn't run AND Python
414+
# would emit a delayed "coroutine was never awaited"
415+
# warning at GC time. Close the awaitable to suppress
416+
# that secondary noise and surface the misconfiguration
417+
# via our own explicit warn so it fails loudly at the
418+
# call site. ``getattr`` rather than ``hasattr``+method
419+
# access keeps pyright's strict-mode happy on the
420+
# ``Awaitable`` type (``.close`` lives on
421+
# ``Coroutine``, not the broader ``Awaitable``).
422+
close_method = getattr(result, "close", None)
423+
if close_method is not None:
424+
try:
425+
close_method()
426+
except Exception as close_error:
427+
# Cleanup is best-effort: a raise here MUST NOT
428+
# propagate or block sibling observers. Surface
429+
# via ``warnings.warn`` so the swallow is at
430+
# least observable if it ever fires (CodeQL
431+
# py/empty-except clears on this surface too).
432+
warnings.warn(
433+
f"observer prepare_sync close cleanup raised "
434+
f"{type(close_error).__name__}: {close_error}",
435+
stacklevel=2,
436+
)
437+
warnings.warn(
438+
f"observer prepare_sync returned an awaitable "
439+
f"({type(result).__name__}); prepare_sync MUST be sync "
440+
f"(define as `def`, not `async def`). The returned "
441+
f"awaitable will not be awaited and is NOT guaranteed "
442+
f"to complete before the node body starts; log "
443+
f"correlation may miss this node's span.",
444+
stacklevel=2,
445+
)
353446
context.queue.put_nowait(_QueuedItem(event=event, observers=observers))
354447

355448

0 commit comments

Comments
 (0)