Skip to content

Commit 239438c

Browse files
committed
refactor: simplify StorageManager naming
Signed-off-by: ji-huazhong <hzji210@gmail.com>
1 parent b266d39 commit 239438c

26 files changed

Lines changed: 161 additions & 210 deletions

README.md

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ TransferQueue offers **fine-grained, sub-sample-level** data management and **lo
5050

5151
### Control Plane: Panoramic Data Management
5252

53-
In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. Once all required data fields are ready (i.e., written to the `TransferQueueStorageManager`), the data sample can be consumed by downstream tasks.
53+
In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. Once all required data fields are ready (i.e., written to the `StorageManager`), the data sample can be consumed by downstream tasks.
5454

5555
We also track the consumption history for each computational task (e.g., `generate_sequences`, `compute_log_prob`, etc.). Therefore, even when different computational tasks require the same data field, they can consume the data independently without interfering with each other.
5656

@@ -66,7 +66,7 @@ To make the data retrieval process more customizable, we provide a `Sampler` cla
6666

6767
In the data plane, we utilize a pluggable design, enabling TransferQueue to integrate with different storage backends based on user requirements.
6868

69-
Specifically, we provide a `TransferQueueStorageManager` abstraction class that defines the core APIs as follows:
69+
Specifically, we provide a `StorageManager` abstraction class that defines the core APIs as follows:
7070

7171
- `async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None`
7272
- `async def get_data(self, metadata: BatchMeta) -> TensorDict`
@@ -298,21 +298,19 @@ The data plane is organized as follows:
298298
│ │── simple_backend.py # Default distributed storage backend (SimpleStorageUnit) by TQ
299299
│ ├── managers/ # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
300300
│ │ ├── __init__.py
301-
│ │ ├──base.py # TransferQueueStorageManager, KVStorageManager
301+
│ │ ├──base.py # StorageManager, KVStorageManager, StorageManagerFactory
302302
│ │ ├──simple_backend_manager.py # AsyncSimpleStorageManager
303303
│ │ ├──yuanrong_manager.py # YuanrongStorageManager
304-
│ │ ├──mooncake_manager.py # MooncakeStorageManager
305-
│ │ └──factory.py # TransferQueueStorageManagerFactory
304+
│ │ └──mooncake_manager.py # MooncakeStorageManager
306305
│ └── clients/ # Clients are lower level interfaces that directly manipulate the target storage backend.
307306
│ │ ├── __init__.py
308-
│ │ ├── base.py # TransferQueueStorageKVClient
307+
│ │ ├── base.py # StorageKVClient, StorageKVClientFactory
309308
│ │ ├── yuanrong_client.py # YuanrongStorageClient
310309
│ │ ├── mooncake_client.py # MooncakeStorageClient
311-
│ │ ├── ray_storage_client.py # RayStorageClient
312-
│ │ └── factory.py # TransferQueueStorageClientFactory
310+
│ │ └── ray_storage_client.py # RayStorageClient
313311
```
314312

315-
To integrate TransferQueue with a custom storage backend, start by implementing a subclass that inherits from `TransferQueueStorageManager`. This subclass acts as an adapter between the TransferQueue system and the target storage backend. For KV-based storage backends, you can simply inherit from `KVStorageManager`, which can serve as the general manager for all KV-based backends.
313+
To integrate TransferQueue with a custom storage backend, start by implementing a subclass that inherits from `StorageManager`. This subclass acts as an adapter between the TransferQueue system and the target storage backend. For KV-based storage backends, you can simply inherit from `KVStorageManager`, which can serve as the general manager for all KV-based backends.
316314

317315
Distributed storage backends often come with their own native clients serving as the interface of the storage system. In such cases, a low-level adapter for this client can be written, following the examples provided in the `storage/clients` directory.
318316

scripts/put_benchmark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
from transfer_queue import TransferQueueClient
3232
from transfer_queue.controller import TransferQueueController
33-
from transfer_queue.storage.simple_backend import SimpleStorageUnit
33+
from transfer_queue.storage.simple_storage import SimpleStorageUnit
3434
from transfer_queue.utils.common import get_placement_group
3535
from transfer_queue.utils.zmq_utils import process_zmq_server_info
3636

tests/test_async_simple_storage_manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def mock_async_storage_manager():
6262

6363
# Mock the handshake process entirely to avoid ZMQ complexity
6464
with patch(
65-
"transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"
65+
"transfer_queue.storage.managers.base.StorageManager._connect_to_controller"
6666
) as mock_connect:
6767
# Mock the manager without actually connecting
6868
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
@@ -254,7 +254,7 @@ async def test_get_data_routes_from_hash():
254254
ports={"put_get_socket": 19011},
255255
),
256256
}
257-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
257+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
258258
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
259259
manager.storage_manager_id = "test_get"
260260
manager.storage_unit_infos = storage_unit_infos
@@ -307,7 +307,7 @@ async def test_clear_data_routes_from_hash():
307307
ports={"put_get_socket": 19021},
308308
),
309309
}
310-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
310+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
311311
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
312312
manager.storage_manager_id = "test_clear"
313313
manager.storage_unit_infos = storage_unit_infos
@@ -358,7 +358,7 @@ async def test_hash_routing_stable_across_batch_sizes():
358358
ports={"put_get_socket": 19031},
359359
),
360360
}
361-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
361+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
362362
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
363363
manager.storage_manager_id = "test_hash_batch"
364364
manager.storage_unit_infos = storage_unit_infos
@@ -419,7 +419,7 @@ async def test_hash_routing_stable_reversed_order():
419419
ports={"put_get_socket": 19041},
420420
),
421421
}
422-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
422+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
423423
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
424424
manager.storage_manager_id = "test_hash_order"
425425
manager.storage_unit_infos = storage_unit_infos

tests/test_kv_storage_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def test_generate_values(test_data):
106106
assert torch.equal(values[i], expected_values[i])
107107

108108

109-
@patch("transfer_queue.storage.managers.base.StorageClientFactory.create")
109+
@patch("transfer_queue.storage.managers.base.StorageKVClientFactory.create")
110110
@patch.object(KVStorageManager, "_connect_to_controller", lambda self: None)
111111
def test_merge_tensors_to_tensordict(mock_create, test_data):
112112
"""Test whether _merge_kv_to_tensordict can correctly reconstruct the TensorDict."""
@@ -268,7 +268,7 @@ def test_data_for_put_data():
268268
}
269269

270270

271-
STORAGE_CLIENT_FACTORY_PATH = "transfer_queue.storage.managers.base.StorageClientFactory"
271+
STORAGE_CLIENT_FACTORY_PATH = "transfer_queue.storage.managers.base.StorageKVClientFactory"
272272

273273

274274
@patch.object(KVStorageManager, "_connect_to_controller", lambda self: None)

tests/test_metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ class TestStorageUnitDataStrict:
678678

679679
def test_put_data_length_mismatch_raises(self):
680680
"""put_data must raise when global_indexes and field values have different lengths."""
681-
from transfer_queue.storage.simple_backend import StorageUnitData
681+
from transfer_queue.storage.simple_storage import StorageUnitData
682682

683683
sud = StorageUnitData(storage_size=10)
684684
# 3 indexes but only 2 values — must raise, not silently drop

tests/test_ray_p2p.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from transfer_queue.client import TransferQueueClient
2525
from transfer_queue.metadata import BatchMeta
2626
from transfer_queue.storage.managers.base import KVStorageManager
27-
from transfer_queue.storage.managers.factory import TransferQueueStorageManagerFactory
27+
from transfer_queue.storage.managers.factory import StorageManagerFactory
2828
from transfer_queue.utils.zmq_utils import ZMQServerInfo
2929

3030
TEST_CONFIGS: list[tuple[tuple[int, int], torch.dtype]] = [
@@ -71,9 +71,9 @@ def create_mock_controller():
7171
def ensure_mock_storage_manager_registered():
7272
"""Ensure MockKVStorageManager is registered in current process."""
7373

74-
if "KV_MOCK" not in TransferQueueStorageManagerFactory._registry:
74+
if "KV_MOCK" not in StorageManagerFactory._registry:
7575

76-
@TransferQueueStorageManagerFactory.register("KV_MOCK")
76+
@StorageManagerFactory.register("KV_MOCK")
7777
class MockKVStorageManager(KVStorageManager):
7878
def _connect_to_controller(self):
7979
pass

tests/test_simple_storage_unit.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import torch
2222
import zmq
2323

24-
from transfer_queue.storage.simple_backend import SimpleStorageUnit
24+
from transfer_queue.storage.simple_storage import SimpleStorageUnit
2525
from transfer_queue.utils.zmq_utils import ZMQMessage, ZMQRequestType
2626

2727

@@ -420,7 +420,7 @@ def test_storage_unit_data_direct():
420420

421421
def test_storage_unit_data_capacity_uses_active_keys():
422422
"""Capacity check must use _active_keys, not scan field_data."""
423-
from transfer_queue.storage.simple_backend import StorageUnitData
423+
from transfer_queue.storage.simple_storage import StorageUnitData
424424

425425
storage = StorageUnitData(storage_size=3)
426426

tests/test_storage_client_factory.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import pytest
2020
import torch
2121

22-
from transfer_queue.storage.clients.factory import StorageClientFactory
22+
from transfer_queue.storage.clients.factory import StorageKVClientFactory
2323
from transfer_queue.storage.clients.yuanrong_client import YuanrongStorageClient
2424

2525

@@ -29,12 +29,12 @@ def setUp(self):
2929

3030
@pytest.mark.skipif(find_spec("datasystem") is None, reason="datasystem is not available")
3131
def test_create_client(self):
32-
self.assertIn("YuanrongStorageClient", StorageClientFactory._registry)
33-
self.assertIs(StorageClientFactory._registry["YuanrongStorageClient"], YuanrongStorageClient)
34-
StorageClientFactory.create("YuanrongStorageClient", self.cfg)
32+
self.assertIn("YuanrongStorageClient", StorageKVClientFactory._registry)
33+
self.assertIs(StorageKVClientFactory._registry["YuanrongStorageClient"], YuanrongStorageClient)
34+
StorageKVClientFactory.create("YuanrongStorageClient", self.cfg)
3535

3636
with self.assertRaises(ValueError) as cm:
37-
StorageClientFactory.create("abc", self.cfg)
37+
StorageKVClientFactory.create("abc", self.cfg)
3838
self.assertIn("Unknown StorageClient", str(cm.exception))
3939

4040
@pytest.mark.skipif(
@@ -47,7 +47,7 @@ def test_client_create_empty_tensorlist(self):
4747
for t in tensors:
4848
shapes.append(t.shape)
4949
dtypes.append(t.dtype)
50-
client = StorageClientFactory.create("YuanrongStorageClient", self.cfg)
50+
client = StorageKVClientFactory.create("YuanrongStorageClient", self.cfg)
5151

5252
empty_tensors = client._create_empty_npu_tensorlist(shapes, dtypes)
5353
self.assertEqual(len(tensors), len(empty_tensors))

transfer_queue/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
BatchMeta,
2929
)
3030
from transfer_queue.storage import (
31-
TransferQueueStorageManagerFactory,
31+
StorageManagerFactory,
3232
)
3333
from transfer_queue.utils.common import limit_pytorch_auto_parallel_threads
3434
from transfer_queue.utils.logging_utils import get_logger
@@ -92,7 +92,7 @@ def initialize_storage_manager(
9292
- zmq_info: ZMQ server information about the storage units
9393
9494
"""
95-
self.storage_manager = TransferQueueStorageManagerFactory.create(
95+
self.storage_manager = StorageManagerFactory.create(
9696
manager_type, controller_info=self._controller, config=config
9797
)
9898

transfer_queue/interface.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from transfer_queue.metadata import KVBatchMeta
3333
from transfer_queue.sampler import * # noqa: F401
3434
from transfer_queue.sampler import BaseSampler
35-
from transfer_queue.storage.simple_backend import SimpleStorageUnit
35+
from transfer_queue.storage.simple_storage import SimpleStorageUnit
3636
from transfer_queue.utils.common import get_placement_group
3737
from transfer_queue.utils.logging_utils import get_logger
3838
from transfer_queue.utils.yuanrong_utils import (

0 commit comments

Comments
 (0)