Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
296de27
serve: install uvloop as default asyncio event loop policy
aswinvisva Jun 24, 2026
e496d7a
inputs/media_io: route media-load work through a dedicated ThreadPool…
aswinvisva Jun 24, 2026
2304bc1
serve/openai_server: route HF input-processor calls through a dedicat…
aswinvisva Jun 24, 2026
d400639
inputs/media_io: BytesIO + uint8 numpy fast path for video decode
aswinvisva Jun 24, 2026
8473609
qwen3vl: skip HF frame sampling when frames are already sampled by th…
aswinvisva Jun 24, 2026
33bcd91
qwen2/3vl: memoize HF processor _merge_kwargs by call signature
aswinvisva Jun 24, 2026
d4961d8
requirements: add uvloop
aswinvisva Jun 24, 2026
c6a8acf
address review feedback
aswinvisva Jun 24, 2026
82e8079
serve: install uvloop after all imports to avoid try/except wrapper
aswinvisva Jun 24, 2026
17127de
address review feedback
aswinvisva Jun 26, 2026
f7b6941
qwen3vl: strip io_loaded_all_frames before HF VideoMetadata
aswinvisva Jun 26, 2026
b0cd053
swap signature-keyed merge_kwargs memoize for HF schema cache
aswinvisva Jun 26, 2026
1b31d05
media_io: lazy_loader for cv2 + unit tests for new control flow
aswinvisva Jun 26, 2026
1611aa8
address review feedback: media_io rename, comment, staticmethod; drop…
aswinvisva Jun 29, 2026
2f4a233
media_io: suppress bandit B108 on /dev/shm tempdir literal
aswinvisva Jun 29, 2026
de7c65f
qwen3vl: null fps when caller sets only num_frames so HF respects the…
aswinvisva Jun 29, 2026
5419c57
qwen3vl: sample video frames when caller gives no num_frames/fps
aswinvisva Jun 29, 2026
be7286b
qwen3vl: update silent-caller do_sample_frames unit test
aswinvisva Jun 30, 2026
be3b69a
qwen3vl: register new serve CLI options in API stability reference
aswinvisva Jul 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ cuda-python>=13
diffusers>=0.37.1
ftfy
lark
lazy_loader~=0.5
mpi4py
numpy>=2.0.0,<2.4 # numba 0.63.1 requires numpy<2.4
onnx>=1.21.0
Expand Down Expand Up @@ -91,3 +92,4 @@ smg-grpc-proto>=0.4.2
cache-dit>=1.3.5
librosa
msgpack
uvloop>=0.19.0
33 changes: 32 additions & 1 deletion tensorrt_llm/_torch/models/modeling_multimodal_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import functools
import math
import os
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast
from typing import (Any, Callable, Dict, List, Optional, Tuple, TypedDict,
Union, cast)

import torch
import torch.nn.functional as F
Expand Down Expand Up @@ -63,6 +64,34 @@ def has_raw_multimodal_payload(param: MultimodalParams) -> bool:
})


def _get_cached_merged_typed_dict(schema, cache):
"""Return a stable copy of ProcessorMixin's ephemeral merged TypedDict.

`ProcessorMixin._merge_kwargs` creates a fresh
`TypedDict("merged_typed_dict", ...)` for image/video kwargs on every
processor call. `huggingface_hub` caches strict dataclass validators by
schema-object identity, so a fresh type defeats that cache. Reuse an
equivalent TypedDict class keyed by (totality, annotation items) so the
upstream validator hits cache and skips the recursive type validation.
"""
if getattr(schema, "__name__", None) != "merged_typed_dict":
return schema
try:
cache_key = (getattr(schema, "__total__",
True), tuple(schema.__annotations__.items()))
cached_schema = cache.get(cache_key)
except TypeError:
return schema
if cached_schema is None:
cached_schema = TypedDict(
"merged_typed_dict",
dict(schema.__annotations__),
total=getattr(schema, "__total__", True),
)
cache[cache_key] = cached_schema
return cached_schema


@functools.lru_cache(maxsize=None)
def _install_processor_output_validation_filter():
"""Install a process-wide filter over transformers' ``validate_typed_dict``.
Expand Down Expand Up @@ -107,13 +136,15 @@ def _install_processor_output_validation_filter():
"No transformers module exposes validate_typed_dict; "
"cannot patch processor output validation.")
base_orig = binders[0].validate_typed_dict
merged_schema_cache: dict = {}

def _filtered_validate(schema, data):
if isinstance(data, dict):
data = {
k: v
for k, v in data.items() if k not in _PROCESSOR_OUTPUT_KEYS
}
schema = _get_cached_merged_typed_dict(schema, merged_schema_cache)
return base_orig(schema, data)

for b in binders:
Expand Down
105 changes: 92 additions & 13 deletions tensorrt_llm/_torch/models/modeling_qwen3vl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.

import copy
import math
import re
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -161,6 +162,61 @@ def _expand_prompt_token_ids_for_mm_handoff(
)


def _decide_do_sample_frames(
video_datas: Optional[List[Any]],
mm_processor_kwargs: Dict[str, Any],
) -> bool:
"""Pick a single `do_sample_frames` flag for the HF processor call.

HF's video processor takes a scalar `do_sample_frames` that applies to
every video in the request. Decide it as follows:

1. If `mm_processor_kwargs.do_sample_frames` is explicitly set
(True or False), honor it.
2. If the caller supplies no frame target (`num_frames` / `fps`),
match HF's class default, which samples frames (returns True).
3. Otherwise, for each video compute the target frame count from the
kwargs (`num_frames` directly, or `floor(duration * fps)` if
`fps` is given) and compare to `len(vd.frames)`. If any video
needs a different count, the batch is sampled (returns True).

Per-video targets that match the IO-decoded count don't need HF
sampling; the all-or-nothing reduction over the batch means a single
video needing resampling pulls the rest along through a no-op
identity `np.linspace`.
"""
if "do_sample_frames" in mm_processor_kwargs:
return bool(mm_processor_kwargs["do_sample_frames"])

if not video_datas:
return False

user_num_frames = mm_processor_kwargs.get("num_frames")
user_fps = mm_processor_kwargs.get("fps")
has_num_frames = user_num_frames is not None and user_num_frames != -1
has_fps = user_fps is not None and user_fps != -1

# No explicit frame target from the caller: defer to HF's class-default
# sampling (the stock processor sets `do_sample_frames=True` when neither
# `num_frames` nor `fps` is given). Returning False here would hand the
# IO-decoded frames straight to HF unchanged and diverge from stock HF
# whenever the IO loader decoded a different number of frames than HF's
# default sampler would select.
if not has_num_frames and not has_fps:
return True

for vd in video_datas:
n_decoded = len(vd.frames)
if has_num_frames:
n_target = user_num_frames
else: # has_fps
duration = (vd.metadata or {}).get("duration") or 0
n_target = math.floor(duration * user_fps)
if n_target != n_decoded:
return True
return False


class Qwen3VLInputProcessorBase(Qwen2VLInputProcessorBase):
"""Qwen3-VL input processor.

Expand Down Expand Up @@ -253,19 +309,42 @@ def _preprocess(
if videos and isinstance(videos[0][0], torch.Tensor):
do_rescale = False

# Forward video metadata only when the caller opts into per-request kwargs;
# the default path pre-samples frames in the IO loader, so unconditional
# metadata triggers IndexError in HF's _decode_and_sample_videos.
video_metadata = (
[vd.metadata for vd in video_datas] if video_datas and mm_processor_kwargs else None
)

# num_frames and fps are mutually exclusive in the HF processor's sample_frames.
# If the caller set num_frames without fps, null fps explicitly so the class-level
# default fps=2 does not interfere.
proc_kwargs = dict(mm_processor_kwargs)
if "num_frames" in proc_kwargs and "fps" not in proc_kwargs:
proc_kwargs["fps"] = None
do_sample_frames = _decide_do_sample_frames(video_datas, mm_processor_kwargs)

# Pass `do_sample_frames` plus, when sampling is needed, the
# caller's `num_frames` / `fps` target. Everything else the caller
# supplied (resize, normalize knobs, etc.) flows through unchanged.
proc_kwargs: Dict[str, Any] = {"do_sample_frames": do_sample_frames}
for k, v in mm_processor_kwargs.items():
if k in ("num_frames", "fps", "do_sample_frames"):
continue
proc_kwargs[k] = v
if do_sample_frames:
if "num_frames" in mm_processor_kwargs:
proc_kwargs["num_frames"] = mm_processor_kwargs["num_frames"]
if "fps" in mm_processor_kwargs:
proc_kwargs["fps"] = mm_processor_kwargs["fps"]
elif "num_frames" in mm_processor_kwargs:
# HF's `sample_frames` honors `num_frames` only when `fps` is
# not also set; the class-default `fps=2` would otherwise cap
# the returned count below the caller's requested
# `num_frames` for short clips. Null `fps` so `num_frames` is
# respected verbatim.
proc_kwargs["fps"] = None

# Forward per-video metadata with `total_num_frames` rewritten to the
# actual decoded frame count. HF's `sample_frames` computes indices
# via `np.linspace(0, total_num_frames - 1, num_frames)` and indexes
# the frame tensor with them; the rewrite keeps those indices in
# range and the no-sampling path consistent for downstream qwen3vl
# code that consults the metadata.
video_metadata: Optional[List[Dict[str, Any]]] = None
if video_datas:
video_metadata = []
for vd in video_datas:
m = dict(vd.metadata or {})
m["total_num_frames"] = len(vd.frames)
video_metadata.append(m)

return self.processor(
text=[text],
Expand Down
42 changes: 33 additions & 9 deletions tensorrt_llm/commands/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import click
import torch
import uvloop
import yaml
from strenum import StrEnum
from torch.cuda import device_count
Expand Down Expand Up @@ -350,7 +351,9 @@ def launch_server(
disagg_cluster_config: Optional[DisaggClusterConfig] = None,
multimodal_server_config: Optional[MultimodalServerConfig] = None,
served_model_name: Optional[str] = None,
allow_request_chat_template: bool = False):
allow_request_chat_template: bool = False,
input_processor_workers: int = 8,
media_load_workers: int = 8):

backend = llm_args["backend"]
model = served_model_name or llm_args["model"]
Expand Down Expand Up @@ -394,14 +397,16 @@ def launch_server(
disagg_cluster_config=disagg_cluster_config,
multimodal_server_config=multimodal_server_config,
chat_template=chat_template,
allow_request_chat_template=allow_request_chat_template)
allow_request_chat_template=allow_request_chat_template,
input_processor_workers=input_processor_workers,
media_load_workers=media_load_workers)
_apply_fastapi_middlewares(server.app, middleware)

# Optionally disable GC (default: not disabled)
if os.getenv("TRTLLM_SERVER_DISABLE_GC", "0") == "1":
gc.disable()

asyncio.run(server(host, port, sockets=[s]))
uvloop.run(server(host, port, sockets=[s]))


def launch_grpc_server(host: str,
Expand Down Expand Up @@ -527,7 +532,7 @@ def signal_handler():

logger.info("Shutdown complete")

asyncio.run(serve_grpc_async())
uvloop.run(serve_grpc_async())


def launch_mm_encoder_server(
Expand All @@ -548,7 +553,7 @@ def launch_mm_encoder_server(
metadata_server_cfg=metadata_server_cfg,
tool_parser=None,
allow_request_chat_template=allow_request_chat_template)
asyncio.run(server(host, port))
uvloop.run(server(host, port))


def launch_visual_gen_server(
Expand Down Expand Up @@ -601,7 +606,7 @@ def launch_visual_gen_server(
metadata_server_cfg=metadata_server_cfg,
tool_parser=None)
_apply_fastapi_middlewares(server.app, middleware)
asyncio.run(server(host, port, sockets=[s]))
uvloop.run(server(host, port, sockets=[s]))


class ChoiceWithAlias(click.Choice):
Expand Down Expand Up @@ -769,6 +774,22 @@ def convert(self, value: Any, param: Optional["click.Parameter"],
help="Number of workers to postprocess raw responses "
"to comply with OpenAI protocol.",
status="prototype")
@stability_option("--input-processor-workers",
"input_processor_workers",
type=click.IntRange(min=1),
default=8,
help="Size of the dedicated thread pool that runs the HF "
"input processor (multimodal preprocess) on the chat "
"and completion endpoints.",
status="prototype")
@stability_option("--media-load-workers",
"media_load_workers",
type=click.IntRange(min=1),
default=8,
help="Size of the dedicated thread pool that decodes media "
"payloads (image / video / audio) for multimodal "
"requests.",
status="prototype")
@stability_option("--trust_remote_code",
is_flag=True,
default=False,
Expand Down Expand Up @@ -945,7 +966,8 @@ def serve(
moe_expert_parallel_size: Optional[int],
moe_cluster_parallel_size: Optional[int], gpus_per_node: Optional[int],
free_gpu_memory_fraction: float, kv_cache_dtype: str,
num_postprocess_workers: int, trust_remote_code: bool,
num_postprocess_workers: int, input_processor_workers: int,
media_load_workers: int, trust_remote_code: bool,
revision: Optional[str], extra_llm_api_options: Optional[str],
reasoning_parser: Optional[str], tool_parser: Optional[str],
metadata_server_config_file: Optional[str], server_role: Optional[str],
Expand Down Expand Up @@ -1146,7 +1168,9 @@ def _serve_llm():
disagg_cluster_config,
multimodal_server_config,
served_model_name=served_model_name,
allow_request_chat_template=allow_request_chat_template)
allow_request_chat_template=allow_request_chat_template,
input_processor_workers=input_processor_workers,
media_load_workers=media_load_workers)

def _serve_visual_gen():
parsed_visual_gen_args = (VisualGenArgs.from_yaml(visual_gen_args)
Expand Down Expand Up @@ -1421,7 +1445,7 @@ def disaggregated(
if os.getenv("TRTLLM_DISAGG_SERVER_DISABLE_GC", "1") == "1":
gc.disable()

asyncio.run(server(disagg_cfg.hostname, disagg_cfg.port, sockets=[s]))
uvloop.run(server(disagg_cfg.hostname, disagg_cfg.port, sockets=[s]))


def set_cuda_device():
Expand Down
Loading
Loading