Skip to content

Commit bac8c2b

Browse files
author
ascend-robot
committed
Fix the bug related to the compatibility with openyuanrong_datasystem interface.
Co-authored-by: liwenlin<liwenlin8@huawei.com> Co-authored-by: liwenlin<liwenlin8@huawei.com> # message auto-generated for no-merge-commit merge: !18 merge fix_ds_zcopy into main [Fix] Fix the bug related to the compatibility with openyuanrong_datasystem interface. Created-by: Lexie-7 Commit-by: liwenlin Merged-by: ascend-robot Description: # Fixing details - Modified the interface parameters of `mcreate` and `get_buffers` in the `YuanrongStorageClient` connection. The return value of the interface corresponding to the openyuanrong_datasystem does not need to include the `status`. Therefore, when calling the corresponding function in TransferQueue, the return value also needs to be adjusted. - The waiting time for the `get_buffers` operation has been modified.If the data does not exist, do not wait. If the data exists, then proceed with the normal retrieval. The meaning of the `timeout_ms` parameter in `get_buffers` is the waiting time for subscription, that is, how long to wait if the data **does not exist**. If the data already exists, it will be retrieved normally and is not affected by this parameter. However, in TransferQueue, the data retrieved by `get_buffers` should definitely exist, it has nothing to do with the `timeout_ms` parameter. Even if the data is not immediately retrieved, as long as it is known to exist, it will definitely be retrieved no matter how long it takes. See merge request: Ascend/TransferQueue!18
1 parent c2e514a commit bac8c2b

2 files changed

Lines changed: 4 additions & 4 deletions

File tree

tests/test_yuanrong_storage_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ def side_effect_mcreate(keys, sizes):
7878
buffers = [MockBuffer(size) for size in sizes]
7979
for b in buffers:
8080
stored_raw_buffers.append(b.MutableData())
81-
return 0, buffers
81+
return buffers
8282

8383
storage_client._cpu_ds_client.mcreate.side_effect = side_effect_mcreate
84-
storage_client._cpu_ds_client.get_buffers.return_value = (0, stored_raw_buffers)
84+
storage_client._cpu_ds_client.get_buffers.return_value = stored_raw_buffers
8585

8686
storage_client.mset_zcopy(
8787
["tensor_key", "string_key"], [torch.tensor([1.0, 2.0, 3.0], dtype=torch.float32), "hello yuanrong"]

transfer_queue/storage/clients/yuanrong_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ def mset_zcopy(self, keys: list[str], objs: list[Any]):
194194
assert self._cpu_ds_client is not None, "CPU DS client is not available"
195195
items_list = [[memoryview(b) for b in _encoder.encode(obj)] for obj in objs]
196196
packed_sizes = [calc_packed_size(items) for items in items_list]
197-
status, buffers = self._cpu_ds_client.mcreate(keys, packed_sizes)
197+
buffers = self._cpu_ds_client.mcreate(keys, packed_sizes)
198198
tasks = [(target.MutableData(), item) for target, item in zip(buffers, items_list, strict=False)]
199199
with ThreadPoolExecutor(max_workers=DS_MAX_WORKERS) as executor:
200200
list(executor.map(lambda p: pack_into(*p), tasks))
@@ -210,7 +210,7 @@ def mget_zcopy(self, keys: list[str]) -> list[Any]:
210210
list[Any]: List of deserialized objects corresponding to the input keys.
211211
"""
212212
assert self._cpu_ds_client is not None, "CPU DS client is not available"
213-
status, buffers = self._cpu_ds_client.get_buffers(keys, timeout_ms=500)
213+
buffers = self._cpu_ds_client.get_buffers(keys)
214214
return [_decoder.decode(unpack_from(buffer)) if buffer is not None else None for buffer in buffers]
215215

216216
def _batch_put(self, keys: list[str], values: list[Any]):

0 commit comments

Comments
 (0)