Skip to content

Commit 9069d36

Browse files
feat(observability): openarmature.checkpoint.migrate OTel span
Lands the §6 cross-ref piece of proposal 0014 that the previous commit deferred. Versioned resumes whose migration chain runs now emit a zero-duration `openarmature.checkpoint.migrate` span on the OTel observer, parented under the invocation root. - New _MigrationSummary frozen dataclass in graph/compiled.py carries the chain metadata (from_version, to_version, chain_length) out of _migrate_record back to invoke(). - _migrate_record return type becomes tuple[CheckpointRecord, _MigrationSummary]; invoke() captures the summary across the migration → context-creation gap. - After context + delivery-worker setup but before the main execution loop, invoke() dispatches a synthetic NodeEvent with phase='checkpoint_migrated' carrying the summary on pre_state. Mirrors the checkpoint_saved pattern (state-on-pre, post=None) so the OTel observer's existing routing shape picks it up cleanly. - New phase 'checkpoint_migrated' added to NodeEvent's Literal union and to KNOWN_PHASES (rejects typos in observer.phases subscriptions). NodeEvent.pre_state typed as Any so the permissive _MigrationSummary payload fits; the OTel observer narrows defensively via isinstance. - OTelObserver._emit_checkpoint_migrate_span emits the span with the three normative attributes. Parented under the invocation root span (opened lazily if not yet present — versioned resumes are the first event on a new invocation_id, before any node-span emission). Mirrors _emit_checkpoint_save_span. - Two OTel unit tests: one positive (versioned resume emits the span with the right attrs) and one negative (version-match fast path emits no span, per spec §10.12.3). CHANGELOG entry promoted from 'deferred to follow-on' to the landed shape. Phase gated off default subscriptions so legacy observers don't surface a new event without opt-in.
1 parent 316bc52 commit 9069d36

6 files changed

Lines changed: 258 additions & 24 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
1010

1111
- **State migration for checkpointed graphs (proposal 0014, introduced in spec v0.15.0; refined by proposal 0018 in spec v0.16.0).** Saved checkpoints whose `schema_version` doesn't match the current state class now route through a registered migration chain instead of failing on resume. Surface: `State.schema_version: ClassVar[str] = ""` (declare a non-empty value to opt in), `GraphBuilder.with_state_migration(from_version, to_version, migrate)` and `with_state_migrations(*migrations)` for registration, `StateMigration` and `MigrationRegistry` types exported from `openarmature.checkpoint`. Chain resolution is BFS over the registered edges; the shortest path wins. Three new error categories: `CheckpointStateMigrationChainAmbiguous` (proposal 0018: duplicate `(from, to)` pair at registration time, or multiple distinct shortest paths between the saved and current versions at resume time), `CheckpointStateMigrationMissing` (no chain bridges the versions), and `CheckpointStateMigrationFailed` (a migration function raised). All non-transient. Post-migration deserialization failures still route to `CheckpointRecordInvalid` per §10.12.4. The same chain applies to each entry in `parent_states` in lockstep with the outer state per §10.12.2. Routing precedence per §10.10 (v0.16.0): chain-ambiguous → missing → failed → record-invalid.
1212
- **`Checkpointer.supports_state_migration` Protocol attribute.** Marks whether a backend can expose the structural intermediate form (a plain dict, JSON tree) the migration registry consumes. `SQLiteCheckpointer(serialization="json")` opts in; `SQLiteCheckpointer(serialization="pickle")` and `InMemoryCheckpointer` opt out. On version mismatch against a non-migration-eligible backend the engine raises `CheckpointRecordInvalid` per spec §10.12.1.
13+
- **`openarmature.checkpoint.migrate` OTel span (proposal 0014 §6 cross-ref).** Versioned resumes whose migration chain runs emit a zero-duration `openarmature.checkpoint.migrate` span on the OTel observer, parented under the invocation root span. Attributes: `openarmature.checkpoint.migrate.from_version`, `openarmature.checkpoint.migrate.to_version` (the final target), `openarmature.checkpoint.migrate.chain_length`. The §10.12.3 fast path (versions match, registry not consulted) emits no span. Engine-side: a synthetic `checkpoint_migrated` observer phase carries a `_MigrationSummary` payload from `_migrate_record` through to the OTel observer; the new phase is gated off default subscriptions (observers opt in explicitly via `phases={..., "checkpoint_migrated"}`).
1314
- **Prompt-management capability (proposal 0017, introduced in spec v0.15.0).** New `openarmature.prompts` subpackage. `PromptManager` composes one or more `PromptBackend`s, exposes `fetch` / `render` / `get`, applies the §8 fallback semantics (`prompt_store_unavailable` continues to the next backend; `prompt_not_found` stops the chain), and renders templates with Jinja2's `StrictUndefined` per §7. `Prompt` / `PromptResult` / `PromptGroup` are Pydantic models matching spec §3 / §4 / §9. Three error categories (`PromptNotFound`, `PromptRenderError`, `PromptStoreUnavailable`) with `PROMPT_TRANSIENT_CATEGORIES` exported for retry-middleware classifiers. `FilesystemPromptBackend` is the minimum local-filesystem reference backend (layout: `<root>/<label>/<name>.j2`; `version` derived from the first 16 hex chars of `template_hash`). New runtime dependency: `jinja2>=3.1`.
1415
- **`openarmature.prompts.context` — observability propagation per spec §11.** `with_active_prompt(result)` and `with_active_prompt_group(group)` context managers + `current_prompt_result()` / `current_prompt_group()` inspectors. When the OTel observer is active and an LLM call fires inside `with_active_prompt`, the `openarmature.llm.complete` span carries the normative `openarmature.prompt.*` attributes (`name`, `version`, `label`, `template_hash`, `rendered_hash`, `group_name`). Nesting is innermost-wins.
1516
- **Image content blocks for user messages (proposal 0015, introduced in spec v0.13.0).** `UserMessage.content` now accepts `str | list[ContentBlock]`. The block surface introduces `TextBlock`, `ImageBlock`, `ImageSourceURL`, `ImageSourceInline`, and the `ContentBlock` / `ImageSource` discriminated unions over the block / source `type` field. `ImageBlock` carries a `media_type` (required for inline sources; ignored for URL sources; typed as `str | None` so callers MAY pass any `image/*` type the bound model supports) and an optional `detail` hint (`"auto"` / `"low"` / `"high"`; `None` default omits the field from the wire so providers apply their own default). System, assistant, and tool messages stay text-string-only; image inputs are user-only in v1.

src/openarmature/graph/compiled.py

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,22 @@ def _no_op_finalize(_edge_error: RuntimeGraphError | None) -> None:
246246
silently per proposal 0012 + fixture 013."""
247247

248248

249+
@dataclass(frozen=True)
250+
class _MigrationSummary:
251+
"""Per-resume migration-chain metadata threaded out of
252+
``_migrate_record`` so the engine can dispatch an
253+
``openarmature.checkpoint.migrate`` observer event after the
254+
invocation context is built (per spec §6 cross-ref in proposal
255+
0014). Carried on the synthetic ``NodeEvent.pre_state``
256+
payload for ``phase="checkpoint_migrated"``; the OTel observer
257+
reads it to emit the span.
258+
"""
259+
260+
from_version: str
261+
to_version: str
262+
chain_length: int
263+
264+
249265
def _apply_migration_step(
250266
migration: StateMigration,
251267
value: Any,
@@ -369,15 +385,20 @@ async def _migrate_record(
369385
checkpointer: Checkpointer,
370386
invocation_id: str,
371387
current_schema_version: str,
372-
) -> CheckpointRecord:
388+
) -> tuple[CheckpointRecord, _MigrationSummary]:
373389
"""Resolve a migration chain for ``record`` and apply it.
374390
375-
Returns the record with ``state`` + ``parent_states`` mapped
376-
through the chain. Caller is responsible for the
377-
post-migration deserialization step (§10.12.4): if the
378-
migrated state cannot deserialize against the current state
379-
class, the resulting failure surfaces as
380-
``CheckpointRecordInvalid``.
391+
Returns ``(migrated_record, summary)``. ``migrated_record``
392+
has ``state`` + ``parent_states`` mapped through the chain.
393+
``summary`` carries the chain's metadata so the caller can
394+
dispatch a ``checkpoint_migrated`` observer event after the
395+
invocation context exists (per the spec §6 cross-ref in
396+
proposal 0014).
397+
398+
Caller is responsible for the post-migration deserialization
399+
step (§10.12.4): if the migrated state cannot deserialize
400+
against the current state class, the resulting failure
401+
surfaces as ``CheckpointRecordInvalid``.
381402
382403
Spec §10.12.2 says "parent states MUST be treated as carrying
383404
the same ``schema_version`` as the outer record." We apply
@@ -433,22 +454,21 @@ async def _migrate_record(
433454
for i, parent in enumerate(migrated_parents):
434455
migrated_parents[i] = _apply_migration_step(migration, parent, f"parent_states[{i}]")
435456

436-
# TODO(observability): emit an ``openarmature.checkpoint.migrate``
437-
# span per spec §6 cross-ref. Deferred to a follow-on because
438-
# ``_migrate_record`` runs before the invocation's
439-
# ``_InvocationContext`` is created (the engine needs the
440-
# migrated state shape to build the context), so the existing
441-
# ``_dispatch``-based observer pathway is not yet available
442-
# here. A natural fix is to dispatch a synthetic
443-
# ``checkpoint_migrated`` event as the first event of the
444-
# invocation after context creation. The chain metadata
445-
# captured for that event: ``from_version``, ``to_version``
446-
# (final target), ``chain_length = len(chain)``.
447-
return dataclass_replace(
457+
# Per spec §6 cross-ref, the caller dispatches a synthetic
458+
# ``checkpoint_migrated`` observer event using the summary
459+
# below as soon as the invocation context exists. We can't
460+
# dispatch from here because the context isn't built yet.
461+
summary = _MigrationSummary(
462+
from_version=record.schema_version,
463+
to_version=current_schema_version,
464+
chain_length=len(chain),
465+
)
466+
migrated = dataclass_replace(
448467
record,
449468
state=migrated_state,
450469
parent_states=tuple(migrated_parents),
451470
)
471+
return migrated, summary
452472

453473
async def drain(self) -> None:
454474
"""Await delivery of every observer event produced by prior
@@ -544,6 +564,13 @@ async def invoke(
544564
resume_skip_set: frozenset[tuple[str, ...]] = frozenset()
545565
completed_positions: list[NodePosition] = []
546566
pending_resume_states: dict[int, Any] = {}
567+
# Populated by ``_migrate_record`` during a version-mismatched
568+
# resume; left ``None`` for the no-resume + versions-match
569+
# paths. Dispatched as a synthetic ``checkpoint_migrated``
570+
# observer event after the invocation context is built so
571+
# the OTel observer can emit an
572+
# ``openarmature.checkpoint.migrate`` span per spec §6.
573+
migration_summary: _MigrationSummary | None = None
547574
if resume_invocation is not None:
548575
checkpointer = self._checkpointer_slot[0]
549576
if checkpointer is None:
@@ -570,7 +597,7 @@ async def invoke(
570597
# Order matters — do NOT swap eligibility and registry-lookup.
571598
current_schema_version = self.state_cls.schema_version
572599
if record.schema_version != current_schema_version:
573-
record = await self._migrate_record(
600+
record, migration_summary = await self._migrate_record(
574601
record,
575602
checkpointer,
576603
resume_invocation,
@@ -653,6 +680,28 @@ async def invoke(
653680
# processed), remove it from the active set so long-running
654681
# services don't leak Task references between drain() calls.
655682
worker.add_done_callback(self._active_workers.discard)
683+
# Per spec §6 cross-ref in proposal 0014: dispatch the
684+
# ``checkpoint_migrated`` event as soon as the delivery
685+
# worker is alive but before any node runs, so the OTel
686+
# observer can emit an
687+
# ``openarmature.checkpoint.migrate`` span ahead of the
688+
# invocation's node spans. The synthetic event carries the
689+
# ``_MigrationSummary`` on ``pre_state`` mirroring the
690+
# ``checkpoint_saved`` convention (state-on-pre, post=None).
691+
if migration_summary is not None:
692+
_dispatch(
693+
context,
694+
NodeEvent(
695+
node_name="openarmature.checkpoint.migrate",
696+
namespace=("openarmature.checkpoint.migrate",),
697+
step=-1,
698+
phase="checkpoint_migrated",
699+
pre_state=migration_summary,
700+
post_state=None,
701+
error=None,
702+
parent_states=(),
703+
),
704+
)
656705
try:
657706
return await self._invoke(starting_state, context)
658707
finally:

src/openarmature/graph/events.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"""
1515

1616
from dataclasses import dataclass
17-
from typing import Literal
17+
from typing import Any, Literal
1818

1919
from .errors import RuntimeGraphError
2020
from .state import State
@@ -127,8 +127,19 @@ class NodeEvent:
127127
node_name: str
128128
namespace: tuple[str, ...]
129129
step: int
130-
phase: Literal["started", "completed", "checkpoint_saved"]
131-
pre_state: State
130+
phase: Literal[
131+
"started",
132+
"completed",
133+
"checkpoint_saved",
134+
# Synthetic phase per spec §6 cross-ref in proposal 0014:
135+
# fires once at the start of a versioned resume to carry
136+
# the migration chain's metadata. ``pre_state`` on this
137+
# phase carries a ``_MigrationSummary`` (not a ``State``);
138+
# the field type stays permissive on this dataclass and
139+
# the OTel observer narrows defensively via ``isinstance``.
140+
"checkpoint_migrated",
141+
]
142+
pre_state: Any
132143
post_state: State | None
133144
error: RuntimeGraphError | None
134145
parent_states: tuple[State, ...]

src/openarmature/graph/observer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ async def __call__(self, event: NodeEvent, /) -> None: ...
9797
# All phase values the engine produces (per spec graph-engine §6 +
9898
# pipeline-utilities §10.8). Used by the registration-time validator
9999
# to reject typos like ``phases={"complete"}``.
100-
KNOWN_PHASES: frozenset[str] = frozenset({"started", "completed", "checkpoint_saved"})
100+
KNOWN_PHASES: frozenset[str] = frozenset({"started", "completed", "checkpoint_saved", "checkpoint_migrated"})
101101

102102

103103
@dataclass(frozen=True)

src/openarmature/observability/otel/observer.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,9 @@ async def __call__(self, event: NodeEvent) -> None:
258258
if event.phase == "checkpoint_saved":
259259
self._emit_checkpoint_save_span(event)
260260
return
261+
if event.phase == "checkpoint_migrated":
262+
self._emit_checkpoint_migrate_span(event)
263+
return
261264
if event.phase == "started":
262265
# Idempotent — short-circuits inside ``_open_started_span``
263266
# if ``prepare_sync`` already opened the span synchronously
@@ -429,6 +432,61 @@ def _handle_completed(self, event: NodeEvent) -> None:
429432
# Special-event paths
430433
# ------------------------------------------------------------------
431434

435+
def _emit_checkpoint_migrate_span(self, event: NodeEvent) -> None:
436+
"""Spec pipeline-utilities §6 cross-ref (proposal 0014): emit a
437+
zero-duration ``openarmature.checkpoint.migrate`` span when
438+
a versioned resume's migration chain runs. The synthetic
439+
event carries ``_MigrationSummary`` on ``pre_state``; this
440+
handler reads ``from_version`` / ``to_version`` /
441+
``chain_length`` from the summary onto the span.
442+
443+
Emitted under the invocation's root span (no parent-node
444+
context — the migration runs before any node fires), so
445+
trace UIs surface it as the first child of the invocation.
446+
"""
447+
from openarmature.graph.compiled import _MigrationSummary
448+
from openarmature.observability.correlation import (
449+
current_correlation_id,
450+
current_invocation_id,
451+
)
452+
453+
invocation_id = current_invocation_id()
454+
if invocation_id is None:
455+
return
456+
summary = event.pre_state
457+
if not isinstance(summary, _MigrationSummary):
458+
# Defensive — the engine only sets pre_state to a
459+
# _MigrationSummary on this phase. Skip if something
460+
# else dispatched a checkpoint_migrated event.
461+
return
462+
# Open (or reuse) the invocation's root span and parent the
463+
# migrate span under it. The migration runs before any node
464+
# fires, so the root is the natural parent — no node span
465+
# exists yet at this point in the invocation.
466+
if invocation_id not in self._invocation_span:
467+
cid = current_correlation_id() or ""
468+
self._open_invocation_span(invocation_id, cid, event)
469+
root_open = self._invocation_span.get(invocation_id)
470+
parent_ctx: Any = None
471+
if root_open is not None:
472+
parent_ctx = set_span_in_context(root_open.span)
473+
attrs: dict[str, Any] = {
474+
"openarmature.checkpoint.migrate.from_version": summary.from_version,
475+
"openarmature.checkpoint.migrate.to_version": summary.to_version,
476+
"openarmature.checkpoint.migrate.chain_length": summary.chain_length,
477+
}
478+
cid = current_correlation_id()
479+
if cid is not None:
480+
attrs["openarmature.correlation_id"] = cid
481+
span = self._tracer.start_span(
482+
name="openarmature.checkpoint.migrate",
483+
context=parent_ctx,
484+
kind=SpanKind.INTERNAL,
485+
attributes=attrs,
486+
)
487+
span.set_status(Status(StatusCode.OK))
488+
span.end()
489+
432490
def _emit_checkpoint_save_span(self, event: NodeEvent) -> None:
433491
"""Spec pipeline-utilities §10.8 + observability §4.5: emit a
434492
zero-duration ``openarmature.checkpoint.save`` span attached

0 commit comments

Comments
 (0)