forked from openai/openai-agents-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_usage_tracking_on_error.py
More file actions
143 lines (108 loc) · 4.78 KB
/
test_usage_tracking_on_error.py
File metadata and controls
143 lines (108 loc) · 4.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Test that usage tracking works correctly when streaming fails.
This addresses Issue #1973: Usage tracking lost when streaming fails mid-request.
"""
import pytest
from agents import Agent, Runner
from .fake_model import FakeModel
@pytest.mark.asyncio
async def test_usage_tracking_requests_on_streaming_error():
"""Test that at least request count is tracked when streaming fails.
This addresses Issue #1973: When the model raises an error during streaming,
we should track that a request was made, even if token counts are unavailable.
"""
model = FakeModel()
# Simulate a streaming failure (e.g., context window exceeded, connection drop)
model.set_next_output(RuntimeError("Context window exceeded"))
agent = Agent(
name="test_agent",
model=model,
)
# Run the agent and expect it to fail
with pytest.raises(RuntimeError):
result = Runner.run_streamed(agent, input="Test input that consumes tokens")
async for _ in result.stream_events():
pass
# FIXED: Request count should be tracked even when streaming fails
assert result.context_wrapper.usage.requests == 1, "Request count should be tracked on error"
# Token counts are unavailable when streaming fails before ResponseCompletedEvent
assert result.context_wrapper.usage.input_tokens == 0
assert result.context_wrapper.usage.output_tokens == 0
assert result.context_wrapper.usage.total_tokens == 0
@pytest.mark.asyncio
async def test_usage_tracking_preserved_on_success():
"""Test that normal usage tracking still works correctly after the fix.
This ensures our fix doesn't break the normal case where streaming succeeds.
"""
from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails
from agents.usage import Usage
from .test_responses import get_text_message
model = FakeModel()
# Set custom usage to verify it's tracked correctly
model.set_hardcoded_usage(
Usage(
requests=1,
input_tokens=100,
output_tokens=50,
total_tokens=150,
input_tokens_details=InputTokensDetails(cached_tokens=10),
output_tokens_details=OutputTokensDetails(reasoning_tokens=5),
)
)
# Simulate successful streaming
model.set_next_output([get_text_message("Success")])
agent = Agent(
name="test_agent",
model=model,
)
result = Runner.run_streamed(agent, input="Test input")
async for _ in result.stream_events():
pass
# Usage should be tracked correctly in the success case
assert result.context_wrapper.usage.requests == 1
assert result.context_wrapper.usage.input_tokens == 100
assert result.context_wrapper.usage.output_tokens == 50
assert result.context_wrapper.usage.total_tokens == 150
# Note: FakeModel doesn't fully support token_details, so we only test the main counts
@pytest.mark.asyncio
async def test_usage_tracking_multi_turn_with_error():
"""Test usage tracking across multiple turns when an error occurs.
This ensures that usage from successful turns is preserved even when a later turn fails.
"""
import json
from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails
from agents.usage import Usage
from .test_responses import get_function_tool, get_function_tool_call, get_text_message
model = FakeModel()
# First turn: successful with usage
model.set_hardcoded_usage(
Usage(
requests=1,
input_tokens=100,
output_tokens=50,
total_tokens=150,
input_tokens_details=InputTokensDetails(cached_tokens=0),
output_tokens_details=OutputTokensDetails(reasoning_tokens=0),
)
)
agent = Agent(
name="test_agent",
model=model,
tools=[get_function_tool("test_tool", "tool_result")],
)
model.add_multiple_turn_outputs(
[
# First turn: successful tool call
[get_function_tool_call("test_tool", json.dumps({"arg": "value"}))],
# Second turn: error
RuntimeError("API error on second turn"),
]
)
with pytest.raises(RuntimeError):
result = Runner.run_streamed(agent, input="Test input")
async for _ in result.stream_events():
pass
# Usage should include first turn's usage + second turn's request count
assert result.context_wrapper.usage.requests == 2, "Should track both turns"
assert result.context_wrapper.usage.input_tokens == 100, "Should preserve first turn's tokens"
assert result.context_wrapper.usage.output_tokens == 50, "Should preserve first turn's tokens"
assert result.context_wrapper.usage.total_tokens == 150, "Should preserve first turn's tokens"