diff --git a/src/openarmature/observability/langfuse/adapter.py b/src/openarmature/observability/langfuse/adapter.py index f136b8f..6fb2758 100644 --- a/src/openarmature/observability/langfuse/adapter.py +++ b/src/openarmature/observability/langfuse/adapter.py @@ -328,6 +328,27 @@ def generation( ) return _SpanHandle(obs) + def force_flush(self, timeout_ms: int = 30_000) -> bool: + """Best-effort flush of the underlying Langfuse client. + + ``timeout_ms`` is accepted for Protocol compatibility but + **ignored**: the v4 Langfuse SDK's ``flush()`` method takes + no timeout parameter and discards the underlying + ``TracerProvider.force_flush()`` return value. The call is + nonetheless synchronous — internally ``flush()`` waits on + OTel's ``force_flush`` (default 30 s) and then ``.join()`` on + the SDK's score and media ingestion queues — so by the time + we return the OTel batch processor and ingestion queues have + either drained or hit the SDK's internal default deadlines. + + Returns ``True`` once the SDK call completes without raising; + a tight-deadline caller should pair this with its own + wall-clock guard rather than relying on the return value. + """ + del timeout_ms + self._client.flush() + return True + def _start_observation( self, *, diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py index 0685387..9a7ba53 100644 --- a/src/openarmature/observability/langfuse/client.py +++ b/src/openarmature/observability/langfuse/client.py @@ -217,6 +217,28 @@ def generation( prompt: Any = None, ) -> LangfuseGenerationHandle: ... + def force_flush(self, timeout_ms: int = 30_000) -> bool: + """Flush any pending outbound buffer in the underlying sink. + + Returns ``True`` when the flush completes within the deadline, + ``False`` otherwise. The semantics mirror OTel's + ``TracerProvider.force_flush``: cover the export-buffer half + of fast-teardown races. The bundled + :class:`InMemoryLangfuseClient` has no buffer and returns + ``True`` immediately; SDK adapters delegate to the underlying + client's flush. + + **Deadline honor is best-effort.** Adapters wrapping SDKs + that don't expose a timeout-propagation surface (the v4 + Langfuse SDK is one such case — its ``flush()`` blocks on the + SDK's own internal defaults) may ignore ``timeout_ms`` and + return ``True`` once the underlying call returns. Callers + with a hard deadline should layer their own wall-clock guard + around this method rather than relying solely on the return + value. + """ + ... + # Concrete in-memory implementation --------------------------------------- # Used by tests and the conformance harness. Stores everything the @@ -407,6 +429,13 @@ def generation( trace.observations.append(observation) return _InMemoryGenerationHandle(observation=observation) + def force_flush(self, timeout_ms: int = 30_000) -> bool: + # In-memory recorder has no outbound buffer; every observation + # is captured synchronously on its create call. The ``timeout_ms`` + # parameter is accepted for Protocol compatibility but unused. + del timeout_ms + return True + def _get_trace(self, trace_id: str) -> LangfuseTrace: trace = self.traces.get(trace_id) if trace is None: diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 3654148..1759f32 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -741,6 +741,24 @@ def close_invocation(self, invocation_id: str) -> None: ): self._close_subgraph_observation(inv_state, prefix) + def force_flush(self, timeout_ms: int = 30_000) -> bool: + """Flush pending observations through the underlying client. + + Returns ``True`` when the client's flush completes within the + deadline, ``False`` otherwise. Mirrors the OTel observer's + ``force_flush`` surface — distinct from + :meth:`~openarmature.graph.compiled.CompiledGraph.drain` (which + covers the engine's observer-event queue): this method covers + the outbound buffer of the Langfuse client (the SDK's OTel + BatchSpanProcessor when wrapped via :class:`LangfuseSDKAdapter`). + + Useful in fast-teardown harnesses (CLI one-shots, serverless + functions, ASGI lifespan shutdown) where the SDK's + BatchSpanProcessor export thread would otherwise be cut off + before its buffer drains. + """ + return self.client.force_flush(timeout_ms=timeout_ms) + def shutdown(self) -> None: """Drain every in-flight invocation. Use for long-lived observers shared across requests; CLI / one-shot processes diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 54f926b..6fa7d46 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -1445,6 +1445,32 @@ def _close_invocation_span(self, invocation_id: str) -> None: self._run_enrichers(open_span.span, None) open_span.span.end() + def force_flush(self, timeout_ms: int = 30_000) -> bool: + """Flush any pending spans through every registered span processor. + + Returns ``True`` when all processors finish flushing within the + deadline, ``False`` otherwise. Wraps the underlying OTel + :class:`TracerProvider`'s ``force_flush`` so callers don't have + to reach into the private ``_provider`` attribute. + + **When to call.** Distinct from :meth:`drain` on + :class:`~openarmature.graph.compiled.CompiledGraph` (which covers + the engine's observer-event queue): this method covers the + outbound span-export buffer of each registered + :class:`SpanProcessor`. Under fast or unusual teardown + orderings (FastAPI ``TestClient`` teardown, CLI one-shots, + serverless functions) the :class:`BatchSpanProcessor`'s export + thread can be cut off before its buffer drains; calling + ``force_flush()`` from a ``finally`` block right before process + exit is the canonical hardening. + + The default 30 s ``timeout_ms`` matches the OTel SDK's own + default. Pass a smaller value when running under a hard + deadline (a serverless function's max execution time, an + ASGI lifespan timeout). + """ + return self._provider.force_flush(timeout_millis=timeout_ms) + def shutdown(self) -> None: """Close any still-open spans across all in-flight invocations and shut down the underlying provider. Each per-invocation @@ -1459,8 +1485,7 @@ def shutdown(self) -> None: thread finishes), the flush may not complete in time and spans can appear dropped. Workarounds: - - Call ``observer._provider.force_flush(timeout_millis=…)`` - explicitly before this method. + - Call :meth:`force_flush` explicitly before this method. - Use :class:`SimpleSpanProcessor` instead of :class:`BatchSpanProcessor` in tests; it exports synchronously and is unaffected by teardown timing. diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index 7553001..7fe0ad9 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -108,6 +108,25 @@ def test_in_memory_recorder_observation_id_is_unique_per_recorder() -> None: assert a.id != b.id +def test_observer_force_flush_delegates_to_client() -> None: + # LangfuseObserver.force_flush() calls into the client; the + # InMemoryLangfuseClient's force_flush is a no-op that returns + # True, so this just verifies the delegation wires correctly. + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + assert observer.force_flush() is True + assert observer.force_flush(timeout_ms=1000) is True + + +def test_in_memory_recorder_force_flush_is_no_op() -> None: + # In-memory recorder has no outbound buffer; force_flush returns + # True immediately. The timeout_ms parameter is accepted for + # Protocol compatibility but unused. + client = InMemoryLangfuseClient() + assert client.force_flush() is True + assert client.force_flush(timeout_ms=5000) is True + + def test_in_memory_recorder_children_of_walks_parent_links() -> None: client = InMemoryLangfuseClient() client.trace(id="t1") diff --git a/tests/unit/test_observability_langfuse_adapter.py b/tests/unit/test_observability_langfuse_adapter.py index 08f081e..bc38af9 100644 --- a/tests/unit/test_observability_langfuse_adapter.py +++ b/tests/unit/test_observability_langfuse_adapter.py @@ -114,6 +114,26 @@ def test_adapter_update_trace_merges_into_cache() -> None: assert cached["metadata"] == {"key1": "v1", "key2": "v2"} +def test_adapter_force_flush_delegates_to_client() -> None: + # force_flush() must invoke the wrapped SDK's flush() so callers + # in fast-teardown harnesses get the SDK's internal drain + # (OTel TracerProvider.force_flush + score/media queue joins). + # Mock(wraps=...) lets us assert delegation without simulating + # the SDK's full surface. + from unittest.mock import Mock + + real_client = _dummy_client() + wrapped = Mock(wraps=real_client) + adapter = LangfuseSDKAdapter(wrapped) + + assert adapter.force_flush() is True + wrapped.flush.assert_called_once_with() + + # timeout_ms is accepted but unused per the documented contract. + assert adapter.force_flush(timeout_ms=1_000) is True + assert wrapped.flush.call_count == 2 + + # --------------------------------------------------------------------------- # Integration test against real Langfuse Cloud (opt-in) # --------------------------------------------------------------------------- @@ -173,11 +193,12 @@ async def test_adapter_against_real_langfuse_cloud() -> None: await graph.invoke(_S()) await graph.drain() observer.shutdown() - # ``client.shutdown()`` is the synchronous drain — flush() returns - # immediately while the OTel BatchSpanProcessor exports in - # background, and the test process exits before that finishes. - # shutdown() blocks until all spans are exported (or the - # exporter's shutdown timeout elapses). + # Use ``client.shutdown()`` rather than ``client.flush()`` here: + # both block on the SDK's internal drain (OTel's force_flush plus + # the score/media queue joins), but shutdown() also tears down + # the resource manager so the test process exits cleanly. flush() + # is the appropriate call from a long-lived process that wants + # to drain without releasing SDK resources. client.shutdown() # Manual check: open the trace in the dashboard and confirm # "step_a" + "step_b" appear as Span observations under one Trace. diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index 106725d..8f18cfd 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -1382,3 +1382,16 @@ async def ask_llm(_s: _S) -> dict[str, str]: assert attrs.get("openarmature.prompt.label") == "production" assert attrs.get("openarmature.prompt.template_hash") == "sha256:tpl" assert attrs.get("openarmature.prompt.rendered_hash") == "sha256:rendered" + + +def test_force_flush_delegates_to_provider() -> None: + # Public force_flush wraps TracerProvider.force_flush so downstream + # users don't reach into observer._provider to drain the + # BatchSpanProcessor buffer in fast-teardown harnesses. + exporter = InMemorySpanExporter() + observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter)) + try: + assert observer.force_flush() is True + assert observer.force_flush(timeout_ms=1000) is True + finally: + observer.shutdown()