@@ -209,6 +209,11 @@ class vLLMEngineWrapper:
209209 Args:
210210 *args: The positional arguments for the engine.
211211 max_pending_requests: The maximum number of pending requests in the queue.
212+ If None, it will be auto-resolved to
213+ ``ceil(1.1 * max_num_seqs * pipeline_parallel_size)`` using values
214+ from vLLM's resolved engine config (so the default tracks vLLM's
215+ GPU-dependent ``max_num_seqs``). Pass a non-positive value (e.g.
216+ ``-1``) to disable the semaphore entirely.
212217 dynamic_lora_loading_path: The S3 path to the dynamic LoRA adapter.
213218 log_engine_metrics: Whether to export vLLM metrics to Ray's Prometheus endpoint.
214219 **kwargs: The keyword arguments for the engine.
@@ -217,7 +222,7 @@ class vLLMEngineWrapper:
217222 def __init__ (
218223 self ,
219224 idx_in_batch_column : str ,
220- max_pending_requests : int = - 1 ,
225+ max_pending_requests : Optional [ int ] = None ,
221226 dynamic_lora_loading_path : Optional [str ] = None ,
222227 log_engine_metrics : bool = True ,
223228 ** kwargs ,
@@ -294,8 +299,29 @@ def __init__(
294299
295300 # The performance gets really bad if there are too many requests in the pending queue.
296301 # We work around it with semaphore to limit the number of concurrent requests in the engine.
302+ # When the caller did not specify a limit, derive it from the resolved
303+ # vLLM config rather than from raw engine_kwargs. vLLM's default
304+ # `max_num_seqs` is GPU-dependent (e.g. 256 on A10G/A100, 1024 on H100),
305+ # so reading from `scheduler_config` avoids silently capping the
306+ # semaphore below vLLM's actual capacity.
307+ scheduler_config = self ._vllm_config .scheduler_config
308+ parallel_config = self ._vllm_config .parallel_config
309+ engine_capacity = (
310+ scheduler_config .max_num_seqs * parallel_config .pipeline_parallel_size
311+ )
312+ if max_pending_requests is None :
313+ max_pending_requests = math .ceil (engine_capacity * 1.1 )
314+ elif 0 < max_pending_requests < engine_capacity :
315+ logger .warning (
316+ "max_pending_requests (%d) < max_num_seqs * pipeline_parallel_size "
317+ "(%d); may underutilize vLLM. Consider >=%d, or <=0 to disable." ,
318+ max_pending_requests ,
319+ engine_capacity ,
320+ math .ceil (engine_capacity * 1.1 ),
321+ )
297322 self .max_pending_requests = max_pending_requests
298323 if self .max_pending_requests > 0 :
324+ logger .info ("Max pending requests is set to %d" , self .max_pending_requests )
299325 self .semaphore = asyncio .Semaphore (self .max_pending_requests )
300326 else :
301327 self .semaphore = asyncio .NullContext ()
@@ -612,7 +638,10 @@ def __init__(
612638 engine_kwargs: The kwargs to pass to the vLLM engine.
613639 task_type: The task to use for the vLLM engine (e.g., "generate", "embed", etc).
614640 max_pending_requests: The maximum number of pending requests. If None,
615- it will be set to 1.1 * max_num_seqs * pipeline_parallel_size.
641+ it will be set to ``ceil(1.1 * max_num_seqs * pipeline_parallel_size)``,
642+ where ``max_num_seqs`` and ``pipeline_parallel_size`` are read from
643+ vLLM's resolved engine config (so the default tracks vLLM's
644+ GPU-dependent ``max_num_seqs``, not a hardcoded value).
616645 dynamic_lora_loading_path: The path to the dynamic LoRA adapter. It is expected
617646 to hold subfolders each for a different lora checkpoint.
618647 should_continue_on_error: If True, continue processing when inference fails for
@@ -629,14 +658,6 @@ def __init__(
629658 self .task_type = task_type
630659 self .engine_kwargs = self .normalize_engine_kwargs (engine_kwargs )
631660
632- # Set up the max pending requests.
633- pp_size = self .engine_kwargs .get ("pipeline_parallel_size" , 1 )
634- self .max_pending_requests = max_pending_requests or math .ceil (
635- self .engine_kwargs .get ("max_num_seqs" , 128 ) * pp_size * 1.1
636- )
637- if self .max_pending_requests > 0 :
638- logger .info ("Max pending requests is set to %d" , self .max_pending_requests )
639-
640661 exclude_safetensors = (
641662 self .engine_kwargs .get ("load_format" ) in STREAMING_LOAD_FORMATS
642663 )
@@ -662,11 +683,14 @@ def __init__(
662683 model_source = source ,
663684 idx_in_batch_column = self .IDX_IN_BATCH_COLUMN ,
664685 enable_log_requests = False ,
665- max_pending_requests = self . max_pending_requests ,
686+ max_pending_requests = max_pending_requests ,
666687 dynamic_lora_loading_path = dynamic_lora_loading_path ,
667688 log_engine_metrics = log_engine_metrics ,
668689 ** self .engine_kwargs ,
669690 )
691+ # The wrapper resolves a None into a concrete value using vLLM's
692+ # resolved engine config; surface that back on the UDF.
693+ self .max_pending_requests = self .llm .max_pending_requests
670694
671695 max_num_seqs = self .llm .get_scheduler_config ().max_num_seqs
672696 if batch_size * max_concurrent_batches < max_num_seqs :
0 commit comments