Skip to content

Commit d457687

Browse files
committed
Merge branch 'upstream-main' into fix_dependency
2 parents f1a510d + ab3ffd5 commit d457687

2 files changed

Lines changed: 18 additions & 4 deletions

File tree

tests/test_yuanrong_storage_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class MockBuffer:
3333
def __init__(self, size):
3434
self.data = bytearray(size)
3535

36-
def mutable_data(self):
36+
def MutableData(self):
3737
return self.data
3838

3939

@@ -69,15 +69,15 @@ def mock_deserialization(items):
6969
except UnicodeDecodeError:
7070
return data
7171

72-
mocker.patch("transfer_queue.storage.clients.yuanrong_client.serialization", side_effect=mock_serialization)
73-
mocker.patch("transfer_queue.storage.clients.yuanrong_client.deserialization", side_effect=mock_deserialization)
72+
mocker.patch("transfer_queue.storage.clients.yuanrong_client._encoder.encode", side_effect=mock_serialization)
73+
mocker.patch("transfer_queue.storage.clients.yuanrong_client._decoder.decode", side_effect=mock_deserialization)
7474

7575
stored_raw_buffers = []
7676

7777
def side_effect_mcreate(keys, sizes):
7878
buffers = [MockBuffer(size) for size in sizes]
7979
for b in buffers:
80-
stored_raw_buffers.append(b.mutable_data())
80+
stored_raw_buffers.append(b.MutableData())
8181
return 0, buffers
8282

8383
storage_client._cpu_ds_client.mcreate.side_effect = side_effect_mcreate

transfer_queue/storage/clients/yuanrong_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,12 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes):
185185
return tensors
186186

187187
def mset_zcopy(self, keys: list[str], objs: list[Any]):
188+
"""Store multiple objects in zero-copy mode using parallel serialization and buffer packing.
189+
190+
Args:
191+
keys (list[str]): List of string keys under which the objects will be stored.
192+
objs (list[Any]): List of Python objects to store (e.g., tensors, strings).
193+
"""
188194
items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs]
189195
packed_sizes = [calc_packed_size(items) for items in items_list]
190196
status, buffers = self._cpu_ds_client.mcreate(keys, packed_sizes)
@@ -194,6 +200,14 @@ def mset_zcopy(self, keys: list[str], objs: list[Any]):
194200
self._cpu_ds_client.mset_buffer(buffers)
195201

196202
def mget_zcopy(self, keys: list[str]) -> list[Any]:
203+
"""Retrieve multiple objects in zero-copy mode by directly deserializing from shared memory buffers.
204+
205+
Args:
206+
keys (list[str]): List of string keys to retrieve from storage.
207+
208+
Returns:
209+
list[Any]: List of deserialized objects corresponding to the input keys.
210+
"""
197211
status, buffers = self._cpu_ds_client.get_buffers(keys, timeout_ms=500)
198212
return [_decoder.decode(unpack_from(buffer)) if buffer is not None else None for buffer in buffers]
199213

0 commit comments

Comments
 (0)