-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathobserver.py
More file actions
2094 lines (1962 loc) · 103 KB
/
Copy pathobserver.py
File metadata and controls
2094 lines (1962 loc) · 103 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Spec mapping (observability §8):
# - Consumes the §6 observer event stream as a sibling to the OTel
# observer (§8.9 composition).
# - Maps invocation → Trace, node/subgraph/fan-out → Span observation,
# LLM provider → Generation observation (§8.3 table).
# - Sets the Trace `id` equal to the OA `invocation_id` so cross-system
# lookup by invocation_id finds the Langfuse Trace verbatim (§8.4.1).
# - Routes correlation_id to both `trace.metadata.correlation_id` and
# every `observation.metadata.correlation_id` per §8.5.
# - Sources Trace name from the entry-node name (§8.6 fallback). The
# caller-supplied invocation-label path lands in proposal 0034 (PR 4
# of the v0.10.0 batch).
# - Generation rendering follows §8.7: input/output/request_extras
# appear only when `disable_provider_payload=False`; the truncation
# marker is preserved verbatim as a raw string when the §5.5.5
# truncation makes the JSON unparseable.
# - Prompt linkage follows §8.4.4: reads
# `Prompt.observability_entities["langfuse_prompt"]` to establish a
# native Prompt-entity link when present; metadata-only otherwise.
"""LangfuseObserver: maps OA events to Langfuse Traces + Observations."""
from __future__ import annotations
import json
import uuid
from collections.abc import Callable, Mapping
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any, cast
from openarmature.graph.events import (
EmbeddingEvent,
EmbeddingFailedEvent,
FailureIsolatedEvent,
InvocationCompletedEvent,
InvocationStartedEvent,
LlmCompletionEvent,
LlmFailedEvent,
LlmRetryAttemptEvent,
MetadataAugmentationEvent,
NodeEvent,
ToolCallEvent,
ToolCallFailedEvent,
)
from openarmature.graph.observer import ObserverEvent
from openarmature.observability.lineage import is_strict_prefix
from .client import (
LangfuseClient,
LangfuseGenerationHandle,
LangfuseSpanHandle,
LangfuseUsage,
ObservationLevel,
)
# §5.5.5 / §8.7 truncation: when the serialized payload exceeds the
# configured cap, the marker below is appended and the unparseable
# JSON serves as the "this was truncated" signal in Langfuse's input
# / output / metadata.request_extras fields.
_TRUNCATION_MARKER_TEMPLATE = "…[truncated, {m} bytes total]"
# §5.5.5 minimum-cap rule mirrors the OTel observer's bound. 256 bytes
# is the smallest value that fits the worst-case marker (~36 bytes)
# plus a diagnostically useful preview.
_PAYLOAD_MIN_BYTES = 256
def _read_spec_version() -> str:
"""Lazy spec-version read; mirrors the OTel observer's lookup so
Langfuse-side spec_version metadata stays in lockstep."""
from openarmature import __spec_version__
return __spec_version__
# Proposal 0052: implementation attribution attributes. Sourced from
# the package identity constants via the same lazy-import discipline
# as ``_read_spec_version``.
def _read_implementation_name() -> str:
from openarmature import __implementation_name__
return __implementation_name__
def _read_implementation_version() -> str:
from openarmature import __version__
return __version__
# In-flight Span observation handle, keyed by the standard span-stack
# key (namespace, attempt_index, fan_out_index, branch_name).
# ``branch_name`` discriminates concurrent same-named inner nodes
# across sibling parallel-branches branches (pipeline-utilities §11);
# without it the two inner ``ask`` nodes of two branches with the
# same namespace + fan_out_index would collide on the same key.
# Mirrors the OTel observer's ``_StackKey`` shape but holds a
# Langfuse handle instead of an OTel Span.
_StackKey = tuple[tuple[str, ...], int, int | None, str | None]
# Lineage-aware dispatch keys (proposal 0045): the fan-out / pb NODE namespace
# prefix plus the fan-out instance index / branch name chain slices along the
# path to it. _BranchDispatchKey carries the explicit branch name (a callable
# branch sets it without extending the chain) instead of a trailing chain entry.
_DispatchKey = tuple[tuple[str, ...], tuple[int | None, ...], tuple[str | None, ...]]
_BranchDispatchKey = tuple[tuple[str, ...], tuple[int | None, ...], tuple[str | None, ...], str]
@dataclass
class _OpenObservation:
"""An in-flight Langfuse observation pinned in the observer's state.
Carries the observation's own ``fan_out_index_chain`` and
``branch_name_chain`` so the augmentation walk can apply the
lineage-aware boundary rule (mirror of the OTel observer's
``_OpenSpan``)."""
handle: LangfuseSpanHandle | LangfuseGenerationHandle
fan_out_index_chain: tuple[int | None, ...] = ()
branch_name_chain: tuple[str | None, ...] = ()
def _observation_chain_on_path(
open_obs: _OpenObservation,
aug_fi_chain: tuple[int | None, ...],
aug_bn_chain: tuple[str | None, ...],
) -> bool:
"""Mirror of the OTel observer's ``_span_chain_on_path`` for
Langfuse observations. Returns True iff the observation's chain
is a prefix-match of the augmenter's chain."""
obs_fi = open_obs.fan_out_index_chain
obs_bn = open_obs.branch_name_chain
if len(obs_fi) > len(aug_fi_chain):
return False
if len(obs_bn) > len(aug_bn_chain):
return False
for i in range(len(obs_fi)):
if obs_fi[i] != aug_fi_chain[i]:
return False
for i in range(len(obs_bn)):
if obs_bn[i] != aug_bn_chain[i]:
return False
return True
def _dispatch_key(
prefix: tuple[str, ...],
fan_out_index_chain: tuple[int | None, ...],
branch_name_chain: tuple[str | None, ...],
) -> _DispatchKey:
"""Lineage-aware identity key for a fan-out instance / per-branch dispatch at
namespace ``prefix``. Encodes the fan-out/pb NODE namespace plus the full
chain of fan-out instance indices / branch names along the path to it (sliced
to ``len(prefix)``). Two dispatches at the same namespace but in different
enclosing fan-out instances / branches therefore get distinct keys -- the
enclosing chain entries differ -- which is what lets a fan-out / pb nested
inside an outer fan-out instance avoid colliding across outer instances. For
a top-level or serial-nested dispatch (no enclosing fan-out/branch) the
enclosing chain entries are all None, so the key is a stable function of the
namespace plus the dispatch's own axis."""
n = len(prefix)
return (prefix, tuple(fan_out_index_chain[:n]), tuple(branch_name_chain[:n]))
def _branch_dispatch_key(
prefix: tuple[str, ...],
fan_out_index_chain: tuple[int | None, ...],
branch_name_chain: tuple[str | None, ...],
branch_name: str,
) -> _BranchDispatchKey:
"""Lineage-aware identity key for a per-branch dispatch at namespace
``prefix``. The branch IDENTITY comes from ``branch_name`` explicitly (not
``branch_name_chain[-1]``): a callable branch carries its name on the event
but never extends ``branch_name_chain`` (no subgraph descent). The key still
carries the ENCLOSING fan-out instance / branch chain (positions above this
pb node) so a pb nested inside an outer fan-out instance doesn't collide
across outer instances."""
n = len(prefix)
return (prefix, tuple(fan_out_index_chain[:n]), tuple(branch_name_chain[: n - 1]), branch_name)
def _empty_str_frozenset() -> frozenset[str]:
"""Typed empty frozenset factory for ``detached_subgraphs`` /
``detached_fan_outs`` defaults."""
return frozenset()
def _apply_caller_metadata(metadata: dict[str, Any], caller_metadata: Mapping[str, Any]) -> None:
"""Merge caller-supplied invocation metadata into a Trace's or
Observation's metadata bag at top level.
Top-level placement lets the Langfuse UI filter on
``metadata.<key>`` directly, so caller-supplied entries become
siblings to ``correlation_id`` / ``entry_node`` rather than
nested under a ``user`` sub-object.
Reserved-key collision with the OA-emitted keys
(``correlation_id``, ``entry_node``, ``spec_version``,
``namespace``, etc.) is not currently checked here: the rejection
may happen at either boundary, and the ``invoke()`` API-boundary
validation already rejects ``openarmature.*`` / ``gen_ai.*``
prefixed keys. Per-Langfuse-backend collision rejection is queued
as a follow-up.
"""
# Spec observability §8.4.1 / §8.4.2 (proposal 0034): top-level
# placement of caller-supplied metadata on the Trace / Observation.
for key, value in caller_metadata.items():
metadata[key] = value
def _promoted_user_id(metadata: Mapping[str, Any]) -> str | None:
# Proposal 0064 §8.4.1: a recognized ``userId`` caller-metadata key
# promotes to Langfuse's first-class trace.userId (recognized, not
# reserved; automatic, not opt-in). Read from the already-merged trace
# metadata, so the promotion is additive -- the key also remains at
# trace.metadata.userId. Absent key -> None (trace.userId unset).
value = metadata.get("userId")
return str(value) if value is not None else None
def _subgraph_identity_at(event: NodeEvent, depth: int) -> str:
"""Return the compiled-subgraph identity for the wrapper at the
given 1-based namespace depth, or the empty string when no
identity is tracked at that depth.
The empty-string fallback is the "no identity tracked" case, for
implementations / direct ``SubgraphNode(...)`` callers that don't
wire an identity through.
Conformance fixtures 031/032/033 lock identity as the required
value; the empty-string path keeps direct callers conformant but
failing those fixtures.
"""
# Spec observability §5.3 (coord thread
# clarify-subgraph-name-semantics): empty-string fallback is
# conformant for callers that don't track a subgraph identity.
idx = depth - 1
if 0 <= idx < len(event.subgraph_identities):
identity = event.subgraph_identities[idx]
if identity is not None:
return identity
return ""
@dataclass
class _InvState:
"""Per-invocation state, isolated by invocation_id.
A single LangfuseObserver is safe to share across concurrent
invocations; each invocation's in-flight observations live under
its own _InvState so they never collide.
"""
trace_id: str
open_observations: dict[_StackKey, _OpenObservation] = field(
default_factory=dict[_StackKey, _OpenObservation]
)
# Synthetic subgraph dispatch Span observations, keyed by namespace
# prefix. Per spec §8.3 each subgraph wrapper produces a Span
# observation in its parent's Trace; descendant node observations
# parent under it. For a detached subgraph, this dictionary holds
# the dispatch Span observation that lives in the DETACHED Trace
# (so descendants in that subtree parent under it via the detached
# Trace's observation tree); the main Trace carries a separate
# link observation surfacing metadata.detached_child_trace_ids
# that's opened and closed in one shot, not tracked here.
subgraph_observations: dict[tuple[str, ...], _OpenObservation] = field(
default_factory=dict[tuple[str, ...], _OpenObservation]
)
# Per-instance fan-out dispatch Span observations (non-detached),
# keyed by ``prefix + (str(fan_out_index),)``. Parents under the
# fan-out node's own Span observation; inner-node observations
# parent under this dispatch instead of the shared fan-out node
# span. Closed when the fan-out node's completed event fires.
fan_out_instance_observations: dict[_DispatchKey, _OpenObservation] = field(
default_factory=dict[_DispatchKey, _OpenObservation]
)
# Maps a namespace prefix to the detached Langfuse trace_id when
# that subtree is configured detached (per the observer's
# ``detached_subgraphs`` / ``detached_fan_outs`` knobs). The
# presence of a prefix here switches descendant observations onto
# the detached Trace.
detached_traces: dict[tuple[str, ...], str] = field(default_factory=dict[tuple[str, ...], str])
# Set of detached fan-out instance prefixes
# (``prefix + (str(fan_out_index),)``) — distinguished from
# detached subgraph prefixes because they're closed when the
# fan-out node's completed event fires, not when the namespace
# cursor leaves the subtree.
fan_out_instance_root_prefixes: set[_DispatchKey] = field(default_factory=set[_DispatchKey])
# ``parent_node_name`` cache for per-instance attribution
# (spec proposal 0013 v0.10.0 — inner events from inside a
# non-detached fan-out instance don't carry fan_out_config
# themselves; the cache bridges the lookup so the synthetic
# per-instance dispatch observation can attach
# metadata.fan_out_parent_node_name).
fan_out_parent_node_name: dict[tuple[str, ...], str] = field(default_factory=dict[tuple[str, ...], str])
# Per proposal 0045: structural identification of parallel-
# branches NODE namespaces. Populated on a pb NODE's started
# event (whichever events carry ``parallel_branches_config``);
# consulted by the augmentation walk to skip the pb NODE itself
# as a shared parent (§3.4's structural classification).
parallel_branches_parent_node_name: dict[tuple[str, ...], str] = field(
default_factory=dict[tuple[str, ...], str]
)
# Per proposal 0044: per-branch dispatch-span observations synthesized from
# the first inner event of each branch, keyed ``prefix + (branch_name,)``
# (prefix = the parallel-branches NODE namespace). Inner branch-node
# observations parent under this dispatch instead of the shared pb NODE
# span; closed when the pb NODE's completed event fires.
parallel_branches_branch_spans: dict[_BranchDispatchKey, _OpenObservation] = field(
default_factory=dict[_BranchDispatchKey, _OpenObservation]
)
# Declared branch-name set per pb NODE namespace (from the NODE's
# parallel_branches_config), so an inner branch event matches only the node
# that actually declares its branch.
parallel_branches_branch_names: dict[tuple[str, ...], frozenset[str]] = field(
default_factory=dict[tuple[str, ...], frozenset[str]]
)
# Side-cache: accumulator for `metadata.detached_child_trace_ids`
# on dispatch observations that spawn detached children. Keyed by
# the dispatch observation's prefix (the fan-out node's namespace,
# or the detached-subgraph parent's prefix). Each new detached
# child append-then-snapshot lets us preserve §8.5's string-array
# shape across multiple instances without re-reading metadata
# from the client (the Protocol doesn't expose a read accessor).
detached_child_trace_ids: dict[tuple[str, ...], list[str]] = field(
default_factory=dict[tuple[str, ...], list[str]]
)
@dataclass
class LangfuseObserver:
"""Observer-driven Langfuse mapping.
Construct with a :class:`LangfuseClient` — the bundled
:class:`InMemoryLangfuseClient` for tests, or a real
``langfuse.Langfuse()`` instance for production. The observer
handles the event stream and emits Trace + Observation entities
through the client.
Constructor knobs:
- ``client``: the Langfuse sink (Protocol-typed).
- ``disable_llm_spans``: when ``True`` the observer skips
Generation observations on LLM provider events.
- ``disable_provider_payload``: default ``True`` for a symmetric
privacy posture with the OTel observer. Gates
``generation.input`` / ``output`` / ``metadata.request_extras``
emission. The name carries the broadened provider-payload scope;
LLM completion is OA's only provider-call payload today.
- ``payload_byte_cap``: per-attribute byte cap on the source
payload string before parse-back. Mirrors the OTel observer's
``payload_max_bytes`` semantic — emission preserves the raw
truncated string when the truncation marker is present. Default
64 KiB; same minimum (256 bytes) applies.
- ``detached_subgraphs``: set of subgraph wrapper node names that
run in their own Langfuse Trace. Each such subgraph gets a fresh
trace_id; the main Trace's dispatch observation surfaces the link
via ``metadata.detached_child_trace_ids``.
- ``detached_fan_outs``: set of fan-out node names whose instances
each get their own Langfuse Trace. Same link mechanism on the
fan-out node observation: each per-instance detached trace_id
lands in the array.
- ``disable_state_payload``: default ``True`` (Trace input/output
sourcing). When ``True`` the observer does NOT serialize
``initial_state`` / final state directly onto ``trace.input`` /
``trace.output``; the minimal stub applies unless
``trace_input_from_state`` / ``trace_output_from_state``
overrides. When ``False`` the raw state object is serialized to
the Trace fields, subject to ``payload_byte_cap`` truncation.
Independent of ``disable_provider_payload`` — the two payloads
carry distinct threat models (LLM-call transcript vs.
application state).
- ``trace_input_from_state``: optional caller hook returning the
value to use as ``trace.input``. Called once per invocation at
the ``InvocationStartedEvent``. Returning ``None`` falls
through to the next lever (raw state when
``disable_state_payload=False``, minimal stub otherwise).
- ``trace_output_from_state``: same shape for ``trace.output``,
called once per invocation at the ``InvocationCompletedEvent``.
- ``implementation_name``: string surfaced as
``trace.metadata.implementation_name`` on every Trace. Defaults
to the package's ``__implementation_name__``
(``"openarmature-python"``). Configurable for test
parameterization.
- ``implementation_version``: string surfaced as
``trace.metadata.implementation_version`` on every Trace.
Defaults to ``openarmature.__version__``. Always emitted —
not gated by ``disable_state_payload``,
``disable_provider_payload``, or any other privacy knob.
The observer reads the spec version from the package at
construction time. Safe to share across concurrent invocations
and across resumes of the same correlation_id; per-invocation
state isolation keys all internal maps by invocation_id.
"""
# Spec observability §8 (Langfuse backend mapping). Knob spec
# basis: §8.9 privacy posture; §8.4.1 Trace input/output sourcing
# (proposal 0043); §8.5 detached traces; §5.1 always-emit
# attribution invariant.
client: LangfuseClient
disable_llm_spans: bool = False
disable_provider_payload: bool = True
payload_byte_cap: int = 65536
detached_subgraphs: frozenset[str] = field(default_factory=_empty_str_frozenset)
detached_fan_outs: frozenset[str] = field(default_factory=_empty_str_frozenset)
spec_version: str = field(default_factory=_read_spec_version)
# Proposal 0052 §8.4.1: implementation attribution rows on every
# Trace. Configurable for test parameterization; defaults to the
# package identity. Always-emit invariant inherited from §5.1 —
# ``disable_state_payload`` and the other privacy knobs do not
# gate these rows because they describe runtime identity, not
# runtime data.
implementation_name: str = field(default_factory=_read_implementation_name)
implementation_version: str = field(default_factory=_read_implementation_version)
# Proposal 0043 §8.4.1 *Trace input/output sourcing*.
disable_state_payload: bool = True
trace_input_from_state: Callable[[Any], Any] | None = None
trace_output_from_state: Callable[[Any], Any] | None = None
# Internal state populated during invocation.
_inv_states: dict[str, _InvState] = field(init=False, repr=False, default_factory=dict[str, _InvState])
def __post_init__(self) -> None:
# §5.5.5 minimum-cap validation mirrors the OTel observer's bound.
# Reject misconfigurations at construction time rather than
# surfacing them as a Langfuse-ingest error later.
if self.payload_byte_cap < _PAYLOAD_MIN_BYTES:
raise ValueError(
f"payload_byte_cap={self.payload_byte_cap} below the spec §5.5.5 "
f"minimum of {_PAYLOAD_MIN_BYTES} bytes"
)
async def __call__(
self,
event: ObserverEvent,
) -> None:
if isinstance(event, InvocationStartedEvent):
self._handle_invocation_started(event)
return
if isinstance(event, InvocationCompletedEvent):
self._handle_invocation_completed(event)
return
# Proposal 0050 per-attempt LLM events are OTel-span-only: the
# Langfuse mapping renders one Generation per call from the
# terminal LlmCompletionEvent / LlmFailedEvent, so the
# per-attempt event is ignored here.
if isinstance(event, LlmRetryAttemptEvent):
return
# Proposal 0049 typed LlmCompletionEvent (success path). Drives
# the §5.5 Generation observation lifecycle for successful
# provider calls.
if isinstance(event, LlmCompletionEvent):
if not self.disable_llm_spans:
self._handle_typed_llm_completion(event)
return
# Proposal 0058 typed LlmFailedEvent (failure path). Drives
# the same Generation observation lifecycle with ERROR level +
# error_category as statusMessage.
if isinstance(event, LlmFailedEvent):
if not self.disable_llm_spans:
self._handle_typed_llm_failed(event)
return
# Proposal 0063 tool-execution observability: render the dedicated
# Langfuse Tool observation (asType "tool") under the calling
# node's Span observation.
if isinstance(event, ToolCallEvent | ToolCallFailedEvent):
self._handle_tool_call(event)
return
# Proposal 0050 §6.3 framework-emitted failure-isolation event.
if isinstance(event, FailureIsolatedEvent):
self._handle_failure_isolated(event)
return
if isinstance(event, MetadataAugmentationEvent):
self._handle_metadata_augmentation(event)
return
# Proposal 0059 embedding events: the bundled Langfuse Embedding
# observation is a follow-up. Until it lands the events are safely
# ignored here rather than falling through to the NodeEvent phase
# dispatch (which would AttributeError on the absent ``phase``).
if isinstance(event, EmbeddingEvent | EmbeddingFailedEvent):
return
if event.phase == "started":
self._open_started_observation(event)
elif event.phase == "completed":
self._handle_completed(event)
# checkpoint_saved and checkpoint_migrated are OTel-mapping-
# specific synthetic phases per §5.5 / §10.8; the Langfuse
# mapping doesn't surface checkpoint events as observations
# in v0.23.0 (§8.10's deferral envelope).
# ------------------------------------------------------------------
# Span observation lifecycle (node / subgraph / fan-out)
# ------------------------------------------------------------------
def _open_started_observation(self, event: NodeEvent) -> None:
from openarmature.observability.correlation import (
current_correlation_id,
current_invocation_id,
)
invocation_id = current_invocation_id()
if invocation_id is None:
return
correlation_id = current_correlation_id()
# Lazy Trace open on the first event for this invocation_id.
# The Trace ID equals the invocation_id verbatim per §8.4.1 so
# cross-system lookup is a direct hit.
if invocation_id not in self._inv_states:
self._open_trace(invocation_id, correlation_id, event)
inv_state = self._inv_states[invocation_id]
# Cache the fan-out node's parent_node_name from its own
# started event so synthetic per-instance dispatch observations
# can attach metadata.fan_out_parent_node_name (the inner
# events from inside the fan-out don't carry fan_out_config
# themselves; this cache bridges). fan_out_config is set only on
# the NODE's own events, so it alone identifies them -- NOT
# ``fan_out_index is None``, which would miss a fan-out node nested
# inside an outer fan-out instance (its own event carries the OUTER
# instance index), leaving the inner dispatch unsynthesized.
if event.fan_out_config is not None:
inv_state.fan_out_parent_node_name[event.namespace] = event.fan_out_config.parent_node_name
# Per proposal 0045: mirror cache for parallel-branches NODE
# identification (used by the augmentation shared-parent
# check). No additional ``branch_name is None`` filter — the
# ``*_config`` field is itself only populated on a NODE's own
# events.
if event.parallel_branches_config is not None:
inv_state.parallel_branches_parent_node_name[event.namespace] = (
event.parallel_branches_config.parent_node_name
)
inv_state.parallel_branches_branch_names[event.namespace] = frozenset(
event.parallel_branches_config.branch_names
)
key = self._key_for(event)
if key in inv_state.open_observations:
# Idempotent: a second started for the same (namespace,
# attempt_index, fan_out_index) tuple is a no-op (matches
# the OTel observer's behavior under retry-replay).
return
# Synthesize any subgraph dispatch / fan-out per-instance
# dispatch observations the leaf needs as ancestors. Also
# closes dispatch observations whose subtree we've left.
self._sync_subgraph_observations(inv_state, correlation_id, event)
parent_observation_id = self._resolve_parent_observation_id(inv_state, event)
metadata = self._observation_metadata(event, correlation_id)
target_trace_id = self._trace_id_for(inv_state, event.namespace, event.fan_out_index)
handle = self.client.span(
trace_id=target_trace_id,
name=event.node_name,
metadata=metadata,
parent_observation_id=parent_observation_id,
)
inv_state.open_observations[key] = _OpenObservation(
handle=handle,
fan_out_index_chain=event.fan_out_index_chain,
branch_name_chain=event.branch_name_chain,
)
def _handle_completed(self, event: NodeEvent) -> None:
from openarmature.observability.correlation import current_invocation_id
invocation_id = current_invocation_id()
if invocation_id is None:
return
inv_state = self._inv_states.get(invocation_id)
if inv_state is None:
return
# If this is the fan-out node's own completion (event.fan_out_index
# is None) AND the fan-out is configured detached, close any
# detached per-instance Trace dispatch observations the fan-out
# spawned. Done BEFORE the regular pop so the close ordering is
# children-before-parents.
if event.fan_out_index is None and event.namespace and event.namespace[0] in self.detached_fan_outs:
ns = event.namespace
for key in list(inv_state.fan_out_instance_root_prefixes):
anchor_ns = key[0]
if len(anchor_ns) >= len(ns) and anchor_ns[: len(ns)] == ns:
# Detached per-instance dispatches live in
# fan_out_instance_observations (same map as
# non-detached); close via the matching helper.
self._close_fan_out_instance_dispatch_observation(inv_state, key)
inv_state.fan_out_instance_root_prefixes.discard(key)
# detached_traces uses the top-level routing key shape; derive
# it from the lineage key's own instance index (last entry).
fi_chain = key[1]
inv_state.detached_traces.pop(anchor_ns + (str(fi_chain[-1]),), None)
# Per spec proposal 0013 (v0.10.0): when the fan-out node's
# own completion fires, close all per-instance dispatch
# observations synthesized for it. Children-before-parents.
if event.fan_out_index is None and event.fan_out_config is not None:
ns = event.namespace
for key in list(inv_state.fan_out_instance_observations.keys()):
anchor_ns = key[0]
# The dispatch key is now (anchor_ns, fi_chain, bn_chain); match
# on the NODE namespace (anchor_ns) being in this completing
# node's subtree.
if len(anchor_ns) >= len(ns) and anchor_ns[: len(ns)] == ns:
self._close_fan_out_instance_dispatch_observation(inv_state, key)
inv_state.fan_out_parent_node_name.pop(event.namespace, None)
# Clear the detached-child-trace-ids accumulator for this
# fan-out node — cyclic execution that re-enters the same
# fan-out starts the next iteration with a fresh list
# rather than appending to the previous iteration's
# accumulator and overwriting the prior link metadata.
inv_state.detached_child_trace_ids.pop(event.namespace, None)
# Per proposals 0044/0045: on a pb NODE's own completion, close the
# per-branch dispatch observations synthesized for it (children-before-
# parents) and clear the pb caches. Same shape as the fan-out cleanup.
if event.parallel_branches_config is not None:
ns = event.namespace
for key in list(inv_state.parallel_branches_branch_spans.keys()):
anchor_ns = key[0]
if len(anchor_ns) >= len(ns) and anchor_ns[: len(ns)] == ns:
self._close_parallel_branches_branch_dispatch_observation(inv_state, key)
inv_state.parallel_branches_parent_node_name.pop(event.namespace, None)
inv_state.parallel_branches_branch_names.pop(event.namespace, None)
key = self._key_for(event)
observation = inv_state.open_observations.pop(key, None)
if observation is None:
return
# Error-category mapping per §8.4.2: error.category → level=ERROR
# + statusMessage=<category>.
if event.error is not None and getattr(event.error, "category", None) is not None:
observation.handle.end(level="ERROR", status_message=event.error.category)
else:
observation.handle.end()
# If this was a detached subgraph root prefix, drop the
# detached_traces entry so a subsequent re-entry mints fresh.
inv_state.detached_traces.pop(event.namespace, None)
# ------------------------------------------------------------------
# Metadata augmentation (proposal 0040 §3.4 + §6)
# ------------------------------------------------------------------
def _handle_metadata_augmentation(self, event: MetadataAugmentationEvent) -> None:
# Spec proposal 0040 §3.4 MUST: open observations whose lineage
# ancestor-or-equals the augmenting context get the entries
# applied in place via the Langfuse handle's
# ``update(metadata=...)`` method. Sibling instances / branches
# and ancestors above the containment are skipped (same scoping
# rule as the OTel mapping — see
# ``OTelObserver._handle_metadata_augmentation`` for the algebra).
#
# For an outermost-serial augmenter (FI=None, BN=None), the
# invocation's Trace itself is updated via
# ``client.update_trace`` so the augmented keys land on
# ``trace.metadata.<key>`` for §8.4-style top-level filtering.
# Inside a fan-out instance / parallel-branches branch the
# Trace is OUT of scope (it's shared with siblings); only the
# innermost containment + the augmenter's own subtree update.
#
# Per-instance / per-branch isolation:
# ``set_invocation_metadata`` runs in the calling node's task
# whose Context already carries the per-async-context COW
# mapping (proposal 0034 §3.4). The augmentation event's
# ``entries`` are that delta only — applying them to matching
# open observations preserves the per-async-context isolation
# 029 / 030 encode.
from openarmature.observability.correlation import current_invocation_id
invocation_id = current_invocation_id()
if invocation_id is None or not event.entries:
return
inv_state = self._inv_states.get(invocation_id)
aug_ns = event.namespace
aug_fi_chain = event.fan_out_index_chain
aug_bn_chain = event.branch_name_chain
metadata_delta = dict(event.entries)
# Trace.metadata: only when the augmenter sits in OUTERMOST
# SERIAL context (no fan-out instance and no parallel-branches
# branch on its call-stack path). Per §3.4 the Trace is a
# shared parent inside any dispatch boundary — siblings would
# leak — so only the no-dispatch-on-path case writes.
outermost_serial = all(fi is None for fi in aug_fi_chain) and all(bn is None for bn in aug_bn_chain)
if outermost_serial:
self.client.update_trace(id=invocation_id, metadata=metadata_delta)
if inv_state is None:
return
# Per proposal 0045: parallel walk of the OTel observer's
# _collect_augmentation_targets — subgraph wrappers on the
# call-stack path (chain prefix-matches), fan-out instance
# dispatch observations whose dispatch position matches the
# augmenter's chain, and open NODE observations on the path
# (skipping fan-out / pb shared-parent NODEs).
# Subgraph wrapper observations on the path.
for prefix, observation in inv_state.subgraph_observations.items():
if not is_strict_prefix(prefix, aug_ns):
continue
if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain):
observation.handle.update(metadata=metadata_delta)
# Fan-out instance dispatch observations: on the augmenter's path iff the
# dispatch NODE namespace (key[0]) is an ancestor-or-equal of the
# augmenter AND its full lineage chain (carried on the observation) is a
# prefix of the augmenter's -- so a SIBLING outer instance's dispatch,
# whose chain differs at the enclosing position, is excluded.
for key, observation in inv_state.fan_out_instance_observations.items():
anchor_ns = key[0]
if not (is_strict_prefix(anchor_ns, aug_ns) or anchor_ns == aug_ns):
continue
if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain):
observation.handle.update(metadata=metadata_delta)
# Open NODE observations. Same as augmenter or strict
# ancestor on the path; skip shared-parent NODE observations
# (fan-out NODE / pb NODE) identified by presence in the
# parent_node_name caches.
for key, observation in inv_state.open_observations.items():
ns, _ai, _fi, _bn = key
if ns != aug_ns and not is_strict_prefix(ns, aug_ns):
continue
# A fan-out / pb NODE is a shared parent and MUST NOT carry an
# instance's / branch's augmentation (proposal 0045 §3.4). This skip
# applies whether the NODE sits strictly above the augmenter OR at
# the augmenter's own namespace: an instance/branch executes AT the
# fan-out/pb node's namespace, so ns == aug_ns also matches the shared
# NODE (its per-instance dispatch is the one updated, separately above).
if ns in inv_state.fan_out_parent_node_name or ns in inv_state.parallel_branches_parent_node_name:
continue
if _observation_chain_on_path(observation, aug_fi_chain, aug_bn_chain):
observation.handle.update(metadata=metadata_delta)
# ------------------------------------------------------------------
# Failure-isolation event (proposal 0050 §6.3)
# ------------------------------------------------------------------
def _handle_failure_isolated(self, event: FailureIsolatedEvent) -> None:
# Render the FailureIsolationMiddleware catch as a marker
# observation. Parented under the wrapped node's observation when
# it is still open; otherwise trace-level (the node observation
# is typically already closed-with-error by delivery time, since
# the node-body raise fires the node's completed event before the
# middleware recovers). The wrapped node's name rides on
# ``metadata.failure_isolation_node`` for correlation regardless.
from openarmature.observability.correlation import (
current_correlation_id,
current_invocation_id,
)
invocation_id = current_invocation_id()
if invocation_id is None:
return
inv_state = self._inv_states.get(invocation_id)
if inv_state is None:
return
key: _StackKey = (event.namespace, event.attempt_index, event.fan_out_index, event.branch_name)
parent = inv_state.open_observations.get(key)
parent_observation_id = parent.handle.id if parent is not None else None
metadata: dict[str, Any] = {
"failure_isolation_event_name": event.event_name,
"error_message": event.caught_exception.message,
}
if event.namespace:
metadata["failure_isolation_node"] = event.namespace[-1]
if event.caught_exception.category is not None:
metadata["error_category"] = event.caught_exception.category
correlation_id = current_correlation_id()
if correlation_id is not None:
metadata["correlation_id"] = correlation_id
handle = self.client.span(
trace_id=inv_state.trace_id,
name="openarmature.failure_isolated",
metadata=metadata,
parent_observation_id=parent_observation_id,
)
handle.end()
# ------------------------------------------------------------------
# Invocation-boundary events (proposal 0043 §8.4.1 sourcing)
# ------------------------------------------------------------------
def _handle_invocation_started(self, event: InvocationStartedEvent) -> None:
# Spec proposal 0043 §8.4.1 *Trace input/output sourcing*.
# Lazy-open the Trace if this is the first signal for the
# invocation_id (no node event has fired yet), then resolve
# ``trace.input`` via the three-lever decision tree:
# 1. Hook supplied AND returns non-None → hook value.
# 2. ``disable_state_payload`` is False → raw initial_state
# serialized (subject to payload_byte_cap truncation).
# 3. Otherwise → minimal stub:
# {entry_node, correlation_id}.
# The stub carries no application payload — both fields are
# already in ``trace.metadata``; surfacing them on
# ``trace.input`` makes the Langfuse Traces list view
# scannable without revealing state shape.
if event.invocation_id not in self._inv_states:
self._open_trace_lazy(event.invocation_id, event.correlation_id, event.entry_node)
input_value = self._resolve_trace_input(event)
self.client.update_trace(id=event.invocation_id, input=input_value)
def _handle_invocation_completed(self, event: InvocationCompletedEvent) -> None:
# Spec proposal 0043 §8.4.1. Resolve ``trace.output`` via the
# same three-lever decision tree as input, with the minimal
# stub carrying {final_node, status}.
if event.invocation_id not in self._inv_states:
# Defensive: a fast-failure invocation may complete before
# any node event fired (e.g., resume-path validation
# rejected). Lazy-open the Trace so the stub still lands.
entry_node = event.final_node # best-effort fallback
self._open_trace_lazy(event.invocation_id, event.correlation_id, entry_node)
output_value = self._resolve_trace_output(event)
self.client.update_trace(id=event.invocation_id, output=output_value)
def _resolve_trace_input(self, event: InvocationStartedEvent) -> Any:
# Lever 1: caller hook.
if self.trace_input_from_state is not None:
try:
hook_value = self.trace_input_from_state(event.initial_state)
except Exception:
# Hook raise: skip emission (defensive — caller code
# should not break observability). Fall through to the
# next lever rather than crash the observer.
hook_value = None
if hook_value is not None:
return self._maybe_truncate_for_extras(hook_value)
# Lever 2: raw state when knob is OFF.
if not self.disable_state_payload:
serialized = self._state_to_jsonable(event.initial_state)
return self._maybe_truncate_for_extras(serialized)
# Lever 3: minimal stub.
stub: dict[str, Any] = {"entry_node": event.entry_node}
if event.correlation_id is not None:
stub["correlation_id"] = event.correlation_id
return stub
def _resolve_trace_output(self, event: InvocationCompletedEvent) -> Any:
# Lever 1: caller hook.
if self.trace_output_from_state is not None:
try:
hook_value = self.trace_output_from_state(event.final_state)
except Exception:
hook_value = None
if hook_value is not None:
return self._maybe_truncate_for_extras(hook_value)
# Lever 2: raw state when knob is OFF.
if not self.disable_state_payload:
serialized = self._state_to_jsonable(event.final_state)
return self._maybe_truncate_for_extras(serialized)
# Lever 3: minimal stub.
return {"final_node": event.final_node, "status": event.status}
@staticmethod
def _state_to_jsonable(state: Any) -> Any:
# Best-effort conversion of a State instance to a JSON-able
# shape. Pydantic models expose ``model_dump`` directly; other
# objects fall through to a str representation. The serialized
# form is what ends up on the Langfuse Trace's
# ``input`` / ``output`` field.
#
# ``mode="json"`` (rather than the default Python mode) coerces
# non-JSON-native types — ``datetime``, ``UUID``, ``Decimal``,
# etc. — into JSON-compatible strings BEFORE the dict reaches
# the downstream ``json.dumps`` truncation path. Without it the
# truncation path raises ``TypeError`` and the observer's
# ``__call__`` raise is swallowed by the engine's warnings-only
# observer-isolation contract, leaving ``trace.input`` /
# ``trace.output`` silently blank on states containing those
# types.
dumper = getattr(state, "model_dump", None)
if callable(dumper):
try:
return dumper(mode="json")
except Exception:
return str(state)
return str(state)
def _client_trace(self, *, id: str, name: str | None, metadata: dict[str, Any]) -> None:
# Proposal 0064 §8.4.1: every Trace open routes through here so the
# sessionId / userId promotions apply uniformly across the main,
# lazy, and detached trace-open sites.
# - trace.userId: promoted from the recognized ``userId`` caller
# key (already merged into ``metadata`` by _apply_caller_metadata).
# - trace.sessionId: sourced from openarmature.session_id (sessions
# capability, observability §5.6 / proposal 0020). python has no
# session_id source until 0020 lands, so it is unset (None) today;
# this is the single hook 0020 wires the source into.
self.client.trace(
id=id,
name=name,
metadata=metadata,
session_id=None,
user_id=_promoted_user_id(metadata),
)
def _open_trace_lazy(
self,
invocation_id: str,
correlation_id: str | None,
entry_node: str,
) -> None:
# Open the Trace from a non-NodeEvent path (the proposal 0043
# invocation-boundary events). The existing ``_open_trace``
# entry point reads ``entry_node`` and caller metadata from a
# NodeEvent; this lazy path doesn't have one. Caller metadata
# is still readable via ``current_invocation_metadata`` —
# ``_apply_caller_metadata`` mirrors the existing path.
from openarmature.observability.metadata import current_invocation_metadata
metadata: dict[str, Any] = {
"entry_node": entry_node,
"spec_version": self.spec_version,
# Proposal 0052 §8.4.1: implementation attribution rows.
"implementation_name": self.implementation_name,
"implementation_version": self.implementation_version,
}
if correlation_id is not None:
metadata["correlation_id"] = correlation_id
_apply_caller_metadata(metadata, current_invocation_metadata())
self._client_trace(id=invocation_id, name=entry_node, metadata=metadata)
self._inv_states[invocation_id] = _InvState(trace_id=invocation_id)
def _open_trace(self, invocation_id: str, correlation_id: str | None, event: NodeEvent) -> None:
# ``entry_node`` and the trace name MUST identify the outer-graph
# entry, not whichever node fired first. Subgraph wrappers do not
# emit their own events — when the outer entry is a SubgraphNode
# the first event the observer sees comes from inside the
# subgraph (with ``event.namespace = (wrapper, inner)`` and
# ``event.node_name = inner``). Using ``event.namespace[0]``
# walks back to the outermost prefix component, which IS the
# outer entry by construction (the graph engine fires inner
# events under the wrapper's namespace).
entry_node = event.namespace[0] if event.namespace else event.node_name
metadata: dict[str, Any] = {
"entry_node": entry_node,
"spec_version": self.spec_version,
# Proposal 0052 §8.4.1: implementation attribution rows.
"implementation_name": self.implementation_name,
"implementation_version": self.implementation_version,
}
if correlation_id is not None:
metadata["correlation_id"] = correlation_id
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
# §8.6 trace name: caller-supplied invocation label takes
# precedence; entry-node name is the spec-recommended fallback.
# The caller-supplied path lands in proposal 0034 (PR 4) — for
# now only the fallback is wired.
trace_name = entry_node
self._client_trace(id=invocation_id, name=trace_name, metadata=metadata)
self._inv_states[invocation_id] = _InvState(trace_id=invocation_id)
def _key_for(self, event: NodeEvent) -> _StackKey:
return (event.namespace, event.attempt_index, event.fan_out_index, event.branch_name)
def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent) -> str | None:
# Parent precedence (innermost wins):
# 1. Per-instance fan-out dispatch observation at
# namespace[:1] + (str(fan_out_index),) — both detached
# (where the dispatch observation lives in the detached
# Trace) and non-detached (where it lives in the main
# Trace) cases route here when event is inside a fan-out
# instance.
# 2. Subgraph dispatch observation at any matching ancestor
# prefix, walked longest-first.
# 3. Leaf node observation at any matching ancestor prefix,
# walked longest-first.
# 4. None — the Trace itself becomes the implicit parent.
# Per proposals 0044 / 0013 / 0045: an inner node parents under the
# INNERMOST dispatch on its lineage -- a per-branch dispatch
# (parallel_branches_branch_spans) or a per-instance fan-out dispatch
# (fan_out_instance_observations), both keyed by the lineage-aware
# _dispatch_key. Walk prefixes longest-first so the innermost wins; the
# lineage key carries the enclosing fan-out instance / branch chain, so
# this resolves arbitrary nesting (fan-out in fan-out, parallel-branches
# in fan-out, ...) to the RIGHT outer instance. Mirrors OTel
# _resolve_parent_context.
for prefix_len in range(len(event.namespace), 0, -1):
prefix = event.namespace[:prefix_len]
fi_axis = (
event.fan_out_index_chain[prefix_len - 1]
if prefix_len - 1 < len(event.fan_out_index_chain)
else None
)
if event.branch_name is not None:
branch_dispatch = inv_state.parallel_branches_branch_spans.get(
_branch_dispatch_key(
prefix, event.fan_out_index_chain, event.branch_name_chain, event.branch_name
)
)
if branch_dispatch is not None:
return branch_dispatch.handle.id
if fi_axis is not None:
dispatch = inv_state.fan_out_instance_observations.get(
_dispatch_key(prefix, event.fan_out_index_chain, event.branch_name_chain)
)
if dispatch is not None: