Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/openarmature/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
_set_namespace_prefix,
current_active_observer_span,
current_attempt_index,
validate_invocation_id,
)
from openarmature.observability.metadata import (
_reset_invocation_metadata,
Expand Down Expand Up @@ -772,6 +773,7 @@ async def invoke(
observers: Iterable[Observer | SubscribedObserver] | None = None,
*,
correlation_id: str | None = None,
invocation_id: str | None = None,
resume_invocation: str | None = None,
metadata: Mapping[str, Any] | None = None,
) -> StateT:
Expand All @@ -797,6 +799,12 @@ async def invoke(
- ``correlation_id`` is the per-invocation cross-backend join
key. Caller-supplied or auto-generated UUIDv4 when absent.
Preserved unchanged across ``resume_invocation``.
- ``invocation_id`` (proposal 0039) is the per-attempt id.
Caller-supplied or auto-generated UUIDv4 when absent; a
caller value MAY be any non-empty URL-safe string. Applies
to the fresh-invocation path only — a ``resume_invocation``
mints a fresh id regardless (each attempt is its own
invocation).
- ``resume_invocation`` names a prior ``invocation_id`` to
resume from. Requires a registered Checkpointer; raises
``CheckpointNotFound`` when the backend has no record for
Expand Down Expand Up @@ -848,7 +856,13 @@ async def invoke(
# step 3) and pre-populate the skip-set + completed_positions.
starting_state: StateT = initial_state
resolved_correlation_id = correlation_id or str(uuid.uuid4())
invocation_id = str(uuid.uuid4())
# Caller-supplied invocation_id (proposal 0039) applies to the
# fresh-invocation path only; a resume mints a fresh id
# regardless (each attempt is its own invocation, §5.1).
if invocation_id is not None and resume_invocation is None:
invocation_id = validate_invocation_id(invocation_id)
else:
invocation_id = str(uuid.uuid4())
resume_skip_set: frozenset[tuple[str, ...]] = frozenset()
completed_positions: list[NodePosition] = []
pending_resume_states: dict[int, Any] = {}
Expand Down
33 changes: 33 additions & 0 deletions src/openarmature/observability/correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from __future__ import annotations

import re
from collections.abc import Callable
from contextvars import ContextVar, Token
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -119,6 +120,37 @@ def _reset_invocation_id(token: Token[str | None]) -> None:
_invocation_id_var.reset(token)


# Caller-supplied invocation_id validation (proposal 0039). Per §5.1 a
# caller MAY supply its own id at invoke(); it MAY be any non-empty
# URL-safe string (it need not be a UUID — the Langfuse trace.id
# derivation in §8.4.1 handles non-UUID values). URL-safe here is the
# RFC 3986 unreserved set.
_INVOCATION_ID_RE = re.compile(r"^[A-Za-z0-9._~-]+$")


def validate_invocation_id(value: object) -> str:
"""Validate a caller-supplied ``invocation_id`` and return it.

Per observability §5.1 a caller-supplied id MAY be any non-empty
URL-safe string. Rejects empty / non-string / non-URL-safe values
at the ``invoke()`` boundary so the violation surfaces
synchronously to the caller rather than as a downstream trace-id
derivation failure. Typed ``object`` (like
:func:`validate_invocation_metadata`) so the boundary check guards
against untyped callers. Raises :class:`ValueError`.
"""
if not isinstance(value, str):
raise ValueError(f"invocation_id must be a string; got {type(value).__name__}")
if not value:
raise ValueError("invocation_id must be a non-empty string")
if not _INVOCATION_ID_RE.match(value):
raise ValueError(
f"invocation_id {value!r} is not URL-safe; allowed characters are "
f"A-Z a-z 0-9 and -._~ (RFC 3986 unreserved set)"
)
return value


# ---------------------------------------------------------------------------
# Active observer set — for capability backends emitting from outside the
# engine's per-step path (llm-provider span hook in Phase 6, future
Expand Down Expand Up @@ -398,6 +430,7 @@ def _reset_active_observer_span(token: Token[object | None]) -> None:
"current_fan_out_index",
"current_invocation_id",
"current_namespace_prefix",
"validate_invocation_id",
# Engine-internal lifecycle helpers — exported so the engine in
# ``openarmature.graph.compiled`` can drive set/reset without
# pyright's strict ``reportUnusedFunction`` flagging them as
Expand Down
5 changes: 4 additions & 1 deletion src/openarmature/observability/langfuse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@
ObservationType,
)
from .observer import LangfuseObserver
from .trace_id import langfuse_trace_id

# LangfuseSDKAdapter requires the [langfuse] optional dependency.
# Surface it when available, but don't force the import on consumers
# who only use the InMemoryLangfuseClient — the adapter module's own
# guard raises an informative ImportError if anyone tries to use it
# without the extras installed.
# without the extras installed. `langfuse_trace_id` is pure
# (uuid+hashlib only), so it's exported unconditionally above.
try:
from .adapter import LangfuseSDKAdapter as LangfuseSDKAdapter

Comment thread
chris-colinsky marked this conversation as resolved.
Expand All @@ -65,6 +67,7 @@
"LangfuseUsage",
"ObservationLevel",
"ObservationType",
"langfuse_trace_id",
]
if _adapter_available:
__all__.append("LangfuseSDKAdapter")
39 changes: 9 additions & 30 deletions src/openarmature/observability/langfuse/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
from __future__ import annotations

import json
import uuid as _uuid
from contextlib import ExitStack
from typing import TYPE_CHECKING, Any, cast

from .client import LangfuseGenerationHandle, LangfuseSpanHandle, LangfuseUsage, ObservationLevel
from .trace_id import _is_uuid, _to_otel_trace_id

if TYPE_CHECKING:
from langfuse import Langfuse
Expand All @@ -53,31 +53,6 @@
) from exc


def _to_otel_trace_id(trace_id: str) -> str:
"""Convert OA's UUID4-formatted invocation_id to OTel's 32-char
hex trace_id form (no dashes).

Langfuse v4 is OTel-based: trace IDs are 128-bit integers
serialized as 32 lowercase hex characters. OA's invocation_id is
a standard UUID4 (8-4-4-4-12 dashed hex); same 128 bits, different
representation. Passing the dashed form to Langfuse v4 fails with
``int(..., 16)`` parsing in the SDK's internals.

Non-UUID inputs pass through unchanged so adapter consumers can
pass an already-OTel-formatted trace_id if they have one.

Trade-off: the spec §8.4.1 "trace.id MUST equal invocation_id
verbatim" contract is met content-wise (same 128 bits) but not
representation-wise. Users querying Langfuse for an OA
invocation_id need to strip dashes before searching. Documented
in the adapter's class docstring.
"""
try:
return _uuid.UUID(trace_id).hex
except (ValueError, AttributeError):
return trace_id


def _stringify_metadata(metadata: dict[str, Any] | None) -> dict[str, str]:
"""Coerce metadata values to strings for v4's propagate_attributes,
which only accepts ``Dict[str, str]``. Non-string scalars stringify
Expand Down Expand Up @@ -231,10 +206,14 @@ def trace(
# it via propagate_attributes on every observation under this
# trace_id so the trace's display name + metadata stay
# consistent under v4's last-wins semantics.
self._trace_info[id] = {
"name": name,
"metadata": dict(metadata) if metadata is not None else {},
}
md: dict[str, Any] = dict(metadata) if metadata is not None else {}
# Non-UUID invocation_id: the derived trace.id is a hash, not
# reversible to the caller's id, so surface the raw id under
# trace.metadata.invocation_id for lookup (§8.4.1). The key is
# reserved (proposal 0041), so no caller metadata collides.
if not _is_uuid(id):
md.setdefault("invocation_id", id)
self._trace_info[id] = {"name": name, "metadata": md}

def update_trace(
self,
Expand Down
45 changes: 45 additions & 0 deletions src/openarmature/observability/langfuse/trace_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Trace-id derivation helpers, SDK-independent.

Pure functions for mapping an OA ``invocation_id`` to the 32-char hex
Langfuse ``trace.id``. Imports only stdlib (``uuid``, ``hashlib``), so
this module is importable without the ``[langfuse]`` extras —
operators and tooling can compute trace ids without pulling in the
SDK.
"""

from __future__ import annotations

import hashlib
import uuid as _uuid


def _is_uuid(value: str) -> bool:
try:
_uuid.UUID(value)
except (ValueError, AttributeError):
return False
return True


# Per observability §8.4.1: Langfuse v4 trace ids are 128-bit values
# rendered as 32 lowercase hex. A UUID invocation_id maps to its hex
# form (dashes stripped). A non-UUID maps to the first 16 bytes of
# SHA-256(invocation_id) as 32 hex (the same derivation as Langfuse's
# create_trace_id(seed), so a consumer can reproduce it); the raw id is
# also written to trace.metadata.invocation_id (see the adapter's
# `trace`) for lookup.
def _to_otel_trace_id(trace_id: str) -> str:
"""Return the 32-char hex Langfuse trace id for an OA invocation_id."""
if _is_uuid(trace_id):
return _uuid.UUID(trace_id).hex
return hashlib.sha256(trace_id.encode("utf-8")).digest()[:16].hex()


def langfuse_trace_id(invocation_id: str) -> str:
"""Return the Langfuse ``trace.id`` for an OA ``invocation_id``.

Public helper for mapping a logged ``invocation_id`` (a dashed UUID
or a caller-supplied non-UUID string) to the 32-char hex
``trace.id`` Langfuse stores, e.g. to build a direct trace URL.
"""
return _to_otel_trace_id(invocation_id)
42 changes: 42 additions & 0 deletions src/openarmature/observability/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
- Keys MUST NOT start with ``openarmature.`` or ``gen_ai.`` (reserved
for spec-normative attribute namespaces; collisions would silently
overwrite OA-emitted state at the observer layer).
- Keys MUST NOT exactly match a reserved OA-emitted top-level metadata
key name (the §8.4 Langfuse set plus ``invocation_id``; proposal
0041) for the same collision reason.
- Values MUST be OTel-attribute-compatible scalars: ``str``, ``int``,
``float``, ``bool``, or a homogeneous list/tuple of those types.
``None``, nested objects, and mixed-type arrays are rejected.
Expand Down Expand Up @@ -63,6 +66,39 @@
# boundary so observers never see a colliding key.
_RESERVED_PREFIXES: tuple[str, ...] = ("openarmature.", "gen_ai.")

# Reserved exact key NAMES per §3.4 (proposal 0041): the top-level
# metadata keys an OA-emitted §8 backend mapping writes alongside
# caller keys (the §8.4 Langfuse set, plus invocation_id). A caller
# key matching one exactly would silently overwrite an OA field in a
# backend's flat top-level metadata, so it is rejected at the boundary
# the same way as the prefix reservation. Backend-set-independent:
# rejected regardless of which observers are attached.
_RESERVED_KEY_NAMES: frozenset[str] = frozenset(
{
"correlation_id",
"entry_node",
"spec_version",
"detached_child_trace_ids",
"namespace",
"step",
"attempt_index",
"fan_out_index",
"subgraph_name",
"fan_out_item_count",
"fan_out_concurrency",
"fan_out_error_policy",
"fan_out_parent_node_name",
"prompt_group_name",
"request_extras",
"finish_reason",
"system",
"response_model",
"response_id",
"prompt",
"invocation_id",
Comment thread
chris-colinsky marked this conversation as resolved.
}
)


def current_invocation_metadata() -> MappingProxyType[str, AttributeValue]:
"""Return the caller-supplied invocation metadata in scope, or the
Expand Down Expand Up @@ -144,6 +180,12 @@ def _validate_metadata_key(key: Any) -> None:
f"invocation metadata key {key!r} uses reserved namespace prefix {reserved!r}; "
f"reserved prefixes are for spec-normative attributes (openarmature.*, gen_ai.*)"
)
if key in _RESERVED_KEY_NAMES:
raise ValueError(
f"invocation metadata key {key!r} is reserved: it exactly matches a top-level "
f"metadata key OA emits to observability backends, and would overwrite it. "
f"Rename the key."
)


def _validate_metadata_value(key: str, value: Any) -> None:
Expand Down
39 changes: 33 additions & 6 deletions tests/unit/test_observability_langfuse_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ def test_adapter_caches_trace_info() -> None:
assert "trace-1" in adapter._trace_info # noqa: SLF001
cached = adapter._trace_info["trace-1"] # noqa: SLF001
assert cached["name"] == "my-trace"
assert cached["metadata"] == {"correlation_id": "c-1"}
# "trace-1" is a non-UUID id, so the raw id is also surfaced under
# metadata.invocation_id (proposal 0039 / §8.4.1).
assert cached["metadata"] == {"correlation_id": "c-1", "invocation_id": "trace-1"}


def test_adapter_converts_uuid_trace_id_to_otel_hex() -> None:
Expand All @@ -96,10 +98,34 @@ def test_adapter_converts_uuid_trace_id_to_otel_hex() -> None:
assert _to_otel_trace_id("b24eda93-d06d-4eaa-9891-ca5e56f35722") == "b24eda93d06d4eaa9891ca5e56f35722"
# Idempotent on already-hex input.
assert _to_otel_trace_id("b24eda93d06d4eaa9891ca5e56f35722") == "b24eda93d06d4eaa9891ca5e56f35722"
# Non-UUID inputs pass through unchanged (consumers passing an
# already-OTel-formatted trace_id from elsewhere don't get
# mangled).
assert _to_otel_trace_id("custom-trace-id") == "custom-trace-id"


def test_to_otel_trace_id_non_uuid_derivation() -> None:
import hashlib

from openarmature.observability.langfuse import langfuse_trace_id
from openarmature.observability.langfuse.adapter import _to_otel_trace_id

# Non-UUID -> first 16 bytes of SHA-256 as 32 hex (== Langfuse's
# create_trace_id(seed)); the public helper is the same mapping.
expected = hashlib.sha256(b"run_abc123").digest()[:16].hex()
assert _to_otel_trace_id("run_abc123") == expected
assert langfuse_trace_id("run_abc123") == expected
# Spec fixture 036 pins this vector.
assert expected == "29b50a6c08dabfeaeb1696301f4fabe1"
# UUID path still strips dashes; the helper agrees.
assert langfuse_trace_id("b24eda93-d06d-4eaa-9891-ca5e56f35722") == "b24eda93d06d4eaa9891ca5e56f35722"


def test_adapter_trace_surfaces_raw_id_for_non_uuid() -> None:
adapter = LangfuseSDKAdapter(_dummy_client())
# Non-UUID id: raw id surfaced under metadata.invocation_id (§8.4.1).
adapter.trace(id="run_abc123", name="t", metadata=None)
assert adapter._trace_info["run_abc123"]["metadata"]["invocation_id"] == "run_abc123" # noqa: SLF001
# UUID id: no invocation_id injected (its trace.id is reversible).
uid = "b24eda93-d06d-4eaa-9891-ca5e56f35722"
adapter.trace(id=uid, name="t", metadata=None)
assert "invocation_id" not in adapter._trace_info[uid]["metadata"] # noqa: SLF001


def test_adapter_update_trace_merges_into_cache() -> None:
Expand All @@ -111,7 +137,8 @@ def test_adapter_update_trace_merges_into_cache() -> None:

cached = adapter._trace_info["trace-1"] # noqa: SLF001
assert cached["name"] == "renamed"
assert cached["metadata"] == {"key1": "v1", "key2": "v2"}
# "trace-1" is non-UUID, so trace() also surfaced metadata.invocation_id.
assert cached["metadata"] == {"key1": "v1", "key2": "v2", "invocation_id": "trace-1"}


def test_adapter_force_flush_delegates_to_client() -> None:
Expand Down
Loading