Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
add_trace_processor,
agent_span,
custom_span,
flush_traces,
function_span,
gen_span_id,
gen_trace_id,
Expand Down Expand Up @@ -451,6 +452,7 @@ def enable_verbose_stdout_logging():
"add_trace_processor",
"agent_span",
"custom_span",
"flush_traces",
"function_span",
"generation_span",
"get_current_span",
Expand Down
11 changes: 11 additions & 0 deletions src/agents/tracing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
__all__ = [
"add_trace_processor",
"agent_span",
"flush_traces",
"custom_span",
"function_span",
"generation_span",
Expand Down Expand Up @@ -108,3 +109,13 @@ def set_tracing_export_api_key(api_key: str) -> None:
Set the OpenAI API key for the backend exporter.
"""
default_exporter().set_api_key(api_key)


def flush_traces() -> None:
"""Force an immediate flush of all buffered traces and spans.

Call this at the end of each task in long-running worker processes
(Celery, FastAPI background tasks, RQ, etc.) to ensure traces are
exported to the backend rather than remaining buffered indefinitely.
"""
get_trace_provider().force_flush()
Comment thread
seratch marked this conversation as resolved.
Outdated
8 changes: 8 additions & 0 deletions src/agents/tracing/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ def create_span(
) -> Span[TSpanData]:
"""Create a new span."""

@abstractmethod
def force_flush(self) -> None:
"""Force all registered processors to flush their buffers immediately."""
Comment thread
seratch marked this conversation as resolved.
Outdated

@abstractmethod
def shutdown(self) -> None:
"""Clean up any resources used by the provider."""
Expand Down Expand Up @@ -365,6 +369,10 @@ def create_span(
trace_metadata=trace_metadata,
)

def force_flush(self) -> None:
"""Force all processors to flush their buffers immediately."""
self._multi_processor.force_flush()
Comment thread
seratch marked this conversation as resolved.

def shutdown(self) -> None:
if self._disabled:
return
Expand Down
21 changes: 21 additions & 0 deletions tests/test_trace_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,3 +835,24 @@ def test_truncate_string_for_json_limit_handles_escape_heavy_input():
assert truncated.endswith(exporter._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX)
assert exporter._value_json_size_bytes(truncated) <= max_bytes
exporter.close()


def test_flush_traces_calls_provider_force_flush():
"""Test that flush_traces() delegates to the global trace provider's force_flush()."""
from unittest.mock import MagicMock, patch

mock_provider = MagicMock()

with patch("agents.tracing.get_trace_provider", return_value=mock_provider):
from agents.tracing import flush_traces

flush_traces()

mock_provider.force_flush.assert_called_once()


def test_flush_traces_importable_from_agents():
"""Test that flush_traces is importable from the top-level agents package."""
from agents import flush_traces

assert callable(flush_traces)