Skip to content

Commit a21643f

Browse files
LLLLxmmmji-huazhong0oshowero0
committed
Support managing multiple data partitions for Train/Val/Test in controller
Co-authored-by: ji-huazhong <hzji210@gmail.com> Co-authored-by: 0oshowero0 <o0shower0o@outlook.com>
1 parent 5c466c5 commit a21643f

6 files changed

Lines changed: 282 additions & 200 deletions

File tree

docs/data/transfer_queue.md

Lines changed: 169 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,96 @@
11
# TransferQueue Data System
22

3-
Last updated: 09/28/2025.
3+
Last updated: 11/17/2025.
44

55
This doc introduce [TransferQueue](https://github.com/TransferQueue/TransferQueue), an asynchronous streaming data management system for efficient post-training.
66

77

88
<h2 id="overview"> Overview</h2>
99

10-
TransferQueue is a high-performance data storage and transfer system with panoramic data visibility and streaming scheduling capabilities, optimized for efficient dataflow in post-training workflows.
10+
TransferQueue is a high-performance data storage and transfer module with panoramic data visibility and streaming scheduling capabilities, optimized for efficient dataflow in post-training workflows.
1111

1212
<p align="center">
13-
<img src="https://cdn.nlark.com/yuque/0/2025/png/23208217/1758696193102-a5654375-65a1-4e06-9c63-142b59df90b8.png" width="70%">
13+
<img src="https://github.com/TransferQueue/community_doc/blob/main/docs/tq_arch.png?raw=true" width="70%">
1414
</p>
1515

16-
17-
TransferQueue offers **fine-grained, sample-level** data management capabilities, serving as a data gateway that decouples explicit data dependencies across computational tasks. This enables a divide-and-conquer approach, significantly simplifying the design of the algorithm controller.
18-
16+
TransferQueue offers **fine-grained, sample-level** data management and **load-balancing** (on the way) capabilities, serving as a data gateway that decouples explicit data dependencies across computational tasks. This enables a divide-and-conquer approach, significantly simplifies the algorithm controller design.
1917

2018
<p align="center">
21-
<img src="https://cdn.nlark.com/yuque/0/2025/png/23208217/1758696791245-fa7baf96-46af-4c19-8606-28ffadc4556c.png" width="70%">
19+
<img src="https://github.com/TransferQueue/community_doc/blob/main/docs/main_func.png?raw=true" width="70%">
2220
</p>
2321

22+
<h2 id="updates"> Updates</h2>
2423

25-
24+
- **Nov 10, 2025**: We disentangle the data retrieval logic from TransferQueueController [PR#101](https://github.com/TransferQueue/TransferQueue/pull/101). Now you can implement your own `Sampler` to control how to consume the data.
25+
- **Nov 5, 2025**: We provide a `KVStorageManager` that simplifies the integration with KV-based storage backends [PR#96](https://github.com/TransferQueue/TransferQueue/pull/96). The first available KV-based backend is [Yuanrong](https://gitee.com/openeuler/yuanrong-datasystem).
26+
- **Nov 4, 2025**: Data partition capability is available in [PR#98](https://github.com/TransferQueue/TransferQueue/pull/98). Now you can define logical data partitions to manage your train/val/test datasets.
27+
- **Oct 25, 2025**: We make storage backends pluggable in [PR#66](https://github.com/TransferQueue/TransferQueue/pull/66). You can try to integrate your own storage backend with TransferQueue now!
28+
- **Oct 21, 2025**: Official integration into verl is ready [verl/pulls/3649](https://github.com/volcengine/verl/pull/3649). Following PRs will optimize the single controller architecture by fully decoupling data & control flows.
29+
- **July 22, 2025**: We present a series of Chinese blogs on <a href="https://zhuanlan.zhihu.com/p/1930244241625449814">Zhihu 1</a>, <a href="https://zhuanlan.zhihu.com/p/1933259599953232589">2</a>.
30+
- **July 21, 2025**: We started an RFC on verl community [verl/RFC#2662](https://github.com/volcengine/verl/discussions/2662).
31+
- **July 2, 2025**: We publish the paper [AsyncFlow](https://arxiv.org/abs/2507.01663).
2632

2733
<h2 id="components"> Components</h2>
2834

35+
### Control Plane: Panoramic Data Management
2936

37+
In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. When all the required data fields are ready (i.e., written to the `TransferQueueStorageManager`), we know that this data sample can be consumed by downstream tasks.
3038

31-
### Control Plane: Panoramic Data Management
32-
33-
In the control plane, `TransferQueueController` tracks the **production status** and **consumption status** of each training sample as metadata. When all the required data fields are ready (i.e., written to the `TransferQueueStorage`), we know that this data sample can be consumed by downstream tasks.
34-
35-
For consumption status, we record the consumption records for each computational task (e.g., `generate_sequences`, `compute_log_prob`, etc.). Therefore, even different computation tasks require the same data field, they can consume the data independently without interfering with each other.
36-
39+
For consumption status, we record the consumption records for each computational task (e.g., `generate_sequences`, `compute_log_prob`, etc.). Therefore, even when different computation tasks require the same data field, they can consume the data independently without interfering with each other.
3740

3841
<p align="center">
39-
<img src="https://cdn.nlark.com/yuque/0/2025/png/23208217/1758696820173-456c1784-42ba-40c8-a292-2ff1401f49c5.png" width="70%">
42+
<img src="https://github.com/TransferQueue/community_doc/blob/main/docs/control_plane.png?raw=true" width="70%">
4043
</p>
4144

45+
To make the data retrieval process more customizable, we provide a `Sampler` class that allows users to define their own data retrieval and consumption logic. Refer to the [Customize](#customize) section for details.
4246

43-
> In the future, we plan to support **load-balancing** and **dynamic batching** capabilities in the control plane. Besides, we will support data management for disaggregated frameworks where each rank manages the data retrieval by itself, rather than coordinated by a single controller.
47+
> In the future, we plan to support **load-balancing** and **dynamic batching** capabilities in the control plane. Additionally, we will support data management for disaggregated frameworks where each rank manages the data retrieval by itself, rather than coordinated by a single controller.
4448
4549
### Data Plane: Distributed Data Storage
4650

47-
In the data plane, `TransferQueueStorageSimpleUnit` serves as a naive storage unit based on CPU memory, responsible for the actual storage and retrieval of data. Each storage unit can be deployed on a separate node, allowing for distributed data management.
51+
In the data plane, we provide a pluggable design that enables TransferQueue to integrate with different storage backends according to user requirements.
52+
53+
Specifically, we provide a `TransferQueueStorageManager` abstraction class that defines the core APIs as follows:
4854

49-
`TransferQueueStorageSimpleUnit` employs a 2D data structure as follows:
55+
- `async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None`
56+
- `async def get_data(self, metadata: BatchMeta) -> TensorDict`
57+
- `async def clear_data(self, metadata: BatchMeta) -> None`
58+
59+
This class encapsulates the core interaction logic within the TransferQueue system. You only need to write a simple subclass to integrate your own storage backend. Refer to the [Customize](#customize) section for details.
60+
61+
Currently, we support the following storage backends:
62+
63+
- SimpleStorageUnit: A basic CPU memory storage with minimal data format constraints and easy usability.
64+
- [MoonCakeStore](https://github.com/kvcache-ai/Mooncake): A high-performance, KV-based hierarchical storage that supports RDMA transport between GPU and DRAM.
65+
- [Yuanrong](https://gitee.com/openeuler/yuanrong-datasystem): An Ascend native data system that provides hierarchical storage interfaces including HBM/DRAM/SSD.
66+
- [Ray Direct Transport](https://docs.ray.io/en/master/ray-core/direct-transport.html): Ray's new feature that allows Ray to store and pass objects directly between Ray actors.
67+
68+
Among them, `SimpleStorageUnit` serves as our default storage backend, coordinated by the `AsyncSimpleStorageManager` class. Each storage unit can be deployed on a separate node, allowing for distributed data management.
69+
70+
`SimpleStorageUnit` employs a 2D data structure as follows:
5071

5172
- Each row corresponds to a training sample, assigned a unique index within the corresponding global batch.
5273
- Each column represents the input/output data fields for computational tasks.
5374

5475
This data structure design is motivated by the computational characteristics of the post-training process, where each training sample is generated in a relayed manner across task pipelines. It provides an accurate addressing capability, which allows fine-grained, concurrent data read/write operations in a streaming manner.
5576

5677
<p align="center">
57-
<img src="https://cdn.nlark.com/yuque/0/2025/png/23208217/1758696805154-3817011f-84e6-40d0-a80c-58b7e3e5f6a7.png" width="70%">
78+
<img src="https://github.com/TransferQueue/community_doc/blob/main/docs/data_plane.png?raw=true" width="70%">
5879
</p>
5980

60-
61-
> In the future, we plan to implement a **general storage abstraction layer** to support various storage backends. Through this abstraction, we hope to integrate high-performance storage solutions such as [MoonCakeStore](https://github.com/kvcache-ai/Mooncake) to support device-to-device data transfer through RDMA, further enhancing data transfer efficiency for large-scale data.
62-
63-
6481
### User Interface: Asynchronous & Synchronous Client
6582

66-
6783
The interaction workflow of TransferQueue system is as follows:
6884

6985
1. A process sends a read request to the `TransferQueueController`.
7086
2. `TransferQueueController` scans the production and consumption metadata for each sample (row), and dynamically assembles a micro-batch metadata according to the load-balancing policy. This mechanism enables sample-level data scheduling.
7187
3. The process retrieves the actual data from distributed storage units using the metadata provided by the controller.
7288

73-
To simplify the usage of TransferQueue, we have encapsulated this process into `AsyncTransferQueueClient` and `TransferQueueClient`. These clients provide both asynchronous and synchronous interfaces for data transfer, allowing users to easily integrate TransferQueue to their framework.
74-
75-
76-
> In the future, we will provide a `StreamingDataLoader` interface for disaggregated frameworks as discussed in [RFC#2662](https://github.com/volcengine/verl/discussions/2662). Leveraging this abstraction, each rank can automatically get its own data like `DataLoader` in PyTorch. The TransferQueue system will handle the underlying data scheduling and transfer logic caused by different parallelism strategies, significantly simplifying the design of disaggregated frameworks.
89+
To simplify the usage of TransferQueue, we have encapsulated this process into `AsyncTransferQueueClient` and `TransferQueueClient`. These clients provide both asynchronous and synchronous interfaces for data transfer, allowing users to easily integrate TransferQueue into their framework.
7790

91+
> In the future, we will provide a `StreamingDataLoader` interface for disaggregated frameworks as discussed in [issue#85](https://github.com/TransferQueue/TransferQueue/issues/85) and [verl/RFC#2662](https://github.com/volcengine/verl/discussions/2662). Leveraging this abstraction, each rank can automatically get its own data like `DataLoader` in PyTorch. The TransferQueue system will handle the underlying data scheduling and transfer logic caused by different parallelism strategies, significantly simplifying the design of disaggregated frameworks.
7892
79-
<h2 id="show-cases"> Show Cases</h2>
93+
<h2 id="show-cases">🔥 Showcases</h2>
8094

8195
### General Usage
8296

@@ -89,29 +103,150 @@ Core interfaces:
89103
- (async_)put(data:TensorDict, metadata:BatchMeta, global_step)
90104
- (async_)clear(global_step: int)
91105

92-
93106
We will soon release a detailed tutorial and API documentation.
94107

95108

96109
### verl Example
97110

111+
The primary motivation for integrating TransferQueue to verl now is to **alleviate the data transfer bottleneck of the single controller `RayPPOTrainer`**. Currently, all `DataProto` objects must be routed through `RayPPOTrainer`, resulting in a single point bottleneck of the whole post-training system.
98112

99-
The primary motivation for integrating TransferQueue to verl now is to **alleviate the data transfer bottleneck of the single controller `RayPPOTrainer`**. Currently, all `DataProto` objects must be routed through `RayPPOTrainer`, resulting in a single point bottleneck of the whole post-training system.
113+
![verl_dataflow_DataProto](https://github.com/TransferQueue/community_doc/blob/main/docs/verl_workflow.jpeg?raw=true)
100114

101-
![verl_dataflow_DataProto](https://cdn.nlark.com/yuque/0/2025/jpeg/23208217/1758704289414-bcc54228-716b-4d4a-ad3b-f9ace6d10fcf.jpeg)
102115

103116
Leveraging TransferQueue, we separate experience data transfer from metadata dispatch by
104117

105118
- Replacing `DataProto` with `BatchMeta` (metadata) and `TensorDict` (actual data) structures
106119
- Preserving verl's original Dispatch/Collect logic via BatchMeta (maintaining single-controller debuggability)
107120
- Accelerating data transfer by TransferQueue's distributed storage units
108121

109-
![verl_dataflow_TransferQueue](https://cdn.nlark.com/yuque/0/2025/jpeg/23208217/1758704301666-0807dc06-766c-4a2d-9cde-889a6bb56b34.jpeg)
122+
![verl_dataflow_TransferQueue](https://github.com/TransferQueue/community_doc/blob/main/docs/verl_workflow_with_tq.jpeg?raw=true)
123+
124+
125+
You may refer to the [recipe](https://github.com/TransferQueue/TransferQueue/tree/dev/recipe/simple_use_case), where we mimic the verl usage in both async & sync scenarios. Official integration to verl is also available now at [verl/pulls/3649](https://github.com/volcengine/verl/pull/3649) (with subsequent PRs to further optimize the integration).
110126

111127

112-
You may refer to the [recipe](https://github.com/TransferQueue/TransferQueue/tree/dev/recipe/simple_use_case), where we mimic the verl usage in both async & sync scenarios.
128+
### Use Python package
129+
```bash
130+
pip install TransferQueue==0.1.1.dev2
131+
```
113132

133+
### Build wheel package from source code
134+
135+
Follow these steps to build and install:
136+
1. Clone the source code from the GitHub repository
137+
```bash
138+
git clone https://github.com/TransferQueue/TransferQueue/
139+
cd TransferQueue
140+
```
141+
142+
2. Install dependencies
143+
```bash
144+
pip install -r requirements.txt
145+
```
146+
147+
3. Build and install
148+
```bash
149+
python -m build --wheel
150+
pip install dist/*.whl
151+
```
152+
153+
<h2 id="performance">📊 Performance</h2>
154+
155+
<p align="center">
156+
<img src="https://github.com/TransferQueue/community_doc/blob/main/docs/performance_0.1.1.dev2.png?raw=true" width="100%">
157+
</p>
114158

159+
> Note: The above benchmark for TransferQueue is based on our naive `SimpleStorageUnit` backend. By introducing high-performance storage backends and optimizing serialization/deserialization, we expect to achieve even better performance. Warmly welcome contributions from the community!
160+
161+
For detailed performance benchmarks, please refer to [this blog](https://www.yuque.com/haomingzi-lfse7/hlx5g0/tml8ke0zkgn6roey?singleDoc#).
162+
163+
<h2 id="customize"> 🛠️ Customize TransferQueue</h2>
164+
165+
### Define your own data retrieval logic
166+
We provide a `BaseSampler` abstraction class, which defines the following interface:
167+
168+
```python3
169+
@abstractmethod
170+
def sample(
171+
self,
172+
ready_indexes: list[int],
173+
batch_size: int,
174+
*args: Any,
175+
**kwargs: Any,
176+
) -> tuple[list[int], list[int]]:
177+
"""Sample a batch of indices from the ready indices.
178+
179+
Args:
180+
ready_indexes: List of global indices for which all required fields of the
181+
corresponding samples have been produced, and the samples are not labeled as
182+
consumed in the corresponding task.
183+
batch_size: Number of samples to select
184+
*args: Additional positional arguments for specific sampler implementations
185+
**kwargs: Additional keyword arguments for specific sampler implementations
186+
187+
Returns:
188+
List of sampled global indices of length batch_size
189+
List of global indices of length batch_size that should be labeled as consumed
190+
(will never be retrieved in the future)
191+
192+
Raises:
193+
ValueError: If batch_size is invalid or ready_indexes is insufficient
194+
"""
195+
raise NotImplementedError("Subclasses must implement sample")
196+
```
197+
198+
In this design, we separate data retrieval and data consumption through the two return values, which enables us to easily control sample replacement. We have implemented two reference designs: `SequentialSampler` and `GRPOGroupNSampler`.
199+
200+
The `Sampler` class or instance should be passed to the `TransferQueueController` during initialization. During each `get_meta` call, you can provide dynamic sampling parameters to the `Sampler`.
201+
202+
```python3
203+
from transfer_queue import TransferQueueController, TransferQueueClient, GRPOGroupNSampler, process_zmq_server_info
204+
205+
# Option 1: Pass the sampler class to the TransferQueueController
206+
controller = TransferQueueController.remote(GRPOGroupNSampler)
207+
208+
# Option 2: Pass the sampler instance to the TransferQueueController (if you need custom configuration)
209+
your_own_sampler = YourOwnSampler(config)
210+
controller = TransferQueueController.remote(your_own_sampler)
211+
212+
# Use the sampler
213+
batch_meta = client.get_meta(
214+
data_fields=["input_ids", "attention_mask"],
215+
batch_size=8,
216+
partition_id="train_0",
217+
task_name="generate_sequences",
218+
sampling_config={"n_samples_per_prompt": 4} # Put the required sampling parameters here
219+
)
220+
```
221+
222+
### How to integrate a new storage backend
223+
224+
The data plane is organized as follows:
225+
```text
226+
transfer_queue/
227+
├── storage/
228+
│ ├── __init__.py
229+
│ │── simple_backend.py # SimpleStorageUnit、StorageUnitData、StorageMetaGroup
230+
│ ├── managers/ # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
231+
│ │ ├── __init__.py
232+
│ │ ├──base.py # TransferQueueStorageManager, KVStorageManager
233+
│ │ ├──simple_backend_manager.py # AsyncSimpleStorageManager
234+
│ │ ├──yuanrong_manager.py # YuanrongStorageManager
235+
│ │ ├──mooncake_manager.py # MooncakeStorageManager
236+
│ │ └──factory.py # TransferQueueStorageManagerFactory
237+
│ └── clients/ # Clients are lower level interfaces that directly manipulate the target storage backend.
238+
│ │ ├── __init__.py
239+
│ │ ├── base.py # TransferQueueStorageKVClient
240+
│ │ ├── yuanrong_client.py # YRStorageClient
241+
│ │ ├── mooncake_client.py # MooncakeStoreClient
242+
│ │ └── factory.py # TransferQueueStorageClientFactory
243+
```
244+
245+
To integrate TransferQueue with a custom storage backend, start by implementing a subclass that inherits from `TransferQueueStorageManager`. This subclass acts as an adapter between the TransferQueue system and the target storage backend. For KV-based storage backends, you can simply inherit from `KVStorageManager`, which can serve as the general manager for all KV-based backends.
246+
247+
Distributed storage backends often come with their own native clients serving as the interface of the storage system. In such cases, a low-level adapter for this client can be written, following the examples provided in the `storage/clients` directory.
248+
249+
Factory classes are provided for both `StorageManager` and `StorageClient` to facilitate easy integration. Adding necessary descriptions of required parameters in the factory class helps enhance the overall user experience.
115250

116251

117252

recipe/transfer_queue/agent_loop.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,7 @@ def _performance_metrics(self, metrics: list[list[dict[str, str]]], output: Data
6767

6868
return timing
6969

70-
def create_transferqueue_client(self, controller_infos, storage_infos, role):
70+
def create_transferqueue_client(self, controller_info, config):
7171
ray.get(
72-
[
73-
worker.create_transferqueue_client.remote(controller_infos, storage_infos, role)
74-
for worker in self.agent_loop_workers
75-
]
72+
[worker.create_transferqueue_client.remote(controller_info, config) for worker in self.agent_loop_workers]
7673
)

0 commit comments

Comments
 (0)