diff --git a/lmdeploy/pytorch/engine/engine_instance.py b/lmdeploy/pytorch/engine/engine_instance.py index c2bbc03420..217e1d4609 100644 --- a/lmdeploy/pytorch/engine/engine_instance.py +++ b/lmdeploy/pytorch/engine/engine_instance.py @@ -72,6 +72,47 @@ def cancel(req_sender: RequestSender, session_id: int): f'Error: {resp.type}.')) +class SharedStore: + def __init__(self): + self._data = {} + + def put(self, data): + import ray + ref = ray.put(data) + key = ref.hex() + self._data[key] = ref + return key + + def get(self, key): + import ray + ref = self._data.pop(key) + return ray.get(ref) + + def clear(self): + import ray + all_data = list(self._data.values()) + if len(all_data) > 0: + ray.internal.free(all_data, local_only=False) + + +_SHARED_STORE = None + + +def _lazy_create_ray_store(): + global _SHARED_STORE + if _SHARED_STORE is None: + import ray + name = 'shared_store' + try: + _SHARED_STORE = ray.get_actor(name, namespace='lmdeploy') + except ValueError: + _SHARED_STORE = ray.remote(num_cpus=0,)(SharedStore).options( + name=name, + namespace='lmdeploy', + lifetime='detached', + ).remote() + + class EngineInstance(EngineInstanceBase): """Instance of TurboMind. @@ -86,6 +127,8 @@ def __init__(self, engine: Engine): self.max_input_len = self.engine.max_session_len self._enable_transfer_obj_ref = engine.engine_config.enable_transfer_obj_ref and \ engine.engine_config.distributed_executor_backend == 'ray' + if self._enable_transfer_obj_ref: + _lazy_create_ray_store() def __del__(self): """Destructor.""" @@ -97,12 +140,9 @@ def _get_extra_outputs(self, resp: Response): routed_experts = resp.data.get('routed_experts', None) if resp.data else None if routed_experts is not None and resp.type in [ResponseType.FINISH, ResponseType.CANCEL]: if self._enable_transfer_obj_ref: - import pybase64 import ray - - ref = ray.put(routed_experts) - data = ray.cloudpickle.dumps(ref) - outputs['routed_experts'] = pybase64.b64encode(data).decode('utf-8') + key = ray.get(_SHARED_STORE.put.remote(routed_experts)) + outputs['routed_experts'] = key else: outputs['routed_experts'] = routed_experts return outputs diff --git a/lmdeploy/pytorch/engine/executor/ray_executor.py b/lmdeploy/pytorch/engine/executor/ray_executor.py index c20adbfd8d..70c7603878 100644 --- a/lmdeploy/pytorch/engine/executor/ray_executor.py +++ b/lmdeploy/pytorch/engine/executor/ray_executor.py @@ -506,8 +506,10 @@ async def forward_async(self, inputs): logger.warning(f'Free input ref failed: {e}') self._prev_inputs = ray.put(inputs) - # make sure in order - self._prev_out = self.dag.execute(self._prev_inputs) + # non-compiled dag would add input object ref, and the ref can not be released in python + self._prev_out = [ + worker.forward_async.remote(self._prev_inputs) for worker in self.workers + ] async def get_output_async(self): """Get output async."""