From 309d10b9ab36d8fb7c2056d4f80a6d026b896e2d Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 15:02:47 -0700 Subject: [PATCH 1/2] Add public force_flush() to OTel and Langfuse observers Downstream users reach into observer._provider directly to drain the BatchSpanProcessor's outbound buffer in fast-teardown harnesses (serverless functions, CLI one-shots, FastAPI TestClient teardown). Expose force_flush(timeout_ms) as a stable surface instead. OTelObserver.force_flush wraps self._provider.force_flush. Distinct from CompiledGraph.drain, which covers the engine's observer-event queue; force_flush covers the SpanProcessor export buffer. LangfuseObserver.force_flush delegates to LangfuseClient.force_flush, a new Protocol method. InMemoryLangfuseClient implements it as a no-op (no buffer); LangfuseSDKAdapter delegates to the SDK's flush(). --- .../observability/langfuse/adapter.py | 10 +++++++ .../observability/langfuse/client.py | 20 +++++++++++++ .../observability/langfuse/observer.py | 18 ++++++++++++ .../observability/otel/observer.py | 29 +++++++++++++++++-- tests/unit/test_observability_langfuse.py | 19 ++++++++++++ tests/unit/test_observability_otel.py | 13 +++++++++ 6 files changed, 107 insertions(+), 2 deletions(-) diff --git a/src/openarmature/observability/langfuse/adapter.py b/src/openarmature/observability/langfuse/adapter.py index f136b8f..23ee4f5 100644 --- a/src/openarmature/observability/langfuse/adapter.py +++ b/src/openarmature/observability/langfuse/adapter.py @@ -328,6 +328,16 @@ def generation( ) return _SpanHandle(obs) + def force_flush(self, timeout_ms: int = 30_000) -> bool: + # The v4 SDK's flush() returns ``None`` synchronously but + # internally waits for the OTel BatchSpanProcessor to drain. + # No timeout parameter is exposed on the SDK call, so we + # ignore ``timeout_ms`` here and rely on the SDK's own + # default. Returns True if flush() completes without raising. + del timeout_ms + self._client.flush() + return True + def _start_observation( self, *, diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py index 0685387..0dd4394 100644 --- a/src/openarmature/observability/langfuse/client.py +++ b/src/openarmature/observability/langfuse/client.py @@ -217,6 +217,19 @@ def generation( prompt: Any = None, ) -> LangfuseGenerationHandle: ... + def force_flush(self, timeout_ms: int = 30_000) -> bool: + """Flush any pending outbound buffer in the underlying sink. + + Returns ``True`` when the flush completes within the deadline, + ``False`` otherwise. The semantics mirror OTel's + ``TracerProvider.force_flush``: cover the export-buffer half + of fast-teardown races. The bundled + :class:`InMemoryLangfuseClient` has no buffer and returns + ``True`` immediately; SDK adapters delegate to the underlying + client's flush. + """ + ... + # Concrete in-memory implementation --------------------------------------- # Used by tests and the conformance harness. Stores everything the @@ -407,6 +420,13 @@ def generation( trace.observations.append(observation) return _InMemoryGenerationHandle(observation=observation) + def force_flush(self, timeout_ms: int = 30_000) -> bool: + # In-memory recorder has no outbound buffer; every observation + # is captured synchronously on its create call. The ``timeout_ms`` + # parameter is accepted for Protocol compatibility but unused. + del timeout_ms + return True + def _get_trace(self, trace_id: str) -> LangfuseTrace: trace = self.traces.get(trace_id) if trace is None: diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 3654148..1759f32 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -741,6 +741,24 @@ def close_invocation(self, invocation_id: str) -> None: ): self._close_subgraph_observation(inv_state, prefix) + def force_flush(self, timeout_ms: int = 30_000) -> bool: + """Flush pending observations through the underlying client. + + Returns ``True`` when the client's flush completes within the + deadline, ``False`` otherwise. Mirrors the OTel observer's + ``force_flush`` surface — distinct from + :meth:`~openarmature.graph.compiled.CompiledGraph.drain` (which + covers the engine's observer-event queue): this method covers + the outbound buffer of the Langfuse client (the SDK's OTel + BatchSpanProcessor when wrapped via :class:`LangfuseSDKAdapter`). + + Useful in fast-teardown harnesses (CLI one-shots, serverless + functions, ASGI lifespan shutdown) where the SDK's + BatchSpanProcessor export thread would otherwise be cut off + before its buffer drains. + """ + return self.client.force_flush(timeout_ms=timeout_ms) + def shutdown(self) -> None: """Drain every in-flight invocation. Use for long-lived observers shared across requests; CLI / one-shot processes diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 54f926b..6fa7d46 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -1445,6 +1445,32 @@ def _close_invocation_span(self, invocation_id: str) -> None: self._run_enrichers(open_span.span, None) open_span.span.end() + def force_flush(self, timeout_ms: int = 30_000) -> bool: + """Flush any pending spans through every registered span processor. + + Returns ``True`` when all processors finish flushing within the + deadline, ``False`` otherwise. Wraps the underlying OTel + :class:`TracerProvider`'s ``force_flush`` so callers don't have + to reach into the private ``_provider`` attribute. + + **When to call.** Distinct from :meth:`drain` on + :class:`~openarmature.graph.compiled.CompiledGraph` (which covers + the engine's observer-event queue): this method covers the + outbound span-export buffer of each registered + :class:`SpanProcessor`. Under fast or unusual teardown + orderings (FastAPI ``TestClient`` teardown, CLI one-shots, + serverless functions) the :class:`BatchSpanProcessor`'s export + thread can be cut off before its buffer drains; calling + ``force_flush()`` from a ``finally`` block right before process + exit is the canonical hardening. + + The default 30 s ``timeout_ms`` matches the OTel SDK's own + default. Pass a smaller value when running under a hard + deadline (a serverless function's max execution time, an + ASGI lifespan timeout). + """ + return self._provider.force_flush(timeout_millis=timeout_ms) + def shutdown(self) -> None: """Close any still-open spans across all in-flight invocations and shut down the underlying provider. Each per-invocation @@ -1459,8 +1485,7 @@ def shutdown(self) -> None: thread finishes), the flush may not complete in time and spans can appear dropped. Workarounds: - - Call ``observer._provider.force_flush(timeout_millis=…)`` - explicitly before this method. + - Call :meth:`force_flush` explicitly before this method. - Use :class:`SimpleSpanProcessor` instead of :class:`BatchSpanProcessor` in tests; it exports synchronously and is unaffected by teardown timing. diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index 7553001..7fe0ad9 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -108,6 +108,25 @@ def test_in_memory_recorder_observation_id_is_unique_per_recorder() -> None: assert a.id != b.id +def test_observer_force_flush_delegates_to_client() -> None: + # LangfuseObserver.force_flush() calls into the client; the + # InMemoryLangfuseClient's force_flush is a no-op that returns + # True, so this just verifies the delegation wires correctly. + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + assert observer.force_flush() is True + assert observer.force_flush(timeout_ms=1000) is True + + +def test_in_memory_recorder_force_flush_is_no_op() -> None: + # In-memory recorder has no outbound buffer; force_flush returns + # True immediately. The timeout_ms parameter is accepted for + # Protocol compatibility but unused. + client = InMemoryLangfuseClient() + assert client.force_flush() is True + assert client.force_flush(timeout_ms=5000) is True + + def test_in_memory_recorder_children_of_walks_parent_links() -> None: client = InMemoryLangfuseClient() client.trace(id="t1") diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index 106725d..8f18cfd 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -1382,3 +1382,16 @@ async def ask_llm(_s: _S) -> dict[str, str]: assert attrs.get("openarmature.prompt.label") == "production" assert attrs.get("openarmature.prompt.template_hash") == "sha256:tpl" assert attrs.get("openarmature.prompt.rendered_hash") == "sha256:rendered" + + +def test_force_flush_delegates_to_provider() -> None: + # Public force_flush wraps TracerProvider.force_flush so downstream + # users don't reach into observer._provider to drain the + # BatchSpanProcessor buffer in fast-teardown harnesses. + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + try: + assert observer.force_flush() is True + assert observer.force_flush(timeout_ms=1000) is True + finally: + observer.shutdown() From 4811f348349a6c50af3f6ea3aeaf6fc0ed96b35a Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 27 May 2026 16:07:17 -0700 Subject: [PATCH 2/2] Tighten force_flush contract docs; add adapter test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Narrow the LangfuseClient.force_flush Protocol contract to call out that deadline honor is best-effort: SDK adapters wrapping clients without timeout-propagation surfaces (the v4 Langfuse SDK) may ignore timeout_ms and rely on the SDK's internal defaults. Tighten the LangfuseSDKAdapter.force_flush docstring to make the synchronous-but-not-deadline-honoring behavior explicit (the v4 SDK flush IS synchronous — it blocks on tracer_provider.force_flush plus score/media queue joins — but takes no timeout parameter and discards the OTel return value). Rewrite a stale comment in the adapter integration test that wrongly claimed flush() was async — it isn't. Add test_adapter_force_flush_delegates_to_client that uses Mock(wraps=...) to verify the adapter invokes the wrapped SDK's flush() and returns True. --- .../observability/langfuse/adapter.py | 21 ++++++++++--- .../observability/langfuse/client.py | 9 ++++++ .../test_observability_langfuse_adapter.py | 31 ++++++++++++++++--- 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/src/openarmature/observability/langfuse/adapter.py b/src/openarmature/observability/langfuse/adapter.py index 23ee4f5..6fb2758 100644 --- a/src/openarmature/observability/langfuse/adapter.py +++ b/src/openarmature/observability/langfuse/adapter.py @@ -329,11 +329,22 @@ def generation( return _SpanHandle(obs) def force_flush(self, timeout_ms: int = 30_000) -> bool: - # The v4 SDK's flush() returns ``None`` synchronously but - # internally waits for the OTel BatchSpanProcessor to drain. - # No timeout parameter is exposed on the SDK call, so we - # ignore ``timeout_ms`` here and rely on the SDK's own - # default. Returns True if flush() completes without raising. + """Best-effort flush of the underlying Langfuse client. + + ``timeout_ms`` is accepted for Protocol compatibility but + **ignored**: the v4 Langfuse SDK's ``flush()`` method takes + no timeout parameter and discards the underlying + ``TracerProvider.force_flush()`` return value. The call is + nonetheless synchronous — internally ``flush()`` waits on + OTel's ``force_flush`` (default 30 s) and then ``.join()`` on + the SDK's score and media ingestion queues — so by the time + we return the OTel batch processor and ingestion queues have + either drained or hit the SDK's internal default deadlines. + + Returns ``True`` once the SDK call completes without raising; + a tight-deadline caller should pair this with its own + wall-clock guard rather than relying on the return value. + """ del timeout_ms self._client.flush() return True diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py index 0dd4394..9a7ba53 100644 --- a/src/openarmature/observability/langfuse/client.py +++ b/src/openarmature/observability/langfuse/client.py @@ -227,6 +227,15 @@ def force_flush(self, timeout_ms: int = 30_000) -> bool: :class:`InMemoryLangfuseClient` has no buffer and returns ``True`` immediately; SDK adapters delegate to the underlying client's flush. + + **Deadline honor is best-effort.** Adapters wrapping SDKs + that don't expose a timeout-propagation surface (the v4 + Langfuse SDK is one such case — its ``flush()`` blocks on the + SDK's own internal defaults) may ignore ``timeout_ms`` and + return ``True`` once the underlying call returns. Callers + with a hard deadline should layer their own wall-clock guard + around this method rather than relying solely on the return + value. """ ... diff --git a/tests/unit/test_observability_langfuse_adapter.py b/tests/unit/test_observability_langfuse_adapter.py index 08f081e..bc38af9 100644 --- a/tests/unit/test_observability_langfuse_adapter.py +++ b/tests/unit/test_observability_langfuse_adapter.py @@ -114,6 +114,26 @@ def test_adapter_update_trace_merges_into_cache() -> None: assert cached["metadata"] == {"key1": "v1", "key2": "v2"} +def test_adapter_force_flush_delegates_to_client() -> None: + # force_flush() must invoke the wrapped SDK's flush() so callers + # in fast-teardown harnesses get the SDK's internal drain + # (OTel TracerProvider.force_flush + score/media queue joins). + # Mock(wraps=...) lets us assert delegation without simulating + # the SDK's full surface. + from unittest.mock import Mock + + real_client = _dummy_client() + wrapped = Mock(wraps=real_client) + adapter = LangfuseSDKAdapter(wrapped) + + assert adapter.force_flush() is True + wrapped.flush.assert_called_once_with() + + # timeout_ms is accepted but unused per the documented contract. + assert adapter.force_flush(timeout_ms=1_000) is True + assert wrapped.flush.call_count == 2 + + # --------------------------------------------------------------------------- # Integration test against real Langfuse Cloud (opt-in) # --------------------------------------------------------------------------- @@ -173,11 +193,12 @@ async def test_adapter_against_real_langfuse_cloud() -> None: await graph.invoke(_S()) await graph.drain() observer.shutdown() - # ``client.shutdown()`` is the synchronous drain — flush() returns - # immediately while the OTel BatchSpanProcessor exports in - # background, and the test process exits before that finishes. - # shutdown() blocks until all spans are exported (or the - # exporter's shutdown timeout elapses). + # Use ``client.shutdown()`` rather than ``client.flush()`` here: + # both block on the SDK's internal drain (OTel's force_flush plus + # the score/media queue joins), but shutdown() also tears down + # the resource manager so the test process exits cleanly. flush() + # is the appropriate call from a long-lived process that wants + # to drain without releasing SDK resources. client.shutdown() # Manual check: open the trace in the dashboard and confirm # "step_a" + "step_b" appear as Span observations under one Trace.