Skip to content

Commit 9a29795

Browse files
observability: phase 6.1 PR-C.2 — proposal 0013 + v0.10.0 (#26)
* observability: phase 6.1 PR-C.2 — proposal 0013 + v0.10.0 Implement spec graph-engine + observability v0.10.0 (proposal 0013): non-detached fan-out instances now synthesize per-instance dispatch spans nested between the fan-out node span and the inner-node spans (mirroring the detached path's layout, but landing in the parent trace rather than a fresh trace_id). The fan-out node span carries the four §5.4 attributes (item_count / concurrency / error_policy / parent_node_name) sourced from the new ``NodeEvent.fan_out_config`` field. Mechanism: typed ``FanOutEventConfig`` dataclass on the canonical event payload (events.py). Engine populates eagerly at fan-out entry in ``_step_fan_out_node`` — resolves item_count (from ``count`` mode or ``len(items_field)``) + concurrency once, threads through every dispatch site (started, all four error-path completed, deferred success-case completed). Pre-resolved values flow into ``FanOutNode.run_with_context`` via new ``pre_resolved_count`` / ``pre_resolved_concurrency`` kwargs so callable resolvers fire at most once per fan-out attempt scope (retry middleware re-uses the outer-scope resolution). The four contract corrections from PR #34's CoPilot review: 1. ``concurrency: int | None`` on the canonical payload (positive int or None for unbounded per pipeline-utilities §9.2). The ``None → 0`` translation lives only at the OTel attribute layer in ``_node_attrs`` per observability §5.4's bare-int sentinel convention. 2. All four ``FanOutEventConfig`` fields structurally required (frozen dataclass; partial resolution would raise on construction). The implementation-defined corner from spec-thread 03 (config resolution itself failing) doesn't surface in current fixtures; natural error propagation covers it. 3. Retried fan-out attempts carry ``fan_out_config``: the resolved config is constructed in the outer scope of ``_step_fan_out_node``; retry middleware re-enters ``innermost`` and re-uses the same outer-scope reference. No per-attempt re-resolution; partition is by node type, not event category. 4. ``parent_node_name`` is on per-instance spans only; the observer caches it from the fan-out node's ``started`` event in ``_InvState.fan_out_parent_node_name`` keyed by namespace prefix, applies at per-instance dispatch span synthesis, and clears at the fan-out node's ``completed`` event. Observer changes (``observer.py``): - ``_InvState`` gains ``fan_out_instance_spans`` (per-instance dispatch span store, keyed by ``prefix + (str(fan_out_index),)``) and ``fan_out_parent_node_name`` (cache). - ``_handle_started`` populates the cache when fan-out node events land. - ``_handle_completed`` closes per-instance dispatch spans on the fan-out node's own completion (children-before-parents ordering) and clears the cache entry. - ``_sync_subgraph_spans`` routes non-detached fan-out instance namespaces through the new ``_open_fan_out_instance_dispatch_span`` helper instead of opening a shared subgraph span at the prefix. - ``_resolve_parent_context`` finds the per-instance dispatch span before falling through, so inner-node events parent under their own per-instance dispatch span (not the shared fan-out node span). - ``_node_attrs`` populates the three §5.4 fan-out node span attributes from ``event.fan_out_config`` when present. - ``_drain_inv_state`` extended to drain ``fan_out_instance_spans`` in child→parent order. Pin sites bumped to v0.10.0 (three-place sync per CLAUDE.md): ``openarmature-spec`` submodule pointer to ``ff86945``, ``pyproject.toml`` ``tool.openarmature.spec_version``, ``src/openarmature/__init__.py`` ``__spec_version__``, ``tests/test_smoke.py`` drift-guard literal. ``OTelObserver.spec_version`` updates automatically via PR-A's ``_read_spec_version``. Tests: - ``tests/conformance/test_observability.py`` removes ``006-otel-fan-out-instance-attribution`` from ``_DEFERRED_FIXTURES`` and adds ``_run_fixture_006`` driver. Verifies the fixture's expected tree: 1 fan-out NODE span with item_count/concurrency/error_policy attributes; 3 per-instance dispatch spans with fan_out_index 0..2 and parent_node_name="process"; 3 compute spans (one per instance) parented under their own per-instance dispatch span. - ``tests/conformance/adapter.py``: ``_TracingFanOutNode.run_with_context`` accepts and forwards the new pre-resolved kwargs (pyright override-compatibility). - Existing fan-out tests (resolution-once-per-entry, concurrency-callable-once-per-entry, instance-middleware retry, fan-in ordering, etc.) stay green — pre-resolved- values plumbing didn't introduce double-resolution. 399 tests pass (was 392; net +7 from new fan-out path verification + fixture 006 going from skip to pass + harness work). 3 skipped (was 4; only fixture 010 remains, which is PR-C.3's scope). Pyright clean. PR-C.3 (observer ``prepare_sync`` + fixture 010) sits independently behind its own architectural piece; lands in either order. Phase 6.1 closes when both merge. * otel: address PR #26 review - Move fan_out runtime imports from compiled.py module top into function scope to break the textual import cycle CodeQL's py/cyclic-import rule flagged. fan_out has a TYPE_CHECKING back-reference to compiled (no runtime issue), but the static analyzer doesn't see the gate. Lazy-imports inside _invoke (FanOutNode for the isinstance check) and _step_fan_out_node (_resolve_concurrency / _resolve_count). Type-only FanOutNode reference moves to TYPE_CHECKING with `from __future__ import annotations` so signatures resolve without the runtime import. - Fix _drain_inv_state ordering: per-instance dispatch spans in fan_out_instance_spans are children of the fan-out NODE span (which lives in open_spans at depth 1). The previous shape closed all of open_spans before draining fan_out_instance_spans, ending the parent before its children during shutdown/abandon. Now drains open_spans in two phases (deep >= 2 / shallow = 1) with the per-instance drain between, so children-before-parents holds. - Remove dead `by_id` map from _run_fixture_006_case — scaffolding from an earlier draft; current assertions don't use it. * codeql: suppress py/unsafe-cyclic-import Both compiled.py and fan_out.py reference each other's types only via TYPE_CHECKING blocks. CodeQL's analyzer flags the textual cycle regardless of the gate; no runtime cycle exists. Removing either side breaks pyright's type resolution for generics across the boundary. Pyright's reportImportCycles already covers genuine runtime cycles at type-check time, so dropping this CodeQL rule loses no signal. * fan-out: surface resolver failures with event pair PR-C.2 hoisted item_count / concurrency resolution out of ``run_with_context`` to step entry so the eager ``FanOutEventConfig`` could ride on every fan-out node event. That moved resolver failures (callable raises, ``getattr`` on malformed state, items_field non-list) outside ``innermost``'s ``except Exception → NodeException`` block, dropping the started/completed event pair and the NodeException wrap that the inner path used to provide. Re-establish the contract by wrapping the resolution block: any failure now dispatches a started+completed pair with ``fan_out_config=None`` (we never built one) and surfaces as NodeException. Also drops the silent 0-coercion when ``items_field`` resolves to a non-list — raises with the same TypeError text ``_build_instance_states`` produces, so fan_out_config never reports a misleading ``item_count=0``. * tests: drop redundant Protocol assertion The ``_: ProjectionStrategy[S, Inner] = BoomProjection()`` line was a defensive belt-and-suspenders structural-conformance check, but the call-site ``add_subgraph_node(projection=BoomProjection())`` exercises the same Protocol check via its parameter annotation — pyright catches a mismatch there without needing the explicit assignment. Drop the line and update the comment to point at the call site. * tests: comment LLM step sentinel + parametrize list factories - Annotate ``step=-1`` in the synthetic LLM-event observer test to point readers at ``OpenAIProvider._llm_event`` (openai.py:643) where the same sentinel is minted for production LLM-provider span events that aren't tied to graph step sequencing. - ``_ParentState`` in the fan-out gating test now uses ``Field(default_factory=list[int])`` instead of the bare ``[]`` default, matching the parametrized factory shape used in ``test_checkpoint.py``'s ``_ParentState`` and the surrounding pyright-strict expectation that field types are fully known.
1 parent 5775fc5 commit 9a29795

13 files changed

Lines changed: 550 additions & 31 deletions

File tree

.github/codeql/codeql-config.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,23 @@ query-filters:
4848
# so dropping the CodeQL rule doesn't lose signal.
4949
- exclude:
5050
id: py/unused-import
51+
# ``py/unsafe-cyclic-import`` flags the textual import shape
52+
# without honoring ``if TYPE_CHECKING:`` gates. The two cases
53+
# in this codebase are mutual-typing pairs:
54+
#
55+
# - ``graph/compiled.py`` and ``graph/fan_out.py`` reference
56+
# each other's types only via TYPE_CHECKING blocks
57+
# (``compiled`` imports ``FanOutNode`` for type annotations;
58+
# ``fan_out`` imports ``CompiledGraph`` for the
59+
# ``FanOutConfig.subgraph`` field annotation). Both sides use
60+
# ``from __future__ import annotations`` so annotations are
61+
# strings; no runtime cycle exists.
62+
#
63+
# The canonical Python workaround for typed-module pairs is
64+
# exactly this TYPE_CHECKING gating. Removing either side
65+
# breaks pyright's type resolution for generics across the
66+
# boundary. Pyright's ``reportImportCycles`` already covers the
67+
# genuine runtime-cycle cases at type-check time, so dropping
68+
# the CodeQL rule doesn't lose signal.
69+
- exclude:
70+
id: py/unsafe-cyclic-import

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ Repository = "https://github.com/LunarCommand/openarmature-python"
3636
Specification = "https://github.com/LunarCommand/openarmature-spec"
3737

3838
[tool.openarmature]
39-
spec_version = "0.9.0"
39+
spec_version = "0.10.0"
4040

4141
[dependency-groups]
4242
dev = [

src/openarmature/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""OpenArmature — workflow framework for LLM pipelines and tool-calling agents."""
22

33
__version__ = "0.4.0rc0"
4-
__spec_version__ = "0.9.0"
4+
__spec_version__ = "0.10.0"

src/openarmature/graph/compiled.py

Lines changed: 154 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,26 @@
2121
`cast(MyState, ...)` at the call site.
2222
"""
2323

24+
from __future__ import annotations
25+
2426
import asyncio
2527
import time
2628
import uuid
2729
from collections.abc import Callable, Iterable, Mapping
2830
from dataclasses import dataclass, field
29-
from typing import Any, cast
31+
from typing import TYPE_CHECKING, Any, cast
32+
33+
if TYPE_CHECKING:
34+
# ``FanOutNode`` lives in ``.fan_out`` which has a TYPE_CHECKING
35+
# back-reference to ``CompiledGraph`` here. Importing at module
36+
# top would create a textual cycle CodeQL's
37+
# ``py/cyclic-import`` rule flags (no runtime issue —
38+
# ``fan_out``'s ``compiled`` import is itself TYPE_CHECKING-gated
39+
# — but the static analyzer doesn't see that). Type annotations
40+
# use the string form via ``from __future__ import annotations``;
41+
# runtime use (the ``isinstance`` check in ``_invoke``) imports
42+
# lazily inside the function.
43+
from .fan_out import FanOutNode
3044

3145
from pydantic import ValidationError
3246

@@ -67,8 +81,7 @@
6781
RuntimeGraphError,
6882
StateValidationError,
6983
)
70-
from .events import NodeEvent
71-
from .fan_out import FanOutNode
84+
from .events import FanOutEventConfig, NodeEvent
7285
from .middleware import ChainCall, Middleware, compose_chain
7386
from .nodes import Node
7487
from .observer import (
@@ -519,6 +532,12 @@ async def _invoke(
519532
current = skip_target
520533
continue
521534

535+
# Lazy import: keeps the textual cycle off the module
536+
# graph (``fan_out`` has a TYPE_CHECKING back-reference
537+
# to this module). Function-scope import is cheap once
538+
# cached; this branch fires once per fan-out step.
539+
from .fan_out import FanOutNode # noqa: PLC0415
540+
522541
if isinstance(node, FanOutNode):
523542
# Fan-out nodes are recognized as a distinct node type
524543
# per pipeline-utilities §9. Dispatched through
@@ -913,6 +932,100 @@ async def _step_fan_out_node(
913932
# hardcoded 0.
914933
attempt_counter: list[int] = [0]
915934

935+
# Resolve the fan-out config eagerly so the resolved values
936+
# ride on every fan-out node event (per spec proposal 0013,
937+
# v0.10.0: ``fan_out_config`` is populated on fan-out node
938+
# events including retried attempts). For ``items_field``
939+
# mode the count is ``len(parent_state.<items_field>)``; for
940+
# ``count`` mode it's ``_resolve_count``. ``_resolve_concurrency``
941+
# is pure regardless. Repeating these inside
942+
# ``FanOutNode.run_with_context`` is cheap and matches the
943+
# values surfaced here.
944+
# Lazy import: function-scope to avoid a module-top
945+
# textual cycle CodeQL flags. ``fan_out`` has a
946+
# TYPE_CHECKING back-reference to this module, so the
947+
# static-analyzer view of an importable cycle goes away
948+
# when the engine doesn't reach into ``fan_out`` at module
949+
# load time. Fires once per fan-out step.
950+
from .fan_out import _resolve_concurrency, _resolve_count # noqa: PLC0415
951+
952+
# Resolver failures (callable count/concurrency raising,
953+
# ``getattr`` on a malformed state, etc.) used to land inside
954+
# ``innermost``'s ``except Exception → NodeException`` block
955+
# below and produce a started/completed event pair via the
956+
# surrounding dispatches. Hoisting resolution out of
957+
# ``run_with_context`` for the eager ``FanOutEventConfig``
958+
# build moved them past that scope, so re-establish the
959+
# contract here: surface a started/completed pair with
960+
# ``fan_out_config=None`` (we never built one) and raise as
961+
# ``NodeException``.
962+
try:
963+
if node.config.items_field is not None:
964+
items_attr: Any = getattr(state, node.config.items_field, [])
965+
if not isinstance(items_attr, list):
966+
raise NodeException(
967+
node_name=current,
968+
cause=TypeError(f"items_field {node.config.items_field!r} is not a list at runtime"),
969+
recoverable_state=state,
970+
)
971+
item_count = len(cast("list[Any]", items_attr))
972+
else:
973+
item_count = _resolve_count(current, node.config, state)
974+
concurrency_resolved: int | None = _resolve_concurrency(current, node.config, state)
975+
fan_out_event_config = FanOutEventConfig(
976+
item_count=item_count,
977+
concurrency=concurrency_resolved,
978+
error_policy=node.config.error_policy,
979+
parent_node_name=current,
980+
)
981+
except NodeException as resolution_error:
982+
self._dispatch_started(
983+
context,
984+
current,
985+
namespace,
986+
step,
987+
state,
988+
attempt_index=0,
989+
fan_out_config=None,
990+
)
991+
self._dispatch_completed(
992+
context,
993+
current,
994+
namespace,
995+
step,
996+
state,
997+
error=resolution_error,
998+
attempt_index=0,
999+
fan_out_config=None,
1000+
)
1001+
raise
1002+
except Exception as resolution_error:
1003+
wrapped = NodeException(
1004+
node_name=current,
1005+
cause=resolution_error,
1006+
recoverable_state=state,
1007+
)
1008+
self._dispatch_started(
1009+
context,
1010+
current,
1011+
namespace,
1012+
step,
1013+
state,
1014+
attempt_index=0,
1015+
fan_out_config=None,
1016+
)
1017+
self._dispatch_completed(
1018+
context,
1019+
current,
1020+
namespace,
1021+
step,
1022+
state,
1023+
error=wrapped,
1024+
attempt_index=0,
1025+
fan_out_config=None,
1026+
)
1027+
raise wrapped from resolution_error
1028+
9161029
# Cell holding the FINAL successful attempt's
9171030
# (attempt_index, pre_state, merged); see same comment in
9181031
# ``_step_function_node``.
@@ -923,12 +1036,32 @@ async def innermost(s: Any) -> Mapping[str, Any]:
9231036
attempt_counter[0] += 1
9241037
attempt_token = _set_attempt_index(attempt_index)
9251038
try:
926-
self._dispatch_started(context, current, namespace, step, s, attempt_index=attempt_index)
1039+
self._dispatch_started(
1040+
context,
1041+
current,
1042+
namespace,
1043+
step,
1044+
s,
1045+
attempt_index=attempt_index,
1046+
fan_out_config=fan_out_event_config,
1047+
)
9271048
try:
928-
partial = await node.run_with_context(s, context)
1049+
partial = await node.run_with_context(
1050+
s,
1051+
context,
1052+
pre_resolved_count=item_count,
1053+
pre_resolved_concurrency=(concurrency_resolved,),
1054+
)
9291055
except RuntimeGraphError as e:
9301056
self._dispatch_completed(
931-
context, current, namespace, step, s, error=e, attempt_index=attempt_index
1057+
context,
1058+
current,
1059+
namespace,
1060+
step,
1061+
s,
1062+
error=e,
1063+
attempt_index=attempt_index,
1064+
fan_out_config=fan_out_event_config,
9321065
)
9331066
raise
9341067
except Exception as e:
@@ -941,14 +1074,22 @@ async def innermost(s: Any) -> Mapping[str, Any]:
9411074
s,
9421075
error=wrapped,
9431076
attempt_index=attempt_index,
1077+
fan_out_config=fan_out_event_config,
9441078
)
9451079
raise wrapped from e
9461080

9471081
try:
9481082
merged = _merge_partial(s, partial, self.reducers, current)
9491083
except (ReducerError, StateValidationError) as e:
9501084
self._dispatch_completed(
951-
context, current, namespace, step, s, error=e, attempt_index=attempt_index
1085+
context,
1086+
current,
1087+
namespace,
1088+
step,
1089+
s,
1090+
error=e,
1091+
attempt_index=attempt_index,
1092+
fan_out_config=fan_out_event_config,
9521093
)
9531094
raise
9541095

@@ -1021,6 +1162,7 @@ def finalize_completed(edge_error: RuntimeGraphError | None) -> None:
10211162
final_pre_state,
10221163
post_state=final_merged,
10231164
attempt_index=final_attempt_index,
1165+
fan_out_config=fan_out_event_config,
10241166
)
10251167
else:
10261168
self._dispatch_completed(
@@ -1031,6 +1173,7 @@ def finalize_completed(edge_error: RuntimeGraphError | None) -> None:
10311173
final_pre_state,
10321174
error=edge_error,
10331175
attempt_index=final_attempt_index,
1176+
fan_out_config=fan_out_event_config,
10341177
)
10351178

10361179
return _StepResult(state=merged_outer, finalize_completed=finalize_completed)
@@ -1044,6 +1187,7 @@ def _dispatch_started(
10441187
pre_state: State,
10451188
*,
10461189
attempt_index: int = 0,
1190+
fan_out_config: FanOutEventConfig | None = None,
10471191
) -> None:
10481192
_dispatch(
10491193
context,
@@ -1058,6 +1202,7 @@ def _dispatch_started(
10581202
parent_states=context.parent_states_prefix,
10591203
attempt_index=attempt_index,
10601204
fan_out_index=context.fan_out_index,
1205+
fan_out_config=fan_out_config,
10611206
),
10621207
)
10631208

@@ -1072,6 +1217,7 @@ def _dispatch_completed(
10721217
post_state: State | None = None,
10731218
error: RuntimeGraphError | None = None,
10741219
attempt_index: int = 0,
1220+
fan_out_config: FanOutEventConfig | None = None,
10751221
) -> None:
10761222
_dispatch(
10771223
context,
@@ -1086,6 +1232,7 @@ def _dispatch_completed(
10861232
parent_states=context.parent_states_prefix,
10871233
attempt_index=attempt_index,
10881234
fan_out_index=context.fan_out_index,
1235+
fan_out_config=fan_out_config,
10891236
),
10901237
)
10911238

src/openarmature/graph/events.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,52 @@
1616
from .state import State
1717

1818

19+
@dataclass(frozen=True)
20+
class FanOutEventConfig:
21+
"""Spec §6 + §5.4 (per spec proposal 0013, v0.10.0):
22+
fan-out node events carry the resolved configuration so backend
23+
observers can attribute the fan-out node span (``item_count`` /
24+
``concurrency`` / ``error_policy``) and synthesize per-instance
25+
spans with the right ``parent_node_name``.
26+
27+
Populated ONLY on ``started`` and ``completed`` events for a
28+
fan-out node itself (partition by node type, not event category —
29+
INCLUDES retried attempts of a fan-out node when retry middleware
30+
wraps it). All other events leave ``NodeEvent.fan_out_config``
31+
null.
32+
33+
Field shapes:
34+
35+
- ``item_count`` — non-negative int. The resolved instance count
36+
per pipeline-utilities §9 (matches ``count_field`` value when
37+
configured; matches ``len(items_field)`` in items_field mode).
38+
- ``concurrency`` — positive int OR ``None`` (unbounded). Per
39+
pipeline-utilities §9.2: zero or negative is rejected at config
40+
resolution time as ``fan_out_invalid_concurrency``. Backend
41+
mappings translate ``None`` to a sentinel at the attribute layer
42+
(e.g., ``openarmature.fan_out.concurrency = 0`` per
43+
observability §5.4) — that translation is observer-internal,
44+
not engine-internal.
45+
- ``error_policy`` — one of ``"fail_fast"`` or ``"collect"`` per
46+
pipeline-utilities §9.4.
47+
- ``parent_node_name`` — the fan-out node's name in the parent
48+
graph. Carried here for caching by backend observers when
49+
attributing per-instance spans (§5.4 mandates
50+
``openarmature.fan_out.parent_node_name`` on per-instance spans;
51+
the engine surfaces the name once on the fan-out node's started
52+
event, the observer caches and applies on every per-instance
53+
span it synthesizes).
54+
55+
All four fields MUST be present when ``fan_out_config`` is
56+
populated. Only ``concurrency`` is nullable.
57+
"""
58+
59+
item_count: int
60+
concurrency: int | None
61+
error_policy: str
62+
parent_node_name: str
63+
64+
1965
@dataclass(frozen=True)
2066
class NodeEvent:
2167
"""A single node-boundary event delivered to observers.
@@ -55,6 +101,9 @@ class NodeEvent:
55101
retries. `0` for nodes not wrapped by retry middleware.
56102
- `fan_out_index` is the 0-based index of this fan-out instance among
57103
its siblings. `None` for nodes not inside a fan-out.
104+
- `fan_out_config` carries resolved fan-out configuration on events
105+
from a fan-out NODE itself (per spec proposal 0013, v0.10.0). See
106+
:class:`FanOutEventConfig`. ``None`` on every other event.
58107
59108
Invariants:
60109
- On `started` events, `post_state` and `error` MUST both be None.
@@ -72,6 +121,7 @@ class NodeEvent:
72121
parent_states: tuple[State, ...]
73122
attempt_index: int = 0
74123
fan_out_index: int | None = None
124+
fan_out_config: FanOutEventConfig | None = None
75125

76126

77-
__all__ = ["NodeEvent"]
127+
__all__ = ["FanOutEventConfig", "NodeEvent"]

0 commit comments

Comments
 (0)