|
13 | 13 |
|
14 | 14 | from abc import ABC, abstractmethod |
15 | 15 | from collections.abc import Generator |
| 16 | +from pathlib import Path |
16 | 17 | from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal |
17 | 18 |
|
18 | 19 | import numpy as np |
|
37 | 38 | SchedulingStrategy, |
38 | 39 | SynchronousStrategy, |
39 | 40 | ThroughputStrategy, |
| 41 | + TraceReplayStrategy, |
40 | 42 | ) |
41 | 43 | from guidellm.schemas import PydanticClassRegistryMixin |
| 44 | +from guidellm.utils.trace_io import load_relative_timestamps |
42 | 45 |
|
43 | 46 | if TYPE_CHECKING: |
44 | 47 | from guidellm.benchmark.schemas import Benchmark |
|
48 | 51 | "ConcurrentProfile", |
49 | 52 | "Profile", |
50 | 53 | "ProfileType", |
| 54 | + "ReplayProfile", |
51 | 55 | "SweepProfile", |
52 | 56 | "SynchronousProfile", |
53 | 57 | "ThroughputProfile", |
54 | 58 | ] |
55 | 59 |
|
56 | 60 | ProfileType = Annotated[ |
57 | | - Literal["synchronous", "concurrent", "throughput", "async", "sweep"], |
| 61 | + Literal["synchronous", "concurrent", "throughput", "async", "sweep", "replay"], |
58 | 62 | "Profile type identifiers for polymorphic deserialization", |
59 | 63 | ] |
60 | 64 |
|
@@ -328,6 +332,110 @@ def next_strategy( |
328 | 332 | return SynchronousStrategy() |
329 | 333 |
|
330 | 334 |
|
| 335 | +@Profile.register("replay") |
| 336 | +class ReplayProfile(Profile): |
| 337 | + """ |
| 338 | + Replay a trace file: |
| 339 | + schedule each request at start_time + time_scale * relative_timestamp[i]. |
| 340 | +
|
| 341 | + For this profile, the ``rate`` argument is interpreted as time_scale (scale factor |
| 342 | + applied to relative timestamps), not as requests per second. |
| 343 | +
|
| 344 | + When ``data_samples`` is set, the replayed timestamps are truncated to match |
| 345 | + the sampled dataset size. |
| 346 | + """ |
| 347 | + |
| 348 | + type_: Literal["replay"] = "replay" # type: ignore[assignment] |
| 349 | + relative_timestamps: list[float] = Field( |
| 350 | + description="Request start times relative to first event (first = 0)", |
| 351 | + ) |
| 352 | + time_scale: float = Field( |
| 353 | + default=1.0, |
| 354 | + gt=0, |
| 355 | + description="Scale factor applied to relative timestamps", |
| 356 | + ) |
| 357 | + |
| 358 | + @classmethod |
| 359 | + def resolve_args( |
| 360 | + cls, |
| 361 | + rate_type: str, |
| 362 | + rate: list[float] | None, |
| 363 | + random_seed: int, |
| 364 | + **kwargs: Any, |
| 365 | + ) -> dict[str, Any]: |
| 366 | + _ = (rate_type, random_seed) # unused |
| 367 | + data = kwargs.get("data") |
| 368 | + if not data: |
| 369 | + raise ValueError("Replay profile requires data (path to trace file)") |
| 370 | + if len(data) != 1: |
| 371 | + raise ValueError( |
| 372 | + f"ReplayProfile requires exactly one data source, received {len(data)}" |
| 373 | + ) |
| 374 | + if not data[0]: |
| 375 | + raise ValueError("Replay profile requires data (path to trace file)") |
| 376 | + path = Path(data[0]) if isinstance(data[0], str) else data[0] |
| 377 | + if not path.exists(): |
| 378 | + raise ValueError(f"Replay trace file not found: {path}") |
| 379 | + |
| 380 | + # For replay profile, rate is interpreted as time_scale (not requests per |
| 381 | + # second) |
| 382 | + time_scale = rate[0] if rate and len(rate) > 0 else 1.0 |
| 383 | + |
| 384 | + # Honor a custom timestamp column when configured via --data-args so the |
| 385 | + # replay profile and trace_synthetic deserializer use the same field. |
| 386 | + data_args = kwargs.get("data_args") or [] |
| 387 | + first_args = data_args[0] if data_args else {} |
| 388 | + timestamp_column = "timestamp" |
| 389 | + if isinstance(first_args, dict): |
| 390 | + raw_timestamp_column = first_args.get("timestamp_column") |
| 391 | + if isinstance(raw_timestamp_column, str) and raw_timestamp_column.strip(): |
| 392 | + timestamp_column = raw_timestamp_column |
| 393 | + |
| 394 | + relative_timestamps = load_relative_timestamps( |
| 395 | + path, timestamp_column=timestamp_column |
| 396 | + ) |
| 397 | + data_samples = kwargs.get("data_samples", -1) |
| 398 | + if isinstance(data_samples, int) and data_samples > 0: |
| 399 | + relative_timestamps = relative_timestamps[:data_samples] |
| 400 | + |
| 401 | + if not relative_timestamps: |
| 402 | + raise ValueError( |
| 403 | + "No timestamps remain after applying data_samples. " |
| 404 | + "The trace is empty or all events were filtered out." |
| 405 | + ) |
| 406 | + |
| 407 | + constraints = dict(kwargs.get("constraints") or {}) |
| 408 | + if not any( |
| 409 | + key in constraints |
| 410 | + for key in ("max_number", "max_num", "max_requests", "max_req") |
| 411 | + ): |
| 412 | + constraints["max_requests"] = len(relative_timestamps) |
| 413 | + |
| 414 | + return { |
| 415 | + "relative_timestamps": relative_timestamps, |
| 416 | + "time_scale": time_scale, |
| 417 | + "constraints": constraints, |
| 418 | + } |
| 419 | + |
| 420 | + @property |
| 421 | + def strategy_types(self) -> list[str]: |
| 422 | + return ["trace"] |
| 423 | + |
| 424 | + def next_strategy( |
| 425 | + self, |
| 426 | + prev_strategy: SchedulingStrategy | None, |
| 427 | + prev_benchmark: Benchmark | None, |
| 428 | + ) -> TraceReplayStrategy | None: |
| 429 | + _ = prev_benchmark |
| 430 | + # Replay has a single strategy; return it once, then None |
| 431 | + if prev_strategy is not None: |
| 432 | + return None |
| 433 | + return TraceReplayStrategy( |
| 434 | + relative_timestamps=self.relative_timestamps, |
| 435 | + time_scale=self.time_scale, |
| 436 | + ) |
| 437 | + |
| 438 | + |
331 | 439 | @Profile.register("concurrent") |
332 | 440 | class ConcurrentProfile(Profile): |
333 | 441 | """ |
|
0 commit comments