Skip to content

Commit ab3ffd5

Browse files
authored
[fix] Fix UT of YuanrongStorageClient (#12)
### Fix - Added comments to mset_zcopy and mget_zcopy. - Fix the UT `tests/test_yuanrong_storage_client.py` to pass the CI. ================================================================================== ### Summary When connecting to the backend of the YuanrongStorageClient, zero-copy is activated to enhance the transmission speed. ### Change 1. Modified the `transfer_queue/storage/clients/yuanrong_client.py` to call the zero-copy interface, and performed operations such as serialization and pack. 2. Add mget and mset UT: `tests/test_yuanrong_storage_client.py` . ### Testing - Test on CPU: `pytest tests/test_yuanrong_storage_client.py ` ### Result When transmitting 512 pieces of data, each 32 MB in size, with a total data volume of 16GB: End-to-end **Get** took **10s** and the bandwidth was **1.6 GB/s**. The time spent calling the **YuanrongStorageClient** interface was **2.27s** with a bandwidth of **7.05 GB/s**. End-to-end **Put** took **3.42s** and the bandwidth was **4.68 GB/s**. The time spent calling the **YuanrongStorageClient** interface was **3.32s** with a bandwidth of **4.83 GB/s**. ### Related Links - Pending fix for the gitcode pull request [[feat] Introduce Zero-Copy to use YuanrongStorageClient for transmitting CPU Tensors](https://gitcode.com/Ascend/TransferQueue/pull/10) - Previous issues can be viewed: [[Feat]: Try zero-copy serialize objects that can be converted to memoryview](TransferQueue/TransferQueue#147) - Yuanrong Datasystem PR: [https://atomgit.com/openeuler/yuanrong-datasystem/pull/141](https://atomgit.com/openeuler/yuanrong-datasystem/pull/141) --------- Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
1 parent b221e2a commit ab3ffd5

3 files changed

Lines changed: 19 additions & 5 deletions

File tree

.github/workflows/python-package.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ jobs:
3131
- name: Install dependencies
3232
run: |
3333
python -m pip install --upgrade pip
34-
python -m pip install flake8 pytest build pytest_asyncio
34+
python -m pip install flake8 pytest build pytest_asyncio pytest-mock openyuanrong-datasystem
3535
python -m build --wheel
3636
pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu
3737
pip install dist/*.whl

tests/test_yuanrong_storage_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class MockBuffer:
3333
def __init__(self, size):
3434
self.data = bytearray(size)
3535

36-
def mutable_data(self):
36+
def MutableData(self):
3737
return self.data
3838

3939

@@ -69,15 +69,15 @@ def mock_deserialization(items):
6969
except UnicodeDecodeError:
7070
return data
7171

72-
mocker.patch("transfer_queue.storage.clients.yuanrong_client.serialization", side_effect=mock_serialization)
73-
mocker.patch("transfer_queue.storage.clients.yuanrong_client.deserialization", side_effect=mock_deserialization)
72+
mocker.patch("transfer_queue.storage.clients.yuanrong_client._encoder.encode", side_effect=mock_serialization)
73+
mocker.patch("transfer_queue.storage.clients.yuanrong_client._decoder.decode", side_effect=mock_deserialization)
7474

7575
stored_raw_buffers = []
7676

7777
def side_effect_mcreate(keys, sizes):
7878
buffers = [MockBuffer(size) for size in sizes]
7979
for b in buffers:
80-
stored_raw_buffers.append(b.mutable_data())
80+
stored_raw_buffers.append(b.MutableData())
8181
return 0, buffers
8282

8383
storage_client._cpu_ds_client.mcreate.side_effect = side_effect_mcreate

transfer_queue/storage/clients/yuanrong_client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,12 @@ def _create_empty_npu_tensorlist(self, shapes, dtypes):
185185
return tensors
186186

187187
def mset_zcopy(self, keys: list[str], objs: list[Any]):
188+
"""Store multiple objects in zero-copy mode using parallel serialization and buffer packing.
189+
190+
Args:
191+
keys (list[str]): List of string keys under which the objects will be stored.
192+
objs (list[Any]): List of Python objects to store (e.g., tensors, strings).
193+
"""
188194
items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs]
189195
packed_sizes = [calc_packed_size(items) for items in items_list]
190196
status, buffers = self._cpu_ds_client.mcreate(keys, packed_sizes)
@@ -194,6 +200,14 @@ def mset_zcopy(self, keys: list[str], objs: list[Any]):
194200
self._cpu_ds_client.mset_buffer(buffers)
195201

196202
def mget_zcopy(self, keys: list[str]) -> list[Any]:
203+
"""Retrieve multiple objects in zero-copy mode by directly deserializing from shared memory buffers.
204+
205+
Args:
206+
keys (list[str]): List of string keys to retrieve from storage.
207+
208+
Returns:
209+
list[Any]: List of deserialized objects corresponding to the input keys.
210+
"""
197211
status, buffers = self._cpu_ds_client.get_buffers(keys, timeout_ms=500)
198212
return [_decoder.decode(unpack_from(buffer)) if buffer is not None else None for buffer in buffers]
199213

0 commit comments

Comments
 (0)