Skip to content

Commit 239abb4

Browse files
committed
refactor: simplify StorageManager naming
Rename internal storage manager classes by removing TransferQueue prefix: - TransferQueueStorageManager → StorageManager - TransferQueueStorageManagerFactory → StorageManagerFactory These classes are internal components not exposed as public API, so the shorter names improve readability without causing conflicts.
1 parent b266d39 commit 239abb4

24 files changed

Lines changed: 146 additions & 193 deletions

README.md

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ TransferQueue offers **fine-grained, sub-sample-level** data management and **lo
5050

5151
### Control Plane: Panoramic Data Management
5252

53-
In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. Once all required data fields are ready (i.e., written to the `TransferQueueStorageManager`), the data sample can be consumed by downstream tasks.
53+
In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. Once all required data fields are ready (i.e., written to the `StorageManager`), the data sample can be consumed by downstream tasks.
5454

5555
We also track the consumption history for each computational task (e.g., `generate_sequences`, `compute_log_prob`, etc.). Therefore, even when different computational tasks require the same data field, they can consume the data independently without interfering with each other.
5656

@@ -66,7 +66,7 @@ To make the data retrieval process more customizable, we provide a `Sampler` cla
6666

6767
In the data plane, we utilize a pluggable design, enabling TransferQueue to integrate with different storage backends based on user requirements.
6868

69-
Specifically, we provide a `TransferQueueStorageManager` abstraction class that defines the core APIs as follows:
69+
Specifically, we provide a `StorageManager` abstraction class that defines the core APIs as follows:
7070

7171
- `async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None`
7272
- `async def get_data(self, metadata: BatchMeta) -> TensorDict`
@@ -298,21 +298,19 @@ The data plane is organized as follows:
298298
│ │── simple_backend.py # Default distributed storage backend (SimpleStorageUnit) by TQ
299299
│ ├── managers/ # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
300300
│ │ ├── __init__.py
301-
│ │ ├──base.py # TransferQueueStorageManager, KVStorageManager
301+
│ │ ├──base.py # StorageManager, KVStorageManager, StorageManagerFactory
302302
│ │ ├──simple_backend_manager.py # AsyncSimpleStorageManager
303303
│ │ ├──yuanrong_manager.py # YuanrongStorageManager
304-
│ │ ├──mooncake_manager.py # MooncakeStorageManager
305-
│ │ └──factory.py # TransferQueueStorageManagerFactory
304+
│ │ └──mooncake_manager.py # MooncakeStorageManager
306305
│ └── clients/ # Clients are lower level interfaces that directly manipulate the target storage backend.
307306
│ │ ├── __init__.py
308-
│ │ ├── base.py # TransferQueueStorageKVClient
307+
│ │ ├── base.py # StorageKVClient, StorageClientFactory
309308
│ │ ├── yuanrong_client.py # YuanrongStorageClient
310309
│ │ ├── mooncake_client.py # MooncakeStorageClient
311-
│ │ ├── ray_storage_client.py # RayStorageClient
312-
│ │ └── factory.py # TransferQueueStorageClientFactory
310+
│ │ └── ray_storage_client.py # RayStorageClient
313311
```
314312

315-
To integrate TransferQueue with a custom storage backend, start by implementing a subclass that inherits from `TransferQueueStorageManager`. This subclass acts as an adapter between the TransferQueue system and the target storage backend. For KV-based storage backends, you can simply inherit from `KVStorageManager`, which can serve as the general manager for all KV-based backends.
313+
To integrate TransferQueue with a custom storage backend, start by implementing a subclass that inherits from `StorageManager`. This subclass acts as an adapter between the TransferQueue system and the target storage backend. For KV-based storage backends, you can simply inherit from `KVStorageManager`, which can serve as the general manager for all KV-based backends.
316314

317315
Distributed storage backends often come with their own native clients serving as the interface of the storage system. In such cases, a low-level adapter for this client can be written, following the examples provided in the `storage/clients` directory.
318316

scripts/put_benchmark.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
from transfer_queue import TransferQueueClient
3232
from transfer_queue.controller import TransferQueueController
33-
from transfer_queue.storage.simple_backend import SimpleStorageUnit
33+
from transfer_queue.storage.simple_storage import SimpleStorageUnit
3434
from transfer_queue.utils.common import get_placement_group
3535
from transfer_queue.utils.zmq_utils import process_zmq_server_info
3636

tests/test_async_simple_storage_manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def mock_async_storage_manager():
6262

6363
# Mock the handshake process entirely to avoid ZMQ complexity
6464
with patch(
65-
"transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"
65+
"transfer_queue.storage.managers.base.StorageManager._connect_to_controller"
6666
) as mock_connect:
6767
# Mock the manager without actually connecting
6868
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
@@ -254,7 +254,7 @@ async def test_get_data_routes_from_hash():
254254
ports={"put_get_socket": 19011},
255255
),
256256
}
257-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
257+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
258258
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
259259
manager.storage_manager_id = "test_get"
260260
manager.storage_unit_infos = storage_unit_infos
@@ -307,7 +307,7 @@ async def test_clear_data_routes_from_hash():
307307
ports={"put_get_socket": 19021},
308308
),
309309
}
310-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
310+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
311311
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
312312
manager.storage_manager_id = "test_clear"
313313
manager.storage_unit_infos = storage_unit_infos
@@ -358,7 +358,7 @@ async def test_hash_routing_stable_across_batch_sizes():
358358
ports={"put_get_socket": 19031},
359359
),
360360
}
361-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
361+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
362362
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
363363
manager.storage_manager_id = "test_hash_batch"
364364
manager.storage_unit_infos = storage_unit_infos
@@ -419,7 +419,7 @@ async def test_hash_routing_stable_reversed_order():
419419
ports={"put_get_socket": 19041},
420420
),
421421
}
422-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
422+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
423423
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
424424
manager.storage_manager_id = "test_hash_order"
425425
manager.storage_unit_infos = storage_unit_infos

tests/test_metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ class TestStorageUnitDataStrict:
678678

679679
def test_put_data_length_mismatch_raises(self):
680680
"""put_data must raise when global_indexes and field values have different lengths."""
681-
from transfer_queue.storage.simple_backend import StorageUnitData
681+
from transfer_queue.storage.simple_storage import StorageUnitData
682682

683683
sud = StorageUnitData(storage_size=10)
684684
# 3 indexes but only 2 values — must raise, not silently drop

tests/test_ray_p2p.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from transfer_queue.client import TransferQueueClient
2525
from transfer_queue.metadata import BatchMeta
2626
from transfer_queue.storage.managers.base import KVStorageManager
27-
from transfer_queue.storage.managers.factory import TransferQueueStorageManagerFactory
27+
from transfer_queue.storage.managers.factory import StorageManagerFactory
2828
from transfer_queue.utils.zmq_utils import ZMQServerInfo
2929

3030
TEST_CONFIGS: list[tuple[tuple[int, int], torch.dtype]] = [
@@ -71,9 +71,9 @@ def create_mock_controller():
7171
def ensure_mock_storage_manager_registered():
7272
"""Ensure MockKVStorageManager is registered in current process."""
7373

74-
if "KV_MOCK" not in TransferQueueStorageManagerFactory._registry:
74+
if "KV_MOCK" not in StorageManagerFactory._registry:
7575

76-
@TransferQueueStorageManagerFactory.register("KV_MOCK")
76+
@StorageManagerFactory.register("KV_MOCK")
7777
class MockKVStorageManager(KVStorageManager):
7878
def _connect_to_controller(self):
7979
pass

tests/test_simple_storage_unit.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import torch
2222
import zmq
2323

24-
from transfer_queue.storage.simple_backend import SimpleStorageUnit
24+
from transfer_queue.storage.simple_storage import SimpleStorageUnit
2525
from transfer_queue.utils.zmq_utils import ZMQMessage, ZMQRequestType
2626

2727

@@ -420,7 +420,7 @@ def test_storage_unit_data_direct():
420420

421421
def test_storage_unit_data_capacity_uses_active_keys():
422422
"""Capacity check must use _active_keys, not scan field_data."""
423-
from transfer_queue.storage.simple_backend import StorageUnitData
423+
from transfer_queue.storage.simple_storage import StorageUnitData
424424

425425
storage = StorageUnitData(storage_size=3)
426426

transfer_queue/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
BatchMeta,
2929
)
3030
from transfer_queue.storage import (
31-
TransferQueueStorageManagerFactory,
31+
StorageManagerFactory,
3232
)
3333
from transfer_queue.utils.common import limit_pytorch_auto_parallel_threads
3434
from transfer_queue.utils.logging_utils import get_logger
@@ -92,7 +92,7 @@ def initialize_storage_manager(
9292
- zmq_info: ZMQ server information about the storage units
9393
9494
"""
95-
self.storage_manager = TransferQueueStorageManagerFactory.create(
95+
self.storage_manager = StorageManagerFactory.create(
9696
manager_type, controller_info=self._controller, config=config
9797
)
9898

transfer_queue/interface.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from transfer_queue.metadata import KVBatchMeta
3333
from transfer_queue.sampler import * # noqa: F401
3434
from transfer_queue.sampler import BaseSampler
35-
from transfer_queue.storage.simple_backend import SimpleStorageUnit
35+
from transfer_queue.storage.simple_storage import SimpleStorageUnit
3636
from transfer_queue.utils.common import get_placement_group
3737
from transfer_queue.utils.logging_utils import get_logger
3838
from transfer_queue.utils.yuanrong_utils import (

transfer_queue/storage/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717
AsyncSimpleStorageManager,
1818
MooncakeStorageManager,
1919
RayStorageManager,
20-
TransferQueueStorageManager,
21-
TransferQueueStorageManagerFactory,
20+
StorageManager,
21+
StorageManagerFactory,
2222
YuanrongStorageManager,
2323
)
24-
from .simple_backend import SimpleStorageUnit, StorageUnitData
24+
from .simple_storage import SimpleStorageUnit, StorageUnitData
2525

2626
__all__ = [
2727
"SimpleStorageUnit",
2828
"StorageUnitData",
29-
"TransferQueueStorageManager",
30-
"TransferQueueStorageManagerFactory",
29+
"StorageManager",
30+
"StorageManagerFactory",
3131
"AsyncSimpleStorageManager",
3232
"MooncakeStorageManager",
3333
"YuanrongStorageManager",

transfer_queue/storage/clients/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
# limitations under the License.
1515

1616
# This module is currently empty but reserved for future client implementations
17-
from .base import TransferQueueStorageKVClient
17+
from .base import StorageKVClient
1818
from .factory import StorageClientFactory
1919
from .mooncake_client import MooncakeStoreClient
2020
from .ray_storage_client import RayStorageClient
2121
from .yuanrong_client import YuanrongStorageClient
2222

2323
__all__ = [
24-
"TransferQueueStorageKVClient",
24+
"StorageKVClient",
2525
"StorageClientFactory",
2626
"RayStorageClient",
2727
"MooncakeStoreClient",

0 commit comments

Comments
 (0)