Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion docs/storage_backends/openyuanrong_datasystem.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# OpenYuanrong-Datasystem Integration for TransferQueue

> Last updated: 05/18/2026
> Last updated: 05/28/2026

## Overview

Expand Down Expand Up @@ -134,6 +134,8 @@ backend:
worker_port: 31501 # Port for Yuanrong datasystem worker on each node
metastore_port: 2379 # Port for metastore service on the head node
enable_yr_npu_transport: true # Enable NPU transport for high-performance device-to-device transfer
enable_rdma: false # Enable host RDMA (H2H) transport via UCX
ucx_env_vars: {} # UCX env vars for dscli subprocess (e.g., {UCX_LOG_FILE: /tmp/ucx.log, UCX_LOG_LEVEL: ERROR})
worker_args: "--shared_memory_size_mb 8192 --remote_h2d_device_ids 0 --enable_huge_tlb true"
```

Expand All @@ -150,6 +152,17 @@ backend:
- `worker_args` (**mandatory** when `enable_yr_npu_transport: true`):
- `--remote_h2d_device_ids`: Enable RH2D (Remote Host-to-Device) for efficient cross-node NPU data transfer. Specify NPU device IDs as comma-separated values (e.g., `0,1,2,3`). Yuanrong manages all specified devices - to put/get tensors on NPU `X`, device ID `X` must be included in this argument.

**RDMA Options:**
- `enable_rdma`: Whether to enable host RDMA (H2H) transport via UCX. Requires RDMA-capable NIC hardware and `rdma-core` driver on all nodes. When enabled, TQ automatically adds `--enable_rdma true` to the dscli startup command and defaults `UCX_TLS=rc_x` in the subprocess environment. RDMA H2H and RH2D (NPU cross-node) can be enabled simultaneously — they are **not** mutually exclusive.
- `ucx_env_vars`: Dictionary of UCX environment variables passed to the dscli subprocess. These override parent process environment. Common variables:
- `UCX_TLS`: RDMA transport mode. Defaults to `rc_x` when `enable_rdma=true` and not specified here. Alternatives: `rc` (compatible), `ud` (low-latency), `dc` (large-scale). See [UCX environment parameters](https://github.com/openucx/ucx/wiki/UCX-environment-parameters).
- `UCX_LOG_FILE`: Path to UCX log file (e.g., `/tmp/ucx.log`). Requires `UCX_LOG_LEVEL` to be set.
- `UCX_LOG_LEVEL`: Log verbosity — `FATAL`, `ERROR`, `WARN`, `INFO`, `DEBUG`, `TRACE`. Use `DEBUG`/`TRACE` for troubleshooting.
- `UCX_NET_DEVICES`: RDMA device name (e.g., `mlx5_0:1`). Required in multi-NIC setups.
- `UCX_TCP_CM_ROUTE`: TCP control-flow interface for UCX connection setup. Must match the RDMA NIC's network plane.

> For RDMA best practices, troubleshooting, and K8s deployment, see [openYuanrong RDMA Best Practices](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html).

> More configuration parameters for deploying the datasystem can refer to [dscli config](https://gitcode.com/openeuler/yuanrong-datasystem/blob/master/docs/source_zh_cn/deployment/dscli.md).

### Multi-Node Deployment
Expand Down Expand Up @@ -178,6 +191,8 @@ backend:
worker_port: 31501
metastore_port: 2379
enable_yr_npu_transport: true
enable_rdma: false
ucx_env_vars: {}
worker_args: "--shared_memory_size_mb 65536 --remote_h2d_device_ids 0 --enable_huge_tlb true"
```

Expand Down Expand Up @@ -290,6 +305,37 @@ dscli start -w --worker_address <WORKER_IP>:31501 \
--shared_memory_size_mb 8192
```

### Start with RDMA

To enable RDMA for host-to-host (H2H) transfer, add `--enable_rdma true` to the dscli command and set UCX environment variables:

```bash
# Set UCX environment variables
export UCX_TLS=rc_x
# (Optional) Configure UCX logging for debugging
export UCX_LOG_FILE=/tmp/ucx.log
export UCX_LOG_LEVEL=ERROR

# Head node
dscli start -w --worker_address <HEAD_IP>:31501 \
--metastore_address <HEAD_IP>:2379 \
--start_metastore_service true \
--enable_rdma true \
--arena_per_tenant 1 \
--enable_worker_worker_batch_get true \
--shared_memory_size_mb 8192

# Worker node
dscli start -w --worker_address <WORKER_IP>:31501 \
--metastore_address <HEAD_IP>:2379 \
--enable_rdma true \
--arena_per_tenant 1 \
--enable_worker_worker_batch_get true \
--shared_memory_size_mb 8192
```

> `UCX_TLS=rc_x` forces RDMA transport — if RDMA is unavailable, the system will error rather than fall back to TCP. For alternative transport modes, see [UCX environment parameters](https://github.com/openucx/ucx/wiki/UCX-environment-parameters).

### Stop Worker

```bash
Expand Down Expand Up @@ -388,6 +434,17 @@ Common errors and solutions:
- `Device not found`: Check if device ID is included in `--remote_h2d_device_ids`
- `CANN error`: Verify CANN installation path and environment variables

### RDMA Issues

When using `enable_rdma: true`, ensure:
- RDMA NIC hardware and `rdma-core` driver are installed on all nodes. Verify with `ibv_devices`.
- `UCX_TLS=rc_x` is compatible with your NIC. If not, set alternative mode via `ucx_env_vars` (e.g., `{UCX_TLS: rc}`).

Common errors and solutions:
- **UCX endpoint timeout**: In multi-NIC setups, UCX may select an isolated network interface for control flow. Set `UCX_NET_DEVICES` and `UCX_TCP_CM_ROUTE` in `ucx_env_vars` to specify the correct RDMA device and its TCP interface. See [openYuanrong RDMA Best Practices](https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html) for detailed troubleshooting.
- **RDMA verification**: Set `UCX_LOG_FILE` and `UCX_LOG_LEVEL` in `ucx_env_vars` (e.g., `{UCX_LOG_FILE: /tmp/ucx.log, UCX_LOG_LEVEL: INFO}`), then check logs for RC/RDMA entries to confirm RDMA is active.
- **Container environments**: Set `memlock` to `unlimited` in the container, otherwise RDMA memory registration may fail.

### Out of Memory Error

If Yuanrong throws an OOM error during operation:
Expand Down
11 changes: 11 additions & 0 deletions transfer_queue/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ backend:
metastore_port: 2379
# If enable npu transport
enable_yr_npu_transport: false
# If enable host RDMA (H2H) transport via UCX. Requires RDMA NIC hardware and rdma-core driver.
# See https://pages.openeuler.openatom.cn/openyuanrong-datasystem/docs/zh-cn/latest/best_practices/best_practices_for_rdma.html
enable_rdma: false
# UCX env vars passed to dscli subprocess. Overrides parent env. TQ defaults UCX_TLS=rc_x when enable_rdma=true.
# UCX_TLS: RDMA transport mode. "rc_x" (default), "rc" (compatible), "ud" (low-latency), "dc" (large-scale).
Comment on lines +67 to +68
# UCX_LOG_FILE: Path to UCX log file. Requires UCX_LOG_LEVEL to be set.
# UCX_LOG_LEVEL: FATAL, ERROR, WARN, INFO, DEBUG, TRACE. Use DEBUG/TRACE for troubleshooting.
# UCX_NET_DEVICES: RDMA device name (e.g., mlx5_0:1). Required in multi-NIC setups.
# UCX_TCP_CM_ROUTE: TCP control-flow interface for UCX connection setup.
# Example: ucx_env_vars: { UCX_TLS: rc_x, UCX_LOG_FILE: /tmp/ucx.log, UCX_LOG_LEVEL: ERROR }
ucx_env_vars: {}
# Additional config for yuanrong worker.
# Recommended options for NPU environments:
# --remote_h2d_device_ids Enable RH2D for efficient cross-node data transfer. Specify NPU device IDs (comma-separated).
Expand Down
45 changes: 38 additions & 7 deletions transfer_queue/storage/bootstrap/yuanrong_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def start_datasystem_worker(
metastore_address: str,
is_head: bool,
worker_args: str = "",
enable_rdma: bool = False,
ucx_env_vars: dict | None = None,
) -> None:
"""Start Yuanrong datasystem worker in metastore mode.

Expand Down Expand Up @@ -105,18 +107,33 @@ def start_datasystem_worker(
if worker_args:
cmd.extend(worker_args.split())

# Append --enable_rdma if enabled
if enable_rdma:
cmd.extend(["--enable_rdma", "true"])

node_type = "head node" if is_head else "worker node"
logger.info(f"Starting Yuanrong datasystem ({node_type}) at {worker_address}, worker_args={worker_args}")

# Build environment with ASCEND_RT_VISIBLE_DEVICES if specified
env = None
device_ids = _parse_remote_h2d_device_ids(worker_args)
if device_ids:
if device_ids or enable_rdma or ucx_env_vars:
env = os.environ.copy()
env["ASCEND_RT_VISIBLE_DEVICES"] = device_ids
logger.info(
f"Setting ASCEND_RT_VISIBLE_DEVICES={device_ids} for dscli subprocess ({node_type} at {worker_address})"
)
if device_ids:
env["ASCEND_RT_VISIBLE_DEVICES"] = device_ids
logger.info(
f"Setting ASCEND_RT_VISIBLE_DEVICES={device_ids} for dscli subprocess ({node_type} at {worker_address})"
)
# ucx_env_vars overrides parent env (highest priority)
if ucx_env_vars:
for key, value in ucx_env_vars.items():
env[key] = str(value)
# Default UCX_TLS=rc_x only when enable_rdma and UCX_TLS still absent
if enable_rdma and "UCX_TLS" not in env:
env["UCX_TLS"] = "rc_x"
logger.info(
f"Setting UCX_TLS=rc_x (default for RDMA) for dscli subprocess ({node_type} at {worker_address})"
)

try:
ds_result = subprocess.run(
Expand Down Expand Up @@ -179,7 +196,15 @@ class YuanrongWorkerActor:
intersection of local IP addresses with the provided node IPs.
"""

def __init__(self, node_ips: list[str], worker_port: int, metastore_port: int, worker_args: str = ""):
def __init__(
self,
node_ips: list[str],
worker_port: int,
metastore_port: int,
worker_args: str = "",
enable_rdma: bool = False,
ucx_env_vars: dict | None = None,
):
"""Initialize the Yuanrong worker actor.

Args:
Expand Down Expand Up @@ -208,6 +233,8 @@ def __init__(self, node_ips: list[str], worker_port: int, metastore_port: int, w
self.metastore_port = metastore_port
self.worker_address = f"{self.my_ip}:{worker_port}"
self.worker_args = worker_args
self.enable_rdma = enable_rdma
self.ucx_env_vars = ucx_env_vars

# First node in the list is assumed to be the head node.
# This assumption is based on how interface.py constructs node_ips from ray.nodes().
Expand Down Expand Up @@ -236,6 +263,8 @@ def start(self) -> str:
metastore_address=self.metastore_address,
is_head=self.is_head,
worker_args=self.worker_args,
enable_rdma=self.enable_rdma,
ucx_env_vars=self.ucx_env_vars,
)
logger.info(f"Datasystem worker started successfully at {self.worker_address}")
return self.worker_address
Expand Down Expand Up @@ -313,6 +342,8 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None:
worker_port = conf.backend.Yuanrong.worker_port
metastore_port = conf.backend.Yuanrong.metastore_port
worker_args = conf.backend.Yuanrong.get("worker_args", "")
enable_rdma = conf.backend.Yuanrong.get("enable_rdma", False)
ucx_env_vars = dict(conf.backend.Yuanrong.get("ucx_env_vars", {}))

logger.info(f"Found {len(ordered_nodes)} alive Ray nodes: {node_ips}")

Expand Down Expand Up @@ -352,7 +383,7 @@ def initialize_yuanrong_storage(conf: DictConfig) -> dict[str, Any] | None:
actor = YuanrongWorkerActor.options( # type: ignore[attr-defined]
placement_group=pg,
placement_group_bundle_index=rank,
).remote(node_ips, worker_port, metastore_port, worker_args)
).remote(node_ips, worker_port, metastore_port, worker_args, enable_rdma, ucx_env_vars)
worker_actors.append(actor)

logger.info(f"Created {len(worker_actors)} YuanrongWorkerActor instances")
Expand Down
Loading