Skip to content

Commit 9e9dd70

Browse files
jsondaicopybara-github
authored andcommitted
chore: GenAI Client(evals) - Add client-side rate limiter to GenAI Eval SDK
PiperOrigin-RevId: 897323204
1 parent 28e5f95 commit 9e9dd70

5 files changed

Lines changed: 146 additions & 4 deletions

File tree

tests/unit/vertexai/genai/test_evals.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7044,3 +7044,53 @@ def test_create_evaluation_set_with_agent_data(
70447044
candidate_response = candidate_responses[0]
70457045
assert candidate_response["candidate"] == "test-candidate"
70467046
assert candidate_response["agent_data"] == agent_data
7047+
7048+
7049+
class TestRateLimiter:
7050+
"""Tests for the RateLimiter class in _evals_utils."""
7051+
7052+
def test_rate_limiter_init(self):
7053+
"""Tests that RateLimiter initializes correctly."""
7054+
limiter = _evals_utils.RateLimiter(rate=10.0)
7055+
assert limiter.seconds_per_event == pytest.approx(0.1)
7056+
7057+
def test_rate_limiter_invalid_rate(self):
7058+
"""Tests that RateLimiter raises ValueError for non-positive rate."""
7059+
with pytest.raises(ValueError, match="Rate must be a positive number"):
7060+
_evals_utils.RateLimiter(rate=0)
7061+
with pytest.raises(ValueError, match="Rate must be a positive number"):
7062+
_evals_utils.RateLimiter(rate=-1)
7063+
7064+
@mock.patch("time.sleep", return_value=None)
7065+
@mock.patch("time.monotonic")
7066+
def test_rate_limiter_sleep_and_advance(self, mock_monotonic, mock_sleep):
7067+
"""Tests that sleep_and_advance properly throttles calls."""
7068+
# With rate=10 (0.1s interval):
7069+
# - __init__ at t=0: _next_allowed = 0.0
7070+
# - first call at t=0: no delay, _next_allowed = 0.1
7071+
# - second call at t=0.01: delay = 0.1 - 0.01 = 0.09
7072+
mock_monotonic.side_effect = [
7073+
0.0, # __init__: time.monotonic()
7074+
0.0, # first sleep_and_advance: now
7075+
0.01, # second sleep_and_advance: now
7076+
]
7077+
limiter = _evals_utils.RateLimiter(rate=10.0)
7078+
limiter.sleep_and_advance() # First call - should not sleep
7079+
limiter.sleep_and_advance() # Second call - should sleep
7080+
assert mock_sleep.call_count == 1
7081+
# Verify sleep was called with approximately the right delay
7082+
sleep_delay = mock_sleep.call_args[0][0]
7083+
assert 0.08 < sleep_delay <= 0.1
7084+
7085+
def test_rate_limiter_no_sleep_when_enough_time_passed(self):
7086+
"""Tests that no sleep occurs when enough time has passed."""
7087+
import time as real_time
7088+
7089+
limiter = _evals_utils.RateLimiter(rate=1000.0) # Very high rate
7090+
# With rate=1000, interval is 0.001s - should not sleep
7091+
start = real_time.time()
7092+
for _ in range(5):
7093+
limiter.sleep_and_advance()
7094+
elapsed = real_time.time() - start
7095+
# 5 calls at 1000 QPS should take ~0.005s, certainly under 1s
7096+
assert elapsed < 1.0

vertexai/_genai/_evals_common.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1538,6 +1538,7 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
15381538
dataset_schema: Optional[Literal["GEMINI", "FLATTEN", "OPENAI"]] = None,
15391539
dest: Optional[str] = None,
15401540
location: Optional[str] = None,
1541+
evaluation_service_qps: Optional[float] = None,
15411542
**kwargs,
15421543
) -> types.EvaluationResult:
15431544
"""Evaluates a dataset using the provided metrics.
@@ -1550,6 +1551,9 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
15501551
dest: The destination to save the evaluation results.
15511552
location: The location to use for the evaluation. If not specified, the
15521553
location configured in the client will be used.
1554+
evaluation_service_qps: The rate limit (queries per second) for calls
1555+
to the evaluation service. Defaults to 10. Increase this value if
1556+
your project has a higher EvaluateInstances API quota.
15531557
**kwargs: Extra arguments to pass to evaluation, such as `agent_info`.
15541558
15551559
Returns:
@@ -1625,7 +1629,8 @@ def _execute_evaluation( # type: ignore[no-untyped-def]
16251629
logger.info("Running Metric Computation...")
16261630
t1 = time.perf_counter()
16271631
evaluation_result = _evals_metric_handlers.compute_metrics_and_aggregate(
1628-
evaluation_run_config
1632+
evaluation_run_config,
1633+
evaluation_service_qps=evaluation_service_qps,
16291634
)
16301635
t2 = time.perf_counter()
16311636
logger.info("Evaluation took: %f seconds", t2 - t1)

vertexai/_genai/_evals_metric_handlers.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
from . import _evals_common
3333
from . import _evals_constant
34+
from . import _evals_utils
3435
from . import evals
3536
from . import types
3637

@@ -1498,10 +1499,29 @@ class EvaluationRunConfig(_common.BaseModel):
14981499
"""The number of response candidates for the evaluation run."""
14991500

15001501

1502+
def _rate_limited_get_metric_result(
1503+
rate_limiter: _evals_utils.RateLimiter,
1504+
handler: MetricHandler[Any],
1505+
eval_case: types.EvalCase,
1506+
response_index: int,
1507+
) -> types.EvalCaseMetricResult:
1508+
"""Wraps a handler's get_metric_result with rate limiting."""
1509+
rate_limiter.sleep_and_advance()
1510+
return handler.get_metric_result(eval_case, response_index)
1511+
1512+
15011513
def compute_metrics_and_aggregate(
15021514
evaluation_run_config: EvaluationRunConfig,
1515+
evaluation_service_qps: Optional[float] = None,
15031516
) -> types.EvaluationResult:
1504-
"""Computes metrics and aggregates them for a given evaluation run config."""
1517+
"""Computes metrics and aggregates them for a given evaluation run config.
1518+
1519+
Args:
1520+
evaluation_run_config: The configuration for the evaluation run.
1521+
evaluation_service_qps: Optional QPS limit for the evaluation service.
1522+
Defaults to _DEFAULT_EVAL_SERVICE_QPS (10). Users with higher
1523+
quotas can increase this value.
1524+
"""
15051525
metric_handlers = []
15061526
all_futures = []
15071527
results_by_case_response_metric: collections.defaultdict[
@@ -1511,6 +1531,12 @@ def compute_metrics_and_aggregate(
15111531
execution_errors = []
15121532
case_indices_with_errors = set()
15131533

1534+
if evaluation_service_qps is not None and evaluation_service_qps <= 0:
1535+
raise ValueError("evaluation_service_qps must be a positive number.")
1536+
qps = evaluation_service_qps or _evals_utils._DEFAULT_EVAL_SERVICE_QPS
1537+
rate_limiter = _evals_utils.RateLimiter(rate=qps)
1538+
logger.info("Rate limiting evaluation service requests to %.1f QPS.", qps)
1539+
15141540
for eval_metric in evaluation_run_config.metrics:
15151541
metric_handlers.append(
15161542
get_handler_for_metric(evaluation_run_config.evals_module, eval_metric)
@@ -1553,7 +1579,9 @@ def compute_metrics_and_aggregate(
15531579
for response_index in range(actual_num_candidates_for_case):
15541580
try:
15551581
future = executor.submit(
1556-
metric_handler_instance.get_metric_result,
1582+
_rate_limited_get_metric_result,
1583+
rate_limiter,
1584+
metric_handler_instance,
15571585
eval_case,
15581586
response_index,
15591587
)

vertexai/_genai/_evals_utils.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import json
2020
import logging
2121
import os
22+
import threading
2223
import time
2324
from typing import Any, Optional, Union
2425

@@ -38,12 +39,59 @@
3839

3940
GCS_PREFIX = "gs://"
4041
BQ_PREFIX = "bq://"
42+
_DEFAULT_EVAL_SERVICE_QPS = 10
43+
44+
45+
class RateLimiter:
46+
"""Helper class for rate-limiting requests to Vertex AI to improve QoS.
47+
48+
Implements a token bucket algorithm to limit the rate at which API calls
49+
can occur. Designed for cases where the batch size is always 1 for traffic
50+
shaping and rate limiting.
51+
52+
Attributes:
53+
seconds_per_event: The time interval (in seconds) between events to
54+
maintain the desired rate.
55+
last: The timestamp of the last event.
56+
_lock: A lock to ensure thread safety.
57+
"""
58+
59+
def __init__(self, rate: float) -> None:
60+
"""Initializes the rate limiter.
61+
62+
Args:
63+
rate: The number of queries allowed per second.
64+
65+
Raises:
66+
ValueError: If the rate is not positive.
67+
"""
68+
if not rate or rate <= 0:
69+
raise ValueError("Rate must be a positive number")
70+
self.seconds_per_event = 1.0 / rate
71+
self._next_allowed = time.monotonic()
72+
self._lock = threading.Lock()
73+
74+
def sleep_and_advance(self) -> None:
75+
"""Blocks the current thread until the next event can be admitted.
76+
77+
The lock is held only long enough to reserve a time slot. The
78+
actual sleep happens outside the lock so that multiple threads
79+
can be sleeping concurrently with staggered wake-up times.
80+
"""
81+
with self._lock:
82+
now = time.monotonic()
83+
wait_until = max(now, self._next_allowed)
84+
delay = wait_until - now
85+
self._next_allowed = wait_until + self.seconds_per_event
86+
87+
if delay > 0:
88+
time.sleep(delay)
4189

4290

4391
class EvalDatasetLoader:
4492
"""A loader for datasets from various sources, using a shared client."""
4593

46-
def __init__(self, api_client: BaseApiClient):
94+
def __init__(self, api_client: BaseApiClient) -> None:
4795
self.api_client = api_client
4896
self.gcs_utils = _gcs_utils.GcsUtils(self.api_client)
4997
self.bigquery_utils = _bigquery_utils.BigQueryUtils(self.api_client)

vertexai/_genai/types/common.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15453,6 +15453,12 @@ class EvaluateMethodConfig(_common.BaseModel):
1545315453
dest: Optional[str] = Field(
1545415454
default=None, description="""The destination path for the evaluation results."""
1545515455
)
15456+
evaluation_service_qps: Optional[float] = Field(
15457+
default=None,
15458+
description="""The rate limit (queries per second) for calls to the
15459+
evaluation service. Defaults to 10. Increase this value if your
15460+
project has a higher EvaluateInstances API quota.""",
15461+
)
1545615462

1545715463

1545815464
class EvaluateMethodConfigDict(TypedDict, total=False):
@@ -15469,6 +15475,11 @@ class EvaluateMethodConfigDict(TypedDict, total=False):
1546915475
dest: Optional[str]
1547015476
"""The destination path for the evaluation results."""
1547115477

15478+
evaluation_service_qps: Optional[float]
15479+
"""The rate limit (queries per second) for calls to the
15480+
evaluation service. Defaults to 10. Increase this value if your
15481+
project has a higher EvaluateInstances API quota."""
15482+
1547215483

1547315484
EvaluateMethodConfigOrDict = Union[EvaluateMethodConfig, EvaluateMethodConfigDict]
1547415485

0 commit comments

Comments
 (0)