From 7e46278c27c279d29533c3fe3876c9f0a1fa64d4 Mon Sep 17 00:00:00 2001 From: zxy Date: Thu, 7 May 2026 17:41:50 +0800 Subject: [PATCH 1/4] fix: avoid starving server loop during VL request prep --- lmdeploy/serve/processors/multimodal.py | 39 ++++++++++++-- lmdeploy/vl/engine.py | 67 +++++++++++++++---------- 2 files changed, 75 insertions(+), 31 deletions(-) diff --git a/lmdeploy/serve/processors/multimodal.py b/lmdeploy/serve/processors/multimodal.py index bd9ff183df..6801ea273c 100644 --- a/lmdeploy/serve/processors/multimodal.py +++ b/lmdeploy/serve/processors/multimodal.py @@ -1,5 +1,6 @@ # Copyright (c) OpenMMLab. All rights reserved. import asyncio +from functools import partial from typing import Any, Literal import PIL @@ -37,6 +38,7 @@ def __init__(self, self.chat_template = chat_template self.vl_encoder = vl_encoder self.backend = backend + self.prompt_lock = asyncio.Lock() @staticmethod def merge_message_content(msg: dict) -> dict: @@ -343,6 +345,30 @@ async def _get_text_prompt_input(self, chat_template_kwargs: dict | None = None, **kwargs): """Process text-only prompt and return prompt string and input_ids.""" + loop = asyncio.get_event_loop() + async with self.prompt_lock: + return await loop.run_in_executor( + None, + partial(self._get_text_prompt_input_sync, + prompt=prompt, + do_preprocess=do_preprocess, + sequence_start=sequence_start, + adapter_name=adapter_name, + tools=tools, + reasoning_effort=reasoning_effort, + chat_template_kwargs=chat_template_kwargs, + **kwargs)) + + def _get_text_prompt_input_sync(self, + prompt: str | list[dict], + do_preprocess: bool, + sequence_start: bool, + adapter_name: str, + tools: list[object] | None = None, + reasoning_effort: Literal['low', 'medium', 'high'] | None = None, + chat_template_kwargs: dict | None = None, + **kwargs): + """Render and tokenize a text prompt.""" # Change multimodal data to openai text messages if isinstance(prompt, list): prompt = [self.merge_message_content(msg) for msg in prompt] @@ -392,10 +418,15 @@ async def _get_multimodal_prompt_input(self, chat_template_kwargs=chat_template_kwargs) elif self.backend == 'pytorch': if self.vl_encoder._uses_new_preprocess: - input_prompt = self.vl_encoder.model.get_input_prompt(messages=messages, - chat_template=chat_template, - sequence_start=sequence_start, - chat_template_kwargs=chat_template_kwargs) + loop = asyncio.get_event_loop() + async with self.prompt_lock: + input_prompt = await loop.run_in_executor( + None, + partial(self.vl_encoder.model.get_input_prompt, + messages=messages, + chat_template=chat_template, + sequence_start=sequence_start, + chat_template_kwargs=chat_template_kwargs)) results = await self.vl_encoder.preprocess(messages, input_prompt, mm_processor_kwargs) else: results = await self.vl_encoder.preprocess(messages, mm_processor_kwargs) diff --git a/lmdeploy/vl/engine.py b/lmdeploy/vl/engine.py index 6f92f3d81a..b89adfaa3e 100644 --- a/lmdeploy/vl/engine.py +++ b/lmdeploy/vl/engine.py @@ -3,6 +3,7 @@ import asyncio import inspect from concurrent.futures import ThreadPoolExecutor +from functools import partial from typing import Any import torch @@ -44,6 +45,7 @@ def __init__( self.vision_config = vision_config self.max_batch_size = vision_config.max_batch_size self.executor = ThreadPoolExecutor(max_workers=1) + self.executor_lock = asyncio.Lock() self._uses_new_preprocess = self._is_new_preprocess_api(self.model) torch.cuda.empty_cache() @@ -61,14 +63,14 @@ async def preprocess(self, input_prompt: str | list[int] | None = None, mm_processor_kwargs: dict[str, Any] | None = None) -> list[dict]: """Preprocess multimodal data in the messages.""" - if self._uses_new_preprocess: - future = asyncio.get_event_loop().run_in_executor( - self.executor, self.model.preprocess, messages, input_prompt, mm_processor_kwargs) - else: - future = asyncio.get_event_loop().run_in_executor( - self.executor, self.model.preprocess, messages) - future.add_done_callback(_raise_exception_on_finish) - outputs = await future + async with self.executor_lock: + if self._uses_new_preprocess: + future = asyncio.get_event_loop().run_in_executor( + self.executor, self.model.preprocess, messages, input_prompt, mm_processor_kwargs) + else: + future = asyncio.get_event_loop().run_in_executor(self.executor, self.model.preprocess, messages) + future.add_done_callback(_raise_exception_on_finish) + outputs = await future return outputs async def async_infer(self, messages: list[dict]) -> list[dict]: @@ -78,10 +80,11 @@ async def async_infer(self, messages: list[dict]) -> list[dict]: messages (list[dict]): a list of message, which is the output of `preprocess()` """ - future = asyncio.get_event_loop().run_in_executor(self.executor, self.model.forward, messages, - self.max_batch_size) - future.add_done_callback(_raise_exception_on_finish) - outputs = await future + async with self.executor_lock: + future = asyncio.get_event_loop().run_in_executor(self.executor, self.model.forward, messages, + self.max_batch_size) + future.add_done_callback(_raise_exception_on_finish) + outputs = await future return outputs async def wrap_for_pytorch( @@ -112,15 +115,20 @@ async def wrap_for_pytorch( } """ has_input_ids = self.model.has_input_ids(messages) - if not has_input_ids: - result = self.model.to_pytorch(messages, - chat_template, - tokenizer, - sequence_start, - tools=tools, - chat_template_kwargs=chat_template_kwargs) - else: - result = self.model.to_pytorch_with_input_ids(messages) + loop = asyncio.get_event_loop() + async with self.executor_lock: + if not has_input_ids: + result = await loop.run_in_executor( + self.executor, + partial(self.model.to_pytorch, + messages, + chat_template, + tokenizer, + sequence_start, + tools=tools, + chat_template_kwargs=chat_template_kwargs)) + else: + result = await loop.run_in_executor(self.executor, self.model.to_pytorch_with_input_ids, messages) # clear data for i, message in enumerate(messages): if isinstance(message['content'], list): @@ -153,12 +161,17 @@ async def wrap_for_turbomind( ... } """ - result = self.model.to_turbomind(messages, - chat_template, - tokenizer, - sequence_start, - tools=tools, - chat_template_kwargs=chat_template_kwargs) + loop = asyncio.get_event_loop() + async with self.executor_lock: + result = await loop.run_in_executor( + self.executor, + partial(self.model.to_turbomind, + messages, + chat_template, + tokenizer, + sequence_start, + tools=tools, + chat_template_kwargs=chat_template_kwargs)) # clear data for i, message in enumerate(messages): if isinstance(message['content'], list): From c74f8a260670166056442265c50af98ac3334985 Mon Sep 17 00:00:00 2001 From: zxy Date: Thu, 7 May 2026 18:09:33 +0800 Subject: [PATCH 2/4] fix: preserve executor gates after cancellation --- lmdeploy/serve/processors/multimodal.py | 8 +- lmdeploy/utils.py | 27 ++++++ lmdeploy/vl/engine.py | 14 +-- .../test_executor_cancellation.py | 87 +++++++++++++++++++ 4 files changed, 127 insertions(+), 9 deletions(-) create mode 100644 tests/test_lmdeploy/test_executor_cancellation.py diff --git a/lmdeploy/serve/processors/multimodal.py b/lmdeploy/serve/processors/multimodal.py index 6801ea273c..4644bc24ea 100644 --- a/lmdeploy/serve/processors/multimodal.py +++ b/lmdeploy/serve/processors/multimodal.py @@ -7,7 +7,7 @@ from lmdeploy.model import MODELS, BaseChatTemplate from lmdeploy.tokenizer import Tokenizer -from lmdeploy.utils import get_logger +from lmdeploy.utils import await_executor_future, get_logger from lmdeploy.vl.constants import Modality from lmdeploy.vl.media.connection import load_from_url from lmdeploy.vl.media.image import ImageMediaIO @@ -347,7 +347,7 @@ async def _get_text_prompt_input(self, """Process text-only prompt and return prompt string and input_ids.""" loop = asyncio.get_event_loop() async with self.prompt_lock: - return await loop.run_in_executor( + future = loop.run_in_executor( None, partial(self._get_text_prompt_input_sync, prompt=prompt, @@ -358,6 +358,7 @@ async def _get_text_prompt_input(self, reasoning_effort=reasoning_effort, chat_template_kwargs=chat_template_kwargs, **kwargs)) + return await await_executor_future(future) def _get_text_prompt_input_sync(self, prompt: str | list[dict], @@ -420,13 +421,14 @@ async def _get_multimodal_prompt_input(self, if self.vl_encoder._uses_new_preprocess: loop = asyncio.get_event_loop() async with self.prompt_lock: - input_prompt = await loop.run_in_executor( + future = loop.run_in_executor( None, partial(self.vl_encoder.model.get_input_prompt, messages=messages, chat_template=chat_template, sequence_start=sequence_start, chat_template_kwargs=chat_template_kwargs)) + input_prompt = await await_executor_future(future) results = await self.vl_encoder.preprocess(messages, input_prompt, mm_processor_kwargs) else: results = await self.vl_encoder.preprocess(messages, mm_processor_kwargs) diff --git a/lmdeploy/utils.py b/lmdeploy/utils.py index 7f9c891e14..f4fee2b3a8 100644 --- a/lmdeploy/utils.py +++ b/lmdeploy/utils.py @@ -15,6 +15,33 @@ logger_initialized = {} +async def await_executor_future(future: asyncio.Future): + """Await executor work without releasing a lock before cancellation + ends.""" + cancelled = False + while not future.done(): + try: + result = await asyncio.shield(future) + except asyncio.CancelledError: + cancelled = True + except Exception: + if cancelled: + raise asyncio.CancelledError + raise + else: + if cancelled: + raise asyncio.CancelledError + return result + + if cancelled: + try: + future.exception() + except BaseException: + pass + raise asyncio.CancelledError + return future.result() + + class _ASNI_COLOR: BRIGHT_RED = '\033[91m' RED = '\033[31m' diff --git a/lmdeploy/vl/engine.py b/lmdeploy/vl/engine.py index b89adfaa3e..476cef91f3 100644 --- a/lmdeploy/vl/engine.py +++ b/lmdeploy/vl/engine.py @@ -9,7 +9,7 @@ import torch from lmdeploy.messages import PytorchEngineConfig, TurbomindEngineConfig, VisionConfig -from lmdeploy.utils import get_logger +from lmdeploy.utils import await_executor_future, get_logger from lmdeploy.vl.model.builder import load_vl_model logger = get_logger('lmdeploy') @@ -70,7 +70,7 @@ async def preprocess(self, else: future = asyncio.get_event_loop().run_in_executor(self.executor, self.model.preprocess, messages) future.add_done_callback(_raise_exception_on_finish) - outputs = await future + outputs = await await_executor_future(future) return outputs async def async_infer(self, messages: list[dict]) -> list[dict]: @@ -84,7 +84,7 @@ async def async_infer(self, messages: list[dict]) -> list[dict]: future = asyncio.get_event_loop().run_in_executor(self.executor, self.model.forward, messages, self.max_batch_size) future.add_done_callback(_raise_exception_on_finish) - outputs = await future + outputs = await await_executor_future(future) return outputs async def wrap_for_pytorch( @@ -118,7 +118,7 @@ async def wrap_for_pytorch( loop = asyncio.get_event_loop() async with self.executor_lock: if not has_input_ids: - result = await loop.run_in_executor( + future = loop.run_in_executor( self.executor, partial(self.model.to_pytorch, messages, @@ -128,7 +128,8 @@ async def wrap_for_pytorch( tools=tools, chat_template_kwargs=chat_template_kwargs)) else: - result = await loop.run_in_executor(self.executor, self.model.to_pytorch_with_input_ids, messages) + future = loop.run_in_executor(self.executor, self.model.to_pytorch_with_input_ids, messages) + result = await await_executor_future(future) # clear data for i, message in enumerate(messages): if isinstance(message['content'], list): @@ -163,7 +164,7 @@ async def wrap_for_turbomind( """ loop = asyncio.get_event_loop() async with self.executor_lock: - result = await loop.run_in_executor( + future = loop.run_in_executor( self.executor, partial(self.model.to_turbomind, messages, @@ -172,6 +173,7 @@ async def wrap_for_turbomind( sequence_start, tools=tools, chat_template_kwargs=chat_template_kwargs)) + result = await await_executor_future(future) # clear data for i, message in enumerate(messages): if isinstance(message['content'], list): diff --git a/tests/test_lmdeploy/test_executor_cancellation.py b/tests/test_lmdeploy/test_executor_cancellation.py new file mode 100644 index 0000000000..3ca0d3f8d4 --- /dev/null +++ b/tests/test_lmdeploy/test_executor_cancellation.py @@ -0,0 +1,87 @@ +import asyncio + +import pytest + +from lmdeploy.serve.processors import MultimodalProcessor +from lmdeploy.vl.engine import ImageEncoder + + +def test_prompt_lock_waits_for_executor_job_after_cancellation(monkeypatch): + """Test cancelled prompt prep keeps the lock until executor work ends.""" + + async def run_case(): + loop = asyncio.get_event_loop() + pending = loop.create_future() + + class FakeChatTemplate: + + def messages2prompt(self, *args, **kwargs): + return 'hello' + + class FakeTokenizer: + + def encode(self, *args, **kwargs): + return [1, 2, 3] + + def fake_run_in_executor(*args, **kwargs): + return pending + + monkeypatch.setattr(loop, 'run_in_executor', fake_run_in_executor) + processor = MultimodalProcessor(tokenizer=FakeTokenizer(), chat_template=FakeChatTemplate()) + + task = asyncio.create_task( + processor._get_text_prompt_input('hello', + do_preprocess=True, + sequence_start=True, + adapter_name=None)) + await asyncio.sleep(0) + assert processor.prompt_lock.locked() + + task.cancel() + await asyncio.sleep(0) + assert processor.prompt_lock.locked() + + pending.set_result({'prompt': 'hello', 'input_ids': [1, 2, 3]}) + with pytest.raises(asyncio.CancelledError): + await task + assert not processor.prompt_lock.locked() + + asyncio.run(run_case()) + + +def test_image_encoder_lock_waits_for_executor_job_after_cancellation(monkeypatch): + """Test cancelled VL preprocess keeps the lock until executor work ends.""" + + async def run_case(): + loop = asyncio.get_event_loop() + pending = loop.create_future() + + class FakeModel: + + def preprocess(self, messages): + return messages + + def fake_run_in_executor(*args, **kwargs): + return pending + + monkeypatch.setattr(loop, 'run_in_executor', fake_run_in_executor) + encoder = ImageEncoder.__new__(ImageEncoder) + encoder.model = FakeModel() + encoder.executor = None + encoder.executor_lock = asyncio.Lock() + encoder._uses_new_preprocess = False + + task = asyncio.create_task(encoder.preprocess([{'content': 'hello'}])) + await asyncio.sleep(0) + assert encoder.executor_lock.locked() + + task.cancel() + await asyncio.sleep(0) + assert encoder.executor_lock.locked() + + pending.set_result([{'content': 'hello'}]) + with pytest.raises(asyncio.CancelledError): + await task + assert not encoder.executor_lock.locked() + + asyncio.run(run_case()) From 70301bd0a1713ed778d13163a5e98e46b75b4a97 Mon Sep 17 00:00:00 2001 From: zxy Date: Thu, 7 May 2026 18:23:56 +0800 Subject: [PATCH 3/4] docs: clarify executor gating locks --- lmdeploy/serve/processors/multimodal.py | 1 + lmdeploy/vl/engine.py | 1 + 2 files changed, 2 insertions(+) diff --git a/lmdeploy/serve/processors/multimodal.py b/lmdeploy/serve/processors/multimodal.py index 4644bc24ea..fe00b69eee 100644 --- a/lmdeploy/serve/processors/multimodal.py +++ b/lmdeploy/serve/processors/multimodal.py @@ -38,6 +38,7 @@ def __init__(self, self.chat_template = chat_template self.vl_encoder = vl_encoder self.backend = backend + # Gate CPU-heavy prompt prep so waiters yield to the server loop. self.prompt_lock = asyncio.Lock() @staticmethod diff --git a/lmdeploy/vl/engine.py b/lmdeploy/vl/engine.py index 476cef91f3..20b4c0e857 100644 --- a/lmdeploy/vl/engine.py +++ b/lmdeploy/vl/engine.py @@ -45,6 +45,7 @@ def __init__( self.vision_config = vision_config self.max_batch_size = vision_config.max_batch_size self.executor = ThreadPoolExecutor(max_workers=1) + # Gate VL executor submissions so waiters yield instead of queueing. self.executor_lock = asyncio.Lock() self._uses_new_preprocess = self._is_new_preprocess_api(self.model) torch.cuda.empty_cache() From ceb3456a721f9b1490a1d2aab975bc7d9fa31445 Mon Sep 17 00:00:00 2001 From: zxy Date: Sat, 9 May 2026 15:42:55 +0800 Subject: [PATCH 4/4] refactor: simplify executor cancellation helper --- lmdeploy/utils.py | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/lmdeploy/utils.py b/lmdeploy/utils.py index f4fee2b3a8..1eaf003882 100644 --- a/lmdeploy/utils.py +++ b/lmdeploy/utils.py @@ -16,30 +16,15 @@ async def await_executor_future(future: asyncio.Future): - """Await executor work without releasing a lock before cancellation - ends.""" - cancelled = False - while not future.done(): - try: - result = await asyncio.shield(future) - except asyncio.CancelledError: - cancelled = True - except Exception: - if cancelled: - raise asyncio.CancelledError - raise - else: - if cancelled: - raise asyncio.CancelledError - return result - - if cancelled: + """Await executor work without releasing a lock before it finishes.""" + try: + return await asyncio.shield(future) + except asyncio.CancelledError: try: - future.exception() + await future except BaseException: pass - raise asyncio.CancelledError - return future.result() + raise class _ASNI_COLOR: