Add public force_flush() to OTel and Langfuse observers#83
Merged
Conversation
Downstream users reach into observer._provider directly to drain the BatchSpanProcessor's outbound buffer in fast-teardown harnesses (serverless functions, CLI one-shots, FastAPI TestClient teardown). Expose force_flush(timeout_ms) as a stable surface instead. OTelObserver.force_flush wraps self._provider.force_flush. Distinct from CompiledGraph.drain, which covers the engine's observer-event queue; force_flush covers the SpanProcessor export buffer. LangfuseObserver.force_flush delegates to LangfuseClient.force_flush, a new Protocol method. InMemoryLangfuseClient implements it as a no-op (no buffer); LangfuseSDKAdapter delegates to the SDK's flush().
There was a problem hiding this comment.
Pull request overview
Adds public force_flush(timeout_ms) APIs for OTel and Langfuse observers so downstream users can drain exporter/client buffers without reaching into private internals.
Changes:
- Adds
OTelObserver.force_flush()wrapping the underlying OTel provider. - Adds Langfuse
force_flush()support across the observer, client Protocol, in-memory client, and SDK adapter. - Adds unit tests for OTel and in-memory Langfuse flush paths.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/openarmature/observability/otel/observer.py |
Adds public OTel flush wrapper and updates shutdown guidance. |
src/openarmature/observability/langfuse/observer.py |
Adds observer-level Langfuse flush delegation. |
src/openarmature/observability/langfuse/client.py |
Extends the Langfuse client Protocol and in-memory implementation. |
src/openarmature/observability/langfuse/adapter.py |
Adds SDK adapter flush implementation. |
tests/unit/test_observability_otel.py |
Adds OTel force-flush unit coverage. |
tests/unit/test_observability_langfuse.py |
Adds in-memory Langfuse force-flush unit coverage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Narrow the LangfuseClient.force_flush Protocol contract to call out that deadline honor is best-effort: SDK adapters wrapping clients without timeout-propagation surfaces (the v4 Langfuse SDK) may ignore timeout_ms and rely on the SDK's internal defaults. Tighten the LangfuseSDKAdapter.force_flush docstring to make the synchronous-but-not-deadline-honoring behavior explicit (the v4 SDK flush IS synchronous — it blocks on tracer_provider.force_flush plus score/media queue joins — but takes no timeout parameter and discards the OTel return value). Rewrite a stale comment in the adapter integration test that wrongly claimed flush() was async — it isn't. Add test_adapter_force_flush_delegates_to_client that uses Mock(wraps=...) to verify the adapter invokes the wrapped SDK's flush() and returns True.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Downstream consumers reach into
observer._providerdirectly to drain theBatchSpanProcessor's outbound buffer in fast-teardown harnesses (serverless functions, CLI one-shots, FastAPITestClientteardown). This adds a public, stableforce_flush(timeout_ms)method on both observers so the private-attribute reach goes away.OTelObserver.force_flush(timeout_ms=30_000) -> boolwrapsself._provider.force_flush(timeout_millis=timeout_ms).LangfuseObserver.force_flush(timeout_ms=30_000) -> booldelegates toLangfuseClient.force_flush(timeout_ms), a new method on the Protocol.InMemoryLangfuseClient.force_flushis a no-op returningTrue(no outbound buffer; observations are captured synchronously on each call).LangfuseSDKAdapter.force_flushcalls the v4 SDK'sflush()(which internally drains the OTelBatchSpanProcessor).Distinct from
CompiledGraph.drain(), which covers the engine's observer-event queue.force_flush()covers the outbound buffer of each registeredSpanProcessor(or the Langfuse client). Both halves should run in afinallyblock before process exit for tight-deadline teardown.Test plan
pytest tests/unit/test_observability_otel.py tests/unit/test_observability_langfuse.py— 34 passedruff check/ruff format --checkcleanpyrightclean across changed filesobserver._provider.force_flush(...)reach in favor of the public method