Skip to content

Add public force_flush() to OTel and Langfuse observers#83

Merged
chris-colinsky merged 2 commits into
mainfrom
feature/observer-force-flush
May 27, 2026
Merged

Add public force_flush() to OTel and Langfuse observers#83
chris-colinsky merged 2 commits into
mainfrom
feature/observer-force-flush

Conversation

@chris-colinsky

@chris-colinsky chris-colinsky commented May 27, 2026

Copy link
Copy Markdown
Member

Summary

Downstream consumers reach into observer._provider directly to drain the BatchSpanProcessor's outbound buffer in fast-teardown harnesses (serverless functions, CLI one-shots, FastAPI TestClient teardown). This adds a public, stable force_flush(timeout_ms) method on both observers so the private-attribute reach goes away.

  • OTelObserver.force_flush(timeout_ms=30_000) -> bool wraps self._provider.force_flush(timeout_millis=timeout_ms).
  • LangfuseObserver.force_flush(timeout_ms=30_000) -> bool delegates to LangfuseClient.force_flush(timeout_ms), a new method on the Protocol.
  • InMemoryLangfuseClient.force_flush is a no-op returning True (no outbound buffer; observations are captured synchronously on each call).
  • LangfuseSDKAdapter.force_flush calls the v4 SDK's flush() (which internally drains the OTel BatchSpanProcessor).

Distinct from CompiledGraph.drain(), which covers the engine's observer-event queue. force_flush() covers the outbound buffer of each registered SpanProcessor (or the Langfuse client). Both halves should run in a finally block before process exit for tight-deadline teardown.

Test plan

  • pytest tests/unit/test_observability_otel.py tests/unit/test_observability_langfuse.py — 34 passed
  • Full unit suite — 870 passed, 113 skipped
  • ruff check / ruff format --check clean
  • pyright clean across changed files
  • Downstream harness drops its observer._provider.force_flush(...) reach in favor of the public method

Downstream users reach into observer._provider directly to drain
the BatchSpanProcessor's outbound buffer in fast-teardown harnesses
(serverless functions, CLI one-shots, FastAPI TestClient teardown).
Expose force_flush(timeout_ms) as a stable surface instead.

OTelObserver.force_flush wraps self._provider.force_flush. Distinct
from CompiledGraph.drain, which covers the engine's observer-event
queue; force_flush covers the SpanProcessor export buffer.

LangfuseObserver.force_flush delegates to LangfuseClient.force_flush,
a new Protocol method. InMemoryLangfuseClient implements it as a
no-op (no buffer); LangfuseSDKAdapter delegates to the SDK's flush().
Copilot AI review requested due to automatic review settings May 27, 2026 22:03
Comment thread src/openarmature/observability/langfuse/client.py Fixed

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds public force_flush(timeout_ms) APIs for OTel and Langfuse observers so downstream users can drain exporter/client buffers without reaching into private internals.

Changes:

  • Adds OTelObserver.force_flush() wrapping the underlying OTel provider.
  • Adds Langfuse force_flush() support across the observer, client Protocol, in-memory client, and SDK adapter.
  • Adds unit tests for OTel and in-memory Langfuse flush paths.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/openarmature/observability/otel/observer.py Adds public OTel flush wrapper and updates shutdown guidance.
src/openarmature/observability/langfuse/observer.py Adds observer-level Langfuse flush delegation.
src/openarmature/observability/langfuse/client.py Extends the Langfuse client Protocol and in-memory implementation.
src/openarmature/observability/langfuse/adapter.py Adds SDK adapter flush implementation.
tests/unit/test_observability_otel.py Adds OTel force-flush unit coverage.
tests/unit/test_observability_langfuse.py Adds in-memory Langfuse force-flush unit coverage.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/openarmature/observability/langfuse/adapter.py Outdated
Comment thread src/openarmature/observability/langfuse/client.py
Comment thread src/openarmature/observability/langfuse/adapter.py
Narrow the LangfuseClient.force_flush Protocol contract to call out
that deadline honor is best-effort: SDK adapters wrapping clients
without timeout-propagation surfaces (the v4 Langfuse SDK) may
ignore timeout_ms and rely on the SDK's internal defaults.

Tighten the LangfuseSDKAdapter.force_flush docstring to make the
synchronous-but-not-deadline-honoring behavior explicit (the v4 SDK
flush IS synchronous — it blocks on tracer_provider.force_flush plus
score/media queue joins — but takes no timeout parameter and
discards the OTel return value).

Rewrite a stale comment in the adapter integration test that wrongly
claimed flush() was async — it isn't.

Add test_adapter_force_flush_delegates_to_client that uses
Mock(wraps=...) to verify the adapter invokes the wrapped SDK's
flush() and returns True.
Comment thread src/openarmature/observability/langfuse/client.py
@chris-colinsky chris-colinsky merged commit eb80430 into main May 27, 2026
6 checks passed
@chris-colinsky chris-colinsky deleted the feature/observer-force-flush branch May 27, 2026 23:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants