Skip to content

Commit eb80430

Browse files
Add public force_flush() to OTel and Langfuse observers (#83)
* Add public force_flush() to OTel and Langfuse observers Downstream users reach into observer._provider directly to drain the BatchSpanProcessor's outbound buffer in fast-teardown harnesses (serverless functions, CLI one-shots, FastAPI TestClient teardown). Expose force_flush(timeout_ms) as a stable surface instead. OTelObserver.force_flush wraps self._provider.force_flush. Distinct from CompiledGraph.drain, which covers the engine's observer-event queue; force_flush covers the SpanProcessor export buffer. LangfuseObserver.force_flush delegates to LangfuseClient.force_flush, a new Protocol method. InMemoryLangfuseClient implements it as a no-op (no buffer); LangfuseSDKAdapter delegates to the SDK's flush(). * Tighten force_flush contract docs; add adapter test Narrow the LangfuseClient.force_flush Protocol contract to call out that deadline honor is best-effort: SDK adapters wrapping clients without timeout-propagation surfaces (the v4 Langfuse SDK) may ignore timeout_ms and rely on the SDK's internal defaults. Tighten the LangfuseSDKAdapter.force_flush docstring to make the synchronous-but-not-deadline-honoring behavior explicit (the v4 SDK flush IS synchronous — it blocks on tracer_provider.force_flush plus score/media queue joins — but takes no timeout parameter and discards the OTel return value). Rewrite a stale comment in the adapter integration test that wrongly claimed flush() was async — it isn't. Add test_adapter_force_flush_delegates_to_client that uses Mock(wraps=...) to verify the adapter invokes the wrapped SDK's flush() and returns True.
1 parent 2b4bc1b commit eb80430

7 files changed

Lines changed: 153 additions & 7 deletions

File tree

src/openarmature/observability/langfuse/adapter.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,27 @@ def generation(
328328
)
329329
return _SpanHandle(obs)
330330

331+
def force_flush(self, timeout_ms: int = 30_000) -> bool:
332+
"""Best-effort flush of the underlying Langfuse client.
333+
334+
``timeout_ms`` is accepted for Protocol compatibility but
335+
**ignored**: the v4 Langfuse SDK's ``flush()`` method takes
336+
no timeout parameter and discards the underlying
337+
``TracerProvider.force_flush()`` return value. The call is
338+
nonetheless synchronous — internally ``flush()`` waits on
339+
OTel's ``force_flush`` (default 30 s) and then ``.join()`` on
340+
the SDK's score and media ingestion queues — so by the time
341+
we return the OTel batch processor and ingestion queues have
342+
either drained or hit the SDK's internal default deadlines.
343+
344+
Returns ``True`` once the SDK call completes without raising;
345+
a tight-deadline caller should pair this with its own
346+
wall-clock guard rather than relying on the return value.
347+
"""
348+
del timeout_ms
349+
self._client.flush()
350+
return True
351+
331352
def _start_observation(
332353
self,
333354
*,

src/openarmature/observability/langfuse/client.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,28 @@ def generation(
217217
prompt: Any = None,
218218
) -> LangfuseGenerationHandle: ...
219219

220+
def force_flush(self, timeout_ms: int = 30_000) -> bool:
221+
"""Flush any pending outbound buffer in the underlying sink.
222+
223+
Returns ``True`` when the flush completes within the deadline,
224+
``False`` otherwise. The semantics mirror OTel's
225+
``TracerProvider.force_flush``: cover the export-buffer half
226+
of fast-teardown races. The bundled
227+
:class:`InMemoryLangfuseClient` has no buffer and returns
228+
``True`` immediately; SDK adapters delegate to the underlying
229+
client's flush.
230+
231+
**Deadline honor is best-effort.** Adapters wrapping SDKs
232+
that don't expose a timeout-propagation surface (the v4
233+
Langfuse SDK is one such case — its ``flush()`` blocks on the
234+
SDK's own internal defaults) may ignore ``timeout_ms`` and
235+
return ``True`` once the underlying call returns. Callers
236+
with a hard deadline should layer their own wall-clock guard
237+
around this method rather than relying solely on the return
238+
value.
239+
"""
240+
...
241+
220242

221243
# Concrete in-memory implementation ---------------------------------------
222244
# Used by tests and the conformance harness. Stores everything the
@@ -407,6 +429,13 @@ def generation(
407429
trace.observations.append(observation)
408430
return _InMemoryGenerationHandle(observation=observation)
409431

432+
def force_flush(self, timeout_ms: int = 30_000) -> bool:
433+
# In-memory recorder has no outbound buffer; every observation
434+
# is captured synchronously on its create call. The ``timeout_ms``
435+
# parameter is accepted for Protocol compatibility but unused.
436+
del timeout_ms
437+
return True
438+
410439
def _get_trace(self, trace_id: str) -> LangfuseTrace:
411440
trace = self.traces.get(trace_id)
412441
if trace is None:

src/openarmature/observability/langfuse/observer.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,24 @@ def close_invocation(self, invocation_id: str) -> None:
741741
):
742742
self._close_subgraph_observation(inv_state, prefix)
743743

744+
def force_flush(self, timeout_ms: int = 30_000) -> bool:
745+
"""Flush pending observations through the underlying client.
746+
747+
Returns ``True`` when the client's flush completes within the
748+
deadline, ``False`` otherwise. Mirrors the OTel observer's
749+
``force_flush`` surface — distinct from
750+
:meth:`~openarmature.graph.compiled.CompiledGraph.drain` (which
751+
covers the engine's observer-event queue): this method covers
752+
the outbound buffer of the Langfuse client (the SDK's OTel
753+
BatchSpanProcessor when wrapped via :class:`LangfuseSDKAdapter`).
754+
755+
Useful in fast-teardown harnesses (CLI one-shots, serverless
756+
functions, ASGI lifespan shutdown) where the SDK's
757+
BatchSpanProcessor export thread would otherwise be cut off
758+
before its buffer drains.
759+
"""
760+
return self.client.force_flush(timeout_ms=timeout_ms)
761+
744762
def shutdown(self) -> None:
745763
"""Drain every in-flight invocation. Use for long-lived
746764
observers shared across requests; CLI / one-shot processes

src/openarmature/observability/otel/observer.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1445,6 +1445,32 @@ def _close_invocation_span(self, invocation_id: str) -> None:
14451445
self._run_enrichers(open_span.span, None)
14461446
open_span.span.end()
14471447

1448+
def force_flush(self, timeout_ms: int = 30_000) -> bool:
1449+
"""Flush any pending spans through every registered span processor.
1450+
1451+
Returns ``True`` when all processors finish flushing within the
1452+
deadline, ``False`` otherwise. Wraps the underlying OTel
1453+
:class:`TracerProvider`'s ``force_flush`` so callers don't have
1454+
to reach into the private ``_provider`` attribute.
1455+
1456+
**When to call.** Distinct from :meth:`drain` on
1457+
:class:`~openarmature.graph.compiled.CompiledGraph` (which covers
1458+
the engine's observer-event queue): this method covers the
1459+
outbound span-export buffer of each registered
1460+
:class:`SpanProcessor`. Under fast or unusual teardown
1461+
orderings (FastAPI ``TestClient`` teardown, CLI one-shots,
1462+
serverless functions) the :class:`BatchSpanProcessor`'s export
1463+
thread can be cut off before its buffer drains; calling
1464+
``force_flush()`` from a ``finally`` block right before process
1465+
exit is the canonical hardening.
1466+
1467+
The default 30 s ``timeout_ms`` matches the OTel SDK's own
1468+
default. Pass a smaller value when running under a hard
1469+
deadline (a serverless function's max execution time, an
1470+
ASGI lifespan timeout).
1471+
"""
1472+
return self._provider.force_flush(timeout_millis=timeout_ms)
1473+
14481474
def shutdown(self) -> None:
14491475
"""Close any still-open spans across all in-flight invocations
14501476
and shut down the underlying provider. Each per-invocation
@@ -1459,8 +1485,7 @@ def shutdown(self) -> None:
14591485
thread finishes), the flush may not complete in time and spans
14601486
can appear dropped. Workarounds:
14611487
1462-
- Call ``observer._provider.force_flush(timeout_millis=…)``
1463-
explicitly before this method.
1488+
- Call :meth:`force_flush` explicitly before this method.
14641489
- Use :class:`SimpleSpanProcessor` instead of
14651490
:class:`BatchSpanProcessor` in tests; it exports synchronously
14661491
and is unaffected by teardown timing.

tests/unit/test_observability_langfuse.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,25 @@ def test_in_memory_recorder_observation_id_is_unique_per_recorder() -> None:
108108
assert a.id != b.id
109109

110110

111+
def test_observer_force_flush_delegates_to_client() -> None:
112+
# LangfuseObserver.force_flush() calls into the client; the
113+
# InMemoryLangfuseClient's force_flush is a no-op that returns
114+
# True, so this just verifies the delegation wires correctly.
115+
client = InMemoryLangfuseClient()
116+
observer = LangfuseObserver(client=client)
117+
assert observer.force_flush() is True
118+
assert observer.force_flush(timeout_ms=1000) is True
119+
120+
121+
def test_in_memory_recorder_force_flush_is_no_op() -> None:
122+
# In-memory recorder has no outbound buffer; force_flush returns
123+
# True immediately. The timeout_ms parameter is accepted for
124+
# Protocol compatibility but unused.
125+
client = InMemoryLangfuseClient()
126+
assert client.force_flush() is True
127+
assert client.force_flush(timeout_ms=5000) is True
128+
129+
111130
def test_in_memory_recorder_children_of_walks_parent_links() -> None:
112131
client = InMemoryLangfuseClient()
113132
client.trace(id="t1")

tests/unit/test_observability_langfuse_adapter.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,26 @@ def test_adapter_update_trace_merges_into_cache() -> None:
114114
assert cached["metadata"] == {"key1": "v1", "key2": "v2"}
115115

116116

117+
def test_adapter_force_flush_delegates_to_client() -> None:
118+
# force_flush() must invoke the wrapped SDK's flush() so callers
119+
# in fast-teardown harnesses get the SDK's internal drain
120+
# (OTel TracerProvider.force_flush + score/media queue joins).
121+
# Mock(wraps=...) lets us assert delegation without simulating
122+
# the SDK's full surface.
123+
from unittest.mock import Mock
124+
125+
real_client = _dummy_client()
126+
wrapped = Mock(wraps=real_client)
127+
adapter = LangfuseSDKAdapter(wrapped)
128+
129+
assert adapter.force_flush() is True
130+
wrapped.flush.assert_called_once_with()
131+
132+
# timeout_ms is accepted but unused per the documented contract.
133+
assert adapter.force_flush(timeout_ms=1_000) is True
134+
assert wrapped.flush.call_count == 2
135+
136+
117137
# ---------------------------------------------------------------------------
118138
# Integration test against real Langfuse Cloud (opt-in)
119139
# ---------------------------------------------------------------------------
@@ -173,11 +193,12 @@ async def test_adapter_against_real_langfuse_cloud() -> None:
173193
await graph.invoke(_S())
174194
await graph.drain()
175195
observer.shutdown()
176-
# ``client.shutdown()`` is the synchronous drain — flush() returns
177-
# immediately while the OTel BatchSpanProcessor exports in
178-
# background, and the test process exits before that finishes.
179-
# shutdown() blocks until all spans are exported (or the
180-
# exporter's shutdown timeout elapses).
196+
# Use ``client.shutdown()`` rather than ``client.flush()`` here:
197+
# both block on the SDK's internal drain (OTel's force_flush plus
198+
# the score/media queue joins), but shutdown() also tears down
199+
# the resource manager so the test process exits cleanly. flush()
200+
# is the appropriate call from a long-lived process that wants
201+
# to drain without releasing SDK resources.
181202
client.shutdown()
182203
# Manual check: open the trace in the dashboard and confirm
183204
# "step_a" + "step_b" appear as Span observations under one Trace.

tests/unit/test_observability_otel.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,3 +1382,16 @@ async def ask_llm(_s: _S) -> dict[str, str]:
13821382
assert attrs.get("openarmature.prompt.label") == "production"
13831383
assert attrs.get("openarmature.prompt.template_hash") == "sha256:tpl"
13841384
assert attrs.get("openarmature.prompt.rendered_hash") == "sha256:rendered"
1385+
1386+
1387+
def test_force_flush_delegates_to_provider() -> None:
1388+
# Public force_flush wraps TracerProvider.force_flush so downstream
1389+
# users don't reach into observer._provider to drain the
1390+
# BatchSpanProcessor buffer in fast-teardown harnesses.
1391+
exporter = InMemorySpanExporter()
1392+
observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter))
1393+
try:
1394+
assert observer.force_flush() is True
1395+
assert observer.force_flush(timeout_ms=1000) is True
1396+
finally:
1397+
observer.shutdown()

0 commit comments

Comments
 (0)