Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/openarmature/observability/langfuse/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,16 @@ def generation(
)
return _SpanHandle(obs)

def force_flush(self, timeout_ms: int = 30_000) -> bool:
Comment thread
chris-colinsky marked this conversation as resolved.
# The v4 SDK's flush() returns ``None`` synchronously but
# internally waits for the OTel BatchSpanProcessor to drain.
# No timeout parameter is exposed on the SDK call, so we
# ignore ``timeout_ms`` here and rely on the SDK's own
# default. Returns True if flush() completes without raising.
del timeout_ms
self._client.flush()
return True
Comment thread
chris-colinsky marked this conversation as resolved.
Outdated

def _start_observation(
self,
*,
Expand Down
20 changes: 20 additions & 0 deletions src/openarmature/observability/langfuse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,19 @@ def generation(
prompt: Any = None,
) -> LangfuseGenerationHandle: ...

def force_flush(self, timeout_ms: int = 30_000) -> bool:
"""Flush any pending outbound buffer in the underlying sink.

Returns ``True`` when the flush completes within the deadline,
``False`` otherwise. The semantics mirror OTel's
``TracerProvider.force_flush``: cover the export-buffer half
of fast-teardown races. The bundled
:class:`InMemoryLangfuseClient` has no buffer and returns
``True`` immediately; SDK adapters delegate to the underlying
client's flush.
Comment thread
chris-colinsky marked this conversation as resolved.
"""
...
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
Comment thread
chris-colinsky marked this conversation as resolved.


# Concrete in-memory implementation ---------------------------------------
# Used by tests and the conformance harness. Stores everything the
Expand Down Expand Up @@ -407,6 +420,13 @@ def generation(
trace.observations.append(observation)
return _InMemoryGenerationHandle(observation=observation)

def force_flush(self, timeout_ms: int = 30_000) -> bool:
# In-memory recorder has no outbound buffer; every observation
# is captured synchronously on its create call. The ``timeout_ms``
# parameter is accepted for Protocol compatibility but unused.
del timeout_ms
return True

def _get_trace(self, trace_id: str) -> LangfuseTrace:
trace = self.traces.get(trace_id)
if trace is None:
Expand Down
18 changes: 18 additions & 0 deletions src/openarmature/observability/langfuse/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,24 @@ def close_invocation(self, invocation_id: str) -> None:
):
self._close_subgraph_observation(inv_state, prefix)

def force_flush(self, timeout_ms: int = 30_000) -> bool:
"""Flush pending observations through the underlying client.

Returns ``True`` when the client's flush completes within the
deadline, ``False`` otherwise. Mirrors the OTel observer's
``force_flush`` surface — distinct from
:meth:`~openarmature.graph.compiled.CompiledGraph.drain` (which
covers the engine's observer-event queue): this method covers
the outbound buffer of the Langfuse client (the SDK's OTel
BatchSpanProcessor when wrapped via :class:`LangfuseSDKAdapter`).

Useful in fast-teardown harnesses (CLI one-shots, serverless
functions, ASGI lifespan shutdown) where the SDK's
BatchSpanProcessor export thread would otherwise be cut off
before its buffer drains.
"""
return self.client.force_flush(timeout_ms=timeout_ms)

def shutdown(self) -> None:
"""Drain every in-flight invocation. Use for long-lived
observers shared across requests; CLI / one-shot processes
Expand Down
29 changes: 27 additions & 2 deletions src/openarmature/observability/otel/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,32 @@ def _close_invocation_span(self, invocation_id: str) -> None:
self._run_enrichers(open_span.span, None)
open_span.span.end()

def force_flush(self, timeout_ms: int = 30_000) -> bool:
"""Flush any pending spans through every registered span processor.

Returns ``True`` when all processors finish flushing within the
deadline, ``False`` otherwise. Wraps the underlying OTel
:class:`TracerProvider`'s ``force_flush`` so callers don't have
to reach into the private ``_provider`` attribute.

**When to call.** Distinct from :meth:`drain` on
:class:`~openarmature.graph.compiled.CompiledGraph` (which covers
the engine's observer-event queue): this method covers the
outbound span-export buffer of each registered
:class:`SpanProcessor`. Under fast or unusual teardown
orderings (FastAPI ``TestClient`` teardown, CLI one-shots,
serverless functions) the :class:`BatchSpanProcessor`'s export
thread can be cut off before its buffer drains; calling
``force_flush()`` from a ``finally`` block right before process
exit is the canonical hardening.

The default 30 s ``timeout_ms`` matches the OTel SDK's own
default. Pass a smaller value when running under a hard
deadline (a serverless function's max execution time, an
ASGI lifespan timeout).
"""
return self._provider.force_flush(timeout_millis=timeout_ms)

def shutdown(self) -> None:
"""Close any still-open spans across all in-flight invocations
and shut down the underlying provider. Each per-invocation
Expand All @@ -1459,8 +1485,7 @@ def shutdown(self) -> None:
thread finishes), the flush may not complete in time and spans
can appear dropped. Workarounds:

- Call ``observer._provider.force_flush(timeout_millis=…)``
explicitly before this method.
- Call :meth:`force_flush` explicitly before this method.
- Use :class:`SimpleSpanProcessor` instead of
:class:`BatchSpanProcessor` in tests; it exports synchronously
and is unaffected by teardown timing.
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/test_observability_langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,25 @@ def test_in_memory_recorder_observation_id_is_unique_per_recorder() -> None:
assert a.id != b.id


def test_observer_force_flush_delegates_to_client() -> None:
# LangfuseObserver.force_flush() calls into the client; the
# InMemoryLangfuseClient's force_flush is a no-op that returns
# True, so this just verifies the delegation wires correctly.
client = InMemoryLangfuseClient()
observer = LangfuseObserver(client=client)
assert observer.force_flush() is True
assert observer.force_flush(timeout_ms=1000) is True


def test_in_memory_recorder_force_flush_is_no_op() -> None:
# In-memory recorder has no outbound buffer; force_flush returns
# True immediately. The timeout_ms parameter is accepted for
# Protocol compatibility but unused.
client = InMemoryLangfuseClient()
assert client.force_flush() is True
assert client.force_flush(timeout_ms=5000) is True


def test_in_memory_recorder_children_of_walks_parent_links() -> None:
client = InMemoryLangfuseClient()
client.trace(id="t1")
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/test_observability_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -1382,3 +1382,16 @@ async def ask_llm(_s: _S) -> dict[str, str]:
assert attrs.get("openarmature.prompt.label") == "production"
assert attrs.get("openarmature.prompt.template_hash") == "sha256:tpl"
assert attrs.get("openarmature.prompt.rendered_hash") == "sha256:rendered"


def test_force_flush_delegates_to_provider() -> None:
# Public force_flush wraps TracerProvider.force_flush so downstream
# users don't reach into observer._provider to drain the
# BatchSpanProcessor buffer in fast-teardown harnesses.
exporter = InMemorySpanExporter()
observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter))
try:
assert observer.force_flush() is True
assert observer.force_flush(timeout_ms=1000) is True
finally:
observer.shutdown()