Skip to content

Commit 99510de

Browse files
authored
fix: resolve async_openai lint and adaptive concurrency issues (#1082)
* add adaptive/static/baseline for throughput check * add adaptive/static/baseline for throughput check * Fix formatting and add molmo throughput compare script * fix(vllm): pass chat_template directly for lint stability * remove compare scripts * Fix async_openai lint and adaptive concurrency path * fix: address P0 concurrency control and lint issues * fix: harden P1 concurrency control and scheduling
1 parent 59ca7e3 commit 99510de

4 files changed

Lines changed: 525 additions & 336 deletions

File tree

lmms_eval/models/chat/async_openai.py

Lines changed: 139 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,32 @@
22
import os
33
import shutil
44
import tempfile
5+
import time
56
import uuid
67
from multiprocessing import cpu_count
78
from typing import List, Optional, Tuple
89

910
from accelerate import Accelerator, DistributedType
11+
from dotenv import load_dotenv
12+
from loguru import logger as eval_logger
13+
from openai import AsyncOpenAI
1014
from tqdm import tqdm
1115

1216
from lmms_eval.api.instance import Instance
1317
from lmms_eval.api.model import lmms
1418
from lmms_eval.api.registry import register_model
1519
from lmms_eval.imports import optional_import
20+
from lmms_eval.models.model_utils.concurrency_control import (
21+
AdaptiveConcurrencyConfig,
22+
decide_next_concurrency,
23+
is_rate_limit_error,
24+
parse_bool,
25+
)
26+
from lmms_eval.protocol import ChatMessages
1627

1728
VideoReader, _ = optional_import("decord", "VideoReader")
1829
cpu, _ = optional_import("decord", "cpu")
1930

20-
from dotenv import load_dotenv
21-
from loguru import logger as eval_logger
22-
from openai import AsyncOpenAI
23-
24-
from lmms_eval.mcp import MCPClient
25-
from lmms_eval.protocol import ChatMessages
26-
2731
load_dotenv(verbose=True)
2832

2933

@@ -37,6 +41,7 @@ def __init__(
3741
base_url: str = None,
3842
api_key: str = None,
3943
timeout: int = 600,
44+
retry_backoff_s: Optional[float] = None,
4045
max_retries: int = 5,
4146
max_size_in_mb: int = 20,
4247
mcp_server_path: str = None,
@@ -48,11 +53,19 @@ def __init__(
4853
max_pixels: Optional[int] = 151200,
4954
min_pixels: Optional[int] = 28 * 28,
5055
is_qwen3_vl: bool = False,
56+
adaptive_concurrency: bool = False,
57+
adaptive_min_concurrency: int = 1,
58+
adaptive_max_concurrency: int = 128,
59+
adaptive_target_latency_s: float = 15.0,
60+
adaptive_increase_step: float = 0.1,
61+
adaptive_decrease_factor: float = 0.7,
62+
adaptive_failure_threshold: float = 0.05,
5163
**kwargs,
5264
) -> None:
5365
super().__init__()
5466
self.model_version = model_version
5567
self.timeout = timeout
68+
self.retry_backoff_s = max(0.0, float(1.0 if retry_backoff_s is None else retry_backoff_s))
5669
self.max_retries = max_retries
5770
self.max_size_in_mb = max_size_in_mb # some models have a limit on the size of the image
5871
if num_cpus is None:
@@ -69,7 +82,18 @@ def __init__(
6982
self.min_pixels = min_pixels
7083
self.max_frames = max_frames
7184
self.is_qwen3_vl = is_qwen3_vl
85+
self.adaptive_concurrency = parse_bool(adaptive_concurrency)
86+
self.adaptive_config = AdaptiveConcurrencyConfig.from_raw(
87+
min_concurrency=adaptive_min_concurrency,
88+
max_concurrency=adaptive_max_concurrency,
89+
target_latency_s=adaptive_target_latency_s,
90+
increase_step=adaptive_increase_step,
91+
decrease_factor=adaptive_decrease_factor,
92+
failure_threshold=adaptive_failure_threshold,
93+
)
7294
if mcp_server_path is not None:
95+
from lmms_eval.mcp import MCPClient
96+
7397
self.mcp_client = MCPClient(mcp_server_path)
7498
os.makedirs(self.work_dir, exist_ok=True)
7599
else:
@@ -238,19 +262,117 @@ def generate_until(self, requests) -> List[str]:
238262
results, requests = self.get_response_from_cache(requests)
239263

240264
async def run():
241-
res = []
265+
res: List[Tuple[str, int]] = []
242266
pbar = tqdm(total=len(requests), disable=(self.rank != 0), desc="Model Responding")
243-
sem = asyncio.Semaphore(self.num_cpus)
267+
current_concurrency = (
268+
min(
269+
max(1, self.num_cpus),
270+
self.adaptive_config.max_concurrency,
271+
)
272+
if self.adaptive_concurrency
273+
else max(1, self.num_cpus)
274+
)
275+
cursor = 0
244276

245277
async def _process(req, idx):
246-
async with sem:
247-
return await self.maybe_forward_with_tool(req, idx)
248-
249-
tasks = [asyncio.create_task(_process(req, idx)) for idx, req in enumerate(requests)]
250-
for task in asyncio.as_completed(tasks):
251-
content, idx = await task
252-
res.append((content, idx))
253-
pbar.update(1)
278+
started_at = time.time()
279+
rate_limited = False
280+
last_error_msg = "unknown error"
281+
for attempt in range(self.max_retries):
282+
try:
283+
content, original_idx = await self.maybe_forward_with_tool(req, idx)
284+
elapsed = time.time() - started_at
285+
return content, original_idx, True, rate_limited, elapsed
286+
except Exception as exc:
287+
error_msg = str(exc)
288+
last_error_msg = error_msg
289+
rate_limited = rate_limited or is_rate_limit_error(error_msg)
290+
eval_logger.info(f"Attempt {attempt + 1}/{self.max_retries} failed for request {idx} with error: {error_msg}")
291+
if attempt == self.max_retries - 1:
292+
eval_logger.error(f"All {self.max_retries} attempts failed. Last error: {error_msg}")
293+
else:
294+
await asyncio.sleep(self.retry_backoff_s)
295+
296+
elapsed = time.time() - started_at
297+
error_preview = last_error_msg.replace("\n", " ")[:200]
298+
failure_content = f"[LMMS_EVAL_REQUEST_FAILED after {self.max_retries} retries] {error_preview}"
299+
return failure_content, idx, False, rate_limited, elapsed
300+
301+
failed_requests = 0
302+
rate_limited_requests = 0
303+
request_latencies: List[float] = []
304+
completed_since_adapt = 0
305+
in_flight: dict[asyncio.Task, int] = {}
306+
307+
def maybe_update_concurrency(force: bool = False) -> None:
308+
nonlocal current_concurrency
309+
nonlocal failed_requests
310+
nonlocal rate_limited_requests
311+
nonlocal request_latencies
312+
nonlocal completed_since_adapt
313+
314+
if not self.adaptive_concurrency:
315+
return
316+
317+
sample_threshold = max(4, current_concurrency)
318+
if not force and completed_since_adapt < sample_threshold:
319+
return
320+
if completed_since_adapt <= 0:
321+
return
322+
323+
decision = decide_next_concurrency(
324+
current_concurrency=current_concurrency,
325+
total_requests=completed_since_adapt,
326+
failed_requests=failed_requests,
327+
rate_limited_requests=rate_limited_requests,
328+
latencies=request_latencies,
329+
config=self.adaptive_config,
330+
)
331+
if decision.next_concurrency != decision.current_concurrency:
332+
eval_logger.info(
333+
"Adaptive concurrency update: "
334+
f"{decision.current_concurrency} -> "
335+
f"{decision.next_concurrency} "
336+
f"(fail_rate={decision.failure_rate:.3f}, "
337+
f"rate_limit_rate={decision.rate_limit_rate:.3f}, "
338+
f"p95_latency={decision.p95_latency_s:.3f}s)"
339+
)
340+
current_concurrency = decision.next_concurrency
341+
failed_requests = 0
342+
rate_limited_requests = 0
343+
request_latencies = []
344+
completed_since_adapt = 0
345+
346+
while cursor < len(requests) or in_flight:
347+
while cursor < len(requests) and len(in_flight) < max(1, current_concurrency):
348+
task = asyncio.create_task(_process(requests[cursor], cursor))
349+
in_flight[task] = cursor
350+
cursor += 1
351+
352+
if not in_flight:
353+
break
354+
355+
done, _ = await asyncio.wait(in_flight, return_when=asyncio.FIRST_COMPLETED)
356+
for task in done:
357+
in_flight.pop(task, None)
358+
(
359+
content,
360+
request_idx,
361+
success,
362+
rate_limited,
363+
elapsed,
364+
) = task.result()
365+
res.append((content, request_idx))
366+
if not success:
367+
failed_requests += 1
368+
if rate_limited:
369+
rate_limited_requests += 1
370+
request_latencies.append(elapsed)
371+
completed_since_adapt += 1
372+
pbar.update(1)
373+
maybe_update_concurrency(force=False)
374+
375+
maybe_update_concurrency(force=True)
254376

255377
pbar.close()
256378
return res

0 commit comments

Comments
 (0)