From e78106ce7e1cae4f133a8ddfcec34aa0889600c6 Mon Sep 17 00:00:00 2001 From: "q.yao" Date: Thu, 2 Apr 2026 18:18:36 +0800 Subject: [PATCH 1/3] fix mem leak --- lmdeploy/pytorch/engine/engine_instance.py | 50 +++++++++++++++++-- .../pytorch/engine/executor/ray_executor.py | 6 ++- .../pytorch/engine/mp_engine/base_worker.py | 11 ++-- .../pytorch/engine/mp_engine/ray_engine.py | 39 ++++++++++++--- 4 files changed, 88 insertions(+), 18 deletions(-) diff --git a/lmdeploy/pytorch/engine/engine_instance.py b/lmdeploy/pytorch/engine/engine_instance.py index c2bbc03420..76d13685f6 100644 --- a/lmdeploy/pytorch/engine/engine_instance.py +++ b/lmdeploy/pytorch/engine/engine_instance.py @@ -72,6 +72,47 @@ def cancel(req_sender: RequestSender, session_id: int): f'Error: {resp.type}.')) +class SharedStore: + def __init__(self): + self._data = {} + + def put(self, data): + import ray + ref = ray.put(data) + key = ref.hex() + self._data[key] = ref + return key + + def get(self, key): + import ray + ref = self._data.pop(key) + return ray.get(ref) + + def clear(self): + import ray + all_data = list(self._data.values()) + if len(all_data) > 0: + ray.internal.free([all_data], local_only=False) + + +_SHARED_STORE = None + + +def _lazy_create_ray_store(): + global _SHARED_STORE + if _SHARED_STORE is None: + import ray + name = 'shared_store' + try: + _SHARED_STORE = ray.get_actor(name, namespace='lmdeploy') + except ValueError: + _SHARED_STORE = ray.remote(num_cpus=0,)(SharedStore).options( + name=name, + namespace='lmdeploy', + lifetime='detached', + ).remote() + + class EngineInstance(EngineInstanceBase): """Instance of TurboMind. @@ -86,6 +127,8 @@ def __init__(self, engine: Engine): self.max_input_len = self.engine.max_session_len self._enable_transfer_obj_ref = engine.engine_config.enable_transfer_obj_ref and \ engine.engine_config.distributed_executor_backend == 'ray' + if self._enable_transfer_obj_ref: + _lazy_create_ray_store() def __del__(self): """Destructor.""" @@ -97,12 +140,9 @@ def _get_extra_outputs(self, resp: Response): routed_experts = resp.data.get('routed_experts', None) if resp.data else None if routed_experts is not None and resp.type in [ResponseType.FINISH, ResponseType.CANCEL]: if self._enable_transfer_obj_ref: - import pybase64 import ray - - ref = ray.put(routed_experts) - data = ray.cloudpickle.dumps(ref) - outputs['routed_experts'] = pybase64.b64encode(data).decode('utf-8') + key = ray.get(_SHARED_STORE.put.remote(routed_experts)) + outputs['routed_experts'] = key else: outputs['routed_experts'] = routed_experts return outputs diff --git a/lmdeploy/pytorch/engine/executor/ray_executor.py b/lmdeploy/pytorch/engine/executor/ray_executor.py index 7bf9770c54..ee807ac9af 100644 --- a/lmdeploy/pytorch/engine/executor/ray_executor.py +++ b/lmdeploy/pytorch/engine/executor/ray_executor.py @@ -494,8 +494,10 @@ async def forward_async(self, inputs): logger.warning(f'Free input ref failed: {e}') self._prev_inputs = ray.put(inputs) - # make sure in order - self._prev_out = self.dag.execute(self._prev_inputs) + # non-compiled dag would add input object ref, and the ref can not be released in python + self._prev_out = [ + worker.forward_async.remote(self._prev_inputs) for worker in self.workers + ] async def get_output_async(self): """Get output async.""" diff --git a/lmdeploy/pytorch/engine/mp_engine/base_worker.py b/lmdeploy/pytorch/engine/mp_engine/base_worker.py index 0e0fa0fa82..a6fbac67ab 100644 --- a/lmdeploy/pytorch/engine/mp_engine/base_worker.py +++ b/lmdeploy/pytorch/engine/mp_engine/base_worker.py @@ -151,7 +151,12 @@ def add(self, stream_id, result): def pop(self, stream_id, result): if not isinstance(result, EngineOutput): return result - output = self._output.pop(stream_id) - result.token_ids = output.token_ids or [] - result.logprobs = output.logprobs or None + output = self._output.pop(stream_id, None) + if output is not None: + result.token_ids = output.token_ids or [] + result.logprobs = output.logprobs or None return result + + def clear(self, stream_id): + """Clear buffered output of the given stream.""" + self._output.pop(stream_id, None) diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 3c14d3fd0c..0e34d565f0 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -36,10 +36,19 @@ def __init__(self, self._stream_task = dict() self._engine_output_gather = EngineOutputGather() + def _cleanup_stream_task(self, stream_id: int, cancel: bool = False): + """Cleanup cached stream state and optionally cancel running task.""" + self._stream_aiter.pop(stream_id, None) + task = self._stream_task.pop(stream_id, None) + if cancel and task is not None and not task.done(): + task.cancel() + self._engine_output_gather.clear(stream_id) + async def _stream_task_wrapper(self, stream_id: int, init_event: asyncio.Event, func: str, *args, **kwargs): """Create a stream task.""" method = getattr(self, func) event = self._stream_aiter[stream_id][0] + result = None try: generator = method(*args, **kwargs) init_event.set() @@ -48,8 +57,10 @@ async def _stream_task_wrapper(self, stream_id: int, init_event: asyncio.Event, self._stream_aiter[stream_id][1] = (result, False) event.set() finally: - self._stream_aiter[stream_id][1] = (result, True) - event.set() + stream_state = self._stream_aiter.get(stream_id) + if stream_state is not None: + stream_state[1] = (result, True) + event.set() init_event.set() async def create_stream_task(self, func, *args, **kwargs): @@ -78,10 +89,17 @@ async def get_stream_task_result(self, stream_id: int): result = self._engine_output_gather.pop(stream_id, result) if stopped: - self._stream_aiter.pop(stream_id, None) - self._stream_task.pop(stream_id, None) + self._cleanup_stream_task(stream_id) return result, stopped + async def cancel_stream_task(self, stream_id: int): + """Cancel and cleanup a stream task. + + This is used by caller-side generator finalization to avoid leaked references when stream consumption is + interrupted. + """ + self._cleanup_stream_task(stream_id, cancel=True) + def _update_runtime_envs(runtime_env: dict): """Update runtime envs.""" @@ -152,10 +170,15 @@ async def _collective_rpc_streaming_async(self, func, *args, **kwargs): # ray generator would try cache every result, which is too verbose. stream_id = await self._collective_rpc_async('create_stream_task', func, *args, **kwargs) - stopped = False - while not stopped: - result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) - yield result + try: + stopped = False + while not stopped: + result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) + yield result + finally: + # Ensure worker-side stream state is released even when caller + # stops iterating early (cancel/break/exception). + await self._collective_rpc_async('cancel_stream_task', stream_id) def close(self) -> None: """Close mp engine.""" From a746211072a001a8e18302a3f9b569f1d3430529 Mon Sep 17 00:00:00 2001 From: "q.yao" Date: Thu, 2 Apr 2026 18:28:24 +0800 Subject: [PATCH 2/3] fix clear --- lmdeploy/pytorch/engine/engine_instance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lmdeploy/pytorch/engine/engine_instance.py b/lmdeploy/pytorch/engine/engine_instance.py index 76d13685f6..217e1d4609 100644 --- a/lmdeploy/pytorch/engine/engine_instance.py +++ b/lmdeploy/pytorch/engine/engine_instance.py @@ -92,7 +92,7 @@ def clear(self): import ray all_data = list(self._data.values()) if len(all_data) > 0: - ray.internal.free([all_data], local_only=False) + ray.internal.free(all_data, local_only=False) _SHARED_STORE = None From 5f862064795c8a957fd511149960e579716f3c0d Mon Sep 17 00:00:00 2001 From: "q.yao" Date: Fri, 10 Apr 2026 12:24:44 +0800 Subject: [PATCH 3/3] revert change --- .../pytorch/engine/mp_engine/base_worker.py | 11 ++---- .../pytorch/engine/mp_engine/ray_engine.py | 39 ++++--------------- 2 files changed, 11 insertions(+), 39 deletions(-) diff --git a/lmdeploy/pytorch/engine/mp_engine/base_worker.py b/lmdeploy/pytorch/engine/mp_engine/base_worker.py index 56662bb3e4..bc2076863a 100644 --- a/lmdeploy/pytorch/engine/mp_engine/base_worker.py +++ b/lmdeploy/pytorch/engine/mp_engine/base_worker.py @@ -151,12 +151,7 @@ def add(self, stream_id, result): def pop(self, stream_id, result): if not isinstance(result, EngineOutput): return result - output = self._output.pop(stream_id, None) - if output is not None: - result.token_ids = output.token_ids or [] - result.logprobs = output.logprobs or None + output = self._output.pop(stream_id) + result.token_ids = output.token_ids or [] + result.logprobs = output.logprobs or None return result - - def clear(self, stream_id): - """Clear buffered output of the given stream.""" - self._output.pop(stream_id, None) diff --git a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py index 0e34d565f0..3c14d3fd0c 100644 --- a/lmdeploy/pytorch/engine/mp_engine/ray_engine.py +++ b/lmdeploy/pytorch/engine/mp_engine/ray_engine.py @@ -36,19 +36,10 @@ def __init__(self, self._stream_task = dict() self._engine_output_gather = EngineOutputGather() - def _cleanup_stream_task(self, stream_id: int, cancel: bool = False): - """Cleanup cached stream state and optionally cancel running task.""" - self._stream_aiter.pop(stream_id, None) - task = self._stream_task.pop(stream_id, None) - if cancel and task is not None and not task.done(): - task.cancel() - self._engine_output_gather.clear(stream_id) - async def _stream_task_wrapper(self, stream_id: int, init_event: asyncio.Event, func: str, *args, **kwargs): """Create a stream task.""" method = getattr(self, func) event = self._stream_aiter[stream_id][0] - result = None try: generator = method(*args, **kwargs) init_event.set() @@ -57,10 +48,8 @@ async def _stream_task_wrapper(self, stream_id: int, init_event: asyncio.Event, self._stream_aiter[stream_id][1] = (result, False) event.set() finally: - stream_state = self._stream_aiter.get(stream_id) - if stream_state is not None: - stream_state[1] = (result, True) - event.set() + self._stream_aiter[stream_id][1] = (result, True) + event.set() init_event.set() async def create_stream_task(self, func, *args, **kwargs): @@ -89,17 +78,10 @@ async def get_stream_task_result(self, stream_id: int): result = self._engine_output_gather.pop(stream_id, result) if stopped: - self._cleanup_stream_task(stream_id) + self._stream_aiter.pop(stream_id, None) + self._stream_task.pop(stream_id, None) return result, stopped - async def cancel_stream_task(self, stream_id: int): - """Cancel and cleanup a stream task. - - This is used by caller-side generator finalization to avoid leaked references when stream consumption is - interrupted. - """ - self._cleanup_stream_task(stream_id, cancel=True) - def _update_runtime_envs(runtime_env: dict): """Update runtime envs.""" @@ -170,15 +152,10 @@ async def _collective_rpc_streaming_async(self, func, *args, **kwargs): # ray generator would try cache every result, which is too verbose. stream_id = await self._collective_rpc_async('create_stream_task', func, *args, **kwargs) - try: - stopped = False - while not stopped: - result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) - yield result - finally: - # Ensure worker-side stream state is released even when caller - # stops iterating early (cancel/break/exception). - await self._collective_rpc_async('cancel_stream_task', stream_id) + stopped = False + while not stopped: + result, stopped = await self._collective_rpc_async('get_stream_task_result', stream_id) + yield result def close(self) -> None: """Close mp engine."""