Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The

### Changed

- **OTel observer drives the `openarmature.llm.complete` span lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the span in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so the span duration reflects the actual adapter-boundary measurement rather than dispatcher queue delay. Failure-path spans continue to fire from the sentinel `NodeEvent` pair (the typed event is success-only per the proposal). The §5.5 attribute set is unchanged. Dual-emit window: the provider still emits both the sentinel pair AND the typed event during v0.13.0; the sentinel pair drops in v0.15.0.
- **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. Failure paths continue to fire from the sentinel `NodeEvent` (the typed event is success-only per the proposal). The §5.5 attribute set and §8.4 Generation metadata are unchanged.
- **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. The sentinel `completed` event still fires on the failure path until the spec extends `LlmCompletionEvent` with error semantics; the sentinel `started` event is no longer emitted on either path.
- **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter forwards to the v4 Langfuse SDK's `start_observation(start_time=...)` and `obs.end(end_time=...)` (supported in 4.6+). The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions.
Comment thread
chris-colinsky marked this conversation as resolved.
Outdated
- **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.<key>` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips.

### Added
Expand Down
66 changes: 17 additions & 49 deletions src/openarmature/llm/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,21 +441,6 @@ async def complete(
serialized_messages = _serialize_messages_for_payload(messages)
request_params = _request_params_from_config(config)
request_extras = _request_extras_from_config(config)
if dispatch is not None:
dispatch(
_make_llm_event(
"started",
call_id=call_id,
model=self.model,
genai_system=self._genai_system,
input_messages=serialized_messages,
request_params=request_params,
request_extras=request_extras,
active_prompt=active_prompt,
active_prompt_group=active_prompt_group,
)
)

# Wall-clock latency measured at the adapter boundary per
# proposal 0049's LlmCompletionEvent.latency_ms contract. The
# boundary spans from "just before _do_complete is called" to
Expand All @@ -473,13 +458,15 @@ async def complete(
response = await self._do_complete(body, schema_dict, schema_class)
except Exception as exc:
if dispatch is not None:
# Failure path: only the sentinel NodeEvent pair fires.
# Per proposal 0049 §3 (alternative 3): LlmCompletionEvent
# is completion-only; failures flow through the
# llm-provider §7 exception path. The error continues
# to surface through the existing observer chain via
# the sentinel NodeEvent's error_type / error_category
# fields on LlmEventPayload.
# Failure path: the sentinel NodeEvent carries the
# error fields per llm-provider §7. LlmCompletionEvent
# is success-only per proposal 0049 §3 alternative 3,
# so failures continue to surface through a sentinel
# ``completed`` event until the spec extends the typed
# event with error semantics. Only ``completed`` fires
# — no started counterpart, since both bundled
# observers' handlers ignore sentinel-started after
# the v0.13.0 migration.
dispatch(
_make_llm_event(
"completed",
Expand All @@ -498,33 +485,14 @@ async def complete(
latency_ms = (time.perf_counter() - adapter_start) * 1000.0

if dispatch is not None:
# Sentinel NodeEvent pair stays during the dual-emit window
# per proposal 0049 §5.5.7 SHOULD-emit-both transition. The
# window stays open through v0.13.0 with the sentinel
# emission removed in v0.15.0 (CHANGELOG callout pinned to
# the v0.13.0 release notes).
dispatch(
_make_llm_event(
"completed",
call_id=call_id,
model=self.model,
genai_system=self._genai_system,
finish_reason=response.finish_reason,
usage=response.usage,
input_messages=serialized_messages,
output_content=response.message.content or None,
request_params=request_params,
request_extras=request_extras,
response_id=response.response_id,
response_model=response.response_model,
active_prompt=active_prompt,
active_prompt_group=active_prompt_group,
)
)
# The new typed LlmCompletionEvent — observers filtering via
# isinstance(event, LlmCompletionEvent) receive this; legacy
# observers filtering on the sentinel namespace see the
# NodeEvent pair above. Failure path doesn't reach here.
# Success path: emit only the typed LlmCompletionEvent.
# The sentinel NodeEvent pair previously emitted on success
# for compatibility with pre-typed-event observers was
# dropped in v0.13.0; bundled observers (OTel + Langfuse)
# consume the typed event directly, and external custom
# observers should migrate to type discrimination via
# ``isinstance(event, LlmCompletionEvent)`` if they need
# LLM call notifications.
dispatch(
self._build_llm_completion_event(
response,
Expand Down
113 changes: 101 additions & 12 deletions src/openarmature/observability/langfuse/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import json
from contextlib import ExitStack
from datetime import datetime
from typing import TYPE_CHECKING, Any, cast

from .client import LangfuseGenerationHandle, LangfuseSpanHandle, LangfuseUsage, ObservationLevel
Expand Down Expand Up @@ -129,13 +130,20 @@ def update(self, **fields: Any) -> None:
kwargs[key] = value
self._obs.update(**kwargs)

def end(self, **fields: Any) -> None:
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None:
# Apply any field updates first (so they're set BEFORE the
# observation closes), then call end(). v4's end() takes only
# an optional ``end_time``; field mutation happens via update().
# The SDK's end_time is typed Optional[int] nanoseconds —
# convert from the Protocol's datetime surface before passing
# through. Without the conversion the OTel span_processor's
# formatter raises TypeError when it tries ``end_time / 1e9``.
if fields:
self.update(**fields)
self._obs.end()
if end_time is not None:
self._obs.end(end_time=int(end_time.timestamp() * 1_000_000_000))
else:
self._obs.end()


class LangfuseSDKAdapter:
Expand Down Expand Up @@ -337,6 +345,7 @@ def generation(
output: Any = None,
usage: LangfuseUsage | None = None,
prompt: Any = None,
start_time: datetime | None = None,
) -> LangfuseGenerationHandle:
extra_kwargs: dict[str, Any] = {
"model": model,
Expand All @@ -356,18 +365,98 @@ def generation(
if usage.total is not None:
usage_details["total"] = usage.total
extra_kwargs["usage_details"] = usage_details
obs = self._start_observation(
as_type="generation",
trace_id=trace_id,
name=name,
metadata=metadata,
parent_observation_id=parent_observation_id,
level=level,
status_message=status_message,
**{k: v for k, v in extra_kwargs.items() if v is not None},
)
if start_time is not None:
# v4's public ``start_observation`` does NOT accept a
# ``start_time`` kwarg — only the internal OTel tracer
# does. Mirror the SDK's own ``create_event`` precedent
# (langfuse/_client/client.py:1518-1551): open the OTel
# span directly via the private ``_otel_tracer`` with the
# back-dated timestamp, then wrap it in LangfuseGeneration.
# This is the only path to a back-dated Generation in
# v4.7; the live-account integration test catches a future
# SDK break.
obs = self._start_back_dated_generation(
trace_id=trace_id,
name=name,
metadata=metadata,
parent_observation_id=parent_observation_id,
level=level,
status_message=status_message,
start_time=start_time,
**{k: v for k, v in extra_kwargs.items() if v is not None},
)
else:
obs = self._start_observation(
as_type="generation",
trace_id=trace_id,
name=name,
metadata=metadata,
parent_observation_id=parent_observation_id,
level=level,
status_message=status_message,
**{k: v for k, v in extra_kwargs.items() if v is not None},
)
return _SpanHandle(obs)

def _start_back_dated_generation(
self,
*,
trace_id: str,
name: str | None,
metadata: dict[str, Any] | None,
parent_observation_id: str | None,
level: ObservationLevel,
status_message: str | None,
start_time: datetime,
**extra: Any,
) -> Any:
"""Open a LangfuseGeneration at a back-dated timestamp by going
through the private OTel tracer rather than the public
``start_observation`` API (which doesn't accept ``start_time``
in v4.7). Mirrors the SDK's ``create_event`` precedent."""
from langfuse._client.span import LangfuseGeneration
from opentelemetry import trace as otel_trace_api

trace_entry = self._trace_info.get(trace_id)
trace_context: TraceContext = {"trace_id": _to_otel_trace_id(trace_id)}
if parent_observation_id is not None:
trace_context["parent_span_id"] = parent_observation_id

# OTel's ``start_span(start_time=...)`` takes int nanoseconds
# since epoch. The SDK uses ``time_ns()`` for its instant
# events; for back-dating, convert from the supplied datetime.
start_time_ns = int(start_time.timestamp() * 1_000_000_000)

remote_parent_span = self._client._create_remote_parent_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001
trace_id=trace_context["trace_id"],
parent_span_id=trace_context.get("parent_span_id"),
)

with ExitStack() as stack:
if trace_entry is not None:
stack.enter_context(
propagate_attributes(
trace_name=trace_entry["name"],
metadata=_stringify_metadata(trace_entry["metadata"]),
)
)
stack.enter_context(otel_trace_api.use_span(remote_parent_span))
otel_span = self._client._otel_tracer.start_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001
name=name or "observation",
start_time=start_time_ns,
)
generation_kwargs: dict[str, Any] = {
"otel_span": otel_span,
"langfuse_client": self._client,
"metadata": metadata,
}
if level != "DEFAULT":
generation_kwargs["level"] = level
if status_message is not None:
generation_kwargs["status_message"] = status_message
generation_kwargs.update(extra)
return LangfuseGeneration(**generation_kwargs)

def force_flush(self, timeout_ms: int = 30_000) -> bool:
"""Best-effort flush of the underlying Langfuse client.

Expand Down
23 changes: 19 additions & 4 deletions src/openarmature/observability/langfuse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Literal, Protocol, runtime_checkable

ObservationType = Literal["span", "generation", "event"]
Expand Down Expand Up @@ -65,6 +66,13 @@ class LangfuseObservation:
level: ObservationLevel = "DEFAULT"
status_message: str | None = None
ended: bool = False
# Optional caller-supplied timestamps. Populated when a typed-event
# consumer (e.g., the LangfuseObserver on the typed LLM completion
# path) back-dates the observation using a wall-clock measurement
# rather than letting the SDK record open/close moments. ``None``
# means "use the SDK's default"; both fields are independent.
start_time: datetime | None = None
end_time: datetime | None = None

# Generation-specific (None / empty on Span and Event observations)
model: str | None = None
Expand Down Expand Up @@ -125,7 +133,7 @@ def id(self) -> str: ...

def update(self, **fields: Any) -> None: ...

def end(self, **fields: Any) -> None: ...
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ...
Comment thread
chris-colinsky marked this conversation as resolved.


@runtime_checkable
Expand All @@ -138,7 +146,7 @@ def id(self) -> str: ...

def update(self, **fields: Any) -> None: ...

def end(self, **fields: Any) -> None: ...
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ...
Comment thread
chris-colinsky marked this conversation as resolved.


@runtime_checkable
Expand Down Expand Up @@ -224,6 +232,7 @@ def generation(
output: Any = None,
usage: LangfuseUsage | None = None,
prompt: Any = None,
start_time: datetime | None = None,
) -> LangfuseGenerationHandle: ...

def force_flush(self, timeout_ms: int = 30_000) -> bool:
Expand Down Expand Up @@ -268,7 +277,9 @@ def id(self) -> str:
def update(self, **fields: Any) -> None:
_apply_fields(self.observation, fields)

def end(self, **fields: Any) -> None:
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None:
if end_time is not None:
self.observation.end_time = end_time
_apply_fields(self.observation, fields)
self.observation.ended = True

Expand All @@ -286,7 +297,9 @@ def id(self) -> str:
def update(self, **fields: Any) -> None:
_apply_fields(self.observation, fields)

def end(self, **fields: Any) -> None:
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None:
if end_time is not None:
self.observation.end_time = end_time
_apply_fields(self.observation, fields)
self.observation.ended = True

Expand Down Expand Up @@ -428,6 +441,7 @@ def generation(
output: Any = None,
usage: LangfuseUsage | None = None,
prompt: Any = None,
start_time: datetime | None = None,
) -> LangfuseGenerationHandle:
trace = self._get_trace(trace_id)
observation = LangfuseObservation(
Expand All @@ -444,6 +458,7 @@ def generation(
output=output,
usage=usage,
prompt_entity_link=prompt,
start_time=start_time,
)
trace.observations.append(observation)
return _InMemoryGenerationHandle(observation=observation)
Expand Down
Loading