Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions src/openarmature/checkpoint/backends/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,26 @@

@dataclass(frozen=True)
class FanOutInternalSaveBatching:
"""Per-Checkpointer-instance configuration for §10.11.4 fan-out
internal save batching.
"""Per-Checkpointer-instance configuration for fan-out internal
save batching.

Applies ONLY to fan-out instance internal saves. Outermost-graph,
subgraph-internal, and fan-out node completion saves remain
synchronous per §10.3.
synchronous.

- ``flush_every``: flush the buffer every N buffered saves. ``0``
/ negative means batching is disabled (every save flushes
immediately). The buffered save count resets at each flush.

Buffered-but-unflushed saves are LOST on crash per §10.11.4;
on resume, instances whose completed state was buffered-only
revert to ``in_flight`` / ``not_started`` and re-run. The §10.11.1
reducer correctness holds because their contributions hadn't
durably committed.
Buffered-but-unflushed saves are LOST on crash; on resume,
instances whose completed state was buffered-only revert to
``in_flight`` / ``not_started`` and re-run. Reducer correctness
holds because their contributions hadn't durably committed.
"""

# Spec pipeline-utilities §10.11.4 (fan-out internal save batching);
# §10.3 synchronous saves; §10.11.1 reducer correctness.

flush_every: int = 0


Expand All @@ -53,23 +55,22 @@ class InMemoryCheckpointer:
from :meth:`load`; no serialization round-trip. (This is the
feature: tests can assert on the saved state's identity.)

**State-migration eligibility:** none. Per spec §10.12.1, a
backend supports migration only when it can expose a structural
intermediate form of the loaded state independent of the current
**State-migration eligibility:** none. A backend supports
migration only when it can expose a structural intermediate form
of the loaded state independent of the current
state class. This backend holds live typed instances by
reference, so a version mismatch on resume raises
``CheckpointRecordInvalid`` rather than consulting the
migration registry.

**Fan-out internal save batching** (per spec §10.11.4): optional
via the ``fan_out_internal_save_batching`` constructor parameter.
**Fan-out internal save batching**: optional via the
``fan_out_internal_save_batching`` constructor parameter.
Default is no batching (every save flushes immediately). When
enabled, fan-out instance internal saves buffer in memory and
flush every ``flush_every`` saves. Outermost-graph,
subgraph-internal, and fan-out node completion saves bypass the
buffer entirely (they remain synchronous). On crash, buffered
saves are lost — by design, per §10.11.4's documented cost
trade-off.
saves are lost — by design, a documented cost trade-off.
"""

# Per spec §10.12.1: in-memory storage holds live typed-state
Expand Down Expand Up @@ -104,8 +105,8 @@ async def save(self, invocation_id: str, record: CheckpointRecord) -> None:
previous record for the same id. Not durable across process
restarts.

Per §10.11.4: outermost-graph, subgraph-internal, and
fan-out node completion saves are synchronous regardless of
Outermost-graph, subgraph-internal, and fan-out node
completion saves are synchronous regardless of
the batching configuration. The engine routes fan-out
instance internal saves through :meth:`save_fan_out_internal`
instead; this method bypasses the buffer.
Expand All @@ -121,8 +122,8 @@ async def save(self, invocation_id: str, record: CheckpointRecord) -> None:
self._records[invocation_id] = record

async def save_fan_out_internal(self, invocation_id: str, record: CheckpointRecord) -> None:
"""Buffer a fan-out instance internal save under the §10.11.4
batching policy. When batching is disabled (default), behaves
"""Buffer a fan-out instance internal save under the batching
policy. When batching is disabled (default), behaves
identically to :meth:`save` — every save is synchronously
durable. When ``flush_every`` is positive, the save is
buffered; the buffer flushes when the count reaches the
Expand All @@ -142,10 +143,10 @@ async def save_fan_out_in_flight_failure(
invocation_id: str,
record: CheckpointRecord,
) -> None:
"""Buffer an "instance failed mid-execution" save under §10.11.4
batching. The failure save records the in_flight state of an
instance whose terminal inner node raised; this save closes the
in_flight observability gap (per §10.11) for instances whose
"""Buffer an "instance failed mid-execution" save under the
batching policy. The failure save records the in_flight state
of an instance whose terminal inner node raised; this save
closes the in_flight observability gap for instances whose
subgraphs have no sibling-completed save to piggyback on.

Under batching, this save buffers BUT does NOT count toward
Expand Down Expand Up @@ -184,8 +185,8 @@ def _flush_invocation_buffer_locked(self, invocation_id: str) -> None:

async def load(self, invocation_id: str) -> CheckpointRecord | None:
"""Return the saved record for ``invocation_id`` or ``None``
if nothing has been saved under that id. Per §10.11.4:
buffered-but-unflushed fan-out internal saves are NOT visible
if nothing has been saved under that id. Buffered-but-unflushed
fan-out internal saves are NOT visible
to ``load`` — that's the crash-loses-buffered contract. To
simulate a crash before the buffer flushes, drop the
Checkpointer reference; the buffer is in-memory only.
Expand Down
4 changes: 2 additions & 2 deletions src/openarmature/checkpoint/backends/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ def _decode(self, blob: bytes, recorded_mode: str, invocation_id: str) -> Any:

async def save(self, invocation_id: str, record: CheckpointRecord) -> None:
"""Upsert ``record`` under ``invocation_id``. The state,
completed positions, parent-state stack, and (per proposal 0009)
per-fan-out-node progress are serialized via the configured
completed positions, parent-state stack, and per-fan-out-node
progress are serialized via the configured
:class:`SerializationMode` and written in a single statement.
Writes are durable on return (WAL mode, per-write fsync at the
SQLite layer)."""
Expand Down
31 changes: 14 additions & 17 deletions src/openarmature/checkpoint/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,13 @@ class CheckpointRecordInvalid(CheckpointError):
"""Raised when ``Checkpointer.load(X)`` returns a record whose
schema is incompatible with the current graph: state shape
mismatch, missing required fields, OR a post-migration state
that fails to deserialize against the current state class (per
spec §10.12.4). Non-transient.
that fails to deserialize against the current state class.
Non-transient.

Note: raw ``schema_version`` mismatches no longer route here.
They now flow through ``CheckpointStateMigrationMissing`` (no
chain registered) or ``CheckpointStateMigrationFailed`` (chain
application raised) per spec §10.10's three-way category
distinction.
application raised) — a three-way category distinction.
"""

category = "checkpoint_record_invalid"
Expand All @@ -80,8 +79,8 @@ def __init__(self, invocation_id: str, message: str) -> None:
class CheckpointStateMigrationMissing(CheckpointError):
"""Raised on resume when the saved record's ``schema_version``
does not match the current state class's ``schema_version`` AND
no chain of registered migrations bridges the two. Non-transient
per spec §10.10; the user MUST register a migration (or pin
no chain of registered migrations bridges the two. Non-transient;
the user MUST register a migration (or pin
their state to the saved version) for the resume to succeed.

Carries the saved-from / current-to versions and a description
Expand Down Expand Up @@ -112,18 +111,16 @@ def __init__(


class CheckpointStateMigrationChainAmbiguous(CheckpointError):
"""Raised when the registered migration graph is ambiguous per
spec §10.10 / §10.12 (proposal 0018, spec v0.16.0):
"""Raised when the registered migration graph is ambiguous:

- Duplicate-pair case (§10.12.1): two migrations register with the
same ``(from_version, to_version)`` pair. Raised at registration
- Duplicate-pair case: two migrations register with the same
``(from_version, to_version)`` pair. Raised at registration
time so the user sees the ambiguity before any resume attempt.
- Multi-shortest-path case (§10.12.2): the registered migration
graph has multiple distinct shortest paths between the saved
and current versions (e.g., a diamond ``v1→v2→v4`` + ``v1→v3→v4``).
Spec accepts either compile-time detection (recommended) or
load-time detection (this impl runs the check inside BFS at
resume time).
- Multi-shortest-path case: the registered migration graph has
multiple distinct shortest paths between the saved and current
versions (e.g., a diamond ``v1→v2→v4`` + ``v1→v3→v4``). Either
compile-time detection (recommended) or load-time detection is
acceptable (this impl runs the check inside BFS at resume time).

Non-transient: retrying without changing the migration graph
will not succeed. Carries ``from_version`` / ``to_version`` when
Expand All @@ -149,7 +146,7 @@ def __init__(

class CheckpointStateMigrationFailed(CheckpointError):
"""Raised on resume when a registered migration function raises
during chain application (per spec §10.12.2). The migration's
during chain application. The migration's
exception is preserved as ``__cause__``. Non-transient by
default: a buggy migration is deterministic, so retrying
without changing the migration code will not succeed.
Expand Down
22 changes: 10 additions & 12 deletions src/openarmature/checkpoint/migration.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""State migration types and registry.

Realizes pipeline-utilities §10.12 (proposal 0014). A
``StateMigration`` describes one edge in the migration graph;
A ``StateMigration`` describes one edge in the migration graph;
``MigrationRegistry`` holds the ordered set and resolves chains
via BFS. Ambiguity (duplicate ``(from, to)`` pairs OR multiple
distinct shortest paths between the same source/sink) is a
configuration-style error per §10.12.1 / §10.12.2.
configuration-style error.
"""
# Realizes pipeline-utilities §10.12 (proposal 0014).

from __future__ import annotations

Expand All @@ -29,9 +29,9 @@ class StateMigration:
chain (or for final deserialization into the current state class).

Migrations MUST be pure: deterministic, no I/O, no implicit
state. The framework does not police purity per spec §10.12.2
("the contract is documented, not policed"); violating it
risks non-deterministic resume.
state. The framework does not police purity (the contract is
documented, not policed); violating it risks non-deterministic
resume.
"""

from_version: str
Expand All @@ -46,15 +46,14 @@ class MigrationRegistry:

- Two migrations with the same ``from_version`` AND
``to_version`` raise ``CheckpointStateMigrationChainAmbiguous``
directly per spec §10.10 (proposal 0018) so the canonical
category surfaces at the registration boundary without any
wrapping by the builder.
directly so the canonical category surfaces at the registration
boundary without any wrapping by the builder.
- Two migrations with the same ``from_version`` and different
``to_version`` are permitted (branched migration graph;
chain resolution picks a path or raises ambiguity if multiple
shortest paths exist).

Resolution-time semantics (per §10.12.2):
Resolution-time semantics:

- BFS from ``record.schema_version`` to
``current.schema_version``. BFS naturally finds the shortest
Expand Down Expand Up @@ -133,8 +132,7 @@ def resolve_chain(

Raises ``CheckpointStateMigrationChainAmbiguous`` if
multiple distinct shortest paths exist between
``from_version`` and ``to_version`` (ambiguous chain per
spec §10.10 / §10.12.2; proposal 0018 / spec v0.16.0).
``from_version`` and ``to_version`` (an ambiguous chain).
Same canonical category as the duplicate-pair detection
in ``register``; one type for chain ambiguity regardless
of when it surfaces.
Expand Down
29 changes: 15 additions & 14 deletions src/openarmature/checkpoint/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
produce records in the shipping version (atomic-restart contract).

``CheckpointRecord.schema_version`` carries the user-facing
state-schema identifier per spec §10.2 (proposal 0014 repurposes
the field from the original backend-internal record-shape role).
The framework reads ``type(state).schema_version`` at save time;
on load, version mismatches route through the migration registry
(per §10.12) rather than a strict equality check.
state-schema identifier. The framework reads
``type(state).schema_version`` at save time; on load, version
mismatches route through the migration registry rather than a strict
equality check.
"""
# Spec pipeline-utilities §10.2 (proposal 0014): schema_version is the
# state-schema identifier; §10.12 migration registry on mismatch.

from __future__ import annotations

Expand Down Expand Up @@ -98,23 +99,23 @@ class FanOutInstanceProgress:
correctness contract: an instance marked ``completed`` MUST have
its contribution recorded into the accumulator AND that
contribution MUST be reflected in ``result``. Reducer composition
rules (§10.11.1) depend on this exactly-once guarantee.
rules depend on this exactly-once guarantee.
- ``result``: for ``completed`` instances, the durable contribution
to the fan-out accumulator (a success value for the
``target_field`` bucket, or under ``collect`` error policy an
error entry for the ``errors_field`` bucket). Typed per the
parent state schema's ``target_field`` / ``errors_field``
(representation is implementation-defined per §10.11; Python
stores as ``Any`` since dynamic typing absorbs the variance).
(representation is implementation-defined; Python stores as
``Any`` since dynamic typing absorbs the variance).
Unused for ``in_flight`` and ``not_started``.
- ``result_is_error``: boolean discriminator for ``completed``
entries — ``True`` when the contribution is a ``collect``-mode
error entry that rolls forward into ``errors_field``, ``False``
when the contribution is a success value that rolls forward
into ``target_field``. MUST be ``False`` for ``in_flight`` and
``not_started`` (the value of ``result`` is ignored for those).
Per proposal 0027 (spec v0.21.0): implementations MUST consult
this field on resume rather than inferring routing from
Implementations MUST consult this field on resume rather than
inferring routing from
``result``'s shape — heuristic inspection would misclassify
user state values that happen to match the engine's
error-record shape.
Expand Down Expand Up @@ -148,7 +149,7 @@ class FanOutProgress:
the fan-out (empty for outermost-graph fan-outs). Disambiguates
fan-outs of the same name in different nested-subgraph contexts.
- ``instance_count``: the resolved instance count for this fan-out
(per pipeline-utilities §9 count or items_field mode).
(count or items_field mode).
- ``instances``: a tuple of per-instance entries indexed by
``fan_out_index`` (``instances[i]`` is the entry for
``fan_out_index=i``). Length equals ``instance_count``.
Expand All @@ -167,7 +168,7 @@ class CheckpointRecord:

Frozen: backends MUST treat the record as immutable. The engine
builds a fresh record per ``completed`` event rather than mutating
a shared one. The ``fan_out_progress`` field (per §10.11) carries
a shared one. The ``fan_out_progress`` field carries
per-fan-out-node entries when one or more fan-outs are in flight
at save time; an empty tuple means no fan-out progress to record.
"""
Expand Down Expand Up @@ -234,8 +235,8 @@ class Checkpointer(Protocol):
plain dict, JSON tree, or similar) that is independent of the
current state class. JSON-encoded backends naturally satisfy
this; backends that store live typed state instances or use
class-bound serialization (pickle) cannot. Per spec §10.12.1,
backends that cannot expose the intermediate MUST raise
class-bound serialization (pickle) cannot. Backends that cannot
expose the intermediate MUST raise
``CheckpointRecordInvalid`` on version mismatch even when
migrations are registered; the registry has no chance to bridge.

Expand Down
15 changes: 7 additions & 8 deletions src/openarmature/graph/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,11 @@ def add_parallel_branches_node(
errors_field: str | None = None,
middleware: Iterable[Middleware] | None = None,
) -> Self:
"""Register a parallel-branches node per pipeline-utilities §11.
"""Register a parallel-branches node.

``branches`` is a mapping from non-empty branch name to a
:class:`BranchSpec`. Insertion order is preserved and is
the dispatch + merge order per §11.8.
the dispatch + merge order.

Validates at registration:

Expand Down Expand Up @@ -397,7 +397,7 @@ def with_state_migration(
to_version: str,
migrate: Callable[[Any], Any],
) -> Self:
"""Register one state migration per pipeline-utilities §10.12.
"""Register one state migration.

On resume, when the saved record's ``schema_version`` does not
match the current state class's ``schema_version``, the engine
Expand All @@ -406,15 +406,14 @@ def with_state_migration(
``parent_states``) before deserialization.

Migrations MUST be pure: deterministic, no I/O, no implicit
state. The framework does not police purity (per §10.12.2),
but violating it risks non-deterministic resume.
state. The framework does not police purity, but violating it
risks non-deterministic resume.

Raises ``CheckpointStateMigrationChainAmbiguous`` at
registration if the ``(from_version, to_version)`` pair is
already registered (per spec §10.10 / §10.12.1; proposal
0018 / spec v0.16.0). Also raises ``ValueError`` if
already registered. Also raises ``ValueError`` if
``to_version`` is the empty-string sentinel (the un-declared
marker per §10.2 is not a valid chain target).
marker is not a valid chain target).
"""
self._migration_registry.register(
StateMigration(
Expand Down
Loading