Skip to content

Commit dc7d203

Browse files
authored
[feat] Enable openYuanrong RDMA support (#108)
## Description For 910B nodes with an additional RoCE NIC (besides NPU-side RoCE), openYuanrong datasystem supports host RDMA (H2H) transport via UCX. Since TQ routes CPU tensors through KV client and NPU tensors through tensor client by tensor location, H2H RDMA and RH2D can be enabled simultaneously — they are **not mutually exclusive**. Previously, enabling RDMA required manually adding `--enable_rdma true` to `worker_args` and setting `UCX_TLS=rc_x` in the environment. This PR introduces dedicated config options for one-click RDMA enablement. ## Changes 1. **`config.yaml`**: Added `enable_rdma` (default `false`) and `ucx_env_vars` (default `{}`). When `enable_rdma=true`, TQ auto-adds `--enable_rdma true` to dscli cmd and defaults `UCX_TLS=rc_x`. `ucx_env_vars` lets users specify UCX env vars (UCX_TLS, UCX_LOG_FILE, UCX_LOG_LEVEL, UCX_NET_DEVICES, UCX_TCP_CM_ROUTE) with highest priority over parent env. 2. **`yuanrong_bootstrap.py`**: Wired `enable_rdma` and `ucx_env_vars` through config → actor → `start_datasystem_worker`. Env priority: `ucx_env_vars` > parent env > default `UCX_TLS=rc_x`. 3. **`openyuanrong_datasystem.md`**: Added RDMA Options section, updated config examples, added manual RDMA startup instructions, and added RDMA FAQ (endpoint timeout, verification, container memlock). ## Related Issues Closes #98 --------- Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
1 parent 0277d3f commit dc7d203

4 files changed

Lines changed: 137 additions & 27 deletions

File tree

docs/storage_backends/openyuanrong_datasystem.md

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# OpenYuanrong-Datasystem Integration for TransferQueue
22

3-
> Last updated: 05/18/2026
3+
> Last updated: 05/28/2026
44
55
## Overview
66

@@ -134,6 +134,8 @@ backend:
134134
worker_port: 31501 # Port for Yuanrong datasystem worker on each node
135135
metastore_port: 2379 # Port for metastore service on the head node
136136
enable_yr_npu_transport: true # Enable NPU transport for high-performance device-to-device transfer
137+
enable_rdma: false # Enable host RDMA (H2H) transport via UCX
138+
ucx_env_vars: {} # UCX env vars for dscli subprocess (e.g., {UCX_LOG_FILE: /tmp/ucx.log, UCX_LOG_LEVEL: ERROR})
137139
worker_args: "--shared_memory_size_mb 8192 --remote_h2d_device_ids 0 --enable_huge_tlb true"
138140
```
139141
@@ -143,13 +145,24 @@ backend:
143145
- `metastore_port`: Port for metastore service on the head node.
144146
- `worker_args`: Additional arguments passed to `dscli start` command:
145147
- `--shared_memory_size_mb`: Shared memory size in MB for datasystem worker.
146-
- `--enable_huge_tlb`: Configure huge page memory to reduce TLB misses and improve memory access efficiency. Note: may cause system memory shortage, kernel OOM, or system instability. **Please allocate huge pages before starting datasystem** - refer to [Huge Page Guide](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/appendix/hugepage_guide.html).
148+
- `--enable_huge_tlb`: Configure huge page memory to reduce TLB misses and improve memory access efficiency. Note: may cause system memory shortage, kernel OOM, or system instability. **Please allocate huge pages before starting datasystem** - refer to [Huge Page Guide](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/appendix/hugepage_guide.html). Before enabling, OS config required (root privilege): `sysctl -w vm.nr_hugepages=<count>` (each page is 2MB, e.g. 65536 for 128GB) and `ulimit -l unlimited` (allow pinning enough memory for RDMA/Ascend).
147149

148150
**NPU Transfer Options:**
149151
- `enable_yr_npu_transport`: Enable NPU transport for high-performance device-to-device data transfer. Set to `true` when using NPU tensors.
150152
- `worker_args` (**mandatory** when `enable_yr_npu_transport: true`):
151153
- `--remote_h2d_device_ids`: Enable RH2D (Remote Host-to-Device) for efficient cross-node NPU data transfer. Specify NPU device IDs as comma-separated values (e.g., `0,1,2,3`). Yuanrong manages all specified devices - to put/get tensors on NPU `X`, device ID `X` must be included in this argument.
152154

155+
**RDMA Options:**
156+
- `enable_rdma`: Whether to enable host RDMA (H2H) transport via UCX. Requires RDMA-capable NIC hardware and `rdma-core` driver on all nodes. When enabled, TQ automatically adds `--enable_rdma true` to the dscli startup command and defaults `UCX_TLS=rc_x` in the subprocess environment. RDMA H2H and RH2D (NPU cross-node) can be enabled simultaneously — they are **not** mutually exclusive.
157+
- `ucx_env_vars`: Dictionary of UCX environment variables passed to the dscli subprocess. These override parent process environment. Common variables:
158+
- `UCX_TLS`: RDMA transport mode. Precedence: `ucx_env_vars` > parent env > fallback default `rc_x` (when `enable_rdma=true`). Alternatives: `rc` (compatible), `ud` (low-latency), `dc` (large-scale). See [UCX environment parameters](https://github.com/openucx/ucx/wiki/UCX-environment-parameters).
159+
- `UCX_LOG_FILE`: Path to UCX log file (e.g., `/tmp/ucx.log`). Requires `UCX_LOG_LEVEL` to be set.
160+
- `UCX_LOG_LEVEL`: Log verbosity — `FATAL`, `ERROR`, `WARN`, `INFO`, `DEBUG`, `TRACE`. Use `DEBUG`/`TRACE` for troubleshooting.
161+
- `UCX_NET_DEVICES`: RDMA device name (e.g., `mlx5_0:1`). Required in multi-NIC setups.
162+
- `UCX_TCP_CM_ROUTE`: TCP control-flow interface for UCX connection setup. Must match the RDMA NIC's network plane.
163+
164+
> For RDMA best practices, troubleshooting, and K8s deployment, see [openYuanrong RDMA Best Practices](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html).
165+
153166
> More configuration parameters for deploying the datasystem can refer to [dscli config](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/docs/source_zh_cn/deployment/dscli.md).
154167

155168
### Multi-Node Deployment
@@ -178,6 +191,8 @@ backend:
178191
worker_port: 31501
179192
metastore_port: 2379
180193
enable_yr_npu_transport: true
194+
enable_rdma: false
195+
ucx_env_vars: {}
181196
worker_args: "--shared_memory_size_mb 65536 --remote_h2d_device_ids 0 --enable_huge_tlb true"
182197
```
183198

@@ -290,6 +305,46 @@ dscli start -w --worker_address <WORKER_IP>:31501 \
290305
--shared_memory_size_mb 8192
291306
```
292307

308+
### Start with RDMA
309+
310+
To enable RDMA for host-to-host (H2H) transfer, add `--enable_rdma true` to the dscli command and set UCX environment variables:
311+
312+
```bash
313+
# Set UCX environment variables
314+
export UCX_TLS=rc_x
315+
# (Optional) Configure UCX logging for debugging
316+
export UCX_LOG_FILE=/tmp/ucx.log
317+
export UCX_LOG_LEVEL=ERROR
318+
319+
# Head node
320+
dscli start -w --worker_address <HEAD_IP>:31501 \
321+
--metastore_address <HEAD_IP>:2379 \
322+
--start_metastore_service true \
323+
--enable_rdma true \
324+
--arena_per_tenant 1 \
325+
--enable_worker_worker_batch_get true \
326+
--shared_memory_size_mb 8192 \
327+
--enable_huge_tlb true
328+
329+
# Worker node
330+
dscli start -w --worker_address <WORKER_IP>:31501 \
331+
--metastore_address <HEAD_IP>:2379 \
332+
--enable_rdma true \
333+
--arena_per_tenant 1 \
334+
--enable_worker_worker_batch_get true \
335+
--shared_memory_size_mb 8192 \
336+
--enable_huge_tlb true
337+
```
338+
339+
Parameters:
340+
- `--enable_rdma true`: Enable RDMA for H2H data transfer between workers.
341+
- `--arena_per_tenant 1`: Number of shared memory arenas per tenant. Set to 1 for fastest startup; higher values improve first-allocation performance but increase fd usage.
342+
- `--enable_worker_worker_batch_get true`: Enable batch get between workers for better cross-node transfer throughput.
343+
- `--shared_memory_size_mb 8192`: Per-node shared memory size in MB. All clients on the same node share this shared memory space.
344+
- `--enable_huge_tlb true`: Enable huge page memory to reduce TLB misses and accelerate startup/transfer. Before enabling, OS config required (root privilege): `sysctl -w vm.nr_hugepages=<count>` (each page is 2MB) and `ulimit -l unlimited`.
345+
346+
> `UCX_TLS=rc_x` forces RDMA transport — if RDMA is unavailable, the system will error rather than fall back to TCP. For alternative transport modes, see [UCX environment parameters](https://github.com/openucx/ucx/wiki/UCX-environment-parameters).
347+
293348
### Stop Worker
294349

295350
```bash
@@ -388,6 +443,17 @@ Common errors and solutions:
388443
- `Device not found`: Check if device ID is included in `--remote_h2d_device_ids`
389444
- `CANN error`: Verify CANN installation path and environment variables
390445

446+
### RDMA Issues
447+
448+
When using `enable_rdma: true`, ensure:
449+
- RDMA NIC hardware and `rdma-core` driver are installed on all nodes. Verify with `ibv_devices`.
450+
- `UCX_TLS=rc_x` is compatible with your NIC. If not, set alternative mode via `ucx_env_vars` (e.g., `{UCX_TLS: rc}`).
451+
452+
Common errors and solutions:
453+
- **UCX endpoint timeout**: In multi-NIC setups, UCX may select an isolated network interface for control flow. Set `UCX_NET_DEVICES` and `UCX_TCP_CM_ROUTE` in `ucx_env_vars` to specify the correct RDMA device and its TCP interface. See [openYuanrong RDMA Best Practices](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html) for detailed troubleshooting.
454+
- **RDMA verification**: Set `UCX_LOG_FILE` and `UCX_LOG_LEVEL` in `ucx_env_vars` (e.g., `{UCX_LOG_FILE: /tmp/ucx.log, UCX_LOG_LEVEL: INFO}`), then check logs for RC/RDMA entries to confirm RDMA is active.
455+
- **Container environments**: Set `memlock` to `unlimited` in the container, otherwise RDMA memory registration may fail.
456+
391457
### Out of Memory Error
392458

393459
If Yuanrong throws an OOM error during operation:

transfer_queue/config.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,23 @@ backend:
6161
metastore_port: 2379
6262
# If enable npu transport
6363
enable_yr_npu_transport: false
64+
# If enable host RDMA (H2H) transport via UCX. Requires RDMA NIC hardware and rdma-core driver.
65+
# See https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html
66+
enable_rdma: false
67+
# UCX env vars passed to dscli subprocess. Precedence: ucx_env_vars > parent env > TQ default (UCX_TLS=rc_x when enable_rdma=true).
68+
# UCX_TLS: RDMA transport mode. "rc_x" (default when RDMA enabled and unset), "rc" (compatible), "ud" (low-latency), "dc" (large-scale).
69+
# UCX_LOG_FILE: Path to UCX log file. Requires UCX_LOG_LEVEL to be set.
70+
# UCX_LOG_LEVEL: FATAL, ERROR, WARN, INFO, DEBUG, TRACE. Use DEBUG/TRACE for troubleshooting.
71+
# UCX_NET_DEVICES: RDMA device name (e.g., mlx5_0:1). Required in multi-NIC setups.
72+
# UCX_TCP_CM_ROUTE: TCP control-flow interface for UCX connection setup.
73+
# Example: ucx_env_vars: { UCX_TLS: rc_x, UCX_LOG_FILE: /tmp/ucx.log, UCX_LOG_LEVEL: ERROR }
74+
ucx_env_vars: {}
6475
# Additional config for yuanrong worker.
6576
# Recommended options for NPU environments:
6677
# --remote_h2d_device_ids Enable RH2D for efficient cross-node data transfer. Specify NPU device IDs (comma-separated).
6778
# --enable_huge_tlb Enable huge page memory to improve performance. Required for >21GB shared memory on 910B.
79+
# Before enabling, OS config required (root privilege):
80+
# sysctl -w vm.nr_hugepages=<count> (each page is 2MB, e.g. 65536 for 128GB)
81+
# ulimit -l unlimited (allow pinning enough memory for RDMA/Ascend)
6882
# Example: "--shared_memory_size_mb 16384 --remote_h2d_device_ids 0,1,2,3 --enable_huge_tlb true"
6983
worker_args: "--shared_memory_size_mb 8192"

transfer_queue/storage/bootstrap/yuanrong_bootstrap.py

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ def start_datasystem_worker(
7878
metastore_address: str,
7979
is_head: bool,
8080
worker_args: str = "",
81+
enable_rdma: bool = False,
82+
ucx_env_vars: dict | None = None,
8183
) -> None:
8284
"""Start Yuanrong datasystem worker in metastore mode.
8385
@@ -105,18 +107,33 @@ def start_datasystem_worker(
105107
if worker_args:
106108
cmd.extend(worker_args.split())
107109

110+
# Append --enable_rdma if enabled
111+
if enable_rdma:
112+
cmd.extend(["--enable_rdma", "true"])
113+
108114
node_type = "head node" if is_head else "worker node"
109115
logger.info(f"Starting Yuanrong datasystem ({node_type}) at {worker_address}, worker_args={worker_args}")
110116

111117
# Build environment with ASCEND_RT_VISIBLE_DEVICES if specified
112118
env = None
113119
device_ids = _parse_remote_h2d_device_ids(worker_args)
114-
if device_ids:
120+
if device_ids or enable_rdma or ucx_env_vars:
115121
env = os.environ.copy()
116-
env["ASCEND_RT_VISIBLE_DEVICES"] = device_ids
117-
logger.info(
118-
f"Setting ASCEND_RT_VISIBLE_DEVICES={device_ids} for dscli subprocess ({node_type} at {worker_address})"
119-
)
122+
if device_ids:
123+
env["ASCEND_RT_VISIBLE_DEVICES"] = device_ids
124+
logger.info(
125+
f"Setting ASCEND_RT_VISIBLE_DEVICES={device_ids} for dscli subprocess ({node_type} at {worker_address})"
126+
)
127+
# ucx_env_vars overrides parent env (highest priority)
128+
if ucx_env_vars:
129+
for key, value in ucx_env_vars.items():
130+
env[key] = str(value)
131+
# Default UCX_TLS=rc_x only when enable_rdma and UCX_TLS still absent
132+
if enable_rdma and "UCX_TLS" not in env:
133+
env["UCX_TLS"] = "rc_x"
134+
logger.info(
135+
f"Setting UCX_TLS=rc_x (default for RDMA) for dscli subprocess ({node_type} at {worker_address})"
136+
)
120137

121138
try:
122139
ds_result = subprocess.run(
@@ -179,7 +196,15 @@ class YuanrongWorkerActor:
179196
intersection of local IP addresses with the provided node IPs.
180197
"""
181198

182-
def __init__(self, node_ips: list[str], worker_port: int, metastore_port: int, worker_args: str = ""):
199+
def __init__(
200+
self,
201+
node_ips: list[str],
202+
worker_port: int,
203+
metastore_port: int,
204+
worker_args: str = "",
205+
enable_rdma: bool = False,
206+
ucx_env_vars: dict | None = None,
207+
):
183208
"""Initialize the Yuanrong worker actor.
184209
185210
Args:
@@ -208,6 +233,8 @@ def __init__(self, node_ips: list[str], worker_port: int, metastore_port: int, w
208233
self.metastore_port = metastore_port
209234
self.worker_address = f"{self.my_ip}:{worker_port}"
210235
self.worker_args = worker_args
236+
self.enable_rdma = enable_rdma
237+
self.ucx_env_vars = ucx_env_vars
211238

212239
# First node in the list is assumed to be the head node.
213240
# This assumption is based on how interface.py constructs node_ips from ray.nodes().
@@ -236,6 +263,8 @@ def start(self) -> str:
236263
metastore_address=self.metastore_address,
237264
is_head=self.is_head,
238265
worker_args=self.worker_args,
266+
enable_rdma=self.enable_rdma,
267+
ucx_env_vars=self.ucx_env_vars,
239268
)
240269
logger.info(f"Datasystem worker started successfully at {self.worker_address}")
241270
return self.worker_address
@@ -313,6 +342,8 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None:
313342
worker_port = conf.backend.Yuanrong.worker_port
314343
metastore_port = conf.backend.Yuanrong.metastore_port
315344
worker_args = conf.backend.Yuanrong.get("worker_args", "")
345+
enable_rdma = conf.backend.Yuanrong.get("enable_rdma", False)
346+
ucx_env_vars = dict(conf.backend.Yuanrong.get("ucx_env_vars", {}))
316347

317348
logger.info(f"Found {len(ordered_nodes)} alive Ray nodes: {node_ips}")
318349

@@ -352,14 +383,14 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None:
352383
actor = YuanrongWorkerActor.options( # type: ignore[attr-defined]
353384
placement_group=pg,
354385
placement_group_bundle_index=rank,
355-
).remote(node_ips, worker_port, metastore_port, worker_args)
386+
).remote(node_ips, worker_port, metastore_port, worker_args, enable_rdma, ucx_env_vars)
356387
worker_actors.append(actor)
357388

358389
logger.info(f"Created {len(worker_actors)} YuanrongWorkerActor instances")
359390

360391
# Find which actor is running on the head node (driver IP)
361392
# The head node actor needs to start first to initialize metastore service
362-
head_actor_index = None
393+
head_actor_index = 0
363394
for idx, actor in enumerate(worker_actors):
364395
try:
365396
node_ip = ray.get(actor.get_node_ip.remote())
@@ -369,10 +400,6 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None:
369400
except Exception:
370401
pass
371402

372-
if head_actor_index is None:
373-
logger.warning("Could not identify head node actor, using actor 0 as default")
374-
head_actor_index = 0
375-
376403
logger.info(f"Head node actor identified: actor {head_actor_index}")
377404

378405
# Start head worker first to initialize metastore service
@@ -395,26 +422,27 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None:
395422
"worker_actors": worker_actors,
396423
"metastore_address": metastore_address,
397424
"placement_group": pg,
425+
"head_actor_index": head_actor_index,
398426
}
399427
except Exception as e:
400428
# Cleanup on initialization failure: attempt graceful stop of started workers first
401429
logger.error(f"Failed to start Yuanrong workers: {e}, cleaning up...")
402430

403431
# Try to gracefully stop workers that may have already started
432+
# Stop worker nodes (non-head) in parallel first, then head node (metastore service)
404433
if worker_actors:
405434
stop_exceptions = []
406-
# Stop worker nodes (all except head node 0) first
407-
if len(worker_actors) > 1:
408-
stop_refs = [actor.stop.remote() for actor in worker_actors[1:]]
409-
for idx, stop_ref in enumerate(stop_refs, start=1):
435+
other_indices = [i for i in range(len(worker_actors)) if i != head_actor_index]
436+
if other_indices:
437+
stop_refs = [worker_actors[idx].stop.remote() for idx in other_indices]
438+
for idx, stop_ref in zip(other_indices, stop_refs, strict=False):
410439
try:
411440
ray.get(stop_ref, timeout=30)
412441
except Exception as stop_e:
413442
stop_exceptions.append(stop_e)
414443
logger.warning(f"Failed to stop worker node actor {idx}: {stop_e}")
415-
# Stop head node (actor 0)
416444
try:
417-
ray.get(worker_actors[0].stop.remote(), timeout=30)
445+
ray.get(worker_actors[head_actor_index].stop.remote(), timeout=30)
418446
except Exception as stop_e:
419447
stop_exceptions.append(stop_e)
420448
logger.warning(f"Failed to stop head node actor: {stop_e}")

transfer_queue/utils/yuanrong_utils.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,17 +153,19 @@ def cleanup_yuanrong_resources(storage_value: Any) -> None:
153153

154154
worker_actors = storage_value.get("worker_actors", [])
155155
placement_group = storage_value.get("placement_group")
156+
head_actor_index = storage_value.get("head_actor_index", 0)
156157

157158
try:
158159
if worker_actors:
159160
logger.info(f"Cleaning up Yuanrong backend (stopping {len(worker_actors)} workers)...")
160161

161-
# Stop worker nodes (all except head node 0) in parallel first
162+
# Stop worker nodes (non-head) in parallel first, then head node (metastore service)
162163
stop_exceptions = []
163-
if len(worker_actors) > 1:
164-
logger.info(f"Stopping {len(worker_actors) - 1} worker nodes (excluding head) in parallel...")
165-
stop_refs = [actor.stop.remote() for actor in worker_actors[1:]]
166-
for idx, stop_ref in enumerate(stop_refs, start=1):
164+
other_indices = [i for i in range(len(worker_actors)) if i != head_actor_index]
165+
if other_indices:
166+
logger.info(f"Stopping {len(other_indices)} worker nodes in parallel...")
167+
stop_refs = [worker_actors[idx].stop.remote() for idx in other_indices]
168+
for idx, stop_ref in zip(other_indices, stop_refs, strict=False):
167169
try:
168170
ray.get(stop_ref)
169171
except Exception as e:
@@ -172,10 +174,10 @@ def cleanup_yuanrong_resources(storage_value: Any) -> None:
172174
if len(stop_exceptions) < len(stop_refs):
173175
logger.info("Completed stop requests for non-head worker nodes")
174176

175-
# Then stop head node (actor 0) which runs metastore service
177+
# Then stop head node which runs metastore service
176178
logger.info("Stopping head node with metastore service...")
177179
try:
178-
ray.get(worker_actors[0].stop.remote())
180+
ray.get(worker_actors[head_actor_index].stop.remote())
179181
logger.info("Head node stopped successfully")
180182
except Exception as e:
181183
stop_exceptions.append(e)

0 commit comments

Comments
 (0)