Skip to content

Commit 7dce446

Browse files
authored
fix ray mem leak (#4487)
* fix mem leak * fix clear * revert change
1 parent 73bf1c7 commit 7dce446

2 files changed

Lines changed: 49 additions & 7 deletions

File tree

lmdeploy/pytorch/engine/engine_instance.py

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,47 @@ def cancel(req_sender: RequestSender, session_id: int):
7272
f'Error: {resp.type}.'))
7373

7474

75+
class SharedStore:
76+
def __init__(self):
77+
self._data = {}
78+
79+
def put(self, data):
80+
import ray
81+
ref = ray.put(data)
82+
key = ref.hex()
83+
self._data[key] = ref
84+
return key
85+
86+
def get(self, key):
87+
import ray
88+
ref = self._data.pop(key)
89+
return ray.get(ref)
90+
91+
def clear(self):
92+
import ray
93+
all_data = list(self._data.values())
94+
if len(all_data) > 0:
95+
ray.internal.free(all_data, local_only=False)
96+
97+
98+
_SHARED_STORE = None
99+
100+
101+
def _lazy_create_ray_store():
102+
global _SHARED_STORE
103+
if _SHARED_STORE is None:
104+
import ray
105+
name = 'shared_store'
106+
try:
107+
_SHARED_STORE = ray.get_actor(name, namespace='lmdeploy')
108+
except ValueError:
109+
_SHARED_STORE = ray.remote(num_cpus=0,)(SharedStore).options(
110+
name=name,
111+
namespace='lmdeploy',
112+
lifetime='detached',
113+
).remote()
114+
115+
75116
class EngineInstance(EngineInstanceBase):
76117
"""Instance of TurboMind.
77118
@@ -86,6 +127,8 @@ def __init__(self, engine: Engine):
86127
self.max_input_len = self.engine.max_session_len
87128
self._enable_transfer_obj_ref = engine.engine_config.enable_transfer_obj_ref and \
88129
engine.engine_config.distributed_executor_backend == 'ray'
130+
if self._enable_transfer_obj_ref:
131+
_lazy_create_ray_store()
89132

90133
def __del__(self):
91134
"""Destructor."""
@@ -97,12 +140,9 @@ def _get_extra_outputs(self, resp: Response):
97140
routed_experts = resp.data.get('routed_experts', None) if resp.data else None
98141
if routed_experts is not None and resp.type in [ResponseType.FINISH, ResponseType.CANCEL]:
99142
if self._enable_transfer_obj_ref:
100-
import pybase64
101143
import ray
102-
103-
ref = ray.put(routed_experts)
104-
data = ray.cloudpickle.dumps(ref)
105-
outputs['routed_experts'] = pybase64.b64encode(data).decode('utf-8')
144+
key = ray.get(_SHARED_STORE.put.remote(routed_experts))
145+
outputs['routed_experts'] = key
106146
else:
107147
outputs['routed_experts'] = routed_experts
108148
return outputs

lmdeploy/pytorch/engine/executor/ray_executor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,10 @@ async def forward_async(self, inputs):
506506
logger.warning(f'Free input ref failed: {e}')
507507

508508
self._prev_inputs = ray.put(inputs)
509-
# make sure in order
510-
self._prev_out = self.dag.execute(self._prev_inputs)
509+
# non-compiled dag would add input object ref, and the ref can not be released in python
510+
self._prev_out = [
511+
worker.forward_async.remote(self._prev_inputs) for worker in self.workers
512+
]
511513

512514
async def get_output_async(self):
513515
"""Get output async."""

0 commit comments

Comments
 (0)