Skip to content

Commit f9e9468

Browse files
authored
Merge pull request #3 from ServerlessLLM/fy/minor_fix
Improved Results Handling and System Robustness
2 parents ff40629 + db9c251 commit f9e9468

5 files changed

Lines changed: 210 additions & 7 deletions

File tree

tests/test_cli.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import unittest
23
from unittest.mock import MagicMock, patch
34

@@ -122,6 +123,92 @@ def test_cli_invalid_pattern(self):
122123
self.assertNotEqual(result.exit_code, 0)
123124
self.assertIn("Invalid value for '--pattern'", result.output)
124125

126+
@patch("tracestorm.cli.run_load_test")
127+
@patch("tracestorm.cli.os.makedirs")
128+
@patch("tracestorm.cli.datetime")
129+
def test_cli_with_output_dir(
130+
self, mock_datetime, mock_makedirs, mock_run_load_test
131+
):
132+
"""Test CLI with output directory option."""
133+
mock_analyzer = MagicMock()
134+
mock_run_load_test.return_value = ([], mock_analyzer)
135+
mock_datetime.datetime.now.return_value.strftime.return_value = (
136+
"20240101_120000"
137+
)
138+
139+
# Test with explicit output dir
140+
result = self.runner.invoke(
141+
main,
142+
[
143+
"--model",
144+
"gpt-3.5-turbo",
145+
"--output-dir",
146+
"custom_output_dir",
147+
],
148+
)
149+
150+
self.assertEqual(result.exit_code, 0)
151+
mock_makedirs.assert_called_with("custom_output_dir", exist_ok=True)
152+
mock_analyzer.export_json.assert_called_once()
153+
154+
# Reset mocks
155+
mock_makedirs.reset_mock()
156+
mock_analyzer.reset_mock()
157+
158+
# Test with default output dir
159+
result = self.runner.invoke(
160+
main,
161+
[
162+
"--model",
163+
"gpt-3.5-turbo",
164+
],
165+
)
166+
167+
self.assertEqual(result.exit_code, 0)
168+
mock_makedirs.assert_called_with(
169+
os.path.join("tracestorm_results", "20240101_120000"), exist_ok=True
170+
)
171+
mock_analyzer.export_json.assert_called_once()
172+
173+
@patch("tracestorm.cli.run_load_test")
174+
@patch("tracestorm.cli.os.makedirs")
175+
def test_cli_with_plot_option(self, mock_makedirs, mock_run_load_test):
176+
"""Test CLI with plot option."""
177+
mock_analyzer = MagicMock()
178+
mock_run_load_test.return_value = ([], mock_analyzer)
179+
180+
# Test with plot enabled
181+
result = self.runner.invoke(
182+
main,
183+
[
184+
"--model",
185+
"gpt-3.5-turbo",
186+
"--plot",
187+
"--output-dir",
188+
"test_dir",
189+
],
190+
)
191+
192+
self.assertEqual(result.exit_code, 0)
193+
mock_analyzer.plot_cdf.assert_called_once()
194+
195+
# Reset mock
196+
mock_analyzer.reset_mock()
197+
198+
# Test with plot disabled (default)
199+
result = self.runner.invoke(
200+
main,
201+
[
202+
"--model",
203+
"gpt-3.5-turbo",
204+
"--output-dir",
205+
"test_dir",
206+
],
207+
)
208+
209+
self.assertEqual(result.exit_code, 0)
210+
mock_analyzer.plot_cdf.assert_not_called()
211+
125212

126213
if __name__ == "__main__":
127214
unittest.main()

tests/test_replayer.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,64 @@ async def mock_request_raising(*args, **kwargs):
126126
"The 'error' field should match our simulated exception message.",
127127
)
128128

129+
async def _dummy_coro(self):
130+
# Simple coroutine that does nothing
131+
return
132+
133+
@patch("tracestorm.trace_player.logger")
134+
async def async_test_sender_worker(self, mock_logger):
135+
"""Test the sender_worker method with various queue states."""
136+
player = TracePlayer(
137+
self.name,
138+
self.trace,
139+
self.requests,
140+
self.base_url,
141+
self.api_key,
142+
self.ipc_queue,
143+
)
144+
145+
# Set up the dispatch queue and add one item
146+
player.dispatch_queue = asyncio.Queue()
147+
await player.dispatch_queue.put((1000, {"prompt": "Test prompt"}))
148+
149+
# Mock the request method
150+
player.request = AsyncMock(
151+
return_value={
152+
"result": "mock_result",
153+
"token_count": 10,
154+
"time_records": [time.time()],
155+
"error": None,
156+
}
157+
)
158+
159+
# Setup - run the sender_worker for a bit then set shutdown flag
160+
task = asyncio.create_task(player.sender_worker())
161+
await asyncio.sleep(0.1) # Let it process the queued item
162+
163+
# Queue is now empty, sender_worker should call sleep
164+
await asyncio.sleep(0.2)
165+
166+
# Set shutdown flag to stop the worker gracefully
167+
player.shutdown_flag.set()
168+
await asyncio.sleep(0.2)
169+
170+
# The task should complete since we set the shutdown flag
171+
if not task.done():
172+
task.cancel()
173+
try:
174+
await task
175+
except asyncio.CancelledError:
176+
pass
177+
178+
# Check that request was called with our test data
179+
player.request.assert_called_once()
180+
args, _ = player.request.call_args
181+
self.assertEqual(args[0], {"prompt": "Test prompt"})
182+
183+
def test_sender_worker(self):
184+
"""Test wrapper for async_test_sender_worker."""
185+
asyncio.run(self.async_test_sender_worker())
186+
129187

130188
if __name__ == "__main__":
131189
unittest.main()

tracestorm/cli.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import os
23
from typing import Optional, Tuple, Union
34

@@ -115,6 +116,23 @@ def create_trace_generator(
115116
@click.option(
116117
"--datasets-config", default=None, help="Config file for datasets"
117118
)
119+
@click.option(
120+
"--plot",
121+
is_flag=True,
122+
default=False,
123+
help="Generate performance plots",
124+
)
125+
@click.option(
126+
"--output-dir",
127+
default=None,
128+
help="Directory to save results (defaults to tracestorm_results/{timestamp})",
129+
)
130+
@click.option(
131+
"--include-raw-results",
132+
is_flag=True,
133+
default=False,
134+
help="Include raw results in the output",
135+
)
118136
def main(
119137
model,
120138
rps,
@@ -125,9 +143,20 @@ def main(
125143
base_url,
126144
api_key,
127145
datasets_config,
146+
plot,
147+
output_dir,
148+
include_raw_results,
128149
):
129150
"""Run trace-based load testing for OpenAI API endpoints."""
130151
try:
152+
# Set up output directory
153+
if output_dir is None:
154+
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
155+
output_dir = os.path.join("tracestorm_results", timestamp)
156+
157+
os.makedirs(output_dir, exist_ok=True)
158+
logger.info(f"Results will be saved to: {output_dir}")
159+
131160
trace_generator, warning_msg = create_trace_generator(
132161
pattern, rps, duration, seed
133162
)
@@ -152,7 +181,20 @@ def main(
152181
)
153182

154183
print(result_analyzer)
155-
result_analyzer.plot_cdf()
184+
185+
# Save raw results (always)
186+
results_file = os.path.join(output_dir, "results.json")
187+
result_analyzer.export_json(
188+
results_file, include_raw=include_raw_results
189+
)
190+
logger.info(f"Raw results saved to: {results_file}")
191+
192+
# Only generate plots if requested
193+
if plot:
194+
ttft_file = os.path.join(output_dir, "ttft_cdf.png")
195+
tpot_file = os.path.join(output_dir, "tpot_cdf.png")
196+
result_analyzer.plot_cdf(ttft_file=ttft_file, tpot_file=tpot_file)
197+
logger.info("Performance plots generated")
156198

157199
except ValueError as e:
158200
logger.error(f"Configuration error: {str(e)}")

tracestorm/core.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import multiprocessing
2+
from queue import Empty
23
from typing import List, Optional, Tuple
34

45
from tracestorm.logger import init_logger
@@ -83,6 +84,18 @@ def run_load_test(
8384
f"Received result from {name} for timestamp {timestamp}: {resp['token_count']} tokens"
8485
)
8586
aggregated_results.append((name, timestamp, resp))
87+
except Empty:
88+
# Timeout occurred, but maybe not all processes are finished.
89+
logger.warning(
90+
"No results received from IPC queue in the last 30 seconds. Waiting..."
91+
)
92+
# Check if all processes are still alive before continuing
93+
if not any(p.is_alive() for p in processes):
94+
logger.warning(
95+
"All processes seem to have finished. Stopping result collection."
96+
)
97+
break
98+
continue
8699
except Exception as e:
87100
logger.error(f"Error collecting results: {str(e)}", exc_info=True)
88101
break

tracestorm/trace_player.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,12 @@ async def sender_worker(self) -> None:
115115
"""
116116
while not (self.shutdown_flag.is_set() and self.dispatch_queue.empty()):
117117
try:
118-
# Attempt to get a queued item; if none are available quickly, re-check shutdown_flag.
119-
item = await asyncio.wait_for(
120-
self.dispatch_queue.get(), timeout=0.1
121-
)
122-
except asyncio.TimeoutError:
123-
continue # No tasks currently, keep looping.
118+
item = self.dispatch_queue.get_nowait()
119+
except asyncio.QueueEmpty:
120+
if self.shutdown_flag.is_set():
121+
break
122+
await asyncio.sleep(0.1)
123+
continue
124124

125125
timestamp, request_data = item
126126
logger.info(
@@ -152,6 +152,9 @@ async def schedule_requests(self) -> None:
152152
await asyncio.sleep(delay)
153153

154154
request_data = self.requests[i]
155+
# Wait if queue is full
156+
while self.dispatch_queue.full():
157+
await asyncio.sleep(0.1)
155158
# We put both the scheduled timestamp and the request data into the queue.
156159
await self.dispatch_queue.put((scheduled_time, request_data))
157160

0 commit comments

Comments
 (0)