Skip to content

Commit ddb9a02

Browse files
refactor(data-plane): drop dead set_wire_format/_PACK_JAGGED + adapter cleanup
Three independent simplifications in the mooncake_cpu path. No behavior change for any backend. 1. Drop dead code in codec.py: - set_wire_format() and _PACK_JAGGED were defined but never called anywhere (codec was always packing jagged regardless of backend). Remove both, and the unreachable `if not _PACK_JAGGED: return ...` early-returns inside maybe_pack_jagged / pack_per_token_field. - The "padded fallback for mooncake_cpu" hook was inert — if a future Mooncake bug forces it back, re-add explicitly as a parameter rather than module-level state. 2. Delete _usb0_down() in transfer_queue.py: - Its own docstring says "DO NOT rely on this from Python … no-op on the workers where it matters." Dead code. The Slurm-startup layer is the right place; the next commit drops that block too, because MC_TCP_BIND_ADDRESS makes it unnecessary. 3. Drop the duplicated mooncake_cpu setup in _init_tq: - set_kv_promote_1d(True) and the MC_TCP_BIND_ADDRESS env-var write were already done in TQDataPlaneClient.__init__ (which runs in EVERY process, including the driver before _init_tq). Removing the dups makes __init__ the single source of truth. - Simplify the +x chmod block to a single os.chmod(_master, 0o755) under try/except OSError (drop the os.access pre-check; chmod is idempotent and TOCTOU-free this way). - Move `import ipaddress` to module top. Net: -121 lines across two Python files. All public symbols referenced by tests/data_plane/unit/test_architecture_invariants.py (pack_per_token_field, _to_wire's tensor-only guard) preserved. Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
1 parent 18097cf commit ddb9a02

2 files changed

Lines changed: 38 additions & 128 deletions

File tree

nemo_rl/data_plane/adapters/transfer_queue.py

Lines changed: 29 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from __future__ import annotations
2424

25+
import ipaddress
2526
import os
2627
import socket
2728
import subprocess
@@ -67,19 +68,12 @@ def _tq(): # pragma: no cover - trivially exercised by smoke tests
6768
def _get_local_node_ip() -> str:
6869
"""Return THIS process's host IP, not the cluster head's.
6970
70-
Each Ray actor process must use its own node's IP for Mooncake's
71-
listener bind (multi-node correctness). If we used the head IP,
72-
actors on worker nodes would announce a listener address that
73-
only routes back to the head — peers fail with connection refused.
74-
75-
Skips link-local APIPA addresses (RFC 3927 IPv4 169.254/16,
76-
RFC 4291 IPv6 fe80::/10): on this cluster ``avahi-autoipd``
77-
assigns 169.254.x to ``usb0``, and ``gethostbyname`` can resolve
78-
to that non-routable address. The cluster wrapper's network-init
79-
block strips usb0 in most cases, but the check is a defense in
80-
depth (and free).
71+
Each Ray actor process must use its own node's IP so Mooncake's
72+
announce address (``MC_TCP_BIND_ADDRESS`` → ``desc.ip_or_host_name``
73+
in ``transfer_engine_impl.cpp``) is routable cross-node. Link-local
74+
(169.254/16, fe80::/10) is rejected — ``gethostbyname`` can resolve
75+
to APIPA on hosts where ``avahi-autoipd`` is active.
8176
"""
82-
import ipaddress
8377
try:
8478
ip = socket.gethostbyname(socket.gethostname())
8579
if ipaddress.ip_address(ip).is_link_local:
@@ -89,42 +83,6 @@ def _get_local_node_ip() -> str:
8983
return ""
9084

9185

92-
def _usb0_down() -> None:
93-
"""Best-effort attempt to take down usb0 / strip 169.254.x APIPA.
94-
95-
**DO NOT rely on this from Python.** Ray actors run unprivileged —
96-
the ``ip``/``ifconfig`` calls here silently return "Operation not
97-
permitted" without `CAP_NET_ADMIN`. Even when run as root, the fix
98-
is too late: Mooncake's RPC listener has already scanned
99-
``getifaddrs()`` and bound to the first active interface (usually
100-
``usb0`` 169.254.3.1, the link-local APIPA address) before the
101-
Python adapter is loaded. Background daemons (``avahi-autoipd``,
102-
NetworkManager) also re-assign the APIPA address within seconds.
103-
104-
The proven fix lives at the **Slurm container start-up** layer
105-
(e.g. a ``NETWORK_INIT_CMDS`` block in the cluster wrapper that
106-
kills ``avahi-autoipd``, sets ``nmcli device set usb0 managed no``,
107-
flushes the address, and runs a 5 s relaunch loop as a failsafe).
108-
See ``research/data_plane_mooncake_status.md`` and
109-
``data-plane-bench/DEBUG_TQ_BACKENDS.md`` (Issue 1).
110-
111-
This function is kept for reference only; it is a no-op on the
112-
workers where it matters.
113-
"""
114-
cmds = [
115-
"ifconfig usb0 0.0.0.0 2>/dev/null",
116-
"ifconfig usb0 down 2>/dev/null",
117-
"ip link set usb0 down 2>/dev/null",
118-
"ip addr flush dev usb0 2>/dev/null",
119-
]
120-
try:
121-
subprocess.run(
122-
["sh", "-c", "; ".join(cmds)], check=False, capture_output=True
123-
)
124-
except Exception:
125-
pass
126-
127-
12886
def _mooncake_transport_config() -> dict:
12987
protocol = os.environ.get("MC_MOONCAKE_PROTOCOL", "tcp")
13088
if protocol != "rdma":
@@ -266,46 +224,26 @@ def _init_tq(cfg: DataPlaneConfig) -> None:
266224
},
267225
}
268226
elif backend == "mooncake_cpu":
269-
# Enable KV-path 1D→2D promotion (see codec._KV_PROMOTE_1D);
270-
# mooncake_cpu goes through TQ's KVStorageManager which has the
271-
# 1D schema/data mismatch. Idempotent with the per-process
272-
# set_kv_promote_1d in TQDataPlaneClient.__init__; kept here
273-
# so this branch is self-contained.
274-
from nemo_rl.data_plane.codec import set_kv_promote_1d
275-
set_kv_promote_1d(True)
276-
277227
# The mooncake-transfer-engine wheel ships `mooncake_master` at
278228
# <site-packages>/mooncake/, NOT on $PATH. TQ's
279229
# subprocess.Popen(["mooncake_master", ...]) fails with
280230
# FileNotFoundError unless we put the package dir on PATH first.
281-
# The wheel is a base dep (TQ-tier), so the import should always
282-
# succeed — fail loud otherwise.
283231
import mooncake # type: ignore[import-not-found]
284232

285233
_moon_pkg = os.path.dirname(mooncake.__file__)
286234
_master = os.path.join(_moon_pkg, "mooncake_master")
287-
if os.path.exists(_master) and not os.access(_master, os.X_OK):
288-
# Wheels can strip the +x bit on extract; restore it.
289-
import stat as _stat
290-
try:
291-
os.chmod(
292-
_master,
293-
os.stat(_master).st_mode
294-
| _stat.S_IXUSR | _stat.S_IXGRP | _stat.S_IXOTH,
295-
)
296-
except OSError:
297-
pass
235+
try:
236+
os.chmod(_master, 0o755)
237+
except OSError:
238+
pass
298239
_existing_path = os.environ.get("PATH", "")
299240
if _moon_pkg not in _existing_path.split(os.pathsep):
300241
os.environ["PATH"] = _moon_pkg + os.pathsep + _existing_path
301-
_usb0_down()
242+
# Per-process MC_TCP_BIND_ADDRESS / KV-path promotion already
243+
# set by TQDataPlaneClient.__init__ (runs on every process,
244+
# including this driver). _init_tq only needs local_ip below
245+
# for the metadata/master server URLs (driver-bound).
302246
local_ip = _get_local_node_ip()
303-
if local_ip:
304-
# Force-assign (NOT setdefault): Ray actors inherit env vars
305-
# from the driver, so on multi-node runs every actor would
306-
# otherwise carry the driver's IP and announce listeners at
307-
# the wrong host. Each process must publish its OWN IP.
308-
os.environ["MC_TCP_BIND_ADDRESS"] = local_ip
309247
overlay = {
310248
**controller_overlay,
311249
"backend": {
@@ -418,16 +356,21 @@ def __init__(self, cfg: DataPlaneConfig, *, bootstrap: bool = True) -> None:
418356
cluster — ``cfg`` is then only consulted for client-side
419357
knobs (poll interval).
420358
"""
421-
# mooncake_cpu setup must run BEFORE _init_tq / _connect_existing,
422-
# because Mooncake's getifaddrs() listener bind happens inside
423-
# tq.init/connect — once it's bound to usb0 (169.254.3.1), no env
424-
# var change rescues it. Three per-process knobs needed in EVERY
425-
# process that builds a TQ client (driver, SyncRolloutActor, every
426-
# MegatronPolicyWorker rank):
427-
# 1. MC_TCP_BIND_ADDRESS — picked up by Mooncake engine.so for
428-
# client registration so peers receive a routable address.
429-
# 2. MC_STORE_MEMCPY=0 — bypasses Mooncake #1986 LOCAL_MEMCPY
430-
# cross-process pointer-deref segfault (see comment below).
359+
# mooncake_cpu setup must run BEFORE _init_tq / _connect_existing
360+
# — once tq.init/connect runs, Mooncake's engine.so reads the
361+
# env vars and they can't be changed. Three per-process knobs
362+
# needed in EVERY process that builds a TQ client (driver,
363+
# SyncRolloutActor, every MegatronPolicyWorker rank):
364+
# 1. MC_TCP_BIND_ADDRESS — Mooncake engine.so writes this into
365+
# desc.ip_or_host_name, the address peers receive from the
366+
# metadata service. Without it, getifaddrs()[0] picks usb0
367+
# (169.254.x APIPA) and peers fail to connect.
368+
# 2. MC_STORE_MEMCPY=0 — Mooncake LOCAL_MEMCPY fast-path
369+
# reinterpret_casts cross-process pointers, segfaulting
370+
# MemcpyWorkerPool. PR #1995 (merged 2026-04-30) fixes the
371+
# root cause but isn't in any published wheel yet
372+
# (mooncake-transfer-engine 0.3.10.post2 was bumped before
373+
# that merge). Drop this once the wheel includes the fix.
431374
# 3. KV-path 1D promotion — works around TQ's
432375
# extract_field_schema schema/data mismatch for 1D fields.
433376
if cfg.get("backend") == "mooncake_cpu":
@@ -438,12 +381,6 @@ def __init__(self, cfg: DataPlaneConfig, *, bootstrap: bool = True) -> None:
438381
# be a no-op and the actor would announce the driver's
439382
# IP — peers fail with "connection refused".
440383
os.environ["MC_TCP_BIND_ADDRESS"] = local_ip
441-
# Disable LOCAL_MEMCPY fast-path: with multiple Ray actors on
442-
# the same host (driver + 8 policy workers + rollout actor),
443-
# mooncake's isLocalTransfer() incorrectly compares IP-only
444-
# and reinterpret_casts another process's virtual address,
445-
# segfaulting MemcpyWorkerPool. See kvcache-ai/Mooncake#1986
446-
# (PR #1995 is the upstream fix; not yet in our wheel).
447384
os.environ.setdefault("MC_STORE_MEMCPY", "0")
448385
from nemo_rl.data_plane.codec import set_kv_promote_1d
449386
set_kv_promote_1d(True)

nemo_rl/data_plane/codec.py

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -77,43 +77,24 @@ def to_nested_by_length(
7777
return torch.nested.as_nested_tensor(rows, layout=torch.jagged)
7878

7979

80-
# Wire-format kill-switch: backends that can't carry torch.nested tensors
81-
# (e.g. mooncake_cpu, whose C++ MemcpyWorkerPool segfaults on jagged
82-
# pointer arithmetic) flip this to False at adapter init, forcing the
83-
# writer back to padded. Default is jagged (the bandwidth win on simple).
84-
_PACK_JAGGED = True
85-
86-
# 1D field round-trip kill-switch: TQ's KVStorageManager path silently
87-
# unsqueezes 1D fields in metadata while row-iterating them in data
88-
# (transfer_queue/metadata.py:171 vs storage/managers/base.py:_generate_values).
89-
# Backends that go through that path (mooncake_cpu) need the writer to
90-
# unsqueeze 1D fields to (N, 1) so per-row tensors match the metadata
91-
# shape; the reader then squeezes the trailing 1 back. Independent of
92-
# wire-format encoding (jagged vs padded). Default off — only the
93-
# affected adapter flips it.
80+
# 1D field round-trip kill-switch for the KV-path. TQ's
81+
# KVStorageManager silently unsqueezes 1D fields in metadata while
82+
# row-iterating them in data (transfer_queue/metadata.py:171 vs
83+
# storage/managers/base.py:_generate_values). Backends that go through
84+
# that path (mooncake_cpu) need the writer to unsqueeze 1D fields to
85+
# (N, 1) so per-row tensors match the metadata shape; the reader then
86+
# squeezes the trailing 1 back. Default off — only the affected
87+
# adapter flips it.
9488
_KV_PROMOTE_1D = False
9589

9690

97-
def set_wire_format(jagged: bool) -> None:
98-
"""Adapter hook: set whether writers should pack to nested tensors.
99-
100-
Called once by the TQ adapter at init time based on
101-
``data_plane.backend``. Mooncake_cpu sets this to ``False`` so all
102-
writes stay rectangular (the bench validated mooncake against
103-
padded tensors only). Simple backend stays jagged for the
104-
bandwidth/memory win.
105-
"""
106-
global _PACK_JAGGED
107-
_PACK_JAGGED = bool(jagged)
108-
109-
11091
def set_kv_promote_1d(enabled: bool) -> None:
11192
"""Adapter hook: when True, writer unsqueezes 1D bulk fields to
11293
(N, 1) and reader squeezes the trailing 1 in :func:`materialize`.
11394
11495
Required by backends that go through TQ's KVStorageManager path
11596
(mooncake_cpu) — see ``_KV_PROMOTE_1D`` above for the schema/data
116-
mismatch. Independent of jagged-vs-padded wire encoding.
97+
mismatch.
11798
"""
11899
global _KV_PROMOTE_1D
119100
_KV_PROMOTE_1D = bool(enabled)
@@ -132,13 +113,7 @@ def maybe_pack_jagged(
132113
land in TQ as jagged with the same row lengths — read-time
133114
materialization then pads them all to the same target shape,
134115
avoiding shape-mismatch crashes between mixed wire formats.
135-
136-
No-op when :func:`set_wire_format` has been called with
137-
``jagged=False`` — used by the mooncake_cpu adapter to stay on the
138-
padded path that backend's C++ memcpy worker actually supports.
139116
"""
140-
if not _PACK_JAGGED:
141-
return val.detach().contiguous()
142117
n = lengths.shape[0]
143118
if n == 0:
144119
return val.detach().contiguous()
@@ -164,8 +139,6 @@ def pack_per_token_field(val: torch.Tensor, lengths: torch.Tensor) -> torch.Tens
164139
Falls back to rectangular when ``val`` cannot be jaggedized
165140
(wrong batch dim, < 2D, or seq dim shorter than ``max(lengths)``).
166141
"""
167-
if not _PACK_JAGGED:
168-
return val.detach().contiguous()
169142
n = lengths.shape[0]
170143
if n == 0:
171144
return val.detach().contiguous()

0 commit comments

Comments
 (0)