Skip to content

Commit e5ad308

Browse files
committed
feat(integrations): openai-agents streaming support
1 parent 4c4f260 commit e5ad308

File tree

6 files changed

+169
-5
lines changed

6 files changed

+169
-5
lines changed

sentry_sdk/integrations/mcp.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
Supports the low-level `mcp.server.lowlevel.Server` API.
88
"""
99

10+
from contextlib import contextmanager
1011
import inspect
1112
from functools import wraps
1213
from typing import TYPE_CHECKING
@@ -352,6 +353,30 @@ def _prepare_handler_data(
352353
)
353354

354355

356+
@contextmanager
357+
def ensure_span(*args, **kwargs):
358+
"""Ensure a span is created for the current context."""
359+
360+
current_span = sentry_sdk.get_current_span()
361+
transaction_exists = (
362+
current_span is not None and current_span.containing_transaction is not None
363+
)
364+
365+
if transaction_exists:
366+
with sentry_sdk.start_span(*args, **kwargs) as span:
367+
yield span
368+
else:
369+
with sentry_sdk.start_transaction(*args, **kwargs):
370+
with sentry_sdk.start_span(*args, **kwargs) as span:
371+
yield span
372+
# with get_start_span_function()(
373+
# op=OP.MCP_SERVER,
374+
# name=span_name,
375+
# origin=MCPIntegration.origin,
376+
# ) as span:
377+
# yield span
378+
379+
355380
async def _async_handler_wrapper(
356381
handler_type: str,
357382
func: "Callable[..., Any]",
@@ -382,7 +407,7 @@ async def _async_handler_wrapper(
382407
) = _prepare_handler_data(handler_type, original_args, original_kwargs)
383408

384409
# Start span and execute
385-
with get_start_span_function()(
410+
with ensure_span(
386411
op=OP.MCP_SERVER,
387412
name=span_name,
388413
origin=MCPIntegration.origin,
@@ -454,7 +479,7 @@ def _sync_handler_wrapper(
454479
) = _prepare_handler_data(handler_type, original_args)
455480

456481
# Start span and execute
457-
with get_start_span_function()(
482+
with ensure_span(
458483
op=OP.MCP_SERVER,
459484
name=span_name,
460485
origin=MCPIntegration.origin,

sentry_sdk/integrations/openai_agents/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
_create_get_model_wrapper,
55
_create_get_all_tools_wrapper,
66
_create_run_wrapper,
7+
_create_run_streamed_wrapper,
78
_patch_agent_run,
89
_patch_error_tracing,
910
)
@@ -25,12 +26,16 @@ def _patch_runner() -> None:
2526
# Create the root span for one full agent run (including eventual handoffs)
2627
# Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around
2728
# agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately.
28-
# TODO-anton: Also patch streaming runner: agents.Runner.run_streamed
2929
agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper(
3030
agents.run.DEFAULT_AGENT_RUNNER.run
3131
)
3232

33-
# Creating the actual spans for each agent run.
33+
# Patch streaming runner
34+
agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper(
35+
agents.run.DEFAULT_AGENT_RUNNER.run_streamed
36+
)
37+
38+
# Creating the actual spans for each agent run (works for both streaming and non-streaming).
3439
_patch_agent_run()
3540

3641

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .models import _create_get_model_wrapper # noqa: F401
22
from .tools import _create_get_all_tools_wrapper # noqa: F401
3-
from .runner import _create_run_wrapper # noqa: F401
3+
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
44
from .agent_run import _patch_agent_run # noqa: F401
55
from .error_tracing import _patch_error_tracing # noqa: F401

sentry_sdk/integrations/openai_agents/patches/agent_run.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def _patch_agent_run() -> None:
3131

3232
# Store original methods
3333
original_run_single_turn = agents.run.AgentRunner._run_single_turn
34+
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
3435
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
3536
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output
3637

@@ -149,8 +150,48 @@ async def patched_execute_final_output(
149150

150151
return result
151152

153+
@wraps(
154+
original_run_single_turn_streamed.__func__
155+
if hasattr(original_run_single_turn_streamed, "__func__")
156+
else original_run_single_turn_streamed
157+
)
158+
async def patched_run_single_turn_streamed(
159+
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
160+
) -> "Any":
161+
"""Patched _run_single_turn_streamed that creates agent invocation spans for streaming"""
162+
agent = kwargs.get("agent")
163+
context_wrapper = kwargs.get("context_wrapper")
164+
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks")
165+
166+
span = getattr(context_wrapper, "_sentry_agent_span", None)
167+
# Start agent span when agent starts (but only once per agent)
168+
if should_run_agent_start_hooks and agent and context_wrapper:
169+
# End any existing span for a different agent
170+
if _has_active_agent_span(context_wrapper):
171+
current_agent = _get_current_agent(context_wrapper)
172+
if current_agent and current_agent != agent:
173+
end_invoke_agent_span(context_wrapper, current_agent)
174+
175+
span = _start_invoke_agent_span(context_wrapper, agent, kwargs)
176+
agent._sentry_agent_span = span
177+
178+
# Call original streaming method
179+
try:
180+
result = await original_run_single_turn_streamed(*args, **kwargs)
181+
except Exception as exc:
182+
if span is not None and span.timestamp is None:
183+
_record_exception_on_span(span, exc)
184+
end_invoke_agent_span(context_wrapper, agent)
185+
186+
reraise(*sys.exc_info())
187+
188+
return result
189+
152190
# Apply patches
153191
agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn)
192+
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
193+
patched_run_single_turn_streamed
194+
)
154195
agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs)
155196
agents._run_impl.RunImpl.execute_final_output = classmethod(
156197
patched_execute_final_output

sentry_sdk/integrations/openai_agents/patches/runner.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import sys
12
from functools import wraps
23

34
import sentry_sdk
@@ -64,3 +65,74 @@ async def wrapper(*args: "Any", **kwargs: "Any") -> "Any":
6465
return run_result
6566

6667
return wrapper
68+
69+
70+
def _create_run_streamed_wrapper(
71+
original_func: "Callable[..., Any]",
72+
) -> "Callable[..., Any]":
73+
"""
74+
Wraps the agents.Runner.run_streamed method to create a root span for streaming agent workflow runs.
75+
76+
Unlike run(), run_streamed() returns immediately with a RunResultStreaming object
77+
while execution continues in a background task. The workflow span must stay open
78+
throughout the streaming operation and close when streaming completes or is abandoned.
79+
"""
80+
81+
@wraps(original_func)
82+
def wrapper(*args: "Any", **kwargs: "Any") -> "Any":
83+
# Isolate each workflow so that when agents are run in asyncio tasks they
84+
# don't touch each other's scopes
85+
isolation_scope = sentry_sdk.isolation_scope()
86+
isolation_scope.__enter__()
87+
88+
# Clone agent because agent invocation spans are attached per run.
89+
agent = args[0].clone()
90+
91+
# Start workflow span immediately (before run_streamed returns)
92+
workflow_span = agent_workflow_span(agent)
93+
workflow_span.__enter__()
94+
95+
# Store span and scope on agent for cleanup
96+
agent._sentry_workflow_span = workflow_span
97+
agent._sentry_isolation_scope = isolation_scope
98+
99+
args = (agent, *args[1:])
100+
101+
try:
102+
# Call original function to get RunResultStreaming
103+
run_result = original_func(*args, **kwargs)
104+
except Exception as exc:
105+
# If run_streamed itself fails (not the background task), clean up immediately
106+
workflow_span.__exit__(*sys.exc_info())
107+
isolation_scope.__exit__(None, None, None)
108+
_capture_exception(exc)
109+
raise exc from None
110+
111+
# Wrap the result to ensure cleanup when streaming completes
112+
original_aclose = getattr(run_result, "aclose", None)
113+
114+
async def wrapped_aclose() -> None:
115+
"""Close streaming result and clean up Sentry spans"""
116+
try:
117+
if original_aclose is not None:
118+
await original_aclose()
119+
finally:
120+
# End any remaining agent span
121+
if hasattr(run_result, "context_wrapper"):
122+
end_invoke_agent_span(run_result.context_wrapper, agent)
123+
124+
# End workflow span
125+
if hasattr(agent, "_sentry_workflow_span"):
126+
workflow_span.__exit__(None, None, None)
127+
delattr(agent, "_sentry_workflow_span")
128+
129+
# Exit isolation scope
130+
if hasattr(agent, "_sentry_isolation_scope"):
131+
isolation_scope.__exit__(None, None, None)
132+
delattr(agent, "_sentry_isolation_scope")
133+
134+
run_result.aclose = wrapped_aclose
135+
136+
return run_result
137+
138+
return wrapper

tests/integrations/openai_agents/test_openai_agents.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,3 +1998,24 @@ def test_openai_agents_message_truncation(sentry_init, capture_events):
19981998
assert len(parsed_messages) == 2
19991999
assert "small message 4" in str(parsed_messages[0])
20002000
assert "small message 5" in str(parsed_messages[1])
2001+
2002+
2003+
def test_streaming_patches_applied(sentry_init):
2004+
"""
2005+
Test that the streaming patches are applied correctly.
2006+
"""
2007+
sentry_init(
2008+
integrations=[OpenAIAgentsIntegration()],
2009+
traces_sample_rate=1.0,
2010+
)
2011+
2012+
# Verify that run_streamed is patched (will have __wrapped__ attribute if patched)
2013+
import agents
2014+
2015+
# Check that the method exists and has been modified
2016+
assert hasattr(agents.run.DEFAULT_AGENT_RUNNER, "run_streamed")
2017+
assert hasattr(agents.run.AgentRunner, "_run_single_turn_streamed")
2018+
2019+
# Verify the patches were applied by checking for our wrapper
2020+
run_streamed_func = agents.run.DEFAULT_AGENT_RUNNER.run_streamed
2021+
assert run_streamed_func is not None

0 commit comments

Comments
 (0)