Skip to content

Commit 6ad4d07

Browse files
mpb159753ascend-robot
authored andcommitted
[BREAKING][refactor] Convert BatchMeta to columnar layout; enable zero-copy serialization by default
Co-authored-by: 看我72遍<m.pb@msn.com> # message auto-generated for no-merge-commit merge: !28 merge refactor/columnar-batchmeta-zero-copy into main [BREAKING][refactor] Convert BatchMeta to columnar layout; enable zero-copy serialization by default Created-by: mpb159753 Commit-by: 看我72遍 Merged-by: ascend-robot Description: # Columnar BatchMeta + Zero-Copy Default ## 1. Context & Motivation Closes: **[refactor] Convert BatchMeta from row-oriented to column-oriented layout** The current `BatchMeta` uses a **row-oriented** design (`BatchMeta` → `List[SampleMeta]` → `Dict[str, FieldMeta]`), which introduces three scaling issues in high-throughput scenarios: 1. **O(B×F) Complexity**: Critical paths (`build_storage_meta_groups`, `add_fields`, `_filter_storage_data`) involve nested loops over every sample × every field, incurred multiple times per PUT. 2. **Small Object Explosion**: A batch of 1024 samples with 10 fields creates **10,000+ Python objects**, causing GC pressure and unpredictable tail latency. 3. **Redundant Transmission**: Schema info (dtype, shape) is duplicated per sample; row-oriented serialization produces fragmented ZMQ frames, preventing zero-copy optimization. This PR refactors `BatchMeta` to a **column-oriented** (structure-of-arrays) design, reducing metadata complexity from **O(B×F)** to **O(B) + O(F)**, and enables zero-copy serialization by default with automatic pickle fallback. ## 2. Key Changes ### 2.1 Columnar BatchMeta (`metadata.py`) | Aspect | Before (Row-oriented) | After (Column-oriented) | |:---|:---|:---| | Structure | `BatchMeta.samples: List[SampleMeta]` | Flat arrays: `global_indexes`, `partition_ids`, `production_status` | | Field metadata | Per-sample `FieldMeta` objects (**B×F** instances) | Shared `field_schema` dict (**F** entries) | | Status check | Loop over samples **O(B)** | `np.all()` on ndarray **O(1)** | | Classes | `BatchMeta`, `SampleMeta`, `FieldMeta` | `BatchMeta` only | - **Removed**: `SampleMeta` and `FieldMeta` classes entirely - **Added**: `field_schema` dict with three field types: Regular Tensor, Nested Tensor (`is_nested`), Non-Tensor (`is_non_tensor`) - **Vectorized**: `production_status` as `np.ndarray(int8)` — enables O(1) readiness checks via `np.all()` ### 2.2 Zero-Copy Serialization Default (`serial_utils.py`) - Zero-copy serialization is now the **default behavior** (previously gated by env var) - Automatic **fallback to pickle** on serialization failure, with one-time warning - Removed `ZERO_COPY_SERIALIZATION` environment variable switch ### 2.3 Storage & Transport Adaptation - **`simple_backend.py`** / **`simple_backend_manager.py`** / **`controller.py`**: Adapted to columnar API; `clear()` uses `del` instead of `None` assignment to reduce memory fragmentation - **`zmq_utils.py`**: ZMQ transport uses new serialization utilities; frame count reduced from **O(B)** to **F+1** (one metadata header + one per field) ### 2.4 Test Suite - **`test_metadata.py`**: Fully rewritten for columnar API (net -799 lines) - All other test files adapted to new `BatchMeta` constructor ## 3. Benchmark Results Tests conducted in Docker (single-node Ray) across 7 payload sizes. Three configurations compared: - **main-no-zerocopy**: Baseline (row-oriented, pickle serialization) - **main-zero-copy**: Row-oriented + custom zero-copy serialization (previous PR) - **columnar-batchmeta-zero-copy**: This PR (columnar + zero-copy default) ### Throughput Comparison (Gbps) | Config | Operation | main (No ZC) | main (ZC) | **This PR** | vs main (ZC) | |:---|:---|:---|:---|:---|:---| | **debug** (0.05 MB) | PUT | 0.004 | 0.005 | **0.005** | +17% | | | GET | 0.005 | 0.006 | **0.008** | +33% | | **tiny** (0.6–1.5 MB) | PUT | 0.055 | 0.058 | **0.119** | +106% | | | GET | 0.057 | 0.086 | **0.220** | +157% | | **small** (50–150 MB) | PUT | 0.89 | 1.56 | **4.71** | +202% | | | GET | 1.14 | 2.53 | **5.87** | +132% | | **medium** (0.5–1.5 GB) | PUT | 2.91 | 6.82 | **18.26** | +168% | | | GET | 3.31 | 6.95 | **8.83** | +27% | | **large** (3–6 GB) | PUT | 4.32 | 12.34 | **26.11** | +112% | | | GET | 4.57 | 8.41 | **9.60** | +14% | | **xlarge** (6–13 GB) | PUT | 4.37 | 11.86 | **25.74** | +117% | | | GET | 4.67 | 8.47 | **10.20** | +20% | | **huge** (10–25 GB) | PUT | 4.31 | 11.08 | **23.89** | +116% | | | GET | 4.49 | 5.50 | **9.70** | +76% | ### Speedup vs Baseline (main-no-zerocopy) | Config | PUT Speedup | GET Speedup | |:---|:---:|:---:| | debug | 1.2× | 1.5× | | tiny | **2.2×** | **3.8×** | | small | **5.3×** | **5.2×** | | medium | **6.3×** | **2.7×** | | large | **6.0×** | **2.1×** | | xlarge | **5.9×** | **2.2×** | | huge | **5.5×** | **2.2×** | ### Visualization ![image.png](https://raw.gitcode.com/user-images/assets/8886051/b8012aac-3de0-4a7d-b606-3bb17805ef0e/image.png 'image.png') ![image.png](https://raw.gitcode.com/user-images/assets/8886051/44ea047d-d304-4714-a5f7-a03847dfda6f/image.png 'image.png') ![image.png](https://raw.gitcode.com/user-images/assets/8886051/5721ee7d-dbb3-4499-80c4-f9cfa45266a0/image.png 'image.png') ![image.png](https://raw.gitcode.com/user-images/assets/8886051/fbe5798c-666c-47bd-b9db-a5fc990849ae/image.png 'image.png') ### Resource Usage Columnar layout reduces CPU time by eliminating per-sample object creation and pickle overhead: | Config | main (No ZC) CPU-sec | main (ZC) CPU-sec | **This PR** CPU-sec | Reduction vs main (ZC) | |:---|:---|:---|:---|:---| | **large** | 850 | 572 | **574** | ~0% (2× throughput) | | **xlarge** | 1570 | 1166 | **1009** | **-13%** (2× throughput) | | **huge** | 2569 | 2387 | **1936** | **-19%** (2× throughput) | > Note: CPU time is comparable or lower despite processing **2× more data** per unit time. ## 4. API Breaking Changes | Item | Before | After | |:---|:---|:---| | `BatchMeta.samples` | `List[SampleMeta]` | **Removed** | | `SampleMeta` class | Available | **Removed** | | `FieldMeta` class | Available | **Removed** | | `sample.fields['x'].dtype` | Per-sample access | `batch.field_schema['x']['dtype']` | | Constructor | `BatchMeta(samples=[...])` | `BatchMeta(global_indexes=..., partition_ids=..., field_schema=..., production_status=...)` | ## 5. Files Changed ``` 16 files changed, 1369 insertions(+), 2168 deletions(-) ``` | Category | Files | Summary | |:---|:---|:---| | Core | `metadata.py` | Columnar BatchMeta rewrite | | Serialization | `serial_utils.py`, `zmq_utils.py` | Zero-copy default + ZMQ adaptation | | Storage | `simple_backend.py`, `simple_backend_manager.py`, `base.py` | Columnar API adaptation | | Controller | `controller.py` | Columnar API adaptation | | Tests | `test_metadata.py` + 7 test files | Full rewrite + adaptation | | Scripts | `put_benchmark.py` | Minor adjustments | ## 6. Conclusion The columnar `BatchMeta` refactoring combined with default zero-copy serialization delivers: - **PUT throughput**: Up to **6.3×** improvement over baseline, **+100–200%** over previous zero-copy PR - **GET throughput**: Up to **5.2×** improvement over baseline, **+14–157%** over previous zero-copy PR - **CPU efficiency**: Comparable or lower CPU time despite 2× higher throughput - **Code reduction**: Net **-799 lines** of metadata-related code See merge request: Ascend/TransferQueue!28
1 parent 87d7e13 commit 6ad4d07

21 files changed

Lines changed: 2166 additions & 2612 deletions

scripts/put_benchmark.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,11 @@
3333
parent_dir = Path(__file__).resolve().parent.parent.parent
3434
sys.path.append(str(parent_dir))
3535

36-
from transfer_queue import ( # noqa: E402
37-
AsyncTransferQueueClient,
38-
SimpleStorageUnit,
39-
TransferQueueController,
40-
process_zmq_server_info,
41-
)
36+
from transfer_queue import TransferQueueClient # noqa: E402
37+
from transfer_queue.controller import TransferQueueController # noqa: E402
38+
from transfer_queue.storage.simple_backend import SimpleStorageUnit # noqa: E402
4239
from transfer_queue.utils.common import get_placement_group # noqa: E402
40+
from transfer_queue.utils.zmq_utils import process_zmq_server_info # noqa: E402
4341

4442
logging.basicConfig(level=logging.INFO)
4543
logger = logging.getLogger(__name__)
@@ -309,7 +307,7 @@ def initialize_system(self, config_dict):
309307
self.tq_config = OmegaConf.merge(tq_internal_conf, self.tq_config)
310308

311309
# Client Init
312-
self.data_system_client = AsyncTransferQueueClient(
310+
self.data_system_client = TransferQueueClient(
313311
client_id="Trainer", controller_info=self.data_system_controller_info
314312
)
315313
self.data_system_client.initialize_storage_manager(

tests/e2e/test_e2e_lifecycle_consistency.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -420,15 +420,22 @@ def test_cross_shard_complex_update(e2e_client):
420420
"Region 30-39 tensor_f32 should match original Put B"
421421
)
422422

423-
# 9. Verify new fields exist in update region
424-
extended_fields = base_fields + ["new_extra_tensor", "new_extra_non_tensor"]
425-
update_region_meta = poll_for_meta(
426-
client, partition_id, extended_fields, 20, "update_region_task", mode="force_fetch"
423+
# 9. Verify new fields exist in update region (indices 10-29 only have new fields).
424+
# Build extended_meta from full_meta (which has valid _custom_backend_meta)
425+
# by selecting the subset of samples whose global_indexes match meta_update.
426+
# Using meta_update directly would fail because it was derived from alloc_meta
427+
# before put(), so its _custom_backend_meta may be incomplete.
428+
update_gis = set(meta_update.global_indexes)
429+
update_positions_in_full = [
430+
i for i, global_index in enumerate(full_meta.global_indexes) if global_index in update_gis
431+
]
432+
update_meta_with_backend = full_meta.select_samples(update_positions_in_full)
433+
extended_meta = update_meta_with_backend.with_data_fields(
434+
base_fields + ["new_extra_tensor", "new_extra_non_tensor"]
427435
)
428-
if update_region_meta is not None and update_region_meta.size > 0:
429-
update_region_data = client.get_data(update_region_meta)
430-
assert "new_extra_tensor" in update_region_data.keys(), "new_extra_tensor should exist"
431-
assert "new_extra_non_tensor" in update_region_data.keys(), "new_extra_non_tensor should exist"
436+
update_region_data = client.get_data(extended_meta)
437+
assert "new_extra_tensor" in update_region_data.keys(), "new_extra_tensor should exist"
438+
assert "new_extra_non_tensor" in update_region_data.keys(), "new_extra_non_tensor should exist"
432439
finally:
433440
client.clear_partition(partition_id)
434441

@@ -641,5 +648,59 @@ def test_clear_partition(e2e_client):
641648
pass
642649

643650

651+
# Scenario Six: Dynamic Tensor Shape → Nested Tensor Transition
652+
def test_dynamic_tensor_shape_nested_transition(e2e_client):
653+
"""
654+
Test transition from regular tensor to nested tensor.
655+
First put tensors of identical shape, then put tensors of a different shape.
656+
Verify that the field schema marks is_nested=True, and getting all samples returns a nested tensor.
657+
"""
658+
client = e2e_client
659+
partition_id = "test_nested_transition_partition"
660+
task_name = "test_task"
661+
662+
try:
663+
# 1. Put same-shape tensor (shape: (2, 4)) — initial insert
664+
data1 = TensorDict({"dynamic_feature": torch.ones(2, 4)}, batch_size=2)
665+
meta1_put = client.put(data=data1, partition_id=partition_id)
666+
assert meta1_put.size == 2
667+
668+
# Poll and verify first batch is regular tensor
669+
meta1 = poll_for_meta(client, partition_id, ["dynamic_feature"], 2, task_name, mode="force_fetch")
670+
assert not meta1.field_schema["dynamic_feature"]["is_nested"]
671+
retrieved_1 = client.get_data(meta1)
672+
assert not retrieved_1["dynamic_feature"].is_nested
673+
assert retrieved_1["dynamic_feature"].shape == (2, 4)
674+
675+
# 2. Allocate 2 more slots via insert mode, put different-shape tensor (shape: (2, 6))
676+
alloc_meta2 = client.get_meta(
677+
partition_id=partition_id,
678+
data_fields=["dynamic_feature"],
679+
batch_size=2,
680+
mode="insert",
681+
task_name="allocator",
682+
)
683+
assert alloc_meta2.size == 2
684+
data2 = TensorDict({"dynamic_feature": torch.ones(2, 6)}, batch_size=2)
685+
client.put(data=data2, metadata=alloc_meta2)
686+
687+
# Poll and verify metadata now indicates nested tensor
688+
meta2 = poll_for_meta(client, partition_id, ["dynamic_feature"], 2, task_name, mode="force_fetch")
689+
690+
# After second put with different shape, is_nested should be True
691+
assert meta2.field_schema["dynamic_feature"]["is_nested"] is True
692+
693+
# 3. Retrieve all 4 samples together
694+
meta_all = poll_for_meta(client, partition_id, ["dynamic_feature"], 4, task_name, mode="force_fetch")
695+
assert meta_all.field_schema["dynamic_feature"]["is_nested"] is True
696+
697+
retrieved_all = client.get_data(meta_all)
698+
# The merged result should be a nested tensor since the shapes vary
699+
assert retrieved_all["dynamic_feature"].is_nested is True
700+
assert len(retrieved_all["dynamic_feature"]) == 4
701+
finally:
702+
client.clear_partition(partition_id)
703+
704+
644705
if __name__ == "__main__":
645706
sys.exit(pytest.main(["-v", __file__]))

0 commit comments

Comments
 (0)