Skip to content

Commit 7a3f6b7

Browse files
authored
feat: #2135 add public flush_traces API (#2844)
1 parent 6a89f1b commit 7a3f6b7

File tree

5 files changed

+186
-21
lines changed

5 files changed

+186
-21
lines changed

src/agents/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@
203203
add_trace_processor,
204204
agent_span,
205205
custom_span,
206+
flush_traces,
206207
function_span,
207208
gen_span_id,
208209
gen_trace_id,
@@ -451,6 +452,7 @@ def enable_verbose_stdout_logging():
451452
"add_trace_processor",
452453
"agent_span",
453454
"custom_span",
455+
"flush_traces",
454456
"function_span",
455457
"generation_span",
456458
"get_current_span",

src/agents/tracing/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"add_trace_processor",
4343
"agent_span",
4444
"custom_span",
45+
"flush_traces",
4546
"function_span",
4647
"generation_span",
4748
"get_current_span",
@@ -108,3 +109,14 @@ def set_tracing_export_api_key(api_key: str) -> None:
108109
Set the OpenAI API key for the backend exporter.
109110
"""
110111
default_exporter().set_api_key(api_key)
112+
113+
114+
def flush_traces() -> None:
115+
"""Force immediate export of buffered traces and spans.
116+
117+
The default ``BatchTraceProcessor`` already exports traces periodically in the
118+
background. Call this when a worker, background job, or request handler needs
119+
traces to be visible immediately after a unit of work finishes instead of
120+
waiting for the next scheduled flush.
121+
"""
122+
get_trace_provider().force_flush()

src/agents/tracing/processors.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,7 @@ def __init__(
491491
# We lazily start the background worker thread the first time a span/trace is queued.
492492
self._worker_thread: threading.Thread | None = None
493493
self._thread_start_lock = threading.Lock()
494+
self._export_lock = threading.Lock()
494495

495496
def _ensure_thread_started(self) -> None:
496497
# Fast path without holding the lock
@@ -571,25 +572,26 @@ def _export_batches(self, force: bool = False):
571572
"""Drains the queue and exports in batches. If force=True, export everything.
572573
Otherwise, export up to `max_batch_size` repeatedly until the queue is completely empty.
573574
"""
574-
while True:
575-
items_to_export: list[Span[Any] | Trace] = []
575+
with self._export_lock:
576+
while True:
577+
items_to_export: list[Span[Any] | Trace] = []
578+
579+
# Gather a batch of spans up to max_batch_size
580+
while not self._queue.empty() and (
581+
force or len(items_to_export) < self._max_batch_size
582+
):
583+
try:
584+
items_to_export.append(self._queue.get_nowait())
585+
except queue.Empty:
586+
# Another thread might have emptied the queue between checks
587+
break
576588

577-
# Gather a batch of spans up to max_batch_size
578-
while not self._queue.empty() and (
579-
force or len(items_to_export) < self._max_batch_size
580-
):
581-
try:
582-
items_to_export.append(self._queue.get_nowait())
583-
except queue.Empty:
584-
# Another thread might have emptied the queue between checks
589+
# If we collected nothing, we're done
590+
if not items_to_export:
585591
break
586592

587-
# If we collected nothing, we're done
588-
if not items_to_export:
589-
break
590-
591-
# Export the batch
592-
self._exporter.export(items_to_export)
593+
# Export the batch
594+
self._exporter.export(items_to_export)
593595

594596

595597
# Lazily initialized defaults to avoid creating network clients or threading

src/agents/tracing/provider.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,21 @@ def create_span(
188188
) -> Span[TSpanData]:
189189
"""Create a new span."""
190190

191-
@abstractmethod
191+
def force_flush(self) -> None:
192+
"""Force all registered processors to flush buffered traces/spans immediately.
193+
194+
The default implementation is a no-op so existing custom ``TraceProvider``
195+
implementations continue to work without adding this method.
196+
"""
197+
return None
198+
192199
def shutdown(self) -> None:
193-
"""Clean up any resources used by the provider."""
200+
"""Clean up any resources used by the provider.
201+
202+
The default implementation is a no-op so existing custom ``TraceProvider``
203+
implementations continue to work without adding this method.
204+
"""
205+
return None
194206

195207

196208
class DefaultTraceProvider(TraceProvider):
@@ -365,7 +377,19 @@ def create_span(
365377
trace_metadata=trace_metadata,
366378
)
367379

380+
def force_flush(self) -> None:
381+
"""Force all processors to flush their buffers immediately."""
382+
self._refresh_disabled_flag()
383+
if self._disabled:
384+
return
385+
386+
try:
387+
self._multi_processor.force_flush()
388+
except Exception as e:
389+
logger.error(f"Error flushing trace provider: {e}")
390+
368391
def shutdown(self) -> None:
392+
self._refresh_disabled_flag()
369393
if self._disabled:
370394
return
371395

tests/test_trace_processor.py

Lines changed: 128 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
import os
2+
import threading
23
import time
34
from typing import Any, cast
45
from unittest.mock import MagicMock, patch
56

67
import httpx
78
import pytest
89

9-
from agents.tracing.processor_interface import TracingProcessor
10+
from agents.tracing import flush_traces, get_trace_provider
11+
from agents.tracing.processor_interface import TracingExporter, TracingProcessor
1012
from agents.tracing.processors import BackendSpanExporter, BatchTraceProcessor
13+
from agents.tracing.provider import DefaultTraceProvider, TraceProvider
1114
from agents.tracing.span_data import AgentSpanData
12-
from agents.tracing.spans import SpanImpl
13-
from agents.tracing.traces import TraceImpl
15+
from agents.tracing.spans import Span, SpanImpl
16+
from agents.tracing.traces import Trace, TraceImpl
1417

1518

1619
def get_span(processor: TracingProcessor) -> SpanImpl[AgentSpanData]:
@@ -123,6 +126,34 @@ def test_batch_trace_processor_force_flush(mocked_exporter):
123126
processor.shutdown()
124127

125128

129+
def test_batch_trace_processor_force_flush_waits_for_in_flight_background_export():
130+
export_started = threading.Event()
131+
export_continue = threading.Event()
132+
133+
class BlockingExporter(TracingExporter):
134+
def export(self, items: list[Trace | Span[Any]]) -> None:
135+
export_started.set()
136+
assert export_continue.wait(timeout=2.0)
137+
138+
processor = BatchTraceProcessor(exporter=BlockingExporter(), schedule_delay=0.01)
139+
processor.on_trace_start(get_trace(processor))
140+
141+
assert export_started.wait(timeout=2.0)
142+
143+
flush_thread = threading.Thread(target=processor.force_flush)
144+
flush_thread.start()
145+
146+
time.sleep(0.1)
147+
assert flush_thread.is_alive(), "force_flush() should wait for an in-flight export"
148+
149+
export_continue.set()
150+
flush_thread.join(timeout=2.0)
151+
152+
assert not flush_thread.is_alive()
153+
154+
processor.shutdown()
155+
156+
126157
def test_batch_trace_processor_shutdown_flushes(mocked_exporter):
127158
processor = BatchTraceProcessor(exporter=mocked_exporter, schedule_delay=5.0)
128159
processor.on_trace_start(get_trace(processor))
@@ -171,6 +202,100 @@ def test_batch_trace_processor_scheduled_export(mocked_exporter):
171202
assert total_exported == 1, "Item should be exported after scheduled delay"
172203

173204

205+
def test_flush_traces_delegates_to_default_trace_provider():
206+
provider = DefaultTraceProvider()
207+
mock_processor = MagicMock()
208+
provider.register_processor(mock_processor)
209+
210+
with patch("agents.tracing.setup.GLOBAL_TRACE_PROVIDER", provider):
211+
flush_traces()
212+
213+
mock_processor.force_flush.assert_called_once()
214+
215+
216+
def test_flush_traces_is_importable_from_top_level_agents_package():
217+
from agents import flush_traces as top_level_flush_traces
218+
219+
assert top_level_flush_traces is flush_traces
220+
221+
222+
def test_default_trace_provider_force_flush_respects_disabled_flag():
223+
provider = DefaultTraceProvider()
224+
mock_processor = MagicMock()
225+
provider.register_processor(mock_processor)
226+
227+
provider.set_disabled(True)
228+
provider.force_flush()
229+
230+
mock_processor.force_flush.assert_not_called()
231+
232+
233+
def test_trace_provider_force_flush_and_shutdown_default_to_noops():
234+
class MinimalProvider(TraceProvider):
235+
def register_processor(self, processor: TracingProcessor) -> None:
236+
pass
237+
238+
def set_processors(self, processors: list[TracingProcessor]) -> None:
239+
pass
240+
241+
def get_current_trace(self):
242+
return None
243+
244+
def get_current_span(self):
245+
return None
246+
247+
def set_disabled(self, disabled: bool) -> None:
248+
pass
249+
250+
def time_iso(self) -> str:
251+
return ""
252+
253+
def gen_trace_id(self) -> str:
254+
return "trace_123"
255+
256+
def gen_span_id(self) -> str:
257+
return "span_123"
258+
259+
def gen_group_id(self) -> str:
260+
return "group_123"
261+
262+
def create_trace(
263+
self,
264+
name,
265+
trace_id=None,
266+
group_id=None,
267+
metadata=None,
268+
disabled=False,
269+
tracing=None,
270+
):
271+
raise NotImplementedError
272+
273+
def create_span(self, span_data, span_id=None, parent=None, disabled=False):
274+
raise NotImplementedError
275+
276+
provider = MinimalProvider()
277+
provider.force_flush()
278+
provider.shutdown()
279+
280+
281+
def test_get_trace_provider_force_flush_flushes_default_processor(mocked_exporter):
282+
provider = DefaultTraceProvider()
283+
processor = BatchTraceProcessor(exporter=mocked_exporter, schedule_delay=60.0)
284+
provider.register_processor(processor)
285+
286+
with patch("agents.tracing.setup.GLOBAL_TRACE_PROVIDER", provider):
287+
processor.on_trace_start(get_trace(processor))
288+
processor.on_span_end(get_span(processor))
289+
290+
get_trace_provider().force_flush()
291+
292+
total_exported = sum(
293+
len(call_args[0][0]) for call_args in mocked_exporter.export.call_args_list
294+
)
295+
assert total_exported == 2
296+
processor.shutdown()
297+
298+
174299
@pytest.fixture
175300
def patched_time_sleep():
176301
"""

0 commit comments

Comments
 (0)