diff --git a/src/openarmature/checkpoint/backends/memory.py b/src/openarmature/checkpoint/backends/memory.py index f6dc348..b1d08cb 100644 --- a/src/openarmature/checkpoint/backends/memory.py +++ b/src/openarmature/checkpoint/backends/memory.py @@ -19,24 +19,26 @@ @dataclass(frozen=True) class FanOutInternalSaveBatching: - """Per-Checkpointer-instance configuration for §10.11.4 fan-out - internal save batching. + """Per-Checkpointer-instance configuration for fan-out internal + save batching. Applies ONLY to fan-out instance internal saves. Outermost-graph, subgraph-internal, and fan-out node completion saves remain - synchronous per §10.3. + synchronous. - ``flush_every``: flush the buffer every N buffered saves. ``0`` / negative means batching is disabled (every save flushes immediately). The buffered save count resets at each flush. - Buffered-but-unflushed saves are LOST on crash per §10.11.4; - on resume, instances whose completed state was buffered-only - revert to ``in_flight`` / ``not_started`` and re-run. The §10.11.1 - reducer correctness holds because their contributions hadn't - durably committed. + Buffered-but-unflushed saves are LOST on crash; on resume, + instances whose completed state was buffered-only revert to + ``in_flight`` / ``not_started`` and re-run. Reducer correctness + holds because their contributions hadn't durably committed. """ + # Spec pipeline-utilities §10.11.4 (fan-out internal save batching); + # §10.3 synchronous saves; §10.11.1 reducer correctness. + flush_every: int = 0 @@ -53,23 +55,22 @@ class InMemoryCheckpointer: from :meth:`load`; no serialization round-trip. (This is the feature: tests can assert on the saved state's identity.) - **State-migration eligibility:** none. Per spec §10.12.1, a - backend supports migration only when it can expose a structural - intermediate form of the loaded state independent of the current + **State-migration eligibility:** none. A backend supports + migration only when it can expose a structural intermediate form + of the loaded state independent of the current state class. This backend holds live typed instances by reference, so a version mismatch on resume raises ``CheckpointRecordInvalid`` rather than consulting the migration registry. - **Fan-out internal save batching** (per spec §10.11.4): optional - via the ``fan_out_internal_save_batching`` constructor parameter. + **Fan-out internal save batching**: optional via the + ``fan_out_internal_save_batching`` constructor parameter. Default is no batching (every save flushes immediately). When enabled, fan-out instance internal saves buffer in memory and flush every ``flush_every`` saves. Outermost-graph, subgraph-internal, and fan-out node completion saves bypass the buffer entirely (they remain synchronous). On crash, buffered - saves are lost — by design, per §10.11.4's documented cost - trade-off. + saves are lost — by design, a documented cost trade-off. """ # Per spec §10.12.1: in-memory storage holds live typed-state @@ -104,8 +105,8 @@ async def save(self, invocation_id: str, record: CheckpointRecord) -> None: previous record for the same id. Not durable across process restarts. - Per §10.11.4: outermost-graph, subgraph-internal, and - fan-out node completion saves are synchronous regardless of + Outermost-graph, subgraph-internal, and fan-out node + completion saves are synchronous regardless of the batching configuration. The engine routes fan-out instance internal saves through :meth:`save_fan_out_internal` instead; this method bypasses the buffer. @@ -121,8 +122,8 @@ async def save(self, invocation_id: str, record: CheckpointRecord) -> None: self._records[invocation_id] = record async def save_fan_out_internal(self, invocation_id: str, record: CheckpointRecord) -> None: - """Buffer a fan-out instance internal save under the §10.11.4 - batching policy. When batching is disabled (default), behaves + """Buffer a fan-out instance internal save under the batching + policy. When batching is disabled (default), behaves identically to :meth:`save` — every save is synchronously durable. When ``flush_every`` is positive, the save is buffered; the buffer flushes when the count reaches the @@ -142,10 +143,10 @@ async def save_fan_out_in_flight_failure( invocation_id: str, record: CheckpointRecord, ) -> None: - """Buffer an "instance failed mid-execution" save under §10.11.4 - batching. The failure save records the in_flight state of an - instance whose terminal inner node raised; this save closes the - in_flight observability gap (per §10.11) for instances whose + """Buffer an "instance failed mid-execution" save under the + batching policy. The failure save records the in_flight state + of an instance whose terminal inner node raised; this save + closes the in_flight observability gap for instances whose subgraphs have no sibling-completed save to piggyback on. Under batching, this save buffers BUT does NOT count toward @@ -184,8 +185,8 @@ def _flush_invocation_buffer_locked(self, invocation_id: str) -> None: async def load(self, invocation_id: str) -> CheckpointRecord | None: """Return the saved record for ``invocation_id`` or ``None`` - if nothing has been saved under that id. Per §10.11.4: - buffered-but-unflushed fan-out internal saves are NOT visible + if nothing has been saved under that id. Buffered-but-unflushed + fan-out internal saves are NOT visible to ``load`` — that's the crash-loses-buffered contract. To simulate a crash before the buffer flushes, drop the Checkpointer reference; the buffer is in-memory only. diff --git a/src/openarmature/checkpoint/backends/sqlite.py b/src/openarmature/checkpoint/backends/sqlite.py index 22849a6..5b2b04d 100644 --- a/src/openarmature/checkpoint/backends/sqlite.py +++ b/src/openarmature/checkpoint/backends/sqlite.py @@ -281,8 +281,8 @@ def _decode(self, blob: bytes, recorded_mode: str, invocation_id: str) -> Any: async def save(self, invocation_id: str, record: CheckpointRecord) -> None: """Upsert ``record`` under ``invocation_id``. The state, - completed positions, parent-state stack, and (per proposal 0009) - per-fan-out-node progress are serialized via the configured + completed positions, parent-state stack, and per-fan-out-node + progress are serialized via the configured :class:`SerializationMode` and written in a single statement. Writes are durable on return (WAL mode, per-write fsync at the SQLite layer).""" diff --git a/src/openarmature/checkpoint/errors.py b/src/openarmature/checkpoint/errors.py index cf51654..38786d4 100644 --- a/src/openarmature/checkpoint/errors.py +++ b/src/openarmature/checkpoint/errors.py @@ -60,14 +60,13 @@ class CheckpointRecordInvalid(CheckpointError): """Raised when ``Checkpointer.load(X)`` returns a record whose schema is incompatible with the current graph: state shape mismatch, missing required fields, OR a post-migration state - that fails to deserialize against the current state class (per - spec §10.12.4). Non-transient. + that fails to deserialize against the current state class. + Non-transient. Note: raw ``schema_version`` mismatches no longer route here. They now flow through ``CheckpointStateMigrationMissing`` (no chain registered) or ``CheckpointStateMigrationFailed`` (chain - application raised) per spec §10.10's three-way category - distinction. + application raised) — a three-way category distinction. """ category = "checkpoint_record_invalid" @@ -80,8 +79,8 @@ def __init__(self, invocation_id: str, message: str) -> None: class CheckpointStateMigrationMissing(CheckpointError): """Raised on resume when the saved record's ``schema_version`` does not match the current state class's ``schema_version`` AND - no chain of registered migrations bridges the two. Non-transient - per spec §10.10; the user MUST register a migration (or pin + no chain of registered migrations bridges the two. Non-transient; + the user MUST register a migration (or pin their state to the saved version) for the resume to succeed. Carries the saved-from / current-to versions and a description @@ -112,18 +111,16 @@ def __init__( class CheckpointStateMigrationChainAmbiguous(CheckpointError): - """Raised when the registered migration graph is ambiguous per - spec §10.10 / §10.12 (proposal 0018, spec v0.16.0): + """Raised when the registered migration graph is ambiguous: - - Duplicate-pair case (§10.12.1): two migrations register with the - same ``(from_version, to_version)`` pair. Raised at registration + - Duplicate-pair case: two migrations register with the same + ``(from_version, to_version)`` pair. Raised at registration time so the user sees the ambiguity before any resume attempt. - - Multi-shortest-path case (§10.12.2): the registered migration - graph has multiple distinct shortest paths between the saved - and current versions (e.g., a diamond ``v1→v2→v4`` + ``v1→v3→v4``). - Spec accepts either compile-time detection (recommended) or - load-time detection (this impl runs the check inside BFS at - resume time). + - Multi-shortest-path case: the registered migration graph has + multiple distinct shortest paths between the saved and current + versions (e.g., a diamond ``v1→v2→v4`` + ``v1→v3→v4``). Either + compile-time detection (recommended) or load-time detection is + acceptable (this impl runs the check inside BFS at resume time). Non-transient: retrying without changing the migration graph will not succeed. Carries ``from_version`` / ``to_version`` when @@ -149,7 +146,7 @@ def __init__( class CheckpointStateMigrationFailed(CheckpointError): """Raised on resume when a registered migration function raises - during chain application (per spec §10.12.2). The migration's + during chain application. The migration's exception is preserved as ``__cause__``. Non-transient by default: a buggy migration is deterministic, so retrying without changing the migration code will not succeed. diff --git a/src/openarmature/checkpoint/migration.py b/src/openarmature/checkpoint/migration.py index 4244c89..23516d2 100644 --- a/src/openarmature/checkpoint/migration.py +++ b/src/openarmature/checkpoint/migration.py @@ -1,12 +1,12 @@ """State migration types and registry. -Realizes pipeline-utilities §10.12 (proposal 0014). A -``StateMigration`` describes one edge in the migration graph; +A ``StateMigration`` describes one edge in the migration graph; ``MigrationRegistry`` holds the ordered set and resolves chains via BFS. Ambiguity (duplicate ``(from, to)`` pairs OR multiple distinct shortest paths between the same source/sink) is a -configuration-style error per §10.12.1 / §10.12.2. +configuration-style error. """ +# Realizes pipeline-utilities §10.12 (proposal 0014). from __future__ import annotations @@ -29,9 +29,9 @@ class StateMigration: chain (or for final deserialization into the current state class). Migrations MUST be pure: deterministic, no I/O, no implicit - state. The framework does not police purity per spec §10.12.2 - ("the contract is documented, not policed"); violating it - risks non-deterministic resume. + state. The framework does not police purity (the contract is + documented, not policed); violating it risks non-deterministic + resume. """ from_version: str @@ -46,15 +46,14 @@ class MigrationRegistry: - Two migrations with the same ``from_version`` AND ``to_version`` raise ``CheckpointStateMigrationChainAmbiguous`` - directly per spec §10.10 (proposal 0018) so the canonical - category surfaces at the registration boundary without any - wrapping by the builder. + directly so the canonical category surfaces at the registration + boundary without any wrapping by the builder. - Two migrations with the same ``from_version`` and different ``to_version`` are permitted (branched migration graph; chain resolution picks a path or raises ambiguity if multiple shortest paths exist). - Resolution-time semantics (per §10.12.2): + Resolution-time semantics: - BFS from ``record.schema_version`` to ``current.schema_version``. BFS naturally finds the shortest @@ -133,8 +132,7 @@ def resolve_chain( Raises ``CheckpointStateMigrationChainAmbiguous`` if multiple distinct shortest paths exist between - ``from_version`` and ``to_version`` (ambiguous chain per - spec §10.10 / §10.12.2; proposal 0018 / spec v0.16.0). + ``from_version`` and ``to_version`` (an ambiguous chain). Same canonical category as the duplicate-pair detection in ``register``; one type for chain ambiguity regardless of when it surfaces. diff --git a/src/openarmature/checkpoint/protocol.py b/src/openarmature/checkpoint/protocol.py index 426e36f..1804d49 100644 --- a/src/openarmature/checkpoint/protocol.py +++ b/src/openarmature/checkpoint/protocol.py @@ -23,12 +23,13 @@ produce records in the shipping version (atomic-restart contract). ``CheckpointRecord.schema_version`` carries the user-facing -state-schema identifier per spec §10.2 (proposal 0014 repurposes -the field from the original backend-internal record-shape role). -The framework reads ``type(state).schema_version`` at save time; -on load, version mismatches route through the migration registry -(per §10.12) rather than a strict equality check. +state-schema identifier. The framework reads +``type(state).schema_version`` at save time; on load, version +mismatches route through the migration registry rather than a strict +equality check. """ +# Spec pipeline-utilities §10.2 (proposal 0014): schema_version is the +# state-schema identifier; §10.12 migration registry on mismatch. from __future__ import annotations @@ -98,14 +99,14 @@ class FanOutInstanceProgress: correctness contract: an instance marked ``completed`` MUST have its contribution recorded into the accumulator AND that contribution MUST be reflected in ``result``. Reducer composition - rules (§10.11.1) depend on this exactly-once guarantee. + rules depend on this exactly-once guarantee. - ``result``: for ``completed`` instances, the durable contribution to the fan-out accumulator (a success value for the ``target_field`` bucket, or under ``collect`` error policy an error entry for the ``errors_field`` bucket). Typed per the parent state schema's ``target_field`` / ``errors_field`` - (representation is implementation-defined per §10.11; Python - stores as ``Any`` since dynamic typing absorbs the variance). + (representation is implementation-defined; Python stores as + ``Any`` since dynamic typing absorbs the variance). Unused for ``in_flight`` and ``not_started``. - ``result_is_error``: boolean discriminator for ``completed`` entries — ``True`` when the contribution is a ``collect``-mode @@ -113,8 +114,8 @@ class FanOutInstanceProgress: when the contribution is a success value that rolls forward into ``target_field``. MUST be ``False`` for ``in_flight`` and ``not_started`` (the value of ``result`` is ignored for those). - Per proposal 0027 (spec v0.21.0): implementations MUST consult - this field on resume rather than inferring routing from + Implementations MUST consult this field on resume rather than + inferring routing from ``result``'s shape — heuristic inspection would misclassify user state values that happen to match the engine's error-record shape. @@ -148,7 +149,7 @@ class FanOutProgress: the fan-out (empty for outermost-graph fan-outs). Disambiguates fan-outs of the same name in different nested-subgraph contexts. - ``instance_count``: the resolved instance count for this fan-out - (per pipeline-utilities §9 count or items_field mode). + (count or items_field mode). - ``instances``: a tuple of per-instance entries indexed by ``fan_out_index`` (``instances[i]`` is the entry for ``fan_out_index=i``). Length equals ``instance_count``. @@ -167,7 +168,7 @@ class CheckpointRecord: Frozen: backends MUST treat the record as immutable. The engine builds a fresh record per ``completed`` event rather than mutating - a shared one. The ``fan_out_progress`` field (per §10.11) carries + a shared one. The ``fan_out_progress`` field carries per-fan-out-node entries when one or more fan-outs are in flight at save time; an empty tuple means no fan-out progress to record. """ @@ -234,8 +235,8 @@ class Checkpointer(Protocol): plain dict, JSON tree, or similar) that is independent of the current state class. JSON-encoded backends naturally satisfy this; backends that store live typed state instances or use - class-bound serialization (pickle) cannot. Per spec §10.12.1, - backends that cannot expose the intermediate MUST raise + class-bound serialization (pickle) cannot. Backends that cannot + expose the intermediate MUST raise ``CheckpointRecordInvalid`` on version mismatch even when migrations are registered; the registry has no chance to bridge. diff --git a/src/openarmature/graph/builder.py b/src/openarmature/graph/builder.py index 48d95ee..b71b663 100644 --- a/src/openarmature/graph/builder.py +++ b/src/openarmature/graph/builder.py @@ -306,11 +306,11 @@ def add_parallel_branches_node( errors_field: str | None = None, middleware: Iterable[Middleware] | None = None, ) -> Self: - """Register a parallel-branches node per pipeline-utilities §11. + """Register a parallel-branches node. ``branches`` is a mapping from non-empty branch name to a :class:`BranchSpec`. Insertion order is preserved and is - the dispatch + merge order per §11.8. + the dispatch + merge order. Validates at registration: @@ -397,7 +397,7 @@ def with_state_migration( to_version: str, migrate: Callable[[Any], Any], ) -> Self: - """Register one state migration per pipeline-utilities §10.12. + """Register one state migration. On resume, when the saved record's ``schema_version`` does not match the current state class's ``schema_version``, the engine @@ -406,15 +406,14 @@ def with_state_migration( ``parent_states``) before deserialization. Migrations MUST be pure: deterministic, no I/O, no implicit - state. The framework does not police purity (per §10.12.2), - but violating it risks non-deterministic resume. + state. The framework does not police purity, but violating it + risks non-deterministic resume. Raises ``CheckpointStateMigrationChainAmbiguous`` at registration if the ``(from_version, to_version)`` pair is - already registered (per spec §10.10 / §10.12.1; proposal - 0018 / spec v0.16.0). Also raises ``ValueError`` if + already registered. Also raises ``ValueError`` if ``to_version`` is the empty-string sentinel (the un-declared - marker per §10.2 is not a valid chain target). + marker is not a valid chain target). """ self._migration_registry.register( StateMigration( diff --git a/src/openarmature/graph/compiled.py b/src/openarmature/graph/compiled.py index 8b65a6e..9036b96 100644 --- a/src/openarmature/graph/compiled.py +++ b/src/openarmature/graph/compiled.py @@ -240,15 +240,14 @@ def _merge_partial[StateT: State]( class _StepResult[StateT: State]: """Return shape of the per-step dispatchers (``_step_function_node`` / ``_step_subgraph_node`` / - ``_step_fan_out_node``) under the proposal-0012 v0.9.0 swap. + ``_step_fan_out_node``). - Spec graph-engine §3 step 3 (revised) requires the - ``completed`` event for the just-completed node to fire AFTER + The ``completed`` event for the just-completed node fires AFTER edge evaluation completes — so that edge-resolution failures (``routing_error``, ``edge_exception``) land on the preceding node's completed event with ``error`` populated, sharing the started/completed pair rather than producing a separate event - pair (§6 revised). + pair. The step dispatchers can't call ``_dispatch_completed`` for the success path themselves anymore, because the outcome @@ -268,9 +267,9 @@ class _StepResult[StateT: State]: For ``_step_subgraph_node``, the wrapper is transparent per fixture 013 (no started/completed pair); ``finalize_completed`` is a no-op closure so edge errors after a subgraph wrapper - propagate silently per proposal 0012's "preceding unit's - pair" framing applied to a unit that never had one. Same for - middleware that short-circuits without invoking ``next``. + propagate silently — the "preceding unit's pair" framing applied + to a unit that never had one. Same for middleware that short- + circuits without invoking ``next``. """ state: StateT @@ -282,7 +281,7 @@ def _no_op_finalize(_edge_error: RuntimeGraphError | None) -> None: didn't dispatch a started/completed pair — subgraph wrappers (transparent per fixture 013) and middleware that short- circuits without invoking ``next``. Edge errors propagate - silently per proposal 0012 + fixture 013.""" + silently per fixture 013.""" # Helpers for the proposal 0009 per-instance fan-out resume contract. @@ -332,7 +331,7 @@ def _project_fan_out_progress( """Project the engine-internal mutable per-fan-out state into the frozen :class:`FanOutProgress` shape on a saved record. - Per §10.11's snapshot semantics, a save fires with ALL concurrent + Per the snapshot semantics, a save fires with ALL concurrent fan-out instances' states captured at the moment of the save — not just the one whose ``completed`` event triggered the save. This projection enumerates the whole dict; the engine save site @@ -373,18 +372,17 @@ def _restore_fan_out_progress_state( into the mutable per-fan-out tracking dict that ``FanOutNode`` consults to decide which instances to skip vs re-run. - Extra-output state isn't preserved across resume — the spec models - ``result`` as a single accumulator entry and is silent on + Extra-output state isn't preserved across resume: ``result`` is + modeled as a single accumulator entry, with nothing recorded for ``extra_outputs``. Reconstructing them would require either - serializing them on the record (a spec change) or recomputing them + serializing them on the record (a record-format change) or recomputing them (defeating the point of skip-on-resume). Fixtures don't exercise ``extra_outputs`` on the resume path; if a future workload needs them, surface as a follow-on. ``result_is_error`` is read verbatim from the saved record's - explicit field per spec §10.11 (proposal 0027). The pre-0027 - structural-pattern heuristic is gone — the spec mandates the - explicit field as the authoritative discriminator because the + explicit field. The earlier structural-pattern heuristic is gone: + the explicit field is the authoritative discriminator because the user's state schema can legitimately contain values that match the engine's canonical error-record shape, and a heuristic would misclassify them. @@ -420,8 +418,8 @@ async def _save_fan_out_internal( """Route a fan-out-internal save through the checkpointer's optional batching seam. - Per spec §10.11.4, Checkpointer backends MAY support batching - scoped to fan-out internal saves. When the backend exposes a + Checkpointer backends MAY support batching scoped to fan-out + internal saves. When the backend exposes a ``save_fan_out_internal`` coroutine, route there so it can buffer or flush per its configuration. Otherwise, fall back to the standard ``save`` — non-batching backends see no behavioral change. @@ -439,8 +437,8 @@ async def _save_fan_out_in_flight_failure( # pyright: ignore[reportUnusedFuncti record: CheckpointRecord, ) -> None: """Route an "instance failed mid-execution" save through the - checkpointer's failure-save seam (§10.11.4 + the in_flight - observability gap §10.11). + checkpointer's failure-save seam (closing the in_flight + observability gap). Backends that expose ``save_fan_out_in_flight_failure`` get the save directly; under batching, the typical implementation @@ -461,8 +459,8 @@ class _MigrationSummary: """Per-resume migration-chain metadata threaded out of ``_migrate_record`` so the engine can dispatch an ``openarmature.checkpoint.migrate`` observer event after the - invocation context is built (per spec §6 cross-ref in proposal - 0014). Carried on the synthetic ``NodeEvent.pre_state`` + invocation context is built. Carried on the synthetic + ``NodeEvent.pre_state`` payload for ``phase="checkpoint_migrated"``; the OTel observer reads it to emit the span. """ @@ -479,8 +477,8 @@ def _apply_migration_step( ) -> Any: """Apply one migration step to one value (outer state or one parent-state entry). Wraps the user-supplied migration function's - raise as ``CheckpointStateMigrationFailed`` per spec §10.12.2. - The original exception rides ``__cause__``. + raise as ``CheckpointStateMigrationFailed``. The original + exception rides ``__cause__``. """ try: return migration.migrate(value) @@ -618,19 +616,18 @@ async def _migrate_record( has ``state`` + ``parent_states`` mapped through the chain. ``summary`` carries the chain's metadata so the caller can dispatch a ``checkpoint_migrated`` observer event after the - invocation context exists (per the spec §6 cross-ref in - proposal 0014). + invocation context exists. Caller is responsible for the post-migration deserialization - step (§10.12.4): if the migrated state cannot deserialize - against the current state class, the resulting failure - surfaces as ``CheckpointRecordInvalid``. - - Spec §10.12.2 says "parent states MUST be treated as carrying - the same ``schema_version`` as the outer record." We apply - the same chain to every entry in ``parent_states`` lockstep - with the outer state. Future per-parent versioning would - need a spec follow-on. + step: if the migrated state cannot deserialize against the + current state class, the resulting failure surfaces as + ``CheckpointRecordInvalid``. + + Parent states MUST be treated as carrying the same + ``schema_version`` as the outer record, so we apply the same + chain to every entry in ``parent_states`` lockstep with the + outer state. Future per-parent versioning would need a + follow-on. """ # Eligibility check first per §10.12.1: backends that hold # typed in-memory state or class-bound serialization cannot @@ -904,7 +901,7 @@ async def invoke( - ``correlation_id`` is the per-invocation cross-backend join key. Caller-supplied or auto-generated UUIDv4 when absent. Preserved unchanged across ``resume_invocation``. - - ``invocation_id`` (proposal 0039) is the per-attempt id. + - ``invocation_id`` is the per-attempt id. Caller-supplied or auto-generated UUIDv4 when absent; a caller value MAY be any non-empty URL-safe string. Applies to the fresh-invocation path only — a ``resume_invocation`` @@ -925,7 +922,7 @@ async def invoke( own retry logic if transient backend failures should be reattempted. - **Caller-supplied invocation metadata (proposal 0034).** + **Caller-supplied invocation metadata.** - ``metadata`` is an optional mapping of arbitrary ``key → value`` entries the framework propagates to every @@ -935,7 +932,7 @@ async def invoke( the ``openarmature.*`` or ``gen_ai.*`` reserved namespaces. Validation runs synchronously at the API boundary; rule violations raise ``ValueError`` BEFORE any work begins. - - Per spec §5.6 the OTel observer emits each entry as an + - The OTel observer emits each entry as an ``openarmature.user.`` cross-cutting span attribute on every span and OTel log record. The Langfuse observer merges each entry into ``trace.metadata`` AND every @@ -1397,7 +1394,7 @@ async def _step_function_node( ) -> _StepResult[StateT]: """Run one function-node step through the middleware chain. - Per pipeline-utilities §3, the runtime chain composes: + The runtime chain composes: [per_graph...] -> [per_node...] -> innermost @@ -1406,15 +1403,15 @@ async def _step_function_node( to ``innermost`` is one attempt; middleware that calls ``next`` repeatedly (e.g., retry) produces multiple attempts and therefore multiple started/completed event pairs from the engine, each - tagged with an incrementing ``attempt_index`` (graph-engine §6). + tagged with an incrementing ``attempt_index``. - Per proposal-0012 v0.9.0: the success-case ``completed`` event - for the FINAL successful attempt fires AFTER edge eval, not + The success-case ``completed`` event for the FINAL successful + attempt fires AFTER edge eval, not inside ``innermost``. Failure-case dispatches (``node_exception`` / ``reducer_error`` / ``state_validation_error``) stay inline in ``innermost`` — those errors short-circuit before edge eval can run, so the - spec's "before the failure propagates" MUST is preserved by + "before the failure propagates" requirement is preserved by the inline dispatch. Returns a :class:`_StepResult` carrying the merged state + @@ -1523,7 +1520,7 @@ async def innermost(s: Any) -> Mapping[str, Any]: innermost, ) - # Spec observability §3 / Phase 6 LLM-span hook: capability + # Spec observability §3 LLM-span hook: capability # backends emitting from inside a node body (the # llm-provider span instrumentation in OpenAIProvider) need # to find the observers active for THIS invocation, which @@ -1640,7 +1637,7 @@ async def _step_subgraph_node( ) -> _StepResult[StateT]: """Run one subgraph-as-node step through the parent's middleware chain. - Per pipeline-utilities §4: the parent's per-graph middleware plus + The parent's per-graph middleware plus any per-node middleware on the SubgraphNode wraps the subgraph dispatch as a single atomic call. The subgraph's INTERNAL nodes get their own middleware via the subgraph's own CompiledGraph; @@ -1650,12 +1647,11 @@ async def _step_subgraph_node( events come from the subgraph's internal node executions (per fixture 013). - Per proposal-0012 v0.9.0 + spec coordination: edge errors - AFTER a transparent subgraph wrapper propagate to the caller - as ``RuntimeGraphError`` per §4 WITHOUT an associated + Edge errors AFTER a transparent subgraph wrapper propagate to + the caller as ``RuntimeGraphError`` WITHOUT an associated completed event — the wrapper has no started/completed pair - to share, and proposal 0012's "preceding node's pair" MUST - is vacuous (not violated) when the preceding unit emitted + to share, and the "preceding node's pair" MUST is vacuous + (not violated) when the preceding unit emitted no pair. The :class:`_StepResult` returned here uses :func:`_no_op_finalize` so the outer ``_invoke`` call to ``finalize_completed(edge_error)`` is a no-op. @@ -1720,7 +1716,7 @@ async def _step_fan_out_node( ) -> _StepResult[StateT]: """Run one fan-out-as-node step through the parent's middleware chain. - Per pipeline-utilities §9.6: the parent's per-graph + per-node + The parent's per-graph + per-node middleware wraps the fan-out as a SINGLE dispatch — one started event before the fan-out begins, one completed event after all instances complete and fan-in is done. Per-instance events @@ -1728,10 +1724,10 @@ async def _step_fan_out_node( post_state shape is the inner subgraph's state, and they carry ``fan_out_index`` populated. - Raw exceptions escaping the chain become NodeException per §4. + Raw exceptions escaping the chain become NodeException. - Per proposal-0012 v0.9.0: the fan-out's success-case - completed event fires AFTER edge eval (mirrors + The fan-out's success-case completed event fires AFTER edge + eval (mirrors ``_step_function_node``). Failure-path dispatches stay inline; the success-case is deferred via the returned :class:`_StepResult`. @@ -2084,13 +2080,12 @@ async def _step_parallel_branches_node( """Run one parallel-branches-as-node step through the parent's middleware chain. - Per pipeline-utilities §11.6: the parent's per-graph + + The parent's per-graph + per-node middleware wraps the parallel-branches dispatch as a SINGLE unit — one started event before dispatch begins, one completed event after all branches complete and fan-in is done. Per-branch internal events come from - the branches' subgraph executions and carry ``branch_name`` - per graph-engine §6. + the branches' subgraph executions and carry ``branch_name``. Mirrors ``_step_fan_out_node`` minus the eager count/concurrency resolution (parallel branches has no @@ -2364,8 +2359,7 @@ async def _maybe_save_checkpoint( """Fire a checkpoint save for the just-completed node, if a backend is registered. - Per spec pipeline-utilities §10.3 (revised by proposal 0009 / - spec v0.18.0): + Save policy: - Save fires for outermost-graph nodes, subgraph-internal nodes, fan-out instance internal nodes, AND the fan-out @@ -2378,27 +2372,27 @@ async def _maybe_save_checkpoint( ``fan_out_progress`` field projects this shared dict so all concurrent instances' snapshots are captured atomically. - Atomicity contract (§10.11): the save-call site below + Atomicity contract: the save-call site below completes the "produce contribution + record into accumulator - + save" sequence the spec mandates. ``FanOutNode.run_with_context`` + + save" sequence. ``FanOutNode.run_with_context`` flips an instance's state to ``completed`` and stashes its ``result`` BEFORE invoking the save that durably records the transition. A crash between that state mutation and the save below leaves the in-memory dict updated but the persisted record showing ``in_flight``, so resume re-runs the instance and the append/last_write_wins/merge reducer's exactly-once - guarantee per §10.11.1 holds. + guarantee holds. Save also enumerates ALL concurrent fan-out instances when building ``fan_out_progress`` (not just the one whose ``completed`` event triggered this save) — the per-instance - snapshot is consistent across siblings, matching §10.11's - "captured when a sibling instance's ``completed`` event - triggers a save during this instance's execution" wording. + snapshot is consistent across siblings, captured when a + sibling instance's ``completed`` event triggers a save during + this instance's execution. After ``Checkpointer.save`` returns, dispatch a - ``checkpoint_saved`` observer event (per §10.8 SHOULD-level - guidance) so observability backends can surface saves as spans. + ``checkpoint_saved`` observer event so observability backends + can surface saves as spans. Save failures raise ``CheckpointSaveFailed`` to the caller of ``invoke()`` immediately; saves are NOT retried by the engine. @@ -2508,7 +2502,7 @@ async def _maybe_save_checkpoint( # before/after distinction for a save like there is for a # node attempt. The field is repurposed because a save # event represents "the state was persisted" rather than - # "the state transitioned." Phase 6 OTel mapping reads + # "the state transitioned." The OTel mapping reads # ``pre_state`` as the save's state. _dispatch( context, diff --git a/src/openarmature/graph/errors.py b/src/openarmature/graph/errors.py index 7f56a3c..06c5ac0 100644 --- a/src/openarmature/graph/errors.py +++ b/src/openarmature/graph/errors.py @@ -145,8 +145,7 @@ def __init__(self, node_name: str, collect_field: str) -> None: class ParallelBranchesNoBranches(CompileError): """Raised at registration when a parallel-branches node's - ``branches`` mapping is empty. Per pipeline-utilities §11.9 - / proposal 0011. Non-transient.""" + ``branches`` mapping is empty. Non-transient.""" category = "parallel_branches_no_branches" @@ -184,18 +183,16 @@ def __init__(self, node_name: str, cause: BaseException, recoverable_state: Any) class ParallelBranchesBranchFailed(NodeException): """Raised when a branch's subgraph raises under - ``error_policy: 'fail_fast'``. Per pipeline-utilities §11.9 / - proposal 0011. - - Subtype of :class:`NodeException` (per §11.9: "a - ``node_exception`` subtype attached at the parallel-branches - node's level"). The existing NodeException-classifier path - handles transient classification from ``__cause__`` per §6.1: - non-transient by default, inheriting transient classification - from the wrapped exception. - - Carries ``branch_name`` as a structured field per §11.9; the - inner exception rides ``__cause__``. + ``error_policy: 'fail_fast'``. + + Subtype of :class:`NodeException` (a ``node_exception`` subtype + attached at the parallel-branches node's level). The existing + NodeException-classifier path handles transient classification + from ``__cause__``: non-transient by default, inheriting transient + classification from the wrapped exception. + + Carries ``branch_name`` as a structured field; the inner exception + rides ``__cause__``. """ category = "parallel_branches_branch_failed" diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index 462c883..09b3b50 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -112,11 +112,10 @@ class ParallelBranchesEventConfig: - ``branch_names``: non-empty ordered tuple of strings. The branch identifiers in declaration / dispatch order, as configured on - the parallel-branches node (pipeline-utilities §11.1). + the parallel-branches node. - ``branch_count``: positive int. Equals ``len(branch_names)``. Surfaced explicitly so observers don't have to derive it. - - ``error_policy``: one of ``"fail_fast"`` or ``"collect"`` (per - pipeline-utilities §11.5). + - ``error_policy``: one of ``"fail_fast"`` or ``"collect"``. - ``parent_node_name``: the parallel-branches node's name in the parent graph. Carried here for caching by backend observers when attributing per-branch dispatch spans. @@ -180,8 +179,7 @@ class NodeEvent: :class:`FanOutEventConfig`. ``None`` on every other event. - ``branch_name`` is the non-empty string name of the parallel-branches branch this event came from. ``None`` for - nodes outside any branch. Per graph-engine §6 / pipeline- - utilities §11, the combination of ``namespace``, + nodes outside any branch. The combination of ``namespace``, ``branch_name``, ``fan_out_index``, ``attempt_index``, and ``phase`` jointly uniquely identifies an event source. ``branch_name`` and ``fan_out_index`` are independent; both @@ -195,9 +193,9 @@ class NodeEvent: - On ``completed`` events, exactly one of ``post_state`` and ``error`` is populated. - **Synthetic phases.** ``"checkpoint_saved"`` (pipeline-utilities - §10.8) and ``"checkpoint_migrated"`` (proposal 0014 §6 - cross-ref) repurpose this dataclass for non-node events. Both + **Synthetic phases.** ``"checkpoint_saved"`` and + ``"checkpoint_migrated"`` repurpose this dataclass for non-node + events. Both are opt-in via ``phases={...}`` on observer registration; default subscriptions are ``{"started", "completed"}`` only, so legacy observers never see them. Conventions on synthetic @@ -332,14 +330,14 @@ class MetadataAugmentationEvent: Distinct from :class:`NodeEvent` because there is no node phase, no pre/post state, and no error: this event reports a side-channel - augmentation, not a node-attempt boundary. Per graph-engine §6 the - event is NOT subject to the observer ``phases`` filter (which only - governs ``NodeEvent`` phases); the delivery worker forwards it to - every subscribed observer. Observers that handle it iterate their - open observations whose lineage is an ancestor of (or equal to) - the augmenting context's lineage and apply the entries as - ``openarmature.user.`` (OTel, §5.6) / - ``metadata.`` (Langfuse, §8.4.1+§8.4.2). + augmentation, not a node-attempt boundary. The event is NOT + subject to the observer ``phases`` filter (which only governs + ``NodeEvent`` phases); the delivery worker forwards it to every + subscribed observer. Observers that handle it iterate their open + observations whose lineage is an ancestor of (or equal to) the + augmenting context's lineage and apply the entries as + ``openarmature.user.`` (OTel) / ``metadata.`` + (Langfuse). """ entries: Mapping[str, AttributeValue] @@ -373,21 +371,21 @@ class InvocationStartedEvent: Emitted once per invocation, before any node fires. Observers that populate Trace-level input fields (the Langfuse observer, today) consume it to resolve ``trace.input`` per the three-lever decision - tree in observability §8.4.1. Observers without a Trace-level - input concept (the OTel observer) treat it as a no-op. + tree. Observers without a Trace-level input concept (the OTel + observer) treat it as a no-op. Carries: - ``initial_state``: the raw state object the engine constructed from ``invoke()``'s arguments (the typed-state instance). - ``invocation_id``: the invocation id (caller-supplied or - framework-generated per proposal 0039). - - ``correlation_id``: the §3 correlation id when present. + framework-generated). + - ``correlation_id``: the correlation id when present. - ``entry_node``: the outermost-graph entry node name. - Per graph-engine §6 the event is NOT subject to the observer - ``phases`` filter (which only governs ``NodeEvent`` phases); the - delivery worker forwards it to every subscribed observer. + The event is NOT subject to the observer ``phases`` filter (which + only governs ``NodeEvent`` phases); the delivery worker forwards it + to every subscribed observer. """ initial_state: Any @@ -410,8 +408,8 @@ class InvocationCompletedEvent: after a failure boundary on the failure path). Observers that populate Trace-level output fields (the Langfuse observer, today) consume it to resolve ``trace.output`` per the three-lever - decision tree in observability §8.4.1. Observers without a - Trace-level output concept (the OTel observer) treat it as a no-op. + decision tree. Observers without a Trace-level output concept (the + OTel observer) treat it as a no-op. Carries: @@ -424,11 +422,10 @@ class InvocationCompletedEvent: - ``final_node``: the name of the node whose execution preceded the END-reached transition on the success path, or the node that raised on the failure path. - - ``invocation_id`` / ``correlation_id``: the §3 / §5.1 ids. + - ``invocation_id`` / ``correlation_id``: the run + correlation ids. - Per graph-engine §6 the event is NOT subject to the observer - ``phases`` filter; the delivery worker forwards it to every - subscribed observer. + The event is NOT subject to the observer ``phases`` filter; the + delivery worker forwards it to every subscribed observer. """ final_state: Any @@ -539,9 +536,9 @@ class LlmCompletionEvent: lifetime, unique within the run. Distinct from ``response_id``. - ``caller_invocation_metadata``: optional snapshot of caller- - supplied invocation metadata at LLM-call time. Spec-defined as - OPTIONAL; the python OpenAIProvider populates it by default so - the bundled OTel/Langfuse observers can emit the §5.6 + supplied invocation metadata at LLM-call time. OPTIONAL; the + python OpenAIProvider populates it by default so + the bundled OTel/Langfuse observers can emit the ``openarmature.user.`` span-attribute family without an extra opt-in. Pass ``populate_caller_metadata=False`` to suppress the snapshot. Future non-OpenAI providers MAY default to @@ -606,7 +603,7 @@ class LlmFailedEvent: """A typed LLM provider call failure event delivered to observers. Carries identity, scoping, and failure-context data for an LLM - call that raised a llm-provider §7 category exception. Observer + call that raised a llm-provider category exception. Observer code filters by type discrimination (``isinstance(event, LlmFailedEvent)``) rather than by the impl-current sentinel- namespace string match. @@ -619,8 +616,8 @@ class LlmFailedEvent: Failure-specific fields: - - ``error_category``: the llm-provider §7 normative error - category the call raised. One of the 9 canonical strings + - ``error_category``: the llm-provider normative error category + the call raised. One of the 9 canonical strings (``provider_authentication``, ``provider_unavailable``, ``provider_invalid_model``, ``provider_model_not_loaded``, ``provider_rate_limit``, ``provider_invalid_response``, @@ -628,7 +625,7 @@ class LlmFailedEvent: ``provider_unsupported_content_block``, ``structured_output_invalid``). Always present. - ``error_type``: OPTIONAL impl-level / vendor-specific error - type or code. Two acceptable styles per spec: + type or code. Two acceptable styles: vendor error code (e.g. ``"rate_limit_exceeded"``) OR upstream exception class name (e.g. ``"RateLimitError"``). ``None`` when no impl-side type is available. diff --git a/src/openarmature/graph/fan_out.py b/src/openarmature/graph/fan_out.py index 48d545b..f5d3223 100644 --- a/src/openarmature/graph/fan_out.py +++ b/src/openarmature/graph/fan_out.py @@ -10,7 +10,7 @@ This is the single place in the engine where multiple subgraph executions overlap in time within a single invocation; everywhere else -(graph-engine §3) execution is single-threaded. +execution is single-threaded. The module contains: @@ -137,20 +137,20 @@ async def run_with_context( fan-in collected/extra fields, write count_field and errors_field if configured. - Per proposal 0009 / §10.11 per-instance resume contract: this - method registers a per-fan-out tracking entry on the shared + Per the per-instance resume contract: this method registers a + per-fan-out tracking entry on the shared ``context.fan_out_progress_state`` dict before dispatching, flips each instance's state through ``not_started -> in_flight -> completed`` as the instance progresses, and fires an explicit "instance completed" save after the per-instance contribution has been recorded into - the accumulator. The atomicity contract from §10.11 is + the accumulator. The atomicity contract is observed: the per-instance state mutation precedes the save, so a crash after mutation but before save leaves the saved record showing ``in_flight`` (resume re-runs the instance). - ``pre_resolved_count`` / ``pre_resolved_concurrency`` are the - proposal-0013 v0.10.0 hooks: when the engine has already + ``pre_resolved_count`` / ``pre_resolved_concurrency`` are + hooks: when the engine has already resolved the config eagerly to populate ``NodeEvent.fan_out_config`` for the fan-out node's events, it passes the resolved values in so callable resolvers @@ -469,12 +469,12 @@ def _build_instance_states( ) -> list[Any]: """Project parent state to per-instance subgraph states. - Per spec §9.1: + By mode: - items_field mode: one instance per item, item_field gets the item - count mode: ``count`` instances, item_field absent - both modes: inputs map parent fields onto subgraph state fields - ``pre_resolved_count`` (proposal-0013 hook): if the engine has + ``pre_resolved_count``: if the engine has already resolved ``cfg.count`` to populate ``NodeEvent.fan_out_config.item_count``, the resolved value is passed in here so the callable resolver isn't invoked twice. @@ -526,7 +526,7 @@ def _build_instance_states( def _resolve_count(node_name: str, cfg: FanOutConfig, parent_state: Any) -> int: - """Resolve the ``count`` config to an int. Spec §9.1.""" + """Resolve the ``count`` config to an int.""" raw = cfg.count if callable(raw): resolved = raw(parent_state) @@ -545,7 +545,7 @@ def _resolve_count(node_name: str, cfg: FanOutConfig, parent_state: Any) -> int: def _resolve_concurrency(node_name: str, cfg: FanOutConfig, parent_state: Any) -> int | None: - """Resolve the ``concurrency`` config. Spec §9.2.""" + """Resolve the ``concurrency`` config.""" raw = cfg.concurrency if callable(raw): resolved = raw(parent_state) @@ -647,7 +647,7 @@ async def _save_instance_in_flight( save only fires on successful merge (failure path skips it). Routes through the checkpointer's ``save_fan_out_in_flight_failure`` - seam (when present) per §10.11.4. Batching backends typically + seam (when present). Batching backends typically buffer this save WITHOUT triggering a flush — the "crash" the failure represents would lose the buffer, including this save, in a real-world scenario. Non-batching backends route it through @@ -692,17 +692,17 @@ async def _save_instance_completed( parent_state: Any, context: _InvocationContext, ) -> None: - """Fire the explicit "instance completed" save closing the §10.11 + """Fire the explicit "instance completed" save closing the atomicity gap. The per-instance state has already been flipped to ``completed`` with ``result`` populated; this save durably records that transition so resume can skip the instance. - Routed through the fan-out-internal batching seam per §10.11.4 — + Routed through the fan-out-internal batching seam — backends opting into batching may buffer the save; non-batching backends call ``save`` directly. On crash with buffered-but- unflushed saves, the instance reverts to ``in_flight`` / ``not_started`` on resume and re-runs (contributing for the first - time, no double-merge per §10.11.1). + time, no double-merge). """ # Lazy imports: ``compiled`` and ``checkpoint.protocol`` would # create textual cycles at module-load. Function-scope keeps the @@ -773,8 +773,8 @@ def _fan_in_fail_fast( ) -> dict[str, Any]: """Merge per-instance partials into a single fan-out partial under the fail_fast policy. All ``results`` succeeded (otherwise gather - would have raised), so the count is just ``len(results)``. Spec - §9.3 + §9.4: instance-index order.""" + would have raised), so the count is just ``len(results)``; + instance-index order.""" # §9.4 projection: read each instance's subgraph-space partial by # subgraph field name and collect into the parent field. ``.get`` keeps # an omitted collect_field (a callable degrade that doesn't set it, §9.3) @@ -796,7 +796,7 @@ def _fan_in_collect( ) -> dict[str, Any]: """Merge per-instance results under the collect policy. Failures contribute nothing to target_field; if errors_field is configured, - failed instances' exceptions are recorded there. Spec §9.5.""" + failed instances' exceptions are recorded there.""" successes: list[Mapping[str, Any]] = [] error_records: list[dict[str, str]] = [] for idx, r in enumerate(raw_results): diff --git a/src/openarmature/graph/middleware/_core.py b/src/openarmature/graph/middleware/_core.py index 732a8a7..3f0d3a4 100644 --- a/src/openarmature/graph/middleware/_core.py +++ b/src/openarmature/graph/middleware/_core.py @@ -109,7 +109,7 @@ def compose_chain( ``CompiledGraph._step_function_node``, producing one closure layer per middleware on every node step. For typical workloads (single-digit middleware × hundreds of node activations) this is - negligible. Under heavy fan-out (Phase 3+), e.g. 10K instances × 5 + negligible. Under heavy fan-out, e.g. 10K instances × 5 inner nodes × 3 middlewares = 150K closure constructions per invocation; worth measuring with realistic workloads when the fan-out runtime lands. The optimization shape (cache the chain at diff --git a/src/openarmature/graph/observer.py b/src/openarmature/graph/observer.py index 283c76f..ce5da36 100644 --- a/src/openarmature/graph/observer.py +++ b/src/openarmature/graph/observer.py @@ -22,7 +22,7 @@ - `_dispatch`: enqueues an event for the worker to deliver. - `deliver_loop`: the worker coroutine. Reads items from the queue and calls each observer in order, filtering by subscribed phase and - isolating exceptions via `warnings.warn` per spec. + isolating exceptions via `warnings.warn`. """ from __future__ import annotations @@ -110,16 +110,16 @@ async def log_observer(event: NodeEvent | MetadataAugmentationEvent) -> None: ``fan_out_index``, ``branch_name``) so rich backends can update their open observations in place (``span.set_attribute(openarmature.user., v)`` for OTel, - ``observation.update(metadata=...)`` for Langfuse). Per spec §6 - this variant is NOT subject to the ``phases`` filter — every + ``observation.update(metadata=...)`` for Langfuse). This variant + is NOT subject to the ``phases`` filter — every subscribed observer sees it and isinstance-narrows to decide whether to act. Simple user observers typically early-return after ``isinstance(event, NodeEvent)`` checks. - :class:`InvocationStartedEvent` — emitted once per invocation before any node fires. Carries the engine-constructed ``initial_state`` so Trace-level backends (Langfuse) can - populate ``trace.input`` via the proposal 0043 three-lever - decision tree. NOT subject to the ``phases`` filter; OTel-only + populate ``trace.input`` via the three-lever decision tree. NOT + subject to the ``phases`` filter; OTel-only observers ignore it via the isinstance gate. - :class:`InvocationCompletedEvent` — emitted once per invocation after the last node fires (on both the success path and the @@ -283,10 +283,10 @@ class _QueuedItem: without the worker needing to know the graph topology. ``event`` is the union of ``NodeEvent`` (started / completed / - checkpoint phases), ``MetadataAugmentationEvent`` (proposal 0040, - side-channel augmentation), and the two invocation-boundary - events ``InvocationStartedEvent`` / ``InvocationCompletedEvent`` - (proposal 0043, Trace-level input/output sourcing). The delivery + checkpoint phases), ``MetadataAugmentationEvent`` (side-channel + augmentation), and the two invocation-boundary events + ``InvocationStartedEvent`` / ``InvocationCompletedEvent`` + (Trace-level input/output sourcing). The delivery worker branches by type to apply the right delivery contract (phase-filter for ``NodeEvent``, no filter for the other three). """ @@ -344,10 +344,10 @@ class DrainSummary: delivered to every subscribed observer before cancellation, and `timeout_reached is True`. - The spec-mandated minimum is these two fields. Implementations MAY + These two fields are the required minimum. Implementations MAY extend the shape with diagnostic detail (per-observer counts, - sampled event metadata) in subsequent versions; v0.19.0 ships the - minimum. + sampled event metadata) in subsequent versions; this version ships + the minimum. """ undelivered_count: int @@ -367,17 +367,17 @@ class _FanOutInstanceState: not_started -> in_flight -> completed. - ``result`` holds the per-instance contribution to the fan-out - accumulator, set when ``state == "completed"``. Per spec - §10.11 this is "the value contributed to the ``target_field`` - bucket" (success path) or "the error entry contributed to the + accumulator, set when ``state == "completed"``: "the value + contributed to the ``target_field`` bucket" (success path) or + "the error entry contributed to the ``errors_field`` bucket" (collect-mode failure). The harness projects this into the frozen ``FanOutInstanceProgress.result`` verbatim. - ``result_is_error`` distinguishes success contributions (``False``) from collect-mode error contributions (``True``). Internal flag — not exposed on the public - ``FanOutInstanceProgress`` shape because the spec presents - ``result`` as a single typed entry per the parent state schema. + ``FanOutInstanceProgress`` shape because ``result`` is exposed + as a single typed entry per the parent state schema. ``FanOutNode.run_with_context`` consults this on resume to route the rolled-forward contribution through the ``errors_field`` bucket rather than ``target_field``. @@ -385,14 +385,14 @@ class _FanOutInstanceState: ``extra_outputs`` mapping (parent-field -> sub-field) so that per-instance resume preserves the FULL per-instance contribution (not just the ``target_field`` slice). Internal — not exposed on - the public ``FanOutInstanceProgress`` shape because the spec - describes ``result`` as a single accumulator entry. + the public ``FanOutInstanceProgress`` shape because ``result`` + is a single accumulator entry. - ``completed_inner_positions`` accumulates ``NodePosition`` entries from inner nodes that complete inside this instance's subgraph execution. Captures the instance's progress for observational purposes when an in_flight save snapshot fires; not used as a resume re-entry point (the instance re-enters at its subgraph's - declared entry node per §10.7). + declared entry node). """ state: Literal["completed", "in_flight", "not_started"] = "not_started" @@ -570,8 +570,8 @@ class _InvocationContext: def full_observers(self) -> tuple[SubscribedObserver, ...]: """Return the ordered observer list to deliver for events from - this depth. Per spec §6: graph-attached (outermost → innermost), - then invocation-scoped (passed to the outermost invoke).""" + this depth: graph-attached (outermost → innermost), then + invocation-scoped (passed to the outermost invoke).""" return self.graph_attached + self.invocation_scoped def descend_into_subgraph( @@ -593,7 +593,7 @@ def descend_into_subgraph( Checkpointing fields propagate unchanged: subgraph-internal nodes save to the same backend with the same invocation_id - (per spec §10.3; one save per inner-node completion). + (one save per inner-node completion). """ return _InvocationContext( queue=self.queue, @@ -638,10 +638,10 @@ def descend_into_fan_out_instance( Same shape as ``descend_into_subgraph`` but stamps the fan-out index onto the new context so every inner-node event carries it. - Per spec §9 the index is the instance's 0-based position. + The index is the instance's 0-based position. - Per pipeline-utilities §10.3 (revised by proposal 0009): fan-out - instance internal nodes DO produce checkpoint saves. The + Fan-out instance internal nodes DO produce checkpoint saves. + The checkpointer reference propagates unchanged so an inner node's ``completed`` event triggers a save; the engine's save path projects the shared ``fan_out_progress_state`` into the record's @@ -695,8 +695,8 @@ def descend_into_parallel_branch( """Build the context for one parallel-branches branch's subgraph invocation. - Per pipeline-utilities §11.6 the parallel-branches node looks - to outer middleware like a single dispatch; inner-branch + The parallel-branches node looks to outer middleware like a + single dispatch; inner-branch events come from the branch's subgraph execution. Stamps the namespace prefix with the parallel-branches node name so inner events nest under it (mirrors @@ -706,11 +706,11 @@ def descend_into_parallel_branch( the ``observability.correlation._branch_name_var`` ContextVar — set inside the branch's task closure so ``copy_context`` inherits it through the subgraph's execution. The PER-DEPTH - ``branch_name_chain`` (proposal 0045) is extended here on the + ``branch_name_chain`` is extended here on the context so the engine can drive the chain ContextVar at every inner-node execution site. - Per §11.9 / §10.7 atomic-restart: drops the checkpointer + Atomic-restart: drops the checkpointer and pending_resume_states (a crash mid-dispatch re-runs the whole parallel-branches node from scratch on resume; the branches' inner saves wouldn't be useful). @@ -776,18 +776,18 @@ def _dispatch( engine-task scope (e.g., the OTel observer setting ``current_active_observer_span`` for the engine to attach into the OTel context) can do so before the node body runs. - - :class:`MetadataAugmentationEvent` (proposal 0040): a side- - channel augmentation event emitted by + - :class:`MetadataAugmentationEvent`: a side-channel augmentation + event emitted by ``set_invocation_metadata`` mid-invocation. Bypasses the ``prepare_sync`` branch entirely — the sync-prep contract is anchored on ``"started"``, which only ``NodeEvent`` carries. Queued onto the same serial worker so observers see it in strict order with the surrounding node events. - :class:`InvocationStartedEvent` / - :class:`InvocationCompletedEvent` (proposal 0043): invocation- - boundary events the engine enqueues at invocation entry / exit - so Trace-level backends can populate ``trace.input`` / - ``trace.output`` via the §8.4.1 three-lever decision tree. + :class:`InvocationCompletedEvent`: invocation-boundary events the + engine enqueues at invocation entry / exit so Trace-level + backends can populate ``trace.input`` / ``trace.output`` via the + three-lever decision tree. Bypass ``prepare_sync`` (same rationale as ``MetadataAugmentationEvent``: not a node-phase event). @@ -883,10 +883,10 @@ async def deliver_loop( the event's phase do NOT receive it. Phase filter applies at delivery, not dispatch; the engine still produces both events for every attempt. - - For :class:`MetadataAugmentationEvent` (proposal 0040) and the - two invocation-boundary events :class:`InvocationStartedEvent` - / :class:`InvocationCompletedEvent` (proposal 0043), the - ``phases`` filter is bypassed entirely — none of those are + - For :class:`MetadataAugmentationEvent` and the two + invocation-boundary events :class:`InvocationStartedEvent` / + :class:`InvocationCompletedEvent`, the ``phases`` filter is + bypassed entirely — none of those are node-phase events, so every subscribed observer receives them regardless of ``phases``. Observers ``isinstance``-narrow on the first line and choose whether to act. diff --git a/src/openarmature/graph/parallel_branches.py b/src/openarmature/graph/parallel_branches.py index f1251a0..8f8aac7 100644 --- a/src/openarmature/graph/parallel_branches.py +++ b/src/openarmature/graph/parallel_branches.py @@ -11,16 +11,15 @@ subgraph (with potentially different state schema, middleware, topology), its own ``inputs`` / ``outputs`` projection mappings, and its own optional ``middleware`` wrapping the whole branch -invocation as a unit (§11.7). +invocation as a unit. -Buffer-then-apply semantics per §11.4: contributions are -collected during dispatch and merged deterministically once at -node completion, using the parent's reducer for each output -field. Branch insertion order determines both dispatch order -(§11.8) and merge tie-breaking when two branches write the same -parent field. +Buffer-then-apply semantics: contributions are collected during +dispatch and merged deterministically once at node completion, +using the parent's reducer for each output field. Branch insertion +order determines both dispatch order and merge tie-breaking when +two branches write the same parent field. -Error policies per §11.5: +Error policies: - ``fail_fast``: first failure cancels still-running branches; the buffered contributions are discarded; the parallel-branches @@ -31,6 +30,8 @@ branches' contributions merge; failed branches' errors land in the optional ``errors_field``. """ +# Spec pipeline-utilities §11 (parallel branches): §11.4 buffer-then- +# apply, §11.5 error policies, §11.7 branch middleware, §11.8 order. from __future__ import annotations @@ -61,10 +62,10 @@ class BranchSpec[ChildT: State]: """One entry in a :class:`ParallelBranchesNode`'s branch mapping. - Branches are heterogeneous: each spec MAY reference a different + Branches are heterogeneous: each branch may reference a different compiled subgraph with a different state schema. ``inputs`` / ``outputs`` follow the same shape as subgraph projection - mappings (proposal 0002). + mappings. Validation lives on the builder side (``GraphBuilder.add_parallel_branches_node``): @@ -83,7 +84,7 @@ class BranchSpec[ChildT: State]: @dataclass(frozen=True) class ParallelBranchesNode[ParentT: State]: """A node that dispatches M heterogeneous compiled subgraphs - concurrently per spec §11. + concurrently. The Node Protocol contract requires ``name``, ``middleware``, and ``run``. Like :class:`FanOutNode`, the engine recognizes @@ -229,7 +230,7 @@ async def _fail_fast( tasks: list[tuple[str, asyncio.Task[Mapping[str, Any]]]], contributions: dict[str, Mapping[str, Any]], ) -> Mapping[str, Any]: - """Fail-fast policy per spec §11.5. + """Fail-fast policy. Wait for all branches; on first failure, cancel the rest and raise ``ParallelBranchesBranchFailed`` with the failing @@ -309,7 +310,7 @@ async def _collect( contributions: dict[str, Mapping[str, Any]], errors: list[dict[str, str]], ) -> Mapping[str, Any]: - """Collect policy per spec §11.5. + """Collect policy. All branches run to completion regardless of individual failures. Successful branches' contributions go to the @@ -346,8 +347,8 @@ def _merge_contributions( ) -> dict[str, Any]: """Flatten per-branch contributions into a single partial. - Per §11.4 + §11.8: contributions apply in branch insertion - order, using each parent field's reducer. The actual reducer + Contributions apply in branch insertion order, using each + parent field's reducer. The actual reducer application happens at ``_merge_partial`` in compiled.py when the engine merges this partial into parent state. Here we just flatten the per-branch contributions into a dict @@ -386,7 +387,7 @@ class _MultiContribution: """Sentinel for ``_merge_partial`` indicating that multiple branches contributed to the same parent field. The engine applies the parent's reducer to each value in sequence, - preserving branch insertion order per §11.8. + preserving branch insertion order. """ values: tuple[Any, ...] diff --git a/src/openarmature/llm/messages.py b/src/openarmature/llm/messages.py index 9bc32a8..778391d 100644 --- a/src/openarmature/llm/messages.py +++ b/src/openarmature/llm/messages.py @@ -77,10 +77,10 @@ class Tool(BaseModel): class ForceTool(BaseModel): """Force the model to call exactly the named tool. - Use the record form of the §5 `tool_choice` discriminated union - when you need the model to call a specific tool by name. ``type`` - is the spec-level discriminator (``"tool"``); the wire mapping - (§8.1.1) renames it to ``"function"`` for the OpenAI body. The + Use the record form of the `tool_choice` discriminated union when + you need the model to call a specific tool by name. ``type`` is the + discriminator (``"tool"``); the wire mapping renames it + to ``"function"`` for the OpenAI body. The ``name`` MUST match a ``Tool.name`` in the supplied ``tools`` list; ``validate_tool_choice`` enforces this at pre-send time and raises ``ProviderInvalidRequest`` on violation. diff --git a/src/openarmature/llm/provider.py b/src/openarmature/llm/provider.py index 71e03ac..59ae808 100644 --- a/src/openarmature/llm/provider.py +++ b/src/openarmature/llm/provider.py @@ -100,8 +100,8 @@ async def complete( supplied, the implementation constrains the model's output to the schema and populates ``Response.parsed`` with the validated value. - tool_choice: Optional tool-choice constraint (spec §5). One - of ``"auto"``, ``"required"``, ``"none"``, or a + tool_choice: Optional tool-choice constraint. One of + ``"auto"``, ``"required"``, ``"none"``, or a :class:`ForceTool` record. When ``None`` (the default) the wire ``tool_choice`` field is omitted and the provider's own default applies. Pre-send validation @@ -213,9 +213,9 @@ def validate_tool_choice( tool_choice: ToolChoice | None, tools: Sequence[Tool] | None, ) -> None: - """Validate ``tool_choice`` against ``tools`` per spec §5. + """Validate ``tool_choice`` against ``tools``. - Raises :class:`ProviderInvalidRequest` (the §7 + Raises :class:`ProviderInvalidRequest` (the ``provider_invalid_request`` category) on: - ``tool_choice`` supplied as a string that is not one of @@ -229,11 +229,12 @@ def validate_tool_choice( - ``tool_choice=ForceTool(name=X)`` supplied with ``X`` not in the supplied tools list. - No-op when ``tool_choice`` is ``None`` (the default — preserves - pre-0025 behavior; the wire field is omitted and the provider's - own default applies). ``tool_choice="auto"`` and - ``tool_choice="none"`` have no ``tools``-related preconditions. + No-op when ``tool_choice`` is ``None`` (the default — the wire + field is omitted and the provider's own default applies). + ``tool_choice="auto"`` and ``tool_choice="none"`` have no + ``tools``-related preconditions. """ + # Spec llm-provider §5 (tool_choice) / §7 (provider_invalid_request). if tool_choice is None: return # Two-layer type defense at the API boundary. Pyright catches the diff --git a/src/openarmature/llm/providers/openai.py b/src/openarmature/llm/providers/openai.py index b1551fb..8c86cb4 100644 --- a/src/openarmature/llm/providers/openai.py +++ b/src/openarmature/llm/providers/openai.py @@ -370,7 +370,7 @@ async def complete( class; when supplied as a JSON Schema dict, ``Response.parsed`` is the deserialized dict. - ``tool_choice`` is validated against ``tools`` per spec §5: + ``tool_choice`` is validated against ``tools``: ``"required"`` and the ``ForceTool`` record both demand non-empty ``tools``, and ``ForceTool.name`` must appear in the supplied list. Violations raise ``provider_invalid_request`` @@ -589,7 +589,7 @@ def _build_llm_completion_event( Sources identity / scoping fields from the calling-node ContextVars and outcome fields from the response. Request-side - fields (per proposal 0057) are passed through from the + fields are passed through from the provider's complete() local state — serialized message list, the gen_ai.request.* parameter mapping, the RuntimeConfig extras, the prompt-context snapshots taken at dispatch time, @@ -669,17 +669,17 @@ def _build_llm_failed_event( """Construct the typed LlmFailedEvent for the failure path. Sources identity / scoping fields from the calling-node - ContextVars and failure fields from the raised §7 exception. + ContextVars and failure fields from the raised exception. Field set mirrors LlmCompletionEvent (identity + request-side) - plus the three failure-specific fields per proposal 0058. + plus the three failure-specific fields. ``error_type`` defaults to the exception class name — falls into the "upstream exception class name" style documented in - the spec field table. Providers that have a vendor error code + the field table. Providers that have a vendor error code available (e.g. ``rate_limit_exceeded`` for OpenAI) can - override with vendor-specific detail in a future spec - proposal; for now the class name is the safest default since - every LlmProviderError subclass carries one. + override with vendor-specific detail in a future proposal; for + now the class name is the safest default since every + LlmProviderError subclass carries one. """ namespace = current_namespace_prefix() @@ -1178,7 +1178,8 @@ def _augment_messages_with_schema_directive( def _message_to_wire(msg: Message) -> dict[str, Any]: - """Spec §8.1.1 request mapping for one message.""" + """Request mapping for one message.""" + # Spec llm-provider §8.1.1. if isinstance(msg, SystemMessage): return {"role": "system", "content": msg.content} if isinstance(msg, UserMessage): @@ -1299,14 +1300,14 @@ def _tool_to_wire(tool: Tool) -> dict[str, Any]: def _wire_to_assistant_message(wire: dict[str, Any], *, lenient_args: bool) -> AssistantMessage: - """Parse OpenAI-shaped assistant message into spec §3 form. + """Parse an OpenAI-shaped assistant message into canonical form. When ``lenient_args=True`` (i.e. ``finish_reason == "error"``), tool calls with unparseable JSON arguments populate - ``arguments=None`` instead of raising. Per spec §3 "Validation - under finish_reason: error" — degraded responses surface what - they can; repair is a caller concern. + ``arguments=None`` instead of raising — degraded responses surface + what they can; repair is a caller concern. """ + # Spec llm-provider §3: validation under finish_reason "error". content_raw = wire.get("content") or "" content: str = content_raw if isinstance(content_raw, str) else "" raw_tool_calls = cast("list[Any]", wire.get("tool_calls") or []) diff --git a/src/openarmature/observability/correlation.py b/src/openarmature/observability/correlation.py index af515d7..7b6660f 100644 --- a/src/openarmature/observability/correlation.py +++ b/src/openarmature/observability/correlation.py @@ -139,14 +139,16 @@ def _reset_invocation_id(token: Token[str | None]) -> None: def validate_invocation_id(value: object) -> str: """Validate a caller-supplied ``invocation_id`` and return it. - Per observability §5.1 a caller-supplied id MAY be any non-empty - URL-safe string. Rejects empty / non-string / non-URL-safe values - at the ``invoke()`` boundary so the violation surfaces - synchronously to the caller rather than as a downstream trace-id - derivation failure. Typed ``object`` (like - :func:`validate_invocation_metadata`) so the boundary check guards - against untyped callers. Raises :class:`ValueError`. + A caller-supplied id MAY be any non-empty URL-safe string. Rejects + empty / non-string / non-URL-safe values at the ``invoke()`` + boundary so the violation surfaces synchronously to the caller + rather than as a downstream trace-id derivation failure. Typed + ``object`` (like :func:`validate_invocation_metadata`) so the + boundary check guards against untyped callers. Raises + :class:`ValueError`. """ + # Spec observability §5.1: a caller-supplied invocation_id MAY be + # any non-empty URL-safe string. if not isinstance(value, str): raise ValueError(f"invocation_id must be a string; got {type(value).__name__}") if not value: @@ -161,7 +163,7 @@ def validate_invocation_id(value: object) -> str: # --------------------------------------------------------------------------- # Active observer set — for capability backends emitting from outside the -# engine's per-step path (llm-provider span hook in Phase 6, future +# engine's per-step path (llm-provider span hook, future # Langfuse/Datadog backends, user-written instrumented capabilities). # --------------------------------------------------------------------------- @@ -204,7 +206,7 @@ def _reset_active_observers(token: Token[tuple[SubscribedObserver, ...]]) -> Non # Active dispatch hook — queue-mediated event emission from outside the # engine's per-step path. The engine sets this ContextVar to a closure # over the current invocation's delivery queue + observer chain; -# capability backends (the LLM provider span hook in Phase 6, future +# capability backends (the LLM provider span hook, future # Langfuse/Datadog instrumentations) call ``current_dispatch()(event)`` # to enqueue an event for the same delivery worker the engine uses. # diff --git a/src/openarmature/observability/langfuse/__init__.py b/src/openarmature/observability/langfuse/__init__.py index 5657655..92e35c3 100644 --- a/src/openarmature/observability/langfuse/__init__.py +++ b/src/openarmature/observability/langfuse/__init__.py @@ -16,7 +16,7 @@ Public surface: - :class:`LangfuseObserver` — observer-driven Langfuse Trace + - Observation emission per spec observability §8. + Observation emission. - :class:`LangfuseClient` — Protocol the observer calls. Satisfied by the bundled :class:`InMemoryLangfuseClient` and (structurally) by the real ``langfuse.Langfuse`` SDK class. diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py index 8091b76..276498b 100644 --- a/src/openarmature/observability/langfuse/client.py +++ b/src/openarmature/observability/langfuse/client.py @@ -11,7 +11,7 @@ """Langfuse client Protocol + in-memory recorder. -The :class:`LangfuseObserver` consumes the §6 OA event stream and +The :class:`LangfuseObserver` consumes the OA event stream and emits Langfuse Trace + Observation entities through a :class:`LangfuseClient`. The Protocol is intentionally narrow: it declares only the methods the observer calls. Concrete sinks: @@ -55,7 +55,7 @@ class LangfuseObservation: Carries the observation's type-discriminated shape — Spans hold timing + metadata; Generations add model/parameters/usage/input/ output/prompt-entity link; Events are point-in-time markers - (reserved per spec §8.2 — not used by this version of the mapping). + (reserved, not used by this version of the mapping). """ id: str @@ -161,7 +161,7 @@ class LangfuseClient(Protocol): closes. The Protocol does NOT define `event(...)` — Event observations - are reserved by §8.2 but not used in v0.23.0 of the mapping. + are reserved but not used by this mapping. """ def trace( @@ -173,10 +173,11 @@ def trace( ) -> None: """Create a new Trace. - The Trace `id` MUST be the OA invocation_id verbatim (§8.4.1). + The Trace `id` MUST be the OA invocation_id verbatim. Implementations track Traces internally; observation calls pass `trace_id` to associate. """ + # Spec §8.4.1: the Trace id is the OA invocation_id verbatim. ... # The current observer doesn't invoke this method — it sets the @@ -199,10 +200,10 @@ def update_trace( """Update an existing Trace's mutable fields after creation. Used by the observer when the caller-supplied invocation - label (§8.6) lands later than the Trace's open call, when - additional metadata becomes available mid-invocation, or - when the proposal 0043 invocation-boundary events populate - ``trace.input`` / ``trace.output``. + label lands later than the Trace's open call, when additional + metadata becomes available mid-invocation, or when the + invocation-boundary events populate ``trace.input`` / + ``trace.output``. """ ... diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 384fc6c..587641f 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -97,10 +97,10 @@ def _read_implementation_version() -> str: class _OpenObservation: """An in-flight Langfuse observation pinned in the observer's state. - Per proposal 0045: carries the observation's own - ``fan_out_index_chain`` and ``branch_name_chain`` so the - augmentation walk can apply §3.4's lineage-aware boundary rule - (mirror of the OTel observer's ``_OpenSpan``).""" + Carries the observation's own ``fan_out_index_chain`` and + ``branch_name_chain`` so the augmentation walk can apply the + lineage-aware boundary rule (mirror of the OTel observer's + ``_OpenSpan``).""" handle: LangfuseSpanHandle | LangfuseGenerationHandle fan_out_index_chain: tuple[int | None, ...] = () @@ -138,22 +138,23 @@ def _empty_str_frozenset() -> frozenset[str]: def _apply_caller_metadata(metadata: dict[str, Any], caller_metadata: Mapping[str, Any]) -> None: """Merge caller-supplied invocation metadata into a Trace's or - Observation's metadata bag at top level per observability §8.4.1 - + §8.4.2 (proposal 0034). + Observation's metadata bag at top level. - Top-level placement is by spec: Langfuse UI filters on + Top-level placement lets the Langfuse UI filter on ``metadata.`` directly, so caller-supplied entries become siblings to ``correlation_id`` / ``entry_node`` rather than nested under a ``user`` sub-object. - Reserved-key collision with §8.4.1 / §8.4.2 keys + Reserved-key collision with the OA-emitted keys (``correlation_id``, ``entry_node``, ``spec_version``, - ``namespace``, etc.) is not currently checked here: the spec - permits the rejection to happen at either boundary, and the - ``invoke()`` API-boundary validation already rejects - ``openarmature.*`` / ``gen_ai.*`` prefixed keys. Per-Langfuse- - backend collision rejection is queued as a follow-up. + ``namespace``, etc.) is not currently checked here: the rejection + may happen at either boundary, and the ``invoke()`` API-boundary + validation already rejects ``openarmature.*`` / ``gen_ai.*`` + prefixed keys. Per-Langfuse-backend collision rejection is queued + as a follow-up. """ + # Spec observability §8.4.1 / §8.4.2 (proposal 0034): top-level + # placement of caller-supplied metadata on the Trace / Observation. for key, value in caller_metadata.items(): metadata[key] = value @@ -163,15 +164,16 @@ def _subgraph_identity_at(event: NodeEvent, depth: int) -> str: given 1-based namespace depth, or the empty string when no identity is tracked at that depth. - Per observability §5.3 + the coord-thread - ``clarify-subgraph-name-semantics`` resolution: the empty-string - fallback matches the spec's "if the implementation tracks one" - clause for implementations / direct ``SubgraphNode(...)`` callers - that don't wire an identity through. Conformance fixtures - 031/032/033 lock identity as the required value; the empty-string - path keeps direct callers conformant with §5.3 but failing those - fixtures. + The empty-string fallback is the "no identity tracked" case, for + implementations / direct ``SubgraphNode(...)`` callers that don't + wire an identity through. + Conformance fixtures 031/032/033 lock identity as the required + value; the empty-string path keeps direct callers conformant but + failing those fixtures. """ + # Spec observability §5.3 (coord thread + # clarify-subgraph-name-semantics): empty-string fallback is + # conformant for callers that don't track a subgraph identity. idx = depth - 1 if 0 <= idx < len(event.subgraph_identities): identity = event.subgraph_identities[idx] @@ -254,12 +256,12 @@ class _InvState: @dataclass class LangfuseObserver: - """Observer-driven Langfuse mapping per spec observability §8. + """Observer-driven Langfuse mapping. Construct with a :class:`LangfuseClient` — the bundled :class:`InMemoryLangfuseClient` for tests, or a real ``langfuse.Langfuse()`` instance for production. The observer - handles the §6 event stream and emits Trace + Observation entities + handles the event stream and emits Trace + Observation entities through the client. Constructor knobs: @@ -267,34 +269,34 @@ class LangfuseObserver: - ``client``: the Langfuse sink (Protocol-typed). - ``disable_llm_spans``: when ``True`` the observer skips Generation observations on LLM provider events. - - ``disable_provider_payload``: default ``True`` per §8.9's "symmetric - privacy posture" with the OTel observer. Gates + - ``disable_provider_payload``: default ``True`` for a symmetric + privacy posture with the OTel observer. Gates ``generation.input`` / ``output`` / ``metadata.request_extras`` emission. The name carries the broadened provider-payload scope; LLM completion is OA's only provider-call payload today. - ``payload_byte_cap``: per-attribute byte cap on the source payload string before parse-back. Mirrors the OTel observer's ``payload_max_bytes`` semantic — emission preserves the raw - truncated string when the §5.5.5 marker is present (per §8.7). - Default 64 KiB; same minimum (256 bytes) applies. + truncated string when the truncation marker is present. Default + 64 KiB; same minimum (256 bytes) applies. - ``detached_subgraphs``: set of subgraph wrapper node names that - run in their own Langfuse Trace per §8.5. Each such subgraph - gets a fresh trace_id; the main Trace's dispatch observation - surfaces the link via ``metadata.detached_child_trace_ids``. + run in their own Langfuse Trace. Each such subgraph gets a fresh + trace_id; the main Trace's dispatch observation surfaces the link + via ``metadata.detached_child_trace_ids``. - ``detached_fan_outs``: set of fan-out node names whose instances each get their own Langfuse Trace. Same link mechanism on the fan-out node observation: each per-instance detached trace_id lands in the array. - - ``disable_state_payload``: default ``True`` per §8.4.1 *Trace - input/output sourcing* (proposal 0043). When ``True`` the - observer does NOT serialize ``initial_state`` / final state - directly onto ``trace.input`` / ``trace.output``; the minimal - stub applies unless ``trace_input_from_state`` / - ``trace_output_from_state`` overrides. When ``False`` the raw - state object is serialized to the Trace fields, subject to - ``payload_byte_cap`` truncation. Independent of - ``disable_provider_payload`` — the two payloads carry distinct - threat models (LLM-call transcript vs. application state). + - ``disable_state_payload``: default ``True`` (Trace input/output + sourcing). When ``True`` the observer does NOT serialize + ``initial_state`` / final state directly onto ``trace.input`` / + ``trace.output``; the minimal stub applies unless + ``trace_input_from_state`` / ``trace_output_from_state`` + overrides. When ``False`` the raw state object is serialized to + the Trace fields, subject to ``payload_byte_cap`` truncation. + Independent of ``disable_provider_payload`` — the two payloads + carry distinct threat models (LLM-call transcript vs. + application state). - ``trace_input_from_state``: optional caller hook returning the value to use as ``trace.input``. Called once per invocation at the ``InvocationStartedEvent``. Returning ``None`` falls @@ -309,8 +311,8 @@ class LangfuseObserver: parameterization. - ``implementation_version``: string surfaced as ``trace.metadata.implementation_version`` on every Trace. - Defaults to ``openarmature.__version__``. Always-emit invariant - inherited from §5.1 — not gated by ``disable_state_payload``, + Defaults to ``openarmature.__version__``. Always emitted — + not gated by ``disable_state_payload``, ``disable_provider_payload``, or any other privacy knob. The observer reads the spec version from the package at @@ -319,6 +321,11 @@ class LangfuseObserver: state isolation keys all internal maps by invocation_id. """ + # Spec observability §8 (Langfuse backend mapping). Knob spec + # basis: §8.9 privacy posture; §8.4.1 Trace input/output sourcing + # (proposal 0043); §8.5 detached traces; §5.1 always-emit + # attribution invariant. + client: LangfuseClient disable_llm_spans: bool = False disable_provider_payload: bool = True @@ -1451,9 +1458,9 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: def _handle_typed_llm_failed(self, event: LlmFailedEvent) -> None: """Open + close an ERROR-level Generation observation from the - typed LlmFailedEvent (failure path, proposal 0058). Same shape - as the success path with ERROR level + error_category as the - Generation observation's statusMessage.""" + typed LlmFailedEvent (failure path). Same shape as the success + path with ERROR level + error_category as the Generation + observation's statusMessage.""" from openarmature.observability.correlation import ( current_correlation_id, current_invocation_id, @@ -1600,8 +1607,9 @@ def _typed_event_metadata( return metadata def _usage_from_typed_event(self, event: LlmCompletionEvent) -> LangfuseUsage | None: - """Map the typed event's Usage onto the Langfuse Usage record - per §8.4.3. Returns None when no usage was reported.""" + """Map the typed event's Usage onto the Langfuse Usage record. + Returns None when no usage was reported.""" + # Spec observability §8.4.3 (Langfuse usage mapping). usage = event.usage if usage is None: return None @@ -1614,8 +1622,9 @@ def _usage_from_typed_event(self, event: LlmCompletionEvent) -> LangfuseUsage | ) def _resolve_prompt_link_from_typed_event(self, event: LlmCompletionEvent | LlmFailedEvent) -> Any: - """§8.4.4 case discrimination on the typed event's active_prompt + """Case discrimination on the typed event's active_prompt snapshot.""" + # Spec observability §8.4.4. active_prompt = event.active_prompt if active_prompt is None: return None diff --git a/src/openarmature/observability/metadata.py b/src/openarmature/observability/metadata.py index e2c381b..bb856ea 100644 --- a/src/openarmature/observability/metadata.py +++ b/src/openarmature/observability/metadata.py @@ -7,7 +7,7 @@ # at the ``invoke()`` boundary and at mid-invocation augmentation # via ``set_invocation_metadata``. -"""Caller-supplied invocation metadata (proposal 0034). +"""Caller-supplied invocation metadata. Two surfaces: @@ -27,11 +27,11 @@ - Keys MUST be strings. - Keys MUST NOT start with ``openarmature.`` or ``gen_ai.`` (reserved - for spec-normative attribute namespaces; collisions would silently + attribute namespaces; collisions would silently overwrite OA-emitted state at the observer layer). - Keys MUST NOT exactly match a reserved OA-emitted top-level metadata - key name (the §8.4 Langfuse set plus ``invocation_id``; proposal - 0041) for the same collision reason. + key name (the Langfuse set plus ``invocation_id``) for the same + collision reason. - Values MUST be OTel-attribute-compatible scalars: ``str``, ``int``, ``float``, ``bool``, or a homogeneous list/tuple of those types. ``None``, nested objects, and mixed-type arrays are rejected. @@ -123,11 +123,10 @@ def current_invocation_metadata() -> MappingProxyType[str, AttributeValue]: callers MUST NOT mutate it. Use :func:`set_invocation_metadata` to add entries. - Aliased as :func:`get_invocation_metadata` per spec §3.4 (proposal - 0048, v0.40.0); the alias is the canonical spec-idiomatic name - paralleling :func:`set_invocation_metadata`. Both names point at - the same function — pick whichever reads naturally at the call - site. + Aliased as :func:`get_invocation_metadata`; the alias is the + canonical idiomatic name paralleling :func:`set_invocation_metadata`. + Both names point at the same function — pick whichever reads + naturally at the call site. """ return _invocation_metadata_var.get() @@ -146,10 +145,10 @@ def set_invocation_metadata(**entries: AttributeValue) -> None: metadata. Additive: existing keys with the same names are overwritten; other keys are preserved. - Per spec §3.4: affects spans / observations emitted AFTER the - call returns. Open observations whose lineage covers the calling - context ARE updated in place per proposal 0040 — implementations - enqueue a :class:`~openarmature.graph.events.MetadataAugmentationEvent` + Affects spans / observations emitted AFTER the call returns. Open + observations whose lineage covers the calling context ARE updated + in place: implementations enqueue a + :class:`~openarmature.graph.events.MetadataAugmentationEvent` on the engine's serial observer-delivery queue carrying the delta + the calling context's lineage tuple (namespace, attempt_index, fan_out_index, branch_name); observers correlate @@ -169,10 +168,11 @@ def set_invocation_metadata(**entries: AttributeValue) -> None: symmetry; users typically call this from inside a node body, middleware, or observer where an invocation is already in flight. - Symmetric with :func:`get_invocation_metadata` (proposal 0048, - spec §3.4 v0.40.0) which returns an immutable snapshot of the - current async context's view. + Symmetric with :func:`get_invocation_metadata`, which returns an + immutable snapshot of the current async context's view. """ + # Spec observability §3.4: additive merge, affecting only spans / + # observations emitted after this call returns. if not entries: return for key, value in entries.items(): @@ -226,13 +226,14 @@ def validate_invocation_metadata(mapping: object) -> MappingProxyType[str, Attri read-only view the engine stashes on the ContextVar. Public so the engine (`CompiledGraph.invoke`) calls this at the - boundary BEFORE any work begins; per spec §3.4 the rejection - surfaces as a synchronous error to the caller of ``invoke()`` - rather than as a backend-emission failure. + boundary BEFORE any work begins; the rejection surfaces as a + synchronous error to the caller of ``invoke()`` rather than as a + backend-emission failure. Returns the validated read-only mapping. Raises :class:`ValueError` on any rule violation (with a message naming the offending key). """ + # Spec observability §3.4: boundary validation, synchronous rejection. if mapping is None: return _EMPTY_METADATA if not isinstance(mapping, dict): diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 39c3832..35a2929 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -166,8 +166,7 @@ def _read_implementation_version() -> str: def _apply_caller_metadata(attrs: dict[str, Any], metadata: Mapping[str, Any]) -> None: """Merge caller-supplied invocation metadata into a span's - attribute dict as ``openarmature.user.`` entries per - observability §5.6. + attribute dict as ``openarmature.user.`` entries. Called at every span-emission site so the metadata family is cross-cutting (invocation span, every node span, subgraph @@ -187,12 +186,12 @@ def _subgraph_identity_at(event: NodeEvent, depth: int) -> str: given 1-based namespace depth, or the empty string when no identity is tracked at that depth. - Per observability §5.3 + the coord-thread - ``clarify-subgraph-name-semantics`` resolution: empty-string - fallback matches the spec's "if the implementation tracks one" - clause for callers using ``SubgraphNode(name=..., compiled=...)`` - without supplying ``subgraph_identity``. + The empty-string fallback is the "no identity tracked" case, for + callers using ``SubgraphNode(name=..., compiled=...)`` without + supplying ``subgraph_identity``. """ + # Spec observability §5.3 (coord thread + # clarify-subgraph-name-semantics). idx = depth - 1 if 0 <= idx < len(event.subgraph_identities): identity = event.subgraph_identities[idx] @@ -214,10 +213,10 @@ class _OpenSpan: single event handler's scope, so no token needs to live across events. - Per proposal 0045: carries the span's own ``fan_out_index_chain`` - and ``branch_name_chain`` so the augmentation walk can apply - §3.4's lineage-aware boundary rule without re-deriving the chain - from successive events.""" + Carries the span's own ``fan_out_index_chain`` and + ``branch_name_chain`` so the augmentation walk can apply the + lineage-aware boundary rule without re-deriving the chain from + successive events.""" span: Span fan_out_index_chain: tuple[int | None, ...] = () @@ -231,7 +230,7 @@ def _span_chain_on_path( ) -> bool: """Return True iff ``open_span``'s chain is a prefix-match of the augmenter's chain — i.e., the span sits on the augmenter's - call-stack ancestor path. Per proposal 0045 §3.4: + call-stack ancestor path: - A span shorter than the augmenter (chain prefix-matches) is an ancestor on the path. @@ -240,6 +239,7 @@ def _span_chain_on_path( - A span deeper than the augmenter, OR with a position-mismatch anywhere, is a sibling and MUST NOT be updated. """ + # Spec observability §3.4 (proposal 0045): lineage-aware boundary. span_fi = open_span.fan_out_index_chain span_bn = open_span.branch_name_chain if len(span_fi) > len(aug_fi_chain): @@ -385,7 +385,7 @@ class _InvState: @dataclass class OTelObserver: - """Observer-driven OTel span lifecycle per spec observability §6. + """Observer-driven OTel span lifecycle. Construct with a :class:`SpanProcessor` (typically a :class:`BatchSpanProcessor` wrapping a real exporter, or a @@ -443,6 +443,7 @@ class OTelObserver: event handler's scope. """ + # Spec observability §6 (observer-driven span lifecycle). # span_processor accepts a single processor or a sequence per # observability friction-roundup #5. The dataclass field type is # the union; ``__post_init__`` normalizes to a tuple internally. @@ -764,7 +765,7 @@ def _open_started_span(self, event: NodeEvent) -> None: ) def _handle_completed(self, event: NodeEvent) -> None: - """Close the matching span, applying §4.2 status mapping.""" + """Close the matching span, applying the status mapping.""" from openarmature.observability.correlation import current_invocation_id invocation_id = current_invocation_id() @@ -942,8 +943,7 @@ def _collect_augmentation_targets( self, invocation_id: str, event: MetadataAugmentationEvent ) -> list[Span]: """Collect open spans on the augmenter's call-stack ancestor - chain per proposal 0045 §3.4. Three-step boundary decision - tree per open span: + chain. Three-step boundary decision tree per open span: 1. Same context as augmenter (or descendant sharing the mutated mapping) — update. @@ -1060,17 +1060,17 @@ def _collect_augmentation_targets( # ------------------------------------------------------------------ def _emit_checkpoint_migrate_span(self, event: NodeEvent) -> None: - """Spec pipeline-utilities §6 cross-ref (proposal 0014): emit a - zero-duration ``openarmature.checkpoint.migrate`` span when - a versioned resume's migration chain runs. The synthetic - event carries ``_MigrationSummary`` on ``pre_state``; this - handler reads ``from_version`` / ``to_version`` / + """Emit a zero-duration ``openarmature.checkpoint.migrate`` + span when a versioned resume's migration chain runs. The + synthetic event carries ``_MigrationSummary`` on ``pre_state``; + this handler reads ``from_version`` / ``to_version`` / ``chain_length`` from the summary onto the span. Emitted under the invocation's root span (no parent-node context — the migration runs before any node fires), so trace UIs surface it as the first child of the invocation. """ + # Spec pipeline-utilities §6 cross-ref (proposal 0014). from openarmature.graph.compiled import _MigrationSummary from openarmature.observability.correlation import ( current_correlation_id, @@ -1117,10 +1117,10 @@ def _emit_checkpoint_migrate_span(self, event: NodeEvent) -> None: span.end() def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: - """Spec pipeline-utilities §10.8 + observability §4.5: emit a - zero-duration ``openarmature.checkpoint.save`` span attached - to the most-recently-opened node span (the node whose + """Emit a zero-duration ``openarmature.checkpoint.save`` span + attached to the most-recently-opened node span (the node whose completed event triggered the save).""" + # Spec pipeline-utilities §10.8 + observability §4.5. from openarmature.observability.correlation import ( current_correlation_id, current_invocation_id, @@ -1313,8 +1313,8 @@ def _handle_typed_llm_completion(self, event: LlmCompletionEvent) -> None: def _handle_typed_llm_failed(self, event: LlmFailedEvent) -> None: """Open + close the ``openarmature.llm.complete`` span from the - typed LlmFailedEvent (failure path, proposal 0058). Same span - shape as the success path with ERROR status + + typed LlmFailedEvent (failure path). Same span shape as the + success path with ERROR status + ``openarmature.error.category`` attribute attached.""" from openarmature.observability.correlation import ( current_correlation_id, @@ -1605,16 +1605,17 @@ def _sync_subgraph_spans( correlation_id: str | None, event: NodeEvent, ) -> None: - """Open any synthetic subgraph dispatch spans we need (per - observability §4.5: subgraph wrapper MUST emit a span); close - any subgraph spans whose prefix is no longer an ancestor of - the current event's namespace. + """Open any synthetic subgraph dispatch spans we need (the + subgraph wrapper MUST emit a span); close any subgraph spans + whose prefix is no longer an ancestor of the current event's + namespace. Called from ``_open_started_span`` BEFORE opening the leaf node span. Detached-mode entries (subgraph or fan-out instance) are registered as detached roots so their inner spans live in a fresh trace. """ + # Spec observability §4.5: the subgraph wrapper emits a span. namespace = event.namespace # 1. Close any open subgraph spans that aren't ancestors of # the current namespace — we've left those subgraphs. @@ -2015,10 +2016,9 @@ def _open_fan_out_instance_dispatch_span( prefix: tuple[str, ...], event: NodeEvent, ) -> None: - """Per-instance dispatch span for a non-detached fan-out - (per spec §5.4 + proposal 0013, v0.10.0). Mirror of - ``_open_detached_fan_out_instance_root`` but lives in the - parent trace (no fresh trace_id). + """Per-instance dispatch span for a non-detached fan-out. + Mirror of ``_open_detached_fan_out_instance_root`` but lives in + the parent trace (no fresh trace_id). Parents under the fan-out node span at ``prefix``. Span name is the fan-out node's name; attributes are @@ -2084,9 +2084,8 @@ def _open_parallel_branches_branch_dispatch_span( prefix: tuple[str, ...], event: NodeEvent, ) -> None: - """Per-branch dispatch span for a parallel-branches NODE (per - observability §5.7 + proposal 0044, v0.36.0). Mirror of - ``_open_fan_out_instance_dispatch_span``. + """Per-branch dispatch span for a parallel-branches NODE. + Mirror of ``_open_fan_out_instance_dispatch_span``. Parents under the parallel-branches node span at ``prefix``. Span name is the branch's identifier (``event.branch_name``). @@ -2209,7 +2208,7 @@ def _find_fan_out_node_span(self, inv_state: _InvState, prefix: tuple[str, ...]) return None def _node_attrs(self, event: NodeEvent, correlation_id: str | None) -> dict[str, Any]: - """Build the §5 attribute set for a node span.""" + """Build the attribute set for a node span.""" attrs: dict[str, Any] = { "openarmature.node.name": event.node_name, "openarmature.node.namespace": list(event.namespace), @@ -2283,7 +2282,7 @@ def close_invocation(self, invocation_id: str) -> None: invocation in one call without needing to track ids externally. A first-class engine-level signal that lets observers auto-drain per-invocation state on completion is - tracked as Phase 6.1+ follow-up work in + tracked as follow-up work in ``openarmature-coord/docs/phase-6-1-conformance-fillin.md``. """ inv_state = self._inv_states.pop(invocation_id, None) diff --git a/src/openarmature/prompts/backends/filesystem.py b/src/openarmature/prompts/backends/filesystem.py index 0e0ef48..8277056 100644 --- a/src/openarmature/prompts/backends/filesystem.py +++ b/src/openarmature/prompts/backends/filesystem.py @@ -21,9 +21,9 @@ class FilesystemPromptBackend: - ``layout="per-label"`` (default): ``/