Skip to content

Commit c0c0e19

Browse files
authored
[feat] Support metastore mode for Yuanrong backend init (Ascend#74)
## Description Simplify Yuanrong backend initialization to exclusively support metastore mode, removing the external etcd dependency. This refactor uses Ray's native cluster discovery and placement groups to manage distributed Yuanrong datasystem workers. ## Changes - transfer_queue/config.yaml: Remove etcd and metastore mode configuration - Removed: etcd_address, host, port, metastore_mode, metastore_address - Kept: auto_init, worker_port, metastore_port - Added: worker_args for additional dscli start arguments - Host IPs are now auto-detected from ray.nodes() via NodeManagerAddress - transfer_queue: Add new file transfer_queue/utils/yuanrong_utils.py - YuanrongWorkerActor (Ray actor class): - Determines its node via IP intersection with provided node_ips - Starts metastore service on head node (rank 0) - Provides start() and stop() methods for lifecycle management - initialize_yuanrong_backend(): Complete initialization logic - Gets Ray cluster information via ray.nodes() - Creates placement group with STRICT_SPREAD strategy (0.1 CPU per bundle) - Creates YuanrongWorkerActor instances on each bundle - Starts head worker first, then parallel starts remaining workers - Returns dict with worker_actors, metastore_address, placement_group - Handles exceptions with proper cleanup - cleanup_yuanrong_resources(): Complete cleanup logic - Stops all workers concurrently, collecting exceptions - Kills actors and removes placement group - start_datasystem_worker() / stop_datasystem_worker(): dscli wrapper functions - get_local_ip_addresses(): IP discovery for node self-determination - transfer_queue/interface.py: Simplified Yuanrong backend integration - Replace ~100 lines of inline initialization with single function call to initialize_yuanrong_backend(conf) - Simplify close() to single call to cleanup_yuanrong_resources(value) - Remove unused imports: shutil, get_local_ip_addresses, etcd-related functions - tests/: Update test configurations to use worker_port instead of host/port ## Related issues Fixes Ascend#50 Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
1 parent 3793490 commit c0c0e19

15 files changed

Lines changed: 723 additions & 375 deletions

.github/workflows/run-tests.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,8 @@ jobs:
4242
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_e2e_lifecycle_consistency.py
4343
pkill -f "[m]ooncake_master" || true
4444
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_kv_interface_e2e.py
45-
pkill -f "[m]ooncake_master" || true
45+
pkill -f "[m]ooncake_master" || true
46+
- name: Run Yuanrong Backend Specific E2E Tests
47+
run: |
48+
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_e2e_lifecycle_consistency.py
49+
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_kv_interface_e2e.py

docs/storage_backends/openyuanrong_datasystem.md

Lines changed: 60 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
# OpenYuanrong-Datasystem Integration for TransferQueue
33

4-
> Last updated: 01/26/2026
4+
> Last updated: 04/17/2026
55
66
## 🎉 Overview
77

@@ -58,27 +58,7 @@ pip install openyuanrong-datasystem
5858
dscli -h
5959
```
6060

61-
#### 3. Install etcd
62-
63-
OpenYuanrong-datasystem relies on etcd for cluster coordination.
64-
Download and install etcd from the official releases: [ETCD GitHub Releases](https://github.com/etcd-io/etcd/releases)
65-
66-
```bash
67-
# Example for Linux ARM64 (adjust for your architecture)
68-
# Unpack and install etcd
69-
ETCD_VERSION = "v3.6.5" # Replace with the desired version
70-
tar -xvf etcd-${ETCD_VERSION}-linux-arm64.tar.gz
71-
cd etcd-${ETCD_VERSION}-linux-arm64
72-
73-
# Copy the executable file to the system path
74-
sudo cp etcd etcdctl /usr/local/bin/
75-
76-
# Verify installation
77-
etcd --version
78-
etcdctl version
79-
```
80-
81-
#### 4. (Optional) Install CANN and torch-npu
61+
#### 3. (Optional) Install CANN and torch-npu
8262

8363
If you have NPU devices and want to accelerate the transmission of NPU tensor,
8464
you can install **Ascend-cann-toolkit** and **torch-npu**.
@@ -106,19 +86,36 @@ pip install torch-npu==2.8.0
10686
Next, we will provide deployment and code examples for single-node scenarios.
10787
For multi-node scenarios, please refer to [Appendix B](#B-deploy-multi-node-datasystem-for-multi-node-training-and-inference-scenarios).
10888

109-
Unlike using TransferQueue with its default backend, integrating OpenYuanrong-Datasystem requires **pre-launching** the datasystem services before running your Python application.
110-
11189
#### Deployment
112-
```bash
113-
# Deploy etcd (for example, port 2379)
114-
etcd --listen-client-urls http://0.0.0.0:2379 \
115-
--advertise-client-urls http://localhost:2379 &
11690

117-
# Deploy datasystem
118-
dscli start -w --worker_address "127.0.0.1:31501" --etcd_address "127.0.0.1:2379"
91+
TransferQueue will automatically initialize the Yuanrong backend when `auto_init: True` is set. TransferQueue will:
92+
- Create placement groups to ensure workers are spread across Ray nodes
93+
- Launch YuanrongWorkerActor on each node to start datasystem workers
94+
- Set up metastore service on the head node
95+
96+
Configuration example:
97+
```yaml
98+
backend:
99+
storage_backend: Yuanrong
100+
Yuanrong:
101+
auto_init: True
102+
worker_port: 31501
103+
metastore_port: 2379
104+
enable_yr_npu_transport: true
105+
worker_args: "--shared_memory_size_mb 8192 --remote_h2d_device_ids 0 --enable_huge_tlb true"
119106
```
120107
121-
Once the datasystem is up, you can run your TransferQueue + Datasystem application.
108+
Configuration options:
109+
- `auto_init`: Whether to automatically initialize Yuanrong backend. Currently only `True` is supported.
110+
- `worker_port`: Port for Yuanrong datasystem worker on each node.
111+
- `metastore_port`: Port for metastore service on the head node.
112+
- `enable_yr_npu_transport`: Enable NPU transport for high-performance device-to-device data transfer.
113+
- `worker_args`: Additional arguments passed to `dscli start` command:
114+
- `--shared_memory_size_mb`: Shared memory size in MB for datasystem worker.
115+
- `--remote_h2d_device_ids`: Enable RH2D (Remote Host-to-Device) for efficient cross-node data transfer. Specify NPU device IDs as comma-separated values (e.g., `0,1,2,3`).
116+
- `--enable_huge_tlb`: Enable huge page memory, required for >21GB shared memory on Ascend 910B.
117+
118+
Once the configuration is set, you can run your TransferQueue + Datasystem application directly.
122119

123120
#### Demo
124121
You can associate `TransferQueueClient` with `YuanrongStorageManager` through the configuration dictionary when initializing the TransferQueue.
@@ -137,7 +134,7 @@ from transfer_queue import (
137134
config_str = """
138135
manager_type: YuanrongStorageManager
139136
client_name: YuanrongStorageClient
140-
port: 31501
137+
worker_port: 31501
141138
"""
142139
dict_conf = OmegaConf.create(config_str, flags={"allow_objects": True})
143140
```
@@ -198,13 +195,7 @@ print("output: ", output)
198195

199196

200197
#### Shut down datasystem:
201-
```bash
202-
# shutdown datasystem on the node
203-
dscli stop --worker_address "127.0.0.1:31501"
204-
205-
# shutdown etcd
206-
pkill -f etcd || true
207-
```
198+
TransferQueue automatically handles cleanup when calling `tq.close()`, which stops all Yuanrong datasystem workers gracefully.
208199

209200
### Datasystem Logs
210201

@@ -251,57 +242,46 @@ If you need to uninstall, execute:
251242
```
252243

253244
### B: Deploy multi-node datasystem for multi-node training and inference scenarios
254-
We can use etcd to connect to a datasystem backend across multiple nodes.
255-
Let's take two nodes (for instance, 10.170.27.24 and 10.170.27.33) as an example.
256245

257-
#### Start etcd on head node
258-
259-
```bash
260-
# For example, using the port 2379 of head node
261-
etcd \
262-
--name etcd-single \
263-
--data-dir /tmp/etcd-data \
264-
--listen-client-urls http://10.170.27.24:2379 \
265-
--advertise-client-urls http://10.170.27.24:2379 \
266-
--listen-peer-urls http://10.170.27.24:2380 \
267-
--initial-advertise-peer-urls http://10.170.27.24:2380 \
268-
--initial-cluster etcd-single=http://10.170.27.24:2380 &
269-
```
246+
TransferQueue automatically initializes Yuanrong datasystem workers across all Ray cluster nodes. Just set `auto_init: True` in the configuration and TransferQueue will handle the multi-node deployment.
270247

248+
Let's take two nodes (for instance, 192.168.0.1 and 192.168.0.2) as an example.
271249

272-
#### Deploy multi-nodes datasystem
273-
On each node, you need to connect to the etcd service on the head node using your local node's IP address.
250+
#### Deploy ray
274251
```bash
275-
#on head node
276-
dscli start -w --worker_address "10.170.27.24:31501" --etcd_address "10.170.27.24:2379"
252+
# on head node
253+
ray start --head --resources='{"node:192.168.0.1": 1}'
254+
255+
# on worker node (assume ray port of head_node is 6379)
256+
ray start --address="192.168.0.1:6379" --resources='{"node:192.168.0.2": 1}'
277257
```
278258

279-
```bash
280-
#on work node
281-
dscli start -w --worker_address "10.170.27.33:31501" --etcd_address "10.170.27.24:2379"
259+
#### Configuration
260+
261+
TransferQueue will detect all Ray nodes and deploy datasystem workers automatically:
262+
```yaml
263+
backend:
264+
storage_backend: Yuanrong
265+
Yuanrong:
266+
auto_init: True
267+
worker_port: 31501
268+
metastore_port: 2379
269+
enable_yr_npu_transport: true
270+
worker_args: "--shared_memory_size_mb 65536 --remote_h2d_device_ids 0 --enable_huge_tlb true"
282271
```
283-
Now you can use datasystem on head-node and work-node.
284272

285273
> For more detailed deployment instructions, please refer to [yuanrong documents](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/README.md#%E9%83%A8%E7%BD%B2-openyuanrong-datasystem).
286274
> The configuration parameters for deploying the data system can refer [dscli config](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/docs/source_zh_cn/deployment/dscli.md#%E9%85%8D%E7%BD%AE%E9%A1%B9%E8%AF%B4%E6%98%8E).
287275

288276
There is a demo with multi-node scenarios as fellow.
289277

290-
#### Deploy ray
291-
```bash
292-
# on head node
293-
ray start --head --resources='{"node:10.170.27.24": 1}'
294-
295-
# on worker node (assume ray port of head_node is 6379)
296-
ray start --address="10.170.27.24:6379" --resources='{"node:10.170.27.33": 1}'
297-
```
298-
299278
#### Run demo
300-
In the demo below, we use ray actors to implement distributed deployment of processes.
279+
In the demo below, we use ray actors to implement distributed deployment of processes.
301280
The actor writer writes data to the head node, and the actor reader reads data from the worker nodes.
302281
```python
303282
from omegaconf import OmegaConf
304283
from tensordict import TensorDict
284+
import transfer_queue as tq
305285
from transfer_queue import (
306286
TransferQueueClient,
307287
TransferQueueController,
@@ -312,10 +292,10 @@ import ray
312292
313293
########################################################################
314294
# Please set up Ray cluster before running this script
315-
# e.g. ray start --head --resources='{"node:127.0.0.1": 1}'
295+
# e.g. ray start --head --resources='{"node:192.168.0.1": 1}'
316296
########################################################################
317-
HEAD_NODE_IP = "10.170.27.24" # Replace with your head node IP
318-
WORKER_NODE_IP = "10.170.27.33" # Replace with your worker node IP
297+
HEAD_NODE_IP = "192.168.0.1" # Replace with your head node IP
298+
WORKER_NODE_IP = "192.168.0.2" # Replace with your worker node IP
319299
320300
321301
def initialize_controller():
@@ -357,10 +337,12 @@ class TransferQueueClientActor:
357337
358338
359339
def main():
340+
tq.init()
341+
360342
config_str = """
361343
manager_type: YuanrongStorageManager
362344
client_name: YuanrongStorageClient
363-
port: 31501
345+
worker_port: 31501
364346
"""
365347
dict_conf = OmegaConf.create(config_str, flags={"allow_objects": True})
366348
# It is important to pay attention to the controller's lifecycle.
@@ -387,6 +369,8 @@ def main():
387369
)
388370
output = ray.get(output)
389371
372+
tq.close()
373+
390374
if __name__ == "__main__":
391375
main()
392376

scripts/performance_test/README_PERFTEST.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,11 @@ backend:
6464
backend:
6565
storage_backend: Yuanrong
6666
Yuanrong:
67-
port: 31501
67+
auto_init: True
68+
worker_port: 31501
69+
metastore_port: 2379
6870
enable_yr_npu_transport: true
71+
worker_args: "--shared_memory_size_mb 65536 --remote_h2d_device_ids 0 --enable_huge_tlb true"
6972
```
7073
7174
For Yuanrong backend, writer runs on the head node and reader runs on the worker node. `--worker_node_ip` is required.

scripts/performance_test/perftest_config.yaml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,17 @@ backend:
4848

4949
# For Yuanrong:
5050
Yuanrong:
51-
# Port of local yuanrong datasystem worker
52-
port: 31501
51+
# Whether to let TQ automatically init yuanrong
52+
auto_init: True
53+
# Datasystem worker port
54+
worker_port: 31501
55+
# Metastore service port
56+
metastore_port: 2379
5357
# If enable npu transport
5458
enable_yr_npu_transport: true
59+
# Additional config for yuanrong worker.
60+
# Recommended options for NPU environments:
61+
# --remote_h2d_device_ids Enable RH2D for efficient cross-node data transfer. Specify NPU device IDs (comma-separated).
62+
# --enable_huge_tlb Enable huge page memory to improve performance. Required for >21GB shared memory on 910B.
63+
# Example: "--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3 --enable_huge_tlb true"
64+
worker_args: "--shared_memory_size_mb 65536 --remote_h2d_device_ids 0 --enable_huge_tlb true"

tests/e2e/test_e2e_lifecycle_consistency.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@
8080
"backend": {
8181
"storage_backend": "Yuanrong",
8282
"Yuanrong": {
83-
"host": "127.0.0.1",
84-
"port": 31501,
83+
"worker_port": 31501,
84+
"metastore_port": 2379,
8585
},
8686
},
8787
},
@@ -102,11 +102,12 @@ def backend_name():
102102
"""Get the backend name from environment variable.
103103
104104
Environment variables:
105-
TQ_TEST_BACKEND: Backend name (SimpleStorage or MooncakeStore)
105+
TQ_TEST_BACKEND: Backend name (SimpleStorage, MooncakeStore, or Yuanrong)
106106
107107
To run tests for a specific backend:
108108
TQ_TEST_BACKEND=SimpleStorage pytest tests/e2e/test_e2e_lifecycle_consistency.py
109109
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_e2e_lifecycle_consistency.py
110+
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_e2e_lifecycle_consistency.py
110111
"""
111112
return os.environ.get("TQ_TEST_BACKEND", "SimpleStorage")
112113

tests/e2e/test_kv_interface_e2e.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,18 @@ def tq_api(request):
9494
},
9595
},
9696
},
97+
"Yuanrong": {
98+
"controller": {
99+
"polling_mode": True,
100+
},
101+
"backend": {
102+
"storage_backend": "Yuanrong",
103+
"Yuanrong": {
104+
"worker_port": 31501,
105+
"metastore_port": 2379,
106+
},
107+
},
108+
},
97109
}
98110

99111

@@ -112,11 +124,12 @@ def backend_name():
112124
"""Get the backend name from environment variable.
113125
114126
Environment variables:
115-
TQ_TEST_BACKEND: Backend name (SimpleStorage or MooncakeStore)
127+
TQ_TEST_BACKEND: Backend name (SimpleStorage, MooncakeStore, or Yuanrong)
116128
117129
To run tests for a specific backend:
118130
TQ_TEST_BACKEND=SimpleStorage pytest tests/e2e/test_kv_interface_e2e.py
119131
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_kv_interface_e2e.py
132+
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_kv_interface_e2e.py
120133
"""
121134
return os.environ.get("TQ_TEST_BACKEND", "SimpleStorage")
122135

tests/test_kv_storage_manager.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ def test_data():
5757
cfg = {
5858
"controller_info": MagicMock(),
5959
"client_name": "YuanrongStorageClient",
60-
"host": "127.0.0.1",
61-
"port": 31501,
60+
"worker_port": 31501,
6261
"device_id": 0,
6362
}
6463
global_indexes = [8, 9, 10]

tests/test_storage_client_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
class Test(unittest.TestCase):
2727
def setUp(self):
28-
self.cfg = {"host": "127.0.0.1", "port": 31501, "device_id": 0}
28+
self.cfg = {"worker_port": 31501, "device_id": 0}
2929

3030
@pytest.mark.skipif(find_spec("datasystem") is None, reason="datasystem is not available")
3131
def test_create_client(self):

tests/test_yuanrong_client_zero_copy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def mock_kv_client(self, mocker):
4646

4747
@pytest.fixture
4848
def storage_client(self, mock_kv_client):
49-
return GeneralKVClientAdapter({"host": "127.0.0.1", "port": 31501})
49+
return GeneralKVClientAdapter({"worker_port": 31501})
5050

5151
def test_mset_mget_p2p(self, storage_client, mocker):
5252
# Mock serialization/deserialization

tests/test_yuanrong_storage_client_e2e.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def mock_find_reachable_host(port, timeout=1.0):
125125

126126
@pytest.fixture
127127
def config():
128-
return {"host": "127.0.0.1", "port": 12345, "enable_yr_npu_optimization": True}
128+
return {"worker_port": 12345, "enable_yr_npu_optimization": True}
129129

130130

131131
def assert_tensors_equal(a: torch.Tensor, b: torch.Tensor):

0 commit comments

Comments
 (0)