Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions lmdeploy/pytorch/engine/engine_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,47 @@ def cancel(req_sender: RequestSender, session_id: int):
f'Error: {resp.type}.'))


class SharedStore:
def __init__(self):
self._data = {}

def put(self, data):
import ray
ref = ray.put(data)
key = ref.hex()
self._data[key] = ref
return key

def get(self, key):
import ray
ref = self._data.pop(key)
return ray.get(ref)

def clear(self):
import ray
all_data = list(self._data.values())
if len(all_data) > 0:
ray.internal.free(all_data, local_only=False)
Comment on lines +94 to +95


_SHARED_STORE = None


def _lazy_create_ray_store():
global _SHARED_STORE
if _SHARED_STORE is None:
import ray
name = 'shared_store'
try:
_SHARED_STORE = ray.get_actor(name, namespace='lmdeploy')
except ValueError:
_SHARED_STORE = ray.remote(num_cpus=0,)(SharedStore).options(
name=name,
namespace='lmdeploy',
lifetime='detached',
).remote()
Comment on lines +109 to +113


class EngineInstance(EngineInstanceBase):
"""Instance of TurboMind.

Expand All @@ -86,6 +127,8 @@ def __init__(self, engine: Engine):
self.max_input_len = self.engine.max_session_len
self._enable_transfer_obj_ref = engine.engine_config.enable_transfer_obj_ref and \
engine.engine_config.distributed_executor_backend == 'ray'
if self._enable_transfer_obj_ref:
_lazy_create_ray_store()

def __del__(self):
"""Destructor."""
Expand All @@ -97,12 +140,9 @@ def _get_extra_outputs(self, resp: Response):
routed_experts = resp.data.get('routed_experts', None) if resp.data else None
if routed_experts is not None and resp.type in [ResponseType.FINISH, ResponseType.CANCEL]:
if self._enable_transfer_obj_ref:
import pybase64
import ray

ref = ray.put(routed_experts)
data = ray.cloudpickle.dumps(ref)
outputs['routed_experts'] = pybase64.b64encode(data).decode('utf-8')
key = ray.get(_SHARED_STORE.put.remote(routed_experts))
outputs['routed_experts'] = key
Comment on lines 141 to +145
Comment on lines 143 to +145
else:
outputs['routed_experts'] = routed_experts
return outputs
Expand Down
6 changes: 4 additions & 2 deletions lmdeploy/pytorch/engine/executor/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,10 @@ async def forward_async(self, inputs):
logger.warning(f'Free input ref failed: {e}')

self._prev_inputs = ray.put(inputs)
# make sure in order
self._prev_out = self.dag.execute(self._prev_inputs)
# non-compiled dag would add input object ref, and the ref can not be released in python
self._prev_out = [
worker.forward_async.remote(self._prev_inputs) for worker in self.workers
]
Comment on lines 508 to +512

async def get_output_async(self):
"""Get output async."""
Expand Down
Loading