Skip to content

Commit 244b638

Browse files
Add default set env MC_STORE_MEMCPY is 0 when protocol is tcp (#104)
Signed-off-by: rongfu.leng <lenronfu@gmail.com> Signed-off-by: Yubo Wang <yubowang2019@gmail.com> Co-authored-by: Yubo Wang <yubowang2019@gmail.com>
1 parent 8a3c3b4 commit 244b638

5 files changed

Lines changed: 62 additions & 4 deletions

File tree

tests/test_mooncake_force_delete.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,56 @@ def test_enable_hard_pin_default_off(self):
6868
assert restored.enable_hard_pin is False
6969

7070

71+
class TestMooncakeEnvDefaults:
72+
def test_tcp_memcpy_default_is_applied_by_export_env(self):
73+
config = MooncakeConfig(protocol="tcp")
74+
75+
with patch.dict(os.environ, {}, clear=True):
76+
config.export_env()
77+
78+
assert os.environ["MC_STORE_MEMCPY"] == "0"
79+
80+
def test_tcp_memcpy_default_preserves_user_override(self):
81+
config = MooncakeConfig(protocol="tcp")
82+
83+
with patch.dict(os.environ, {"MC_STORE_MEMCPY": "1"}, clear=True):
84+
config.apply_env_defaults()
85+
86+
assert os.environ["MC_STORE_MEMCPY"] == "1"
87+
88+
def test_tcp_memcpy_default_not_applied_for_rdma(self):
89+
config = MooncakeConfig(protocol="rdma")
90+
91+
with patch.dict(os.environ, {}, clear=True):
92+
config.apply_env_defaults()
93+
94+
assert "MC_STORE_MEMCPY" not in os.environ
95+
96+
def test_direct_store_setup_applies_tcp_memcpy_before_mooncake_client_setup(self):
97+
config = MooncakeConfig(protocol="tcp", async_put_pool_size=0)
98+
mock_raw_store = MagicMock()
99+
mock_raw_store.setup.return_value = 0
100+
101+
class ConcreteStore(MooncakeHiddenStateStore):
102+
pass
103+
104+
def make_raw_store():
105+
assert os.environ["MC_STORE_MEMCPY"] == "0"
106+
return mock_raw_store
107+
108+
store = ConcreteStore(config)
109+
with (
110+
patch.dict(os.environ, {}, clear=True),
111+
patch("torchspec.transfer.mooncake.store.MooncakeDistributedStore", make_raw_store),
112+
patch.object(ConcreteStore, "_verify_force_delete"),
113+
patch.object(ConcreteStore, "_build_replicate_config"),
114+
patch("torch.cuda.is_available", return_value=False),
115+
):
116+
store.setup()
117+
118+
mock_raw_store.setup.assert_called_once()
119+
120+
71121
# ---------------------------------------------------------------------------
72122
# Tests 2-3: _verify_force_delete
73123
# ---------------------------------------------------------------------------

tests/test_placement_group.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1-
from argparse import Namespace
21
import importlib.util
32
import sys
43
import types
4+
from argparse import Namespace
55
from pathlib import Path
66
from unittest.mock import MagicMock, patch
77

88
import pytest
99

10-
1110
repo_root = Path(__file__).resolve().parents[1]
1211
torchspec_pkg = sys.modules.get("torchspec")
1312
if torchspec_pkg is None and importlib.util.find_spec("torch") is None:
@@ -67,8 +66,8 @@ def __init__(self, **kwargs):
6766
sys.modules["torchspec.ray.train_group"] = train_group_stub
6867

6968
from torchspec.ray.placement_group import ( # noqa: E402
70-
_NodeConstraint,
7169
_build_custom_bundles,
70+
_NodeConstraint,
7271
_sort_probed_bundle_infos,
7372
create_placement_groups,
7473
)

torchspec/config/mooncake_config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ def export_env(self) -> None:
176176
os.environ["MOONCAKE_PROTOCOL"] = self.protocol
177177
os.environ["MOONCAKE_DEVICE_NAME"] = self.device_name
178178
os.environ["MOONCAKE_ENABLE_GPU_DIRECT"] = "1" if self.enable_gpu_direct else "0"
179+
self.apply_env_defaults()
179180
if self.async_put_pool_size is not None:
180181
os.environ["MOONCAKE_ASYNC_PUT_POOL_SIZE"] = str(self.async_put_pool_size)
181182
os.environ["MOONCAKE_STORE_FULL_WAIT_SECONDS"] = str(self.store_full_wait_seconds)
@@ -190,6 +191,14 @@ def export_env(self) -> None:
190191
os.environ["MOONCAKE_GET_RETRY_MAX_WAIT_SECONDS"] = str(self.get_retry_max_wait_seconds)
191192
os.environ["MOONCAKE_ENABLE_HARD_PIN"] = "1" if self.enable_hard_pin else "0"
192193

194+
def apply_env_defaults(self) -> None:
195+
"""Apply Mooncake process defaults that are needed before client setup."""
196+
# Fix: https://github.com/kvcache-ai/Mooncake/issues/1986
197+
if self.protocol.lower() == "tcp" and "MC_STORE_MEMCPY" not in os.environ:
198+
# Mooncake's TCP-only memcpy fast path can segfault in same-host
199+
# multi-process get paths. Preserve an explicit user override.
200+
os.environ["MC_STORE_MEMCPY"] = "0"
201+
193202
@classmethod
194203
def from_env(cls) -> "MooncakeConfig":
195204
"""Create config from environment variables."""

torchspec/ray/placement_group.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
from torchspec.ray.train_group import RayTrainGroup
3535
from torchspec.utils.logging import logger
3636

37-
3837
# Ray exposes a tiny "node:<ip>" resource on each node. Requiring a fractional
3938
# amount pins a bundle to that node without consuming a full logical resource.
4039
_NODE_RESOURCE_EPSILON = 0.001

torchspec/transfer/mooncake/store.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def setup(self, device: torch.device | int | None = None) -> None:
7373
"Set mooncake.device_name to a specific RDMA device (e.g. 'mlx5_0')."
7474
)
7575

76+
self.config.apply_env_defaults()
7677
self._store = MooncakeDistributedStore()
7778
logger.info(
7879
"Connecting to Mooncake master at %s, metadata server at %s",

0 commit comments

Comments
 (0)