-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
834 lines (735 loc) · 37.1 KB
/
Copy pathmain.py
File metadata and controls
834 lines (735 loc) · 37.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
"""openarmature demo: production observability with dual OTel +
Langfuse observers, caller hooks for trace.input/output, and the
canonical TimingMiddleware.
**Use case:** A single-turn lunar-mission Q&A endpoint instrumented
the way you'd ship it: BOTH the OTel observer and the Langfuse
observer attached to the same graph, caller hooks deriving
domain-shaped trace.input/output from State (instead of dumping the
raw State object), the built-in TimingMiddleware recording per-node
duration, and multi-tenant caller-supplied metadata (tenantId /
requestId / featureFlag) propagating to both observers in one
``invoke()`` call.
**Demonstrates (mapped to shipped features):**
- Dual observers on one graph, with no double-export between them.
Both consume the same NodeEvent stream independently; nothing in
node code knows there are two.
- ``trace_input_from_state`` / ``trace_output_from_state`` caller
hooks on ``LangfuseObserver``. The hooks derive domain dicts
(``{"question": ...}`` / ``{"answer": ..., "model": ...}``)
instead of letting the observer dump the raw State. Production
teams use this to keep PII out of trace payloads while still
surfacing operational signal.
- Built-in ``TimingMiddleware`` from ``openarmature.graph``
wrapping the respond node. An ``on_complete`` callback receives
a ``TimingRecord(node_name, duration_ms, outcome,
exception_category)`` and prints a one-line timing summary; in
production the callback would queue to a metrics backend
(StatsD, Prometheus pushgateway, OTLP metrics exporter).
- ``invoke(metadata={...})`` carrying multi-tenant identifiers.
The OTel observer emits each entry as an
``openarmature.user.<key>`` attribute on every span; the Langfuse
observer merges them as top-level ``trace.metadata`` keys plus
per-observation metadata. One call site, two backends, no
per-observer wiring.
- ``InMemoryLangfuseClient`` captures the Trace + Observation tree
in-process so the demo can print it at the end without needing a
real Langfuse account. ``InMemorySpanExporter`` does the
symmetric job for OTel. Production code swaps in
``LangfuseSDKAdapter(Langfuse(...))`` and
``BatchSpanProcessor(OTLPSpanExporter(...))`` respectively; the
observer call surface doesn't change.
- **Queryable accumulator observer + per-invocation drain.** A
third observer (``LlmUsageAccumulator``) rolls up LLM token
totals per invocation, including a cache-hit ratio from
``usage.cached_tokens`` for backends with prefix caching (vLLM,
Anthropic, etc.). A terminal ``persist`` node calls
``await graph.drain_events_for(current_invocation_id(), timeout=2.0)``
to synchronize on the deliver loop, then reads the accumulator's
bucket and drops it. Without the drain, the bucket would be
missing the most recent LLM event's tokens (the deliver loop
hasn't reached them yet). This is the canonical shape for
per-invocation cost attribution at request scope, in place of
routing every token count through State (a workaround pattern
that pollutes the state schema with non-pipeline data). The
pattern is convention-only at the observer level:
``Observer`` itself stays a single-callable protocol; the
queryable accumulator just exposes its own read methods
(``get_bucket`` / ``drop``) that the persist node knows about.
- **Failure-category tracker observer.** A fourth observer
(``LlmFailureTracker``) subscribes to ``LlmFailedEvent``, the
typed failure-side counterpart to ``LlmCompletionEvent``, fired
once per LLM call that fails with a provider error category. The
tracker maintains a per-invocation ``{category: count}`` bucket;
the persist node reads + reports it alongside the usage rollup.
Together the two observers give operators success/failure
attribution at request scope without joining against external
trace storage. The provider emits exactly one of
(``LlmCompletionEvent``, ``LlmFailedEvent``) per LLM call, never
both, so attempt counts derive cleanly as
``usage.call_count + sum(failure.by_category.values())``.
Complementary to the observer-hooks example (three observers
side-by-side) and the langfuse-observability example (Langfuse
observer + LangfusePromptBackend prompt linkage). This example's
headline is the production-shape wiring + per-invocation cost
attribution, not the hook surface or the prompt linkage.
**Configuration** (env vars; OpenAI defaults shown):
- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. Host root only.
- ``LLM_MODEL`` defaults to ``gpt-4o-mini``.
- ``LLM_API_KEY`` required (empty for local servers that don't
authenticate).
Run with:
uv sync --group examples --all-extras
LLM_API_KEY=sk-... uv run python examples/production-observability/main.py
(``--all-extras`` pulls in ``opentelemetry-sdk`` for the OTel observer
and ``[langfuse]`` for the Langfuse observer's record types.)
"""
from __future__ import annotations
import asyncio
import os
import sys
import uuid
from dataclasses import dataclass
from typing import Any
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from openarmature.graph import (
END,
CompiledGraph,
GraphBuilder,
InvocationCompletedEvent,
LlmCompletionEvent,
LlmFailedEvent,
NodeException,
ObserverEvent,
State,
)
from openarmature.graph.middleware import TimingMiddleware, TimingRecord
from openarmature.llm import (
LlmProviderError,
OpenAIProvider,
RuntimeConfig,
SystemMessage,
UserMessage,
)
from openarmature.observability.correlation import current_invocation_id
from openarmature.observability.langfuse import (
InMemoryLangfuseClient,
LangfuseObservation,
LangfuseObserver,
LangfuseTrace,
)
from openarmature.observability.otel import OTelObserver
# ---------------------------------------------------------------------------
# Provider (lazy-init)
# ---------------------------------------------------------------------------
_provider_instance: OpenAIProvider | None = None
def _get_provider() -> OpenAIProvider:
global _provider_instance
if _provider_instance is None:
_provider_instance = OpenAIProvider(
base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"),
model=os.environ.get("LLM_MODEL", "gpt-4o-mini"),
api_key=os.environ.get("LLM_API_KEY") or None,
)
return _provider_instance
# ---------------------------------------------------------------------------
# State
# ---------------------------------------------------------------------------
class BriefingState(State):
question: str
answer: str = ""
model_used: str = ""
# ---------------------------------------------------------------------------
# Queryable accumulator observer (per-invocation LLM token rollup)
# ---------------------------------------------------------------------------
# A third observer alongside the OTel + Langfuse pair. Its job is to
# accumulate per-invocation LLM token usage in memory so a terminal
# persist node can read the totals at request scope (rather than
# round-tripping every count through State). The Observer protocol is
# a single async callable; the accumulator adds its own read methods
# (``get_bucket`` / ``drop``) on the instance for the persist node to
# consume. Convention only; openarmature does not ship a base class
# for accumulators.
#
# The accumulator subscribes to every event but only records the
# typed ``LlmCompletionEvent`` variant (one event per successful LLM
# call), structured outcome fields read directly off the event without
# the namespace-string-match + payload-narrow dance older sentinel-
# based filters needed.
#
# Per-invocation isolation is by ``LlmCompletionEvent.invocation_id``,
# read directly off the event, no ContextVar lookup needed.
# Concurrent invocations on one observer each get their own bucket.
#
# Cache-stat tracking: the bucket also rolls up ``usage.cached_tokens``
# when the provider reports it. Backends with prefix caching (vLLM
# ``--enable-prefix-caching``, Anthropic prompt caching, OpenAI's
# ``prompt_token_usage`` cache report when enabled) populate the
# field; backends without cache visibility leave it ``None`` and the
# accumulator records zero. The persist node prints a cache-hit
# ratio so operators see whether prefix caching is paying off at
# request scope without having to cross-join Langfuse rows.
#
# ``LlmCompletionEvent`` is a success-only event. Failed LLM calls
# flow through the exception path and emit the parallel
# ``LlmFailedEvent`` variant (see ``LlmFailureTracker`` below), so
# ``bucket.call_count`` here reflects successful calls only. This is
# the right semantic for a usage accumulator (failed calls produce
# no tokens / cost).
@dataclass
class _UsageBucket:
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
cached_tokens: int = 0
call_count: int = 0
class LlmUsageAccumulator:
"""Per-invocation LLM token rollup."""
def __init__(self) -> None:
# Concurrent invocations on one observer each land in their
# own bucket. Production deployments with high concurrency
# would want an eviction policy on top to bound memory; the
# demo's persist node drops the bucket explicitly after read.
self._by_invocation: dict[str, _UsageBucket] = {}
async def __call__(self, event: ObserverEvent) -> None:
# Backstop cleanup: drop any leftover bucket on invocation
# completion. persist() normally drops first, but if its
# drain_events_for timed out, late-delivered LLM events
# could recreate the bucket via setdefault(). Dropping
# again here is idempotent.
if isinstance(event, InvocationCompletedEvent):
self._by_invocation.pop(event.invocation_id, None)
return
if not isinstance(event, LlmCompletionEvent):
return
# call_count tracks successful LLM calls (the typed event is
# emitted on success only). "Call happened" and "usage
# reported" are independent; a provider may legitimately omit
# usage on a successful call. Create the bucket and increment
# call_count unconditionally so the counter reflects all
# successful calls; gate only the token-counting math on usage
# being populated.
bucket = self._by_invocation.setdefault(event.invocation_id, _UsageBucket())
bucket.call_count += 1
# The typed event's usage field is nullable (it may be ``None``
# when the provider does not report usage). Python's provider
# always passes a Usage instance (with all-None fields when not
# reported), but the defensive guard keeps the accumulator
# robust against future providers that exercise the null
# option. Calls without reported usage contribute zero tokens
# (the only honest value we can record).
usage = event.usage
if usage is None:
return
if usage.prompt_tokens is not None:
bucket.prompt_tokens += usage.prompt_tokens
if usage.completion_tokens is not None:
bucket.completion_tokens += usage.completion_tokens
# Prefer the provider-reported total when present; otherwise
# derive from prompt + completion when at least one is known.
# A usage record with all three None (rare; provider didn't
# report counts at all) contributes zero, which is the only
# honest value we can record.
if usage.total_tokens is not None:
bucket.total_tokens += usage.total_tokens
elif usage.prompt_tokens is not None or usage.completion_tokens is not None:
bucket.total_tokens += (usage.prompt_tokens or 0) + (usage.completion_tokens or 0)
# Cache-stat fields are populated only when the provider
# reports them. Backends without prefix-cache visibility
# leave ``cached_tokens`` ``None`` and the bucket records
# zero; the persist node's cache-hit ratio degrades to 0%
# gracefully in that case.
if usage.cached_tokens is not None:
bucket.cached_tokens += usage.cached_tokens
# Consumers MUST synchronize on ``drain_events_for`` before
# calling ``get_bucket`` if completeness matters; without the
# drain the deliver loop may still hold pending events whose
# tokens have not been added yet. ``None`` is returned when
# nothing has been recorded yet (e.g., an invocation with no
# LLM calls).
def get_bucket(self, invocation_id: str) -> _UsageBucket | None:
"""Read the accumulated bucket for an invocation."""
return self._by_invocation.get(invocation_id)
# Bucket lifecycle is two-step. Fast path: a terminal node calls
# ``drop()`` immediately after reading via ``get_bucket()``;
# that's the normal case and runs while the invocation is still
# active. Backstop: the accumulator's ``__call__`` also drops
# any bucket still present when ``InvocationCompletedEvent``
# arrives. The backstop closes the leak where
# ``drain_events_for`` times out, the terminal node drops a
# stale bucket, then late-delivered LLM events ``setdefault()``
# a fresh bucket that nothing would otherwise clean up.
def drop(self, invocation_id: str) -> None:
"""Release the bucket for an invocation."""
self._by_invocation.pop(invocation_id, None)
# ---------------------------------------------------------------------------
# Failure tracker (per-invocation LLM error-category rollup)
# ---------------------------------------------------------------------------
# Parallel queryable observer for the failure path. Subscribes to
# ``LlmFailedEvent``, the typed counterpart to ``LlmCompletionEvent``,
# fired exactly once for every LLM call that fails with a provider
# error category. Every LLM call emits either one
# ``LlmCompletionEvent`` (success) or one ``LlmFailedEvent``
# (failure), never both, so the tracker can count attempt-level
# failures by category without joining against the success-side
# accumulator.
#
# This is the listener the success-side accumulator delegates to:
# ``LlmUsageAccumulator.bucket.call_count`` counts successful calls
# only. Operators wanting attempt-level failure rates (e.g. ``how
# often did this tenant's calls land on a rate-limited provider this
# hour?``) read the failure tracker's bucket alongside the usage
# accumulator's bucket and compute the ratio at request scope.
#
# Bucket shape is a per-category counter: ``{"provider_rate_limit": 2,
# "provider_unavailable": 1, ...}``. ``error_category`` is one of the
# nine canonical category strings, so the dict keys form a stable,
# greppable vocabulary across providers. ``error_type`` (vendor-
# specific code) and ``error_message`` are NOT recorded here; the
# tracker's job is rate / category attribution at request scope, not
# vendor-error forensics; that lives in the OTel + Langfuse spans
# where the full exception detail is captured.
@dataclass
class _FailureBucket:
# ``dict[category, count]`` keyed by the canonical
# ``error_category`` strings. Total attempts can be derived as
# ``sum(by_category.values())``; the tracker doesn't carry a
# separate counter for it.
by_category: dict[str, int]
@classmethod
def empty(cls) -> _FailureBucket:
return cls(by_category={})
class LlmFailureTracker:
"""Per-invocation LLM failure-category rollup."""
def __init__(self) -> None:
self._by_invocation: dict[str, _FailureBucket] = {}
async def __call__(self, event: ObserverEvent) -> None:
# Backstop cleanup mirrors LlmUsageAccumulator's pattern so
# late-delivered failure events after a partial drain
# cannot leak a bucket.
if isinstance(event, InvocationCompletedEvent):
self._by_invocation.pop(event.invocation_id, None)
return
if not isinstance(event, LlmFailedEvent):
return
bucket = self._by_invocation.setdefault(event.invocation_id, _FailureBucket.empty())
bucket.by_category[event.error_category] = bucket.by_category.get(event.error_category, 0) + 1
def get_bucket(self, invocation_id: str) -> _FailureBucket | None:
"""Read the accumulated failure bucket for an invocation."""
return self._by_invocation.get(invocation_id)
def drop(self, invocation_id: str) -> None:
"""Release the failure bucket for an invocation."""
self._by_invocation.pop(invocation_id, None)
# Module-level singletons make the persist node closure-free and
# match how ``_provider_instance`` is handled. In an application
# server, these would live on a request-scoped or app-scoped
# container instead.
_accumulator: LlmUsageAccumulator | None = None
_failure_tracker: LlmFailureTracker | None = None
_compiled_graph: CompiledGraph[BriefingState] | None = None
# ---------------------------------------------------------------------------
# Caller hooks for Langfuse trace.input / trace.output
# ---------------------------------------------------------------------------
# The LangfuseObserver lets callers derive domain-shaped trace.input
# and trace.output from State rather than letting the framework dump
# the raw State object. The hooks fire once per invocation:
# trace_input_from_state on InvocationStartedEvent,
# trace_output_from_state on InvocationCompletedEvent. Production
# teams use this to keep PII out of trace payloads while still
# surfacing the operational signal a Langfuse UI viewer needs.
def _trace_input(state: BriefingState) -> dict[str, Any]:
return {"question": state.question}
def _trace_output(state: BriefingState) -> dict[str, Any]:
return {"answer": state.answer, "model": state.model_used}
# ---------------------------------------------------------------------------
# TimingMiddleware on_complete callback
# ---------------------------------------------------------------------------
# The canonical TimingMiddleware (openarmature.graph.middleware) wraps
# a node's execution and dispatches a TimingRecord to this callback
# when the chain returns or raises. Keep callbacks fast: a slow
# callback adds to the apparent node duration since it fires inline
# before the chain's result returns to the engine.
#
# Production deployments queue the record into a metrics exporter
# (Prometheus pushgateway, StatsD, OTLP metrics) rather than print.
async def _emit_timing(record: TimingRecord) -> None:
extra = ""
if record.exception_category is not None:
extra = f" [category={record.exception_category}]"
print(f"[timing] {record.node_name}: {record.duration_ms:.1f}ms ({record.outcome}){extra}")
# ---------------------------------------------------------------------------
# Node
# ---------------------------------------------------------------------------
async def respond(state: BriefingState) -> dict[str, Any]:
"""Single LLM call that answers the briefing question."""
response = await _get_provider().complete(
[
SystemMessage(
content=(
"You are a lunar-mission expert assistant. Answer "
"questions about Apollo and Artemis missions concisely "
"and factually. Keep responses to two or three sentences."
),
),
UserMessage(content=state.question),
],
config=RuntimeConfig(temperature=0.0, max_tokens=300),
)
return {
"answer": response.message.content or "",
# ``response.response_model`` is the version-specific
# identifier the provider echoes back on the response body
# (e.g., ``gpt-4o-mini-2024-07-18``); falling back to
# "unknown" guards against providers that omit the field.
"model_used": response.response_model or "unknown",
}
# Terminal node. State is intentionally unused: this node's job is
# to synchronize on the observer deliver loop and report a derived
# rollup, not to read or modify pipeline state.
#
# ``drain_events_for`` blocks until every event dispatched up to this
# point has reached every attached observer. Without it the
# accumulator's bucket may still be missing the most-recent LLM
# event's tokens, since the deliver loop hasn't processed them yet
# when the node body runs. Snapshot semantic: the drain awaits only
# events dispatched BEFORE the call (this node's own ``started``
# event included), not events that fire after the call begins
# (notably this node's own ``completed`` event, which only fires
# after the body returns; that's how the call avoids deadlocking
# on itself).
#
# Default timeout is 5.0s; the demo tightens to 2.0s so a stuck
# observer surfaces fast. Production teams pick the threshold against
# their observer SLO. Returning a timeout summary instead of raising
# lets the caller record an SLO breach and proceed with whatever
# data is available, rather than failing the whole invocation.
async def persist(_state: BriefingState) -> dict[str, Any]:
"""Drain the deliver loop, read the LLM-usage + failure rollups, drop the buckets."""
# Use explicit RuntimeError rather than ``assert`` so the failure
# mode stays informative under ``python -O`` (which strips asserts
# and would otherwise turn these into silent ``None`` dereferences
# at the next attribute access).
if _compiled_graph is None or _accumulator is None or _failure_tracker is None:
raise RuntimeError(
"persist node requires _compiled_graph, _accumulator, and _failure_tracker "
"to be set before invoke(); see build_graph() for the initialization pattern"
)
invocation_id = current_invocation_id()
if invocation_id is None:
raise RuntimeError("persist node called outside an active invocation")
summary = await _compiled_graph.drain_events_for(invocation_id, timeout=2.0)
if summary.timeout_reached:
# Production: emit an SLO-breach metric. Demo: surface the
# gap inline so a reader sees what an incomplete drain looks
# like.
print(f"[persist] drain incomplete: {summary.undelivered_count} events still pending after 2.0s")
# Read both buckets up front so the drop calls run in pairs and
# neither bucket leaks if a later print raises.
usage_bucket = _accumulator.get_bucket(invocation_id)
failure_bucket = _failure_tracker.get_bucket(invocation_id)
_accumulator.drop(invocation_id)
_failure_tracker.drop(invocation_id)
# In production, this is where you'd write the canonical
# invocation artifact to durable storage: a JSON record with the
# answer + per-invocation token cost + cache-hit ratio + failure
# category counts + caller metadata + trace IDs for cross-system
# join. The demo prints the rollups so the pattern is legible.
if usage_bucket is None:
print("[persist] no LLM usage recorded for this invocation")
else:
# Cache-hit ratio is ``cached_tokens / prompt_tokens`` when the
# provider reports cache stats. Backends without cache
# visibility report ``cached_tokens=0``; the ratio degrades
# to 0% gracefully without special-casing.
if usage_bucket.prompt_tokens > 0:
cache_hit_pct = (usage_bucket.cached_tokens / usage_bucket.prompt_tokens) * 100
else:
cache_hit_pct = 0.0
print(
f"[persist] LLM usage: prompt={usage_bucket.prompt_tokens} "
f"(cached={usage_bucket.cached_tokens}, {cache_hit_pct:.1f}% hit), "
f"completion={usage_bucket.completion_tokens}, "
f"total={usage_bucket.total_tokens} "
f"across {usage_bucket.call_count} call(s)"
)
if failure_bucket is None or not failure_bucket.by_category:
print("[persist] LLM failures: none")
else:
# Sort by count descending so the noisiest category leads.
ordered = sorted(failure_bucket.by_category.items(), key=lambda kv: (-kv[1], kv[0]))
summary_str = ", ".join(f"{cat}={n}" for cat, n in ordered)
print(f"[persist] LLM failures: {summary_str}")
return {}
# ---------------------------------------------------------------------------
# Graph
# ---------------------------------------------------------------------------
def build_graph() -> CompiledGraph[BriefingState]:
"""Two-node graph: respond -> persist -> END.
TimingMiddleware wraps the respond node so wall-clock duration
is captured per call. The persist node runs synchronously after
respond returns; it drains the deliver loop for the current
invocation, reads the LLM-usage accumulator's bucket, drops the
bucket, and prints a cost summary. No other middleware
(RetryMiddleware lives in the fan-out-with-retry / parallel-
branches examples; this one's scope is observability).
The ``LlmUsageAccumulator`` and ``LlmFailureTracker`` are
constructed, attached to the compiled graph, and registered to
the module-level singletons so the persist node (which reads
from globals to stay closure-free) can reach them without help
from the caller. The factory is self-contained: ``graph =
build_graph(); await graph.invoke(...)`` works on its own. OTel
+ Langfuse observers are NOT attached here; those are
observability-stack choices made at the call site, and
``main()`` attaches them after build_graph() returns.
"""
global _accumulator, _failure_tracker, _compiled_graph
timing = TimingMiddleware(node_name="respond", on_complete=_emit_timing)
_accumulator = LlmUsageAccumulator()
_failure_tracker = LlmFailureTracker()
_compiled_graph = (
GraphBuilder(BriefingState)
.add_node("respond", respond, middleware=[timing])
.add_node("persist", persist)
.add_edge("respond", "persist")
.add_edge("persist", END)
.set_entry("respond")
.compile()
)
_compiled_graph.attach_observer(_accumulator)
_compiled_graph.attach_observer(_failure_tracker)
return _compiled_graph
# ---------------------------------------------------------------------------
# Observer wiring (dual)
# ---------------------------------------------------------------------------
# Both observers consume the same NodeEvent stream independently.
# The Langfuse observer uses an in-memory client so the demo can
# print the captured Trace tree at the end without a real Langfuse
# account. The OTel observer uses an in-memory span exporter for
# symmetric reasons; production code attaches a BatchSpanProcessor
# with a real OTLP exporter pointed at HyperDX / Honeycomb / Tempo /
# any OTLP-compatible backend.
#
# Caller hooks attach to LangfuseObserver via constructor kwargs.
# ``disable_provider_payload=False`` opts in to capturing the input
# messages + output content on Generation observations so the demo
# output is meaningful; the default-True is the privacy-preserving
# setting.
def _build_otel_observer(exporter: InMemorySpanExporter) -> OTelObserver:
# ``disable_provider_payload=False`` opts in to capturing input messages
# + output content on the LLM-call span (same flag the Langfuse
# observer below flips for the same reason). The example's whole
# point is showing both backends seeing the same logical events;
# leaving them asymmetric (Langfuse captures the conversation, OTel
# doesn't) would undercut that. Default-True is the privacy
# posture for general use; flip it deliberately when the operator
# has audited the payload PII risk.
return OTelObserver(
span_processor=SimpleSpanProcessor(exporter),
resource=Resource.create({"service.name": "openarmature-production-observability"}),
disable_provider_payload=False,
)
def _build_langfuse_observer(client: InMemoryLangfuseClient) -> LangfuseObserver:
return LangfuseObserver(
client=client,
disable_provider_payload=False,
trace_input_from_state=_trace_input,
trace_output_from_state=_trace_output,
)
# ---------------------------------------------------------------------------
# Captured-output rendering
# ---------------------------------------------------------------------------
# Production traces land in Honeycomb / HyperDX / Tempo / Phoenix /
# Langfuse Cloud where the backend UI does the rendering work. For
# the in-process demo we walk the captured records and pretty-print
# enough of the shape that a reader can see what each backend would
# have ingested.
# Invocation-level attributes. Surface these only on the root
# ``openarmature.invocation`` span line; inner spans don't carry them
# (they're invocation-level constants, not cross-cutting per-node
# attributes).
_INVOCATION_SPAN_KEYS = (
"openarmature.graph.entry_node",
"openarmature.graph.spec_version",
"openarmature.implementation.name",
"openarmature.implementation.version",
)
# Per-node + cross-cutting attributes. The ``gen_ai.*`` family
# follows the OpenTelemetry GenAI semantic conventions. Surface
# these on inner-node spans only; they propagate to the invocation
# span too but showing them there is redundant once they appear on
# every node line below.
#
# ``openarmature.llm.cache_read.input_tokens`` is the OA-namespace
# cache-stat attribute. Lands on the LLM span only and only when the
# provider reports cache hits. Backends with prefix caching (vLLM
# ``--enable-prefix-caching``, Anthropic prompt caching, OpenAI's
# ``prompt_token_usage`` cache report when enabled) populate it;
# OpenAI's default ``gpt-4o-mini`` configuration leaves it absent.
# The formatter omits the entry when absent rather than showing
# ``None`` so a reader instantly sees whether prefix caching is
# paying off in the observed run.
_INNER_SPAN_KEYS = (
"openarmature.node.name",
"openarmature.user.tenantId",
"openarmature.user.requestId",
"openarmature.user.featureFlag",
"gen_ai.system",
"gen_ai.usage.input_tokens",
"gen_ai.usage.output_tokens",
"openarmature.llm.cache_read.input_tokens",
)
def _format_otel_spans(spans: list[ReadableSpan]) -> str:
"""One line per span: name, duration, key attributes.
The ``openarmature.invocation`` root span closes on observer
``shutdown()`` and surfaces only its invocation-level
attributes (entry_node, spec_version, implementation name +
version). Inner-node spans surface the cross-cutting caller
metadata + ``gen_ai.*`` attributes; printing them on the
invocation line too would just repeat data shown three more
times below.
"""
if not spans:
return " (no spans captured)"
lines: list[str] = []
# Sort by start time so the timeline reads naturally.
spans_sorted = sorted(spans, key=lambda s: s.start_time or 0)
for span in spans_sorted:
attrs = span.attributes or {}
keys = _INVOCATION_SPAN_KEYS if span.name == "openarmature.invocation" else _INNER_SPAN_KEYS
relevant = {k: v for k in keys if (v := attrs.get(k)) is not None}
duration_ms = 0.0
if span.start_time is not None and span.end_time is not None:
duration_ms = (span.end_time - span.start_time) / 1_000_000.0
attr_str = ", ".join(f"{k}={v!r}" for k, v in relevant.items())
lines.append(f" [{span.name}] {duration_ms:.1f}ms {attr_str}")
return "\n".join(lines)
def _format_langfuse_trace(trace: LangfuseTrace) -> str:
"""Pretty-print the captured Trace + Observation tree.
Mirrors what the Langfuse production UI renders for the same
invocation: trace.input / trace.output (sourced via the caller
hooks), top-level metadata (caller-supplied + framework-reserved
keys), and the Observation tree underneath.
"""
lines: list[str] = []
lines.append(f"Trace id={trace.id}")
lines.append(f" name={trace.name!r}")
lines.append(f" input={trace.input!r}")
lines.append(f" output={trace.output!r}")
lines.append(f" metadata={trace.metadata!r}")
for obs in trace.children_of(None):
_format_observation(lines, trace, obs, indent=" ")
return "\n".join(lines)
def _format_observation(
lines: list[str], trace: LangfuseTrace, obs: LangfuseObservation, indent: str
) -> None:
lines.append(f"{indent}[{obs.type}] {obs.name!r}")
if obs.input is not None:
lines.append(f"{indent} input={obs.input!r}")
if obs.output is not None:
lines.append(f"{indent} output={obs.output!r}")
if obs.model is not None:
lines.append(f"{indent} model={obs.model!r}")
if obs.usage is not None:
lines.append(f"{indent} usage={obs.usage!r}")
for child in trace.children_of(obs.id):
_format_observation(lines, trace, child, indent=indent + " ")
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
async def main() -> None:
# Pass a question on the command line to override the default,
# e.g. ``... main.py "When did Apollo 17 splash down?"``.
question = " ".join(sys.argv[1:]) or "What was the primary objective of Apollo 11?"
# In-memory captures so the demo can print what BOTH backends
# would have seen. Swap with production exporters / clients
# without touching node or graph code.
span_exporter = InMemorySpanExporter()
langfuse_client = InMemoryLangfuseClient()
# build_graph() owns the accumulator construction + attachment
# plus the module-level singleton wiring. main() attaches the
# observability-stack observers on top.
graph = build_graph()
# Keep the OTel observer reachable so we can ``shutdown()`` it
# after drain; the root ``openarmature.invocation`` span only
# closes on shutdown, and the in-memory exporter only surfaces
# closed spans through ``get_finished_spans()``. Production
# deployments do the same dance at process exit.
otel_observer = _build_otel_observer(span_exporter)
graph.attach_observer(otel_observer)
graph.attach_observer(_build_langfuse_observer(langfuse_client))
# Caller-supplied multi-tenant metadata. Both observers pick
# the entries up: OTel attaches them as ``openarmature.user.*``
# span attributes; Langfuse merges them as top-level
# ``trace.metadata`` keys plus per-observation metadata.
metadata = {
"tenantId": "demo-acme",
"requestId": str(uuid.uuid4()),
"featureFlag": "v2-canary",
}
print("=== openarmature production-observability demo ===")
print(f"question: {question}")
print(f"tenant id: {metadata['tenantId']}")
print(f"request id: {metadata['requestId']}")
print(f"feature flag: {metadata['featureFlag']}")
print()
# Caller-side error boundary at the invoke() seam. ``exc.__cause__``
# carries the underlying error via Python's standard exception
# chain; an ``LlmProviderError`` surfaces its canonical
# ``.category`` string (``provider_rate_limit``,
# ``provider_invalid_request``, etc.) so the failure mode is
# immediately greppable. Both observers still capture what they
# saw (the captures get printed below) so a reader sees how each
# backend records a failed invocation. Production code would
# either retry transient categories via ``RetryMiddleware`` on
# the node, fallback inside the node body, or surface the
# category to the caller as the example does here.
final: BriefingState | None = None
try:
final = await graph.invoke(
BriefingState(question=question),
metadata=metadata,
)
except NodeException as exc:
cause = exc.__cause__
if isinstance(cause, LlmProviderError):
category = cause.category
else:
category = type(cause).__name__ if cause is not None else "<unknown>"
print()
print(f"*** node {exc.node_name!r} failed ({category}): {cause} ***")
print()
finally:
# drain() is required for short-lived processes: invoke()
# returns when the graph reaches END regardless of whether
# the observer queue has finished draining. shutdown() on
# the OTel observer closes the root ``openarmature.invocation``
# span so it lands in the exporter alongside the per-node
# spans; the Langfuse observer has no analog because it
# writes Trace + Observation entities synchronously through
# the client.
await graph.drain()
otel_observer.shutdown()
await _get_provider().aclose()
if final is not None:
print(f"answer: {final.answer}")
print(f"model: {final.model_used}")
print()
# OTel side: pretty-print the captured spans timeline so a
# reader can see what an OTLP backend would have ingested.
print("--- captured OTel spans ---")
print(_format_otel_spans(list(span_exporter.get_finished_spans())))
print()
# Langfuse side: pretty-print the Trace + Observation tree.
# Exactly one Trace per invocation (the observer opens it on
# the first node event; trace.id equals the invocation_id so
# cross-system lookups land directly).
print("--- captured Langfuse trace ---")
assert len(langfuse_client.traces) == 1, f"expected exactly one trace, got {len(langfuse_client.traces)}"
trace = next(iter(langfuse_client.traces.values()))
print(_format_langfuse_trace(trace))
if __name__ == "__main__":
asyncio.run(main())