Skip to content

Commit abf2112

Browse files
committed
minor update
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
1 parent f422aeb commit abf2112

4 files changed

Lines changed: 16 additions & 14 deletions

File tree

tests/test_metadata.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def test_batch_meta_chunk_by_partition(self):
239239
assert chunks[3].partition_ids == ["partition_3", "partition_3"]
240240
assert chunks[3].global_indexes == [13, 17]
241241

242-
# validate _custom_backend_meta is chunked
242+
# validate custom_meta is chunked
243243
assert 10 in chunks[0].custom_meta
244244
assert 14 in chunks[0].custom_meta
245245
assert 18 in chunks[0].custom_meta
@@ -406,22 +406,22 @@ def test_batch_meta_union(self):
406406

407407
batch1 = BatchMeta(
408408
samples=[
409-
SampleMeta(partition_id="partition_0", global_index=0, fields=fields1),
410-
SampleMeta(partition_id="partition_0", global_index=1, fields=fields1),
409+
SampleMeta(partition_id="partition_0", global_index=8, fields=fields1),
410+
SampleMeta(partition_id="partition_0", global_index=9, fields=fields1),
411411
],
412412
_custom_backend_meta={
413-
i: {"field1": {"dtype": torch.float32}, "field2": {"dtype": torch.int64}} for i in [0, 1]
413+
i: {"field1": {"dtype": torch.float32}, "field2": {"dtype": torch.int64}} for i in [8, 9]
414414
},
415415
)
416416
batch1.extra_info["info1"] = "value1"
417417

418418
batch2 = BatchMeta(
419419
samples=[
420-
SampleMeta(partition_id="partition_0", global_index=0, fields=fields2),
421-
SampleMeta(partition_id="partition_0", global_index=1, fields=fields2),
420+
SampleMeta(partition_id="partition_0", global_index=8, fields=fields2),
421+
SampleMeta(partition_id="partition_0", global_index=9, fields=fields2),
422422
],
423423
_custom_backend_meta={
424-
i: {"field2": {"dtype": torch.int64}, "field3": {"dtype": torch.bool}} for i in [0, 1]
424+
i: {"field2": {"dtype": torch.int64}, "field3": {"dtype": torch.bool}} for i in [8, 9]
425425
},
426426
)
427427
batch2.extra_info["info2"] = "value2"

transfer_queue/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ async def async_set_custom_meta(
314314

315315
if response_msg.request_type != ZMQRequestType.SET_CUSTOM_META_RESPONSE:
316316
raise RuntimeError(
317-
f"[{self.client_id}]: Failed to set custom metadata from controller {self._controller.id}: "
317+
f"[{self.client_id}]: Failed to set custom metadata to controller {self._controller.id}: "
318318
f"{response_msg.body.get('message', 'Unknown error')}"
319319
)
320320

transfer_queue/controller.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -703,8 +703,7 @@ def set_custom_meta(self, custom_meta: dict[int, dict]) -> None:
703703
Existing entries will be overwritten.
704704
"""
705705

706-
for k in custom_meta.keys():
707-
self.custom_meta[k] = custom_meta[k]
706+
self.custom_meta.update(custom_meta)
708707

709708
# ==================== Statistics and Monitoring ====================
710709

@@ -1076,6 +1075,11 @@ def set_custom_meta(self, partition_custom_meta: dict[str, dict[int, dict]]) ->
10761075
partition = self._get_partition(partition_id)
10771076
if partition:
10781077
partition.set_custom_meta(custom_meta)
1078+
else:
1079+
logger.warning(
1080+
f"set_custom_meta: partition {partition_id}' not found; "
1081+
f"custom_metadata for this partition will be ignored"
1082+
)
10791083

10801084
def get_metadata(
10811085
self,

transfer_queue/metadata.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,6 @@ def __post_init__(self):
215215
"""Initialize all computed properties during initialization"""
216216
self.samples = copy.deepcopy(self.samples)
217217
self.extra_info = copy.deepcopy(self.extra_info)
218-
self.custom_meta = copy.deepcopy(self.custom_meta)
219-
self._custom_backend_meta = copy.deepcopy(self._custom_backend_meta)
220218

221219
# Basic properties
222220
object.__setattr__(self, "_size", len(self.samples))
@@ -352,7 +350,7 @@ def update_custom_meta(self, new_meta: dict[int, dict[str, Any]]):
352350
return
353351

354352
non_exist_global_indexes = set(new_meta.keys()) - set(self.global_indexes)
355-
if bool(non_exist_global_indexes):
353+
if non_exist_global_indexes:
356354
raise ValueError(
357355
f"Trying to update custom_meta with non-exist global_indexes! {non_exist_global_indexes} "
358356
f"do not exist in this batch."
@@ -683,7 +681,7 @@ def union(self, other: "BatchMeta", validate: bool = True) -> Optional["BatchMet
683681
elif idx in self._custom_backend_meta:
684682
merged_custom_backend_meta[idx] = {**self._custom_backend_meta[idx]}
685683
elif idx in other._custom_backend_meta:
686-
merged_custom_backend_meta[idx] = other._custom_backend_meta[idx]
684+
merged_custom_backend_meta[idx] = {**other._custom_backend_meta[idx]}
687685

688686
return BatchMeta(
689687
samples=merged_samples,

0 commit comments

Comments
 (0)