|
| 1 | + |
| 2 | +# OpenYuanrong-Datasystem Integration for TransferQueue |
| 3 | + |
| 4 | +> Last updated: 01/26/2026 |
| 5 | +
|
| 6 | +## 🎉 Overview |
| 7 | + |
| 8 | +We provide an optional storage backend [**openYuanrong-datasystem**](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/README.md) for TransferQueue to **deliver a better performance on NPU environments**. |
| 9 | + |
| 10 | +OpenYuanrong-datasystem is a **distributed caching system** that utilizes the HBM/DRAM/SSD resources of the computing cluster to build a **near-memory computation multi-level cache**, improving data access performance in model training and inference scenarios. |
| 11 | + |
| 12 | +In TransferQueue, **openYuanrong-datasystem provides high-performance key-value operations for host-to-host transfer via TCP/RDMA, device-to-device transfer via Ascend NPU HCCS, and remote Host-to-Device / Device-to-Host.** |
| 13 | +It manages the mapping between user-defined keys and metadata, and automatically resolves the data source location and builds transport channels. |
| 14 | + |
| 15 | + |
| 16 | +We have implemented two key components to integrate TransferQueue with **openYuanrong-datasystem**: |
| 17 | + |
| 18 | +- **`YuanrongStorageClient`**: An adapter layer that encapsulates the functionality of openYuanrong-datasystem enables efficient read and write operations within TransferQueue. |
| 19 | +- **`YuanrongStorageManager`**: The primary storage entry point that manages connections between TransferQueue clients and the underlying data system. |
| 20 | + |
| 21 | +`YuanrongStorageClient` supports `put` and `get` NPU-side tensors and any type of serializable CPU-side data. |
| 22 | +It provides powerful performance, especially for **tensors on the NPU side**. |
| 23 | +Users can experience its capabilities by filling in the relevant fields in the configuration of TransferQueue. |
| 24 | + |
| 25 | +## 🚀 Quick Start |
| 26 | + |
| 27 | +### Prerequisites |
| 28 | +- **Python Version**: $ \geq 3.10~and \leq 3.11 $ |
| 29 | +- **Architecture**: AArch64 or x86_64 |
| 30 | + |
| 31 | +### Installation Steps |
| 32 | + |
| 33 | +Follow these steps to build and install: |
| 34 | + |
| 35 | +#### 1. Install Core Dependencies |
| 36 | + |
| 37 | +Install PyTorch and TransferQueue |
| 38 | +```bash |
| 39 | +# Install Torch (matching the version specified for your hardware) |
| 40 | +pip install torch==2.8.0 |
| 41 | + |
| 42 | +# Install TransferQueue from pypi |
| 43 | +pip install TransferQueue |
| 44 | +# or install from source code |
| 45 | +git clone https://github.com/Ascend/TransferQueue/ |
| 46 | +cd TransferQueue |
| 47 | +pip install -r requirements.txt |
| 48 | +python -m build --wheel |
| 49 | +pip install dist/*.whl |
| 50 | +``` |
| 51 | + |
| 52 | +#### 2. Install Datasystem : |
| 53 | +```bash |
| 54 | +# Install the OpenYuanrong Datasystem package |
| 55 | +pip install openyuanrong-datasystem |
| 56 | + |
| 57 | +# Verify installation by checking for the dscli command-line tool |
| 58 | +dscli -h |
| 59 | +``` |
| 60 | + |
| 61 | +#### 3. Install etcd |
| 62 | + |
| 63 | +OpenYuanrong-datasystem relies on etcd for cluster coordination. |
| 64 | +Download and install etcd from the official releases: [ETCD GitHub Releases](https://github.com/etcd-io/etcd/releases) |
| 65 | + |
| 66 | +```bash |
| 67 | +# Example for Linux ARM64 (adjust for your architecture) |
| 68 | +# Unpack and install etcd |
| 69 | +ETCD_VERSION = "v3.6.5" # Replace with the desired version |
| 70 | +tar -xvf etcd-${ETCD_VERSION}-linux-arm64.tar.gz |
| 71 | +cd etcd-${ETCD_VERSION}-linux-arm64 |
| 72 | + |
| 73 | +# Copy the executable file to the system path |
| 74 | +sudo cp etcd etcdctl /usr/local/bin/ |
| 75 | + |
| 76 | +# Verify installation |
| 77 | +etcd --version |
| 78 | +etcdctl version |
| 79 | +``` |
| 80 | + |
| 81 | +#### 4. (Optional) Install CANN and torch-npu |
| 82 | + |
| 83 | +If you have NPU devices and want to accelerate the transmission of NPU tensor, |
| 84 | +you can install **Ascend-cann-toolkit** and **torch-npu**. |
| 85 | + |
| 86 | +Then check whether CANN is already installed: |
| 87 | + |
| 88 | +```bash |
| 89 | +# For root users |
| 90 | +ll /usr/local/Ascend/ascend-toolkit/latest |
| 91 | + |
| 92 | +# For non-root users |
| 93 | +ll ${HOME}/Ascend/ascend-toolkit/latest |
| 94 | +``` |
| 95 | + |
| 96 | +If not installed, and you do need to install it, please skip to [Appendix A](#A-install-cann-for-npu-acceleration). |
| 97 | + |
| 98 | +Ensure that CANN is installed, then install torch-npu: |
| 99 | +```bash |
| 100 | +# The versions of torch and torch-npu must be the same. |
| 101 | +pip install torch-npu==2.8.0 |
| 102 | +``` |
| 103 | + |
| 104 | +### Use Case |
| 105 | + |
| 106 | +Next, we will provide deployment and code examples for single-node scenarios. |
| 107 | +For multi-node scenarios, please refer to [Appendix B](#B-deploy-multi-node-datasystem-for-multi-node-training-and-inference-scenarios). |
| 108 | + |
| 109 | +Unlike using TransferQueue with its default backend, integrating OpenYuanrong-Datasystem requires **pre-launching** the datasystem services before running your Python application. |
| 110 | + |
| 111 | +#### Deployment |
| 112 | +```bash |
| 113 | +# Deploy etcd (for example, port 2379) |
| 114 | +etcd --listen-client-urls http://0.0.0.0:2379 \ |
| 115 | + --advertise-client-urls http://localhost:2379 & |
| 116 | + |
| 117 | +# Deploy datasystem |
| 118 | +dscli start -w --worker_address "127.0.0.1:31501" --etcd_address "127.0.0.1:2379" |
| 119 | +``` |
| 120 | + |
| 121 | +Once the datasystem is up, you can run your TransferQueue + Datasystem application. |
| 122 | + |
| 123 | +#### Demo |
| 124 | +You can associate `TransferQueueClient` with `YuanrongStorageManager` through the configuration dictionary when initializing the TransferQueue. |
| 125 | +Then, `YuanrongStorageManager` automatically creates `YuanrongStorageClient` and connects to the datasystem backend. |
| 126 | +```python |
| 127 | +import torch |
| 128 | +from omegaconf import OmegaConf |
| 129 | +from tensordict import TensorDict |
| 130 | +from transfer_queue import ( |
| 131 | + TransferQueueClient, |
| 132 | + TransferQueueController, |
| 133 | + process_zmq_server_info, |
| 134 | +) |
| 135 | +# host, port, manager_type and client_name are the config for booting the datasystem. |
| 136 | +config_str = """ |
| 137 | + manager_type: YuanrongStorageManager |
| 138 | + client_name: YuanrongStorageClient |
| 139 | + host: 127.0.0.1 |
| 140 | + port: 31501 |
| 141 | +""" |
| 142 | +dict_conf = OmegaConf.create(config_str, flags={"allow_objects": True}) |
| 143 | +``` |
| 144 | + |
| 145 | +We have provided a template method to connect to Yuanrong within TransferQueue, as follows: |
| 146 | +```python |
| 147 | +class Trainer: |
| 148 | + def __init__(self, config: dict): |
| 149 | + self.config = config |
| 150 | + self._initialize_transferqueue() |
| 151 | + |
| 152 | + def _initialize_transferqueue(self): |
| 153 | + # 1. Initialize TransferQueueController (single controller only) |
| 154 | + self.tq_controller = TransferQueueController.remote() |
| 155 | + |
| 156 | + # 2. Prepare necessary information of the controller |
| 157 | + self.tq_controller_info = process_zmq_server_info(self.tq_controller) |
| 158 | + |
| 159 | + tq_config = OmegaConf.create({}, flags={"allow_objects": True}) # Note: Need to generate a new DictConfig |
| 160 | + |
| 161 | + # with allow_objects=True to maintain ZMQServerInfo instance. Otherwise it will be flattened to dict |
| 162 | + tq_config.controller_info = self.tq_controller_info |
| 163 | + self.config = OmegaConf.merge(tq_config, self.config) |
| 164 | + |
| 165 | + # 3. Create TransferQueueClient |
| 166 | + self.tq_client = TransferQueueClient( |
| 167 | + client_id="Trainer", |
| 168 | + controller_info=self.tq_controller_info, |
| 169 | + ) |
| 170 | + |
| 171 | + # 4. Connect to DataSystem |
| 172 | + self.tq_client.initialize_storage_manager(manager_type=self.config["manager_type"], config=self.config) |
| 173 | + |
| 174 | + return self.tq_client |
| 175 | +``` |
| 176 | +And then you can call user interface of TransferQueue: |
| 177 | + |
| 178 | +```python |
| 179 | +# should import tensordict and torch |
| 180 | +data = TensorDict({"text": torch.Tensor([[1, 2], [3, 4]]), "prompt": ["5", "6"]}, batch_size=[2]) |
| 181 | + |
| 182 | +trainer = Trainer(dict_conf) |
| 183 | +trainer.tq_client.put(data=data, partition_id="train_0") |
| 184 | + |
| 185 | +# get_meta before get_data |
| 186 | +meta = trainer.tq_client.get_meta( |
| 187 | + data_fields=list(data.keys()), |
| 188 | + batch_size=data.size(0), |
| 189 | + partition_id="train_0", |
| 190 | + task_name="generate_sequences", |
| 191 | +) |
| 192 | + |
| 193 | +output = trainer.tq_client.get_data(meta) |
| 194 | +print("output: ", output) |
| 195 | +``` |
| 196 | + |
| 197 | +> The class ```Trainer``` in the above code can also be used as a **ray actor**: |
| 198 | +
|
| 199 | + |
| 200 | +#### Shut down datasystem: |
| 201 | +```bash |
| 202 | +# shutdown datasystem on the node |
| 203 | +dscli stop --worker_address "127.0.0.1:31501" |
| 204 | + |
| 205 | +# shutdown etcd |
| 206 | +pkill -f etcd || true |
| 207 | +``` |
| 208 | + |
| 209 | +### Datasystem Logs |
| 210 | + |
| 211 | +If you want to inspect data transmission logs from openYuanrong-Datasystem, set the following environment variable: |
| 212 | + |
| 213 | +```bash |
| 214 | +export DATASYSTEM_CLIENT_LOG_DIR="datasystem_logs" # Custom Path |
| 215 | +``` |
| 216 | + |
| 217 | +## 📕 Appendix |
| 218 | +### A: Install CANN for NPU Acceleration |
| 219 | + |
| 220 | +> CANN (Compute Architecture for Neural Networks) is a heterogeneous computing architecture launched by Huawei for AI scenarios. |
| 221 | +
|
| 222 | + |
| 223 | + |
| 224 | +Download the appropriate toolkit package from: |
| 225 | +[Ascend CANN Downloads](https://www.hiascend.com/developer/download/community/result?cann=8.3.RC1&product=1&model=30). |
| 226 | + |
| 227 | +Please select the appropriate version for your OS and architecture (e.g., Linux + AArch64). |
| 228 | + |
| 229 | +Then install the toolkit: |
| 230 | +```bash |
| 231 | +# For example, download the aarch64 package, set the execution permission, and install it. |
| 232 | +chmod +x Ascend-cann-toolkit_8.3.RC1_linux-aarch64.run |
| 233 | +./Ascend-cann-toolkit_8.3.RC1_linux-aarch64.run --install |
| 234 | + |
| 235 | +# Dependencies of CANN Installation |
| 236 | +pip install scipy psutil tornado decorator ml-dtypes absl-py |
| 237 | +``` |
| 238 | + |
| 239 | +After installation, confirm the toolkit path exists: |
| 240 | +```bash |
| 241 | +# Root user |
| 242 | +ls /usr/local/Ascend/ascend-toolkit/latest |
| 243 | + |
| 244 | +# Non-root user |
| 245 | +ls ${HOME}/Ascend/ascend-toolkit/latest |
| 246 | +``` |
| 247 | + |
| 248 | +If you need to uninstall, execute: |
| 249 | +```bash |
| 250 | +./Ascend-cann-toolkit_8.3.RC1_linux-aarch64.run --uninstall |
| 251 | +``` |
| 252 | + |
| 253 | +### B: Deploy multi-node datasystem for multi-node training and inference scenarios |
| 254 | +We can use etcd to connect to a datasystem backend across multiple nodes. |
| 255 | +Let's take two nodes (for instance, 10.170.27.24 and 10.170.27.33) as an example. |
| 256 | + |
| 257 | +#### Start etcd on head node |
| 258 | + |
| 259 | +```bash |
| 260 | +# For example, using the port 2379 of head node |
| 261 | +etcd \ |
| 262 | + --name etcd-single \ |
| 263 | + --data-dir /tmp/etcd-data \ |
| 264 | + --listen-client-urls http://10.170.27.24:2379 \ |
| 265 | + --advertise-client-urls http://10.170.27.24:2379 \ |
| 266 | + --listen-peer-urls http://10.170.27.24:2380 \ |
| 267 | + --initial-advertise-peer-urls http://10.170.27.24:2380 \ |
| 268 | + --initial-cluster etcd-single=http://10.170.27.24:2380 & |
| 269 | +``` |
| 270 | + |
| 271 | + |
| 272 | +#### Deploy multi-nodes datasystem |
| 273 | +On each node, you need to connect to the etcd service on the head node using your local node's IP address. |
| 274 | +```bash |
| 275 | +#on head node |
| 276 | +dscli start -w --worker_address "10.170.27.24:31501" --etcd_address "10.170.27.24:2379" |
| 277 | +``` |
| 278 | + |
| 279 | +```bash |
| 280 | +#on work node |
| 281 | +dscli start -w --worker_address "10.170.27.33:31501" --etcd_address "10.170.27.24:2379" |
| 282 | +``` |
| 283 | +Now you can use datasystem on head-node and work-node. |
| 284 | + |
| 285 | +> For more detailed deployment instructions, please refer to [yuanrong documents](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/README.md#%E9%83%A8%E7%BD%B2-openyuanrong-datasystem). |
| 286 | +> The configuration parameters for deploying the data system can refer [dscli config](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/docs/source_zh_cn/deployment/dscli.md#%E9%85%8D%E7%BD%AE%E9%A1%B9%E8%AF%B4%E6%98%8E). |
| 287 | +
|
| 288 | +There is a demo with multi-node scenarios as fellow. |
| 289 | + |
| 290 | +#### Deploy ray |
| 291 | +```bash |
| 292 | +# on head node |
| 293 | +ray start --head --resources='{"node:10.170.27.24": 1}' |
| 294 | + |
| 295 | +# on worker node (assume ray port of head_node is 6379) |
| 296 | +ray start --address="10.170.27.24:6379" --resources='{"node:10.170.27.33": 1}' |
| 297 | +``` |
| 298 | + |
| 299 | +#### Run demo |
| 300 | +In the demo below, we use ray actors to implement distributed deployment of processes. |
| 301 | +The actor writer writes data to the head node, and the actor reader reads data from the worker nodes. |
| 302 | +```python |
| 303 | +from omegaconf import OmegaConf |
| 304 | +from tensordict import TensorDict |
| 305 | +from transfer_queue import ( |
| 306 | + TransferQueueClient, |
| 307 | + TransferQueueController, |
| 308 | + process_zmq_server_info, |
| 309 | +) |
| 310 | +import torch |
| 311 | +import ray |
| 312 | + |
| 313 | +######################################################################## |
| 314 | +# Please set up Ray cluster before running this script |
| 315 | +# e.g. ray start --head --resources='{"node:127.0.0.1": 1}' |
| 316 | +######################################################################## |
| 317 | +HEAD_NODE_IP = "10.170.27.24" # Replace with your head node IP |
| 318 | +WORKER_NODE_IP = "10.170.27.33" # Replace with your worker node IP |
| 319 | + |
| 320 | + |
| 321 | +def initialize_controller(): |
| 322 | + tq_controller = TransferQueueController.remote() |
| 323 | + tq_controller_info = process_zmq_server_info(tq_controller) |
| 324 | + return tq_controller, tq_controller_info |
| 325 | + |
| 326 | +@ray.remote |
| 327 | +class TransferQueueClientActor: |
| 328 | + def __init__(self, config: dict, client_id: str): |
| 329 | + self.config = config |
| 330 | + self.client_id = client_id |
| 331 | + self._initialize_client() |
| 332 | + |
| 333 | + def _initialize_client(self): |
| 334 | + # Create TransferQueueClient |
| 335 | + self.tq_client = TransferQueueClient( |
| 336 | + client_id=self.client_id, |
| 337 | + controller_info=self.config.controller_info, |
| 338 | + ) |
| 339 | + # Connect to DataSystem |
| 340 | + self.tq_client.initialize_storage_manager(manager_type=self.config["manager_type"], config=self.config) |
| 341 | + return self.tq_client |
| 342 | + |
| 343 | + def put(self, data: TensorDict, partition_id: str): |
| 344 | + self.tq_client.put(data=data, partition_id=partition_id) |
| 345 | + |
| 346 | + def get(self, data_fields, batch_size, partition_id, task_name=None, sampling_config=None): |
| 347 | + # get metadata from tq_controller |
| 348 | + meta = self.tq_client.get_meta( |
| 349 | + data_fields=data_fields, |
| 350 | + batch_size=batch_size, |
| 351 | + partition_id=partition_id, |
| 352 | + task_name=task_name, |
| 353 | + sampling_config=sampling_config, |
| 354 | + ) |
| 355 | + # use meta to fetch data |
| 356 | + return self.tq_client.get_data(meta) |
| 357 | + |
| 358 | + |
| 359 | +def main(): |
| 360 | + config_str = """ |
| 361 | + manager_type: YuanrongStorageManager |
| 362 | + client_name: YuanrongStorageClient |
| 363 | + host: 10.170.27.24 |
| 364 | + port: 31501 |
| 365 | + """ |
| 366 | + dict_conf = OmegaConf.create(config_str, flags={"allow_objects": True}) |
| 367 | + # It is important to pay attention to the controller's lifecycle. |
| 368 | + controller, dict_conf.controller_info = initialize_controller() |
| 369 | + |
| 370 | + conf_writer = dict_conf.copy() |
| 371 | + conf_writer.host = HEAD_NODE_IP |
| 372 | + conf_reader = dict_conf.copy() |
| 373 | + conf_reader.host = WORKER_NODE_IP |
| 374 | + data = TensorDict({ "prompt": torch.ones(3, 512), "big_tensor": torch.randn(3,1024,1024)}, batch_size=[3]) |
| 375 | + # you could assign npu or gpu devices by 'resources' |
| 376 | + # resources={f"node:{HEAD_NODE_IP}": 0.001} could Force the actor to run on HEAD_NODE |
| 377 | + writer = TransferQueueClientActor.options( |
| 378 | + resources={f"node:{HEAD_NODE_IP}": 0.001}, |
| 379 | + ).remote(conf_writer, "train") |
| 380 | + reader = TransferQueueClientActor.options( |
| 381 | + resources={f"node:{WORKER_NODE_IP}": 0.001} |
| 382 | + ).remote(conf_reader, "rollout") |
| 383 | + |
| 384 | + ray.get(writer.put.remote(data=data, partition_id="train_0")) |
| 385 | + |
| 386 | + output = reader.get.remote( |
| 387 | + data_fields=list(data.keys()), |
| 388 | + batch_size=data.size(0), |
| 389 | + partition_id="train_0", |
| 390 | + task_name="generate_sequences", |
| 391 | + ) |
| 392 | + output = ray.get(output) |
| 393 | + |
| 394 | +if __name__ == "__main__": |
| 395 | + main() |
| 396 | + |
| 397 | +``` |
0 commit comments