Skip to content

Commit 29ff6d1

Browse files
committed
refactor: simplify StorageManager naming
Rename internal storage manager classes by removing TransferQueue prefix: - TransferQueueStorageManager → StorageManager - TransferQueueStorageManagerFactory → StorageManagerFactory These classes are internal components not exposed as public API, so the shorter names improve readability without causing conflicts.
1 parent b266d39 commit 29ff6d1

19 files changed

Lines changed: 140 additions & 183 deletions

README.md

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ TransferQueue offers **fine-grained, sub-sample-level** data management and **lo
5050

5151
### Control Plane: Panoramic Data Management
5252

53-
In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. Once all required data fields are ready (i.e., written to the `TransferQueueStorageManager`), the data sample can be consumed by downstream tasks.
53+
In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. Once all required data fields are ready (i.e., written to the `StorageManager`), the data sample can be consumed by downstream tasks.
5454

5555
We also track the consumption history for each computational task (e.g., `generate_sequences`, `compute_log_prob`, etc.). Therefore, even when different computational tasks require the same data field, they can consume the data independently without interfering with each other.
5656

@@ -66,7 +66,7 @@ To make the data retrieval process more customizable, we provide a `Sampler` cla
6666

6767
In the data plane, we utilize a pluggable design, enabling TransferQueue to integrate with different storage backends based on user requirements.
6868

69-
Specifically, we provide a `TransferQueueStorageManager` abstraction class that defines the core APIs as follows:
69+
Specifically, we provide a `StorageManager` abstraction class that defines the core APIs as follows:
7070

7171
- `async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None`
7272
- `async def get_data(self, metadata: BatchMeta) -> TensorDict`
@@ -298,21 +298,19 @@ The data plane is organized as follows:
298298
│ │── simple_backend.py # Default distributed storage backend (SimpleStorageUnit) by TQ
299299
│ ├── managers/ # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
300300
│ │ ├── __init__.py
301-
│ │ ├──base.py # TransferQueueStorageManager, KVStorageManager
301+
│ │ ├──base.py # StorageManager, KVStorageManager, StorageManagerFactory
302302
│ │ ├──simple_backend_manager.py # AsyncSimpleStorageManager
303303
│ │ ├──yuanrong_manager.py # YuanrongStorageManager
304-
│ │ ├──mooncake_manager.py # MooncakeStorageManager
305-
│ │ └──factory.py # TransferQueueStorageManagerFactory
304+
│ │ └──mooncake_manager.py # MooncakeStorageManager
306305
│ └── clients/ # Clients are lower level interfaces that directly manipulate the target storage backend.
307306
│ │ ├── __init__.py
308-
│ │ ├── base.py # TransferQueueStorageKVClient
307+
│ │ ├── base.py # StorageKVClient, StorageClientFactory
309308
│ │ ├── yuanrong_client.py # YuanrongStorageClient
310309
│ │ ├── mooncake_client.py # MooncakeStorageClient
311-
│ │ ├── ray_storage_client.py # RayStorageClient
312-
│ │ └── factory.py # TransferQueueStorageClientFactory
310+
│ │ └── ray_storage_client.py # RayStorageClient
313311
```
314312

315-
To integrate TransferQueue with a custom storage backend, start by implementing a subclass that inherits from `TransferQueueStorageManager`. This subclass acts as an adapter between the TransferQueue system and the target storage backend. For KV-based storage backends, you can simply inherit from `KVStorageManager`, which can serve as the general manager for all KV-based backends.
313+
To integrate TransferQueue with a custom storage backend, start by implementing a subclass that inherits from `StorageManager`. This subclass acts as an adapter between the TransferQueue system and the target storage backend. For KV-based storage backends, you can simply inherit from `KVStorageManager`, which can serve as the general manager for all KV-based backends.
316314

317315
Distributed storage backends often come with their own native clients serving as the interface of the storage system. In such cases, a low-level adapter for this client can be written, following the examples provided in the `storage/clients` directory.
318316

tests/test_async_simple_storage_manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def mock_async_storage_manager():
6262

6363
# Mock the handshake process entirely to avoid ZMQ complexity
6464
with patch(
65-
"transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"
65+
"transfer_queue.storage.managers.base.StorageManager._connect_to_controller"
6666
) as mock_connect:
6767
# Mock the manager without actually connecting
6868
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
@@ -254,7 +254,7 @@ async def test_get_data_routes_from_hash():
254254
ports={"put_get_socket": 19011},
255255
),
256256
}
257-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
257+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
258258
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
259259
manager.storage_manager_id = "test_get"
260260
manager.storage_unit_infos = storage_unit_infos
@@ -307,7 +307,7 @@ async def test_clear_data_routes_from_hash():
307307
ports={"put_get_socket": 19021},
308308
),
309309
}
310-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
310+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
311311
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
312312
manager.storage_manager_id = "test_clear"
313313
manager.storage_unit_infos = storage_unit_infos
@@ -358,7 +358,7 @@ async def test_hash_routing_stable_across_batch_sizes():
358358
ports={"put_get_socket": 19031},
359359
),
360360
}
361-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
361+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
362362
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
363363
manager.storage_manager_id = "test_hash_batch"
364364
manager.storage_unit_infos = storage_unit_infos
@@ -419,7 +419,7 @@ async def test_hash_routing_stable_reversed_order():
419419
ports={"put_get_socket": 19041},
420420
),
421421
}
422-
with patch("transfer_queue.storage.managers.base.TransferQueueStorageManager._connect_to_controller"):
422+
with patch("transfer_queue.storage.managers.base.StorageManager._connect_to_controller"):
423423
manager = AsyncSimpleStorageManager.__new__(AsyncSimpleStorageManager)
424424
manager.storage_manager_id = "test_hash_order"
425425
manager.storage_unit_infos = storage_unit_infos

tests/test_ray_p2p.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from transfer_queue.client import TransferQueueClient
2525
from transfer_queue.metadata import BatchMeta
2626
from transfer_queue.storage.managers.base import KVStorageManager
27-
from transfer_queue.storage.managers.factory import TransferQueueStorageManagerFactory
27+
from transfer_queue.storage.managers.factory import StorageManagerFactory
2828
from transfer_queue.utils.zmq_utils import ZMQServerInfo
2929

3030
TEST_CONFIGS: list[tuple[tuple[int, int], torch.dtype]] = [
@@ -71,9 +71,9 @@ def create_mock_controller():
7171
def ensure_mock_storage_manager_registered():
7272
"""Ensure MockKVStorageManager is registered in current process."""
7373

74-
if "KV_MOCK" not in TransferQueueStorageManagerFactory._registry:
74+
if "KV_MOCK" not in StorageManagerFactory._registry:
7575

76-
@TransferQueueStorageManagerFactory.register("KV_MOCK")
76+
@StorageManagerFactory.register("KV_MOCK")
7777
class MockKVStorageManager(KVStorageManager):
7878
def _connect_to_controller(self):
7979
pass

transfer_queue/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
BatchMeta,
2929
)
3030
from transfer_queue.storage import (
31-
TransferQueueStorageManagerFactory,
31+
StorageManagerFactory,
3232
)
3333
from transfer_queue.utils.common import limit_pytorch_auto_parallel_threads
3434
from transfer_queue.utils.logging_utils import get_logger
@@ -92,7 +92,7 @@ def initialize_storage_manager(
9292
- zmq_info: ZMQ server information about the storage units
9393
9494
"""
95-
self.storage_manager = TransferQueueStorageManagerFactory.create(
95+
self.storage_manager = StorageManagerFactory.create(
9696
manager_type, controller_info=self._controller, config=config
9797
)
9898

transfer_queue/storage/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717
AsyncSimpleStorageManager,
1818
MooncakeStorageManager,
1919
RayStorageManager,
20-
TransferQueueStorageManager,
21-
TransferQueueStorageManagerFactory,
20+
StorageManager,
21+
StorageManagerFactory,
2222
YuanrongStorageManager,
2323
)
2424
from .simple_backend import SimpleStorageUnit, StorageUnitData
2525

2626
__all__ = [
2727
"SimpleStorageUnit",
2828
"StorageUnitData",
29-
"TransferQueueStorageManager",
30-
"TransferQueueStorageManagerFactory",
29+
"StorageManager",
30+
"StorageManagerFactory",
3131
"AsyncSimpleStorageManager",
3232
"MooncakeStorageManager",
3333
"YuanrongStorageManager",

transfer_queue/storage/clients/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
# limitations under the License.
1515

1616
# This module is currently empty but reserved for future client implementations
17-
from .base import TransferQueueStorageKVClient
17+
from .base import StorageKVClient
1818
from .factory import StorageClientFactory
1919
from .mooncake_client import MooncakeStoreClient
2020
from .ray_storage_client import RayStorageClient
2121
from .yuanrong_client import YuanrongStorageClient
2222

2323
__all__ = [
24-
"TransferQueueStorageKVClient",
24+
"StorageKVClient",
2525
"StorageClientFactory",
2626
"RayStorageClient",
2727
"MooncakeStoreClient",

transfer_queue/storage/clients/base.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from typing import Any, Optional
1818

1919

20-
class TransferQueueStorageKVClient(ABC):
20+
class StorageKVClient(ABC):
2121
"""
2222
Abstract base class for storage client.
2323
Subclasses must implement the core methods: put, get, and clear.
@@ -68,3 +68,44 @@ def get(self, keys: list[str], shapes=None, dtypes=None, custom_backend_meta=Non
6868
def clear(self, keys: list[str], custom_backend_meta=None) -> None:
6969
"""Clear key-value pairs in the storage backend."""
7070
raise NotImplementedError("Subclasses must implement clear")
71+
72+
73+
class StorageClientFactory:
74+
"""
75+
Factory class for creating storage client instances.
76+
Uses a decorator-based registration mechanism to map client names to classes.
77+
"""
78+
79+
# Class variable: maps client names to their corresponding classes
80+
_registry: dict[str, type[StorageKVClient]] = {}
81+
82+
@classmethod
83+
def register(cls, client_type: str):
84+
"""
85+
Decorator to register a concrete client class with the factory.
86+
Args:
87+
client_type (str): The name used to identify the client
88+
Returns:
89+
Callable: The decorator function that returns the original class
90+
"""
91+
92+
def decorator(client_class: type[StorageKVClient]) -> type[StorageKVClient]:
93+
cls._registry[client_type] = client_class
94+
return client_class
95+
96+
return decorator
97+
98+
@classmethod
99+
def create(cls, client_type: str, config: dict) -> StorageKVClient:
100+
"""
101+
Create and return an instance of the storage client by name.
102+
Args:
103+
client_type (str): The registered name of the client
104+
Returns:
105+
StorageClientFactory: An instance of the requested client
106+
Raises:
107+
ValueError: If no client is registered with the given name
108+
"""
109+
if client_type not in cls._registry:
110+
raise ValueError(f"Unknown StorageClient: {client_type}")
111+
return cls._registry[client_type](config)

transfer_queue/storage/clients/factory.py

Lines changed: 0 additions & 57 deletions
This file was deleted.

transfer_queue/storage/clients/mooncake_client.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
import torch
2121
from torch import Tensor
2222

23-
from transfer_queue.storage.clients.base import TransferQueueStorageKVClient
24-
from transfer_queue.storage.clients.factory import StorageClientFactory
23+
from transfer_queue.storage.clients.base import StorageKVClient, StorageClientFactory
2524
from transfer_queue.utils.logging_utils import get_logger
2625
from transfer_queue.utils.tensor_utils import allocate_empty_tensors, get_nbytes, merge_contiguous_memory
2726

@@ -39,7 +38,7 @@
3938

4039

4140
@StorageClientFactory.register("MooncakeStoreClient")
42-
class MooncakeStoreClient(TransferQueueStorageKVClient):
41+
class MooncakeStoreClient(StorageKVClient):
4342
"""
4443
Storage client for MooncakeStore.
4544
"""

transfer_queue/storage/clients/ray_storage_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import ray
2020
import torch
2121

22-
from transfer_queue.storage.clients.base import TransferQueueStorageKVClient
22+
from transfer_queue.storage.clients.base import StorageKVClient
2323
from transfer_queue.storage.clients.factory import StorageClientFactory
2424

2525

@@ -47,7 +47,7 @@ def clear_obj_ref(self, keys: list[str]):
4747

4848

4949
@StorageClientFactory.register("RayStorageClient")
50-
class RayStorageClient(TransferQueueStorageKVClient):
50+
class RayStorageClient(StorageKVClient):
5151
"""
5252
Storage client for Ray RDT.
5353
"""

0 commit comments

Comments
 (0)