Skip to content

Commit aaa3cc3

Browse files
Migrate Langfuse to typed LLM event (#143)
* Migrate Langfuse to typed LLM event Drive the openarmature.llm.complete Generation observation lifecycle from LlmCompletionEvent on the success path, mirroring PR 3b's OTel shape. Open and close the Generation in one shot at typed-event arrival with start_time back-dated by latency_ms so the Langfuse UI shows the adapter-boundary measurement rather than dispatcher queue delay. Sentinel pair stays for the failure path until the spec extends LlmCompletionEvent with error semantics (coord thread filed). Widen the LangfuseClient Protocol with optional start_time on generation() and end_time on Span/Generation handle end(). The SDK adapter forwards both to the v4 SDK; the InMemory client stores them on LangfuseObservation for test assertions. Drop the sentinel NodeEvent pair emission on the success path from OpenAIProvider.complete(). The bundled OTel and Langfuse observers consume the typed event directly; external custom observers consuming LLM events MUST migrate to isinstance discrimination. The sentinel completed event still fires on the failure path; sentinel started is no longer emitted. * Route Langfuse timestamps through OTel tracer The v4.7 Langfuse SDK exposes timestamp control only on its internal OTel tracer, not on the public start_observation() API that the adapter was calling. Two quirks the original Protocol widening got wrong, both surfaced by live-account verification: start_observation() rejects a start_time kwarg with TypeError. When start_time is supplied, the adapter now mirrors the SDK's own create_event precedent: open the OTel span directly via _otel_tracer.start_span(name=, start_time=int_ns) within a trace context and wrap the result in LangfuseGeneration. The existing no-start_time path still uses the public API. LangfuseSpan.end(end_time) is typed Optional[int] (nanoseconds), not datetime. The adapter now converts the Protocol's datetime surface to nanoseconds before forwarding. Without the conversion the OTel span_processor's formatter crashes with TypeError on end_time / 1e9 deep in the SDK. Strengthen the unit tests: spy on both _otel_tracer.start_span and start_observation so the back-dated path asserts the OTel route is taken and the public-API path asserts the OTel tracer is NOT touched. The previous monkeypatch test accepted **kwargs and would have passed even with the broken implementation. Widen the integration test's REST poll budget to 180s and use server-side name+type filters; add a diagnostic that lists observation names actually projected under the trace_id when the target Generation doesn't appear, so a future name-mismatch SDK change surfaces explicitly. * Address PR 143 review Tighten three stale comments that still referred to a sentinel "pair" on the failure path. The provider now emits only a single sentinel completed event on failure (no started counterpart); the comments in langfuse/observer.py (dispatch site + handler docstring) and openai.py (failure-emission site) didn't catch up with the v0.13.0 emission change in the same PR. * Align CHANGELOG + integration docstring with SDK routing Both descriptions were written before live-account verification revealed that v4.7 Langfuse SDK's start_observation rejects start_time with TypeError. The CHANGELOG entry claimed the adapter forwards via start_observation(start_time=...); the integration test docstring said unit tests validate that surface. Rewrote both to describe the actual routing: the back-dated path bypasses start_observation and goes through the private _otel_tracer.start_ span, wrapping the OTel span in LangfuseGeneration directly. The guarded failure mode shifts accordingly: not "SDK silently drops start_time" but "future SDK renames _otel_tracer or moves LangfuseGeneration", breaking the private-API path silently.
1 parent 371545a commit aaa3cc3

10 files changed

Lines changed: 926 additions & 228 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
88

99
### Changed
1010

11-
- **OTel observer drives the `openarmature.llm.complete` span lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the span in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so the span duration reflects the actual adapter-boundary measurement rather than dispatcher queue delay. Failure-path spans continue to fire from the sentinel `NodeEvent` pair (the typed event is success-only per the proposal). The §5.5 attribute set is unchanged. Dual-emit window: the provider still emits both the sentinel pair AND the typed event during v0.13.0; the sentinel pair drops in v0.15.0.
11+
- **OTel and Langfuse observers drive the `openarmature.llm.complete` span / Generation observation lifecycle from the typed `LlmCompletionEvent`** (proposal 0049 + 0057, observability §5.5.7). Successful LLM-provider calls now open + close the OTel span and the Langfuse Generation in one shot at typed-event arrival, with `start_time` back-dated by `LlmCompletionEvent.latency_ms` so duration reflects the adapter-boundary measurement rather than dispatcher queue delay. Failure paths continue to fire from the sentinel `NodeEvent` (the typed event is success-only per the proposal). The §5.5 attribute set and §8.4 Generation metadata are unchanged.
12+
- **`OpenAIProvider.complete()` no longer emits the sentinel `NodeEvent` pair on the success path** (v0.13.0 cleanup). The bundled OTel and Langfuse observers now consume the typed `LlmCompletionEvent` directly; the sentinel pair was kept on the success path through earlier releases for compatibility with pre-typed-event observers. External custom observers that filtered LLM calls by `event.namespace == LLM_NAMESPACE` MUST migrate to `isinstance(event, LlmCompletionEvent)` to continue seeing successful LLM calls. The sentinel `completed` event still fires on the failure path until the spec extends `LlmCompletionEvent` with error semantics; the sentinel `started` event is no longer emitted on either path.
13+
- **`LangfuseClient` Protocol gains optional `start_time` / `end_time` timestamps** on `generation(...)` and the Generation/Span handles' `end(...)`. The Langfuse observer passes back-dated timestamps on the typed-event success path so the Langfuse UI shows the actual adapter-boundary duration. The SDK adapter handles v4 Langfuse SDK quirks transparently: `Langfuse.start_observation()` does NOT accept `start_time`, so back-dated generations are routed through the private `_otel_tracer.start_span(name=..., start_time=int_ns)` API (mirroring the SDK's own `create_event` precedent) and the resulting OTel span is wrapped in `LangfuseGeneration` directly; the non-back-dated path still uses `start_observation`. `LangfuseSpan.end()` is typed `Optional[int]` (nanoseconds), so the adapter converts the Protocol's `datetime` surface to int nanoseconds before forwarding. The `InMemoryLangfuseClient` stores both fields verbatim on `LangfuseObservation` for test assertions.
1214
- **`OpenAIProvider(populate_caller_metadata=...)` default flipped from `False` to `True`.** The python implementation now populates `LlmCompletionEvent.caller_invocation_metadata` by default so the bundled OTel and Langfuse observers can emit the §5.6 `openarmature.user.<key>` span-attribute family without a separate opt-in. Pass `populate_caller_metadata=False` to suppress the snapshot when no downstream consumer needs it. The spec-defined opt-in mechanism is unchanged; only the python default flips.
1315

1416
### Added

src/openarmature/llm/providers/openai.py

Lines changed: 17 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -441,21 +441,6 @@ async def complete(
441441
serialized_messages = _serialize_messages_for_payload(messages)
442442
request_params = _request_params_from_config(config)
443443
request_extras = _request_extras_from_config(config)
444-
if dispatch is not None:
445-
dispatch(
446-
_make_llm_event(
447-
"started",
448-
call_id=call_id,
449-
model=self.model,
450-
genai_system=self._genai_system,
451-
input_messages=serialized_messages,
452-
request_params=request_params,
453-
request_extras=request_extras,
454-
active_prompt=active_prompt,
455-
active_prompt_group=active_prompt_group,
456-
)
457-
)
458-
459444
# Wall-clock latency measured at the adapter boundary per
460445
# proposal 0049's LlmCompletionEvent.latency_ms contract. The
461446
# boundary spans from "just before _do_complete is called" to
@@ -473,13 +458,15 @@ async def complete(
473458
response = await self._do_complete(body, schema_dict, schema_class)
474459
except Exception as exc:
475460
if dispatch is not None:
476-
# Failure path: only the sentinel NodeEvent pair fires.
477-
# Per proposal 0049 §3 (alternative 3): LlmCompletionEvent
478-
# is completion-only; failures flow through the
479-
# llm-provider §7 exception path. The error continues
480-
# to surface through the existing observer chain via
481-
# the sentinel NodeEvent's error_type / error_category
482-
# fields on LlmEventPayload.
461+
# Failure path: the sentinel NodeEvent carries the
462+
# error fields per llm-provider §7. LlmCompletionEvent
463+
# is success-only per proposal 0049 §3 alternative 3,
464+
# so failures continue to surface through a sentinel
465+
# ``completed`` event until the spec extends the typed
466+
# event with error semantics. Only ``completed`` fires
467+
# — no started counterpart, since both bundled
468+
# observers' handlers ignore sentinel-started after
469+
# the v0.13.0 migration.
483470
dispatch(
484471
_make_llm_event(
485472
"completed",
@@ -498,33 +485,14 @@ async def complete(
498485
latency_ms = (time.perf_counter() - adapter_start) * 1000.0
499486

500487
if dispatch is not None:
501-
# Sentinel NodeEvent pair stays during the dual-emit window
502-
# per proposal 0049 §5.5.7 SHOULD-emit-both transition. The
503-
# window stays open through v0.13.0 with the sentinel
504-
# emission removed in v0.15.0 (CHANGELOG callout pinned to
505-
# the v0.13.0 release notes).
506-
dispatch(
507-
_make_llm_event(
508-
"completed",
509-
call_id=call_id,
510-
model=self.model,
511-
genai_system=self._genai_system,
512-
finish_reason=response.finish_reason,
513-
usage=response.usage,
514-
input_messages=serialized_messages,
515-
output_content=response.message.content or None,
516-
request_params=request_params,
517-
request_extras=request_extras,
518-
response_id=response.response_id,
519-
response_model=response.response_model,
520-
active_prompt=active_prompt,
521-
active_prompt_group=active_prompt_group,
522-
)
523-
)
524-
# The new typed LlmCompletionEvent — observers filtering via
525-
# isinstance(event, LlmCompletionEvent) receive this; legacy
526-
# observers filtering on the sentinel namespace see the
527-
# NodeEvent pair above. Failure path doesn't reach here.
488+
# Success path: emit only the typed LlmCompletionEvent.
489+
# The sentinel NodeEvent pair previously emitted on success
490+
# for compatibility with pre-typed-event observers was
491+
# dropped in v0.13.0; bundled observers (OTel + Langfuse)
492+
# consume the typed event directly, and external custom
493+
# observers should migrate to type discrimination via
494+
# ``isinstance(event, LlmCompletionEvent)`` if they need
495+
# LLM call notifications.
528496
dispatch(
529497
self._build_llm_completion_event(
530498
response,

src/openarmature/observability/langfuse/adapter.py

Lines changed: 101 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import json
3737
from contextlib import ExitStack
38+
from datetime import datetime
3839
from typing import TYPE_CHECKING, Any, cast
3940

4041
from .client import LangfuseGenerationHandle, LangfuseSpanHandle, LangfuseUsage, ObservationLevel
@@ -129,13 +130,20 @@ def update(self, **fields: Any) -> None:
129130
kwargs[key] = value
130131
self._obs.update(**kwargs)
131132

132-
def end(self, **fields: Any) -> None:
133+
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None:
133134
# Apply any field updates first (so they're set BEFORE the
134135
# observation closes), then call end(). v4's end() takes only
135136
# an optional ``end_time``; field mutation happens via update().
137+
# The SDK's end_time is typed Optional[int] nanoseconds —
138+
# convert from the Protocol's datetime surface before passing
139+
# through. Without the conversion the OTel span_processor's
140+
# formatter raises TypeError when it tries ``end_time / 1e9``.
136141
if fields:
137142
self.update(**fields)
138-
self._obs.end()
143+
if end_time is not None:
144+
self._obs.end(end_time=int(end_time.timestamp() * 1_000_000_000))
145+
else:
146+
self._obs.end()
139147

140148

141149
class LangfuseSDKAdapter:
@@ -337,6 +345,7 @@ def generation(
337345
output: Any = None,
338346
usage: LangfuseUsage | None = None,
339347
prompt: Any = None,
348+
start_time: datetime | None = None,
340349
) -> LangfuseGenerationHandle:
341350
extra_kwargs: dict[str, Any] = {
342351
"model": model,
@@ -356,18 +365,98 @@ def generation(
356365
if usage.total is not None:
357366
usage_details["total"] = usage.total
358367
extra_kwargs["usage_details"] = usage_details
359-
obs = self._start_observation(
360-
as_type="generation",
361-
trace_id=trace_id,
362-
name=name,
363-
metadata=metadata,
364-
parent_observation_id=parent_observation_id,
365-
level=level,
366-
status_message=status_message,
367-
**{k: v for k, v in extra_kwargs.items() if v is not None},
368-
)
368+
if start_time is not None:
369+
# v4's public ``start_observation`` does NOT accept a
370+
# ``start_time`` kwarg — only the internal OTel tracer
371+
# does. Mirror the SDK's own ``create_event`` precedent
372+
# (langfuse/_client/client.py:1518-1551): open the OTel
373+
# span directly via the private ``_otel_tracer`` with the
374+
# back-dated timestamp, then wrap it in LangfuseGeneration.
375+
# This is the only path to a back-dated Generation in
376+
# v4.7; the live-account integration test catches a future
377+
# SDK break.
378+
obs = self._start_back_dated_generation(
379+
trace_id=trace_id,
380+
name=name,
381+
metadata=metadata,
382+
parent_observation_id=parent_observation_id,
383+
level=level,
384+
status_message=status_message,
385+
start_time=start_time,
386+
**{k: v for k, v in extra_kwargs.items() if v is not None},
387+
)
388+
else:
389+
obs = self._start_observation(
390+
as_type="generation",
391+
trace_id=trace_id,
392+
name=name,
393+
metadata=metadata,
394+
parent_observation_id=parent_observation_id,
395+
level=level,
396+
status_message=status_message,
397+
**{k: v for k, v in extra_kwargs.items() if v is not None},
398+
)
369399
return _SpanHandle(obs)
370400

401+
def _start_back_dated_generation(
402+
self,
403+
*,
404+
trace_id: str,
405+
name: str | None,
406+
metadata: dict[str, Any] | None,
407+
parent_observation_id: str | None,
408+
level: ObservationLevel,
409+
status_message: str | None,
410+
start_time: datetime,
411+
**extra: Any,
412+
) -> Any:
413+
"""Open a LangfuseGeneration at a back-dated timestamp by going
414+
through the private OTel tracer rather than the public
415+
``start_observation`` API (which doesn't accept ``start_time``
416+
in v4.7). Mirrors the SDK's ``create_event`` precedent."""
417+
from langfuse._client.span import LangfuseGeneration
418+
from opentelemetry import trace as otel_trace_api
419+
420+
trace_entry = self._trace_info.get(trace_id)
421+
trace_context: TraceContext = {"trace_id": _to_otel_trace_id(trace_id)}
422+
if parent_observation_id is not None:
423+
trace_context["parent_span_id"] = parent_observation_id
424+
425+
# OTel's ``start_span(start_time=...)`` takes int nanoseconds
426+
# since epoch. The SDK uses ``time_ns()`` for its instant
427+
# events; for back-dating, convert from the supplied datetime.
428+
start_time_ns = int(start_time.timestamp() * 1_000_000_000)
429+
430+
remote_parent_span = self._client._create_remote_parent_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001
431+
trace_id=trace_context["trace_id"],
432+
parent_span_id=trace_context.get("parent_span_id"),
433+
)
434+
435+
with ExitStack() as stack:
436+
if trace_entry is not None:
437+
stack.enter_context(
438+
propagate_attributes(
439+
trace_name=trace_entry["name"],
440+
metadata=_stringify_metadata(trace_entry["metadata"]),
441+
)
442+
)
443+
stack.enter_context(otel_trace_api.use_span(remote_parent_span))
444+
otel_span = self._client._otel_tracer.start_span( # pyright: ignore[reportPrivateUsage] # noqa: SLF001
445+
name=name or "observation",
446+
start_time=start_time_ns,
447+
)
448+
generation_kwargs: dict[str, Any] = {
449+
"otel_span": otel_span,
450+
"langfuse_client": self._client,
451+
"metadata": metadata,
452+
}
453+
if level != "DEFAULT":
454+
generation_kwargs["level"] = level
455+
if status_message is not None:
456+
generation_kwargs["status_message"] = status_message
457+
generation_kwargs.update(extra)
458+
return LangfuseGeneration(**generation_kwargs)
459+
371460
def force_flush(self, timeout_ms: int = 30_000) -> bool:
372461
"""Best-effort flush of the underlying Langfuse client.
373462

src/openarmature/observability/langfuse/client.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from __future__ import annotations
3131

3232
from dataclasses import dataclass, field
33+
from datetime import datetime
3334
from typing import Any, Literal, Protocol, runtime_checkable
3435

3536
ObservationType = Literal["span", "generation", "event"]
@@ -65,6 +66,13 @@ class LangfuseObservation:
6566
level: ObservationLevel = "DEFAULT"
6667
status_message: str | None = None
6768
ended: bool = False
69+
# Optional caller-supplied timestamps. Populated when a typed-event
70+
# consumer (e.g., the LangfuseObserver on the typed LLM completion
71+
# path) back-dates the observation using a wall-clock measurement
72+
# rather than letting the SDK record open/close moments. ``None``
73+
# means "use the SDK's default"; both fields are independent.
74+
start_time: datetime | None = None
75+
end_time: datetime | None = None
6876

6977
# Generation-specific (None / empty on Span and Event observations)
7078
model: str | None = None
@@ -125,7 +133,7 @@ def id(self) -> str: ...
125133

126134
def update(self, **fields: Any) -> None: ...
127135

128-
def end(self, **fields: Any) -> None: ...
136+
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ...
129137

130138

131139
@runtime_checkable
@@ -138,7 +146,7 @@ def id(self) -> str: ...
138146

139147
def update(self, **fields: Any) -> None: ...
140148

141-
def end(self, **fields: Any) -> None: ...
149+
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None: ...
142150

143151

144152
@runtime_checkable
@@ -224,6 +232,7 @@ def generation(
224232
output: Any = None,
225233
usage: LangfuseUsage | None = None,
226234
prompt: Any = None,
235+
start_time: datetime | None = None,
227236
) -> LangfuseGenerationHandle: ...
228237

229238
def force_flush(self, timeout_ms: int = 30_000) -> bool:
@@ -268,7 +277,9 @@ def id(self) -> str:
268277
def update(self, **fields: Any) -> None:
269278
_apply_fields(self.observation, fields)
270279

271-
def end(self, **fields: Any) -> None:
280+
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None:
281+
if end_time is not None:
282+
self.observation.end_time = end_time
272283
_apply_fields(self.observation, fields)
273284
self.observation.ended = True
274285

@@ -286,7 +297,9 @@ def id(self) -> str:
286297
def update(self, **fields: Any) -> None:
287298
_apply_fields(self.observation, fields)
288299

289-
def end(self, **fields: Any) -> None:
300+
def end(self, *, end_time: datetime | None = None, **fields: Any) -> None:
301+
if end_time is not None:
302+
self.observation.end_time = end_time
290303
_apply_fields(self.observation, fields)
291304
self.observation.ended = True
292305

@@ -428,6 +441,7 @@ def generation(
428441
output: Any = None,
429442
usage: LangfuseUsage | None = None,
430443
prompt: Any = None,
444+
start_time: datetime | None = None,
431445
) -> LangfuseGenerationHandle:
432446
trace = self._get_trace(trace_id)
433447
observation = LangfuseObservation(
@@ -444,6 +458,7 @@ def generation(
444458
output=output,
445459
usage=usage,
446460
prompt_entity_link=prompt,
461+
start_time=start_time,
447462
)
448463
trace.observations.append(observation)
449464
return _InMemoryGenerationHandle(observation=observation)

0 commit comments

Comments
 (0)