Skip to content

Commit 583171a

Browse files
committed
[feat] Support auto init YR backend based on metastore
Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
1 parent 0542a8c commit 583171a

12 files changed

Lines changed: 583 additions & 296 deletions

.github/workflows/run-tests.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,8 @@ jobs:
4242
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_e2e_lifecycle_consistency.py
4343
pkill -f "[m]ooncake_master" || true
4444
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_kv_interface_e2e.py
45-
pkill -f "[m]ooncake_master" || true
45+
pkill -f "[m]ooncake_master" || true
46+
- name: Run Yuanrong Backend Specific E2E Tests
47+
run: |
48+
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_e2e_lifecycle_consistency.py
49+
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_kv_interface_e2e.py

tests/e2e/test_e2e_lifecycle_consistency.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@
8080
"backend": {
8181
"storage_backend": "Yuanrong",
8282
"Yuanrong": {
83-
"host": "127.0.0.1",
84-
"port": 31501,
83+
"worker_port": 31501,
84+
"metastore_port": 2379,
8585
},
8686
},
8787
},
@@ -102,11 +102,12 @@ def backend_name():
102102
"""Get the backend name from environment variable.
103103
104104
Environment variables:
105-
TQ_TEST_BACKEND: Backend name (SimpleStorage or MooncakeStore)
105+
TQ_TEST_BACKEND: Backend name (SimpleStorage, MooncakeStore, or Yuanrong)
106106
107107
To run tests for a specific backend:
108108
TQ_TEST_BACKEND=SimpleStorage pytest tests/e2e/test_e2e_lifecycle_consistency.py
109109
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_e2e_lifecycle_consistency.py
110+
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_e2e_lifecycle_consistency.py
110111
"""
111112
return os.environ.get("TQ_TEST_BACKEND", "SimpleStorage")
112113

tests/e2e/test_kv_interface_e2e.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,18 @@ def tq_api(request):
9494
},
9595
},
9696
},
97+
"Yuanrong": {
98+
"controller": {
99+
"polling_mode": True,
100+
},
101+
"backend": {
102+
"storage_backend": "Yuanrong",
103+
"Yuanrong": {
104+
"worker_port": 31501,
105+
"metastore_port": 2379,
106+
},
107+
},
108+
},
97109
}
98110

99111

@@ -112,11 +124,12 @@ def backend_name():
112124
"""Get the backend name from environment variable.
113125
114126
Environment variables:
115-
TQ_TEST_BACKEND: Backend name (SimpleStorage or MooncakeStore)
127+
TQ_TEST_BACKEND: Backend name (SimpleStorage, MooncakeStore, or Yuanrong)
116128
117129
To run tests for a specific backend:
118130
TQ_TEST_BACKEND=SimpleStorage pytest tests/e2e/test_kv_interface_e2e.py
119131
TQ_TEST_BACKEND=MooncakeStore pytest tests/e2e/test_kv_interface_e2e.py
132+
TQ_TEST_BACKEND=Yuanrong pytest tests/e2e/test_kv_interface_e2e.py
120133
"""
121134
return os.environ.get("TQ_TEST_BACKEND", "SimpleStorage")
122135

tests/test_kv_storage_manager.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ def test_data():
5757
cfg = {
5858
"controller_info": MagicMock(),
5959
"client_name": "YuanrongStorageClient",
60-
"host": "127.0.0.1",
61-
"port": 31501,
60+
"worker_port": 31501,
6261
"device_id": 0,
6362
}
6463
global_indexes = [8, 9, 10]

tests/test_storage_client_factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
class Test(unittest.TestCase):
2727
def setUp(self):
28-
self.cfg = {"host": "127.0.0.1", "port": 31501, "device_id": 0}
28+
self.cfg = {"worker_port": 31501, "device_id": 0}
2929

3030
@pytest.mark.skipif(find_spec("datasystem") is None, reason="datasystem is not available")
3131
def test_create_client(self):

tests/test_yuanrong_client_zero_copy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def mock_kv_client(self, mocker):
4646

4747
@pytest.fixture
4848
def storage_client(self, mock_kv_client):
49-
return GeneralKVClientAdapter({"host": "127.0.0.1", "port": 31501})
49+
return GeneralKVClientAdapter({"worker_port": 31501})
5050

5151
def test_mset_mget_p2p(self, storage_client, mocker):
5252
# Mock serialization/deserialization

tests/test_yuanrong_storage_client_e2e.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def mock_find_reachable_host(port, timeout=1.0):
125125

126126
@pytest.fixture
127127
def config():
128-
return {"host": "127.0.0.1", "port": 12345, "enable_yr_npu_optimization": True}
128+
return {"worker_port": 12345, "enable_yr_npu_optimization": True}
129129

130130

131131
def assert_tensors_equal(a: torch.Tensor, b: torch.Tensor):

transfer_queue/config.yaml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ backend:
4848

4949
# For Yuanrong:
5050
Yuanrong:
51-
# Whether to let TQ automatically start etcd and datasystem services
51+
# Whether to let TQ automatically init yuanrong
5252
auto_init: True
53-
# etcd service address (used to start etcd when auto_init=true)
54-
etcd_address: "127.0.0.1:2379"
55-
# datasystem worker host and port (used to start dscli when auto_init=true)
56-
host: "127.0.0.1"
57-
port: 31501
53+
# Datasystem worker port
54+
worker_port: 31501
55+
# Metastore service port
56+
metastore_port: 2379
57+
# Additional config for yuanrong worker (e.g., "--shared_memory_size_mb 16384 --enable_huge_tlb true")
58+
worker_args: ""

transfer_queue/interface.py

Lines changed: 8 additions & 171 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
import logging
1717
import math
1818
import os
19-
import shutil
20-
import socket
2119
import subprocess
22-
import tempfile
2320
import time
2421
from importlib import resources
2522
from typing import Any, Optional
@@ -38,6 +35,10 @@
3835
from transfer_queue.sampler import BaseSampler
3936
from transfer_queue.storage.simple_backend import SimpleStorageUnit
4037
from transfer_queue.utils.common import get_placement_group
38+
from transfer_queue.utils.yuanrong_utils import (
39+
cleanup_yuanrong_resources,
40+
initialize_yuanrong_backend,
41+
)
4142
from transfer_queue.utils.zmq_utils import process_zmq_server_info
4243

4344
logger = logging.getLogger(__name__)
@@ -187,129 +188,8 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
187188
f"Output:\n{error_msg}"
188189
)
189190
_TRANSFER_QUEUE_STORAGE["MooncakeStore"] = process
190-
if conf.backend.storage_backend == "Yuanrong":
191-
if conf.backend.Yuanrong.auto_init:
192-
etcd_process = None
193-
etcd_data_dir = None
194-
worker_address = None
195-
if not shutil.which("etcd"):
196-
raise RuntimeError(
197-
"etcd executable not found in PATH. Please install etcd and make sure it's in the PATH."
198-
)
199-
if not shutil.which("dscli"):
200-
raise RuntimeError(
201-
"dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`."
202-
)
203-
try:
204-
# ========== Start etcd ==========
205-
etcd_address = "127.0.0.1:2379"
206-
try:
207-
etcd_address = conf.backend.Yuanrong.etcd_address
208-
except Exception:
209-
pass
210-
211-
# Assume host:port format
212-
parts = etcd_address.split(":")
213-
if len(parts) != 2:
214-
raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port")
215-
host = parts[0]
216-
port = int(parts[1])
217-
218-
# Create temporary data directory
219-
etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_")
220-
logger.info(f"Starting etcd with data directory: {etcd_data_dir}")
221-
222-
cmd = [
223-
"etcd",
224-
f"--data-dir={etcd_data_dir}",
225-
f"--listen-client-urls=http://{host}:{port}",
226-
f"--advertise-client-urls=http://{host}:{port}",
227-
]
228-
229-
etcd_process = subprocess.Popen(
230-
cmd,
231-
stdout=subprocess.DEVNULL,
232-
stderr=subprocess.DEVNULL,
233-
text=True,
234-
bufsize=1,
235-
universal_newlines=True,
236-
start_new_session=True,
237-
)
238-
time.sleep(3) # Wait for etcd to start
239-
240-
if etcd_process.poll() is None:
241-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242-
sock.settimeout(2)
243-
result = sock.connect_ex((host, port))
244-
sock.close()
245-
if result != 0:
246-
raise RuntimeError(f"etcd process started but not listening on {host}:{port}")
247-
else:
248-
raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}")
249-
250-
logger.info(f"etcd started, PID: {etcd_process.pid}")
251-
time.sleep(2)
252-
253-
# ========== Start datasystem worker ==========
254-
# Assume host:port format
255-
worker_host = conf.backend.Yuanrong.host
256-
worker_port = conf.backend.Yuanrong.port
257-
worker_address = worker_host + ":" + str(worker_port)
258-
259-
cmd = [
260-
"dscli",
261-
"start",
262-
"-w",
263-
"--worker_address",
264-
worker_address,
265-
"--etcd_address",
266-
etcd_address,
267-
]
268-
269-
try:
270-
ds_result = subprocess.run(
271-
cmd,
272-
stdout=subprocess.PIPE,
273-
stderr=subprocess.STDOUT,
274-
text=True,
275-
timeout=90,
276-
)
277-
except subprocess.TimeoutExpired as err:
278-
raise RuntimeError(f"dscli start timed out: {err}") from err
279-
# Wait for dscli to start and exit (it starts worker and exits)
280-
if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout:
281-
logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.")
282-
283-
else:
284-
raise RuntimeError(
285-
f"Failed to start datasystem worker at {worker_address}. "
286-
f"Return code: {ds_result.returncode}, Output: {ds_result.stdout}"
287-
)
288-
289-
# Store processes and data directory
290-
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {
291-
"etcd": etcd_process,
292-
"etcd_data_dir": etcd_data_dir,
293-
"worker_address": worker_address,
294-
"etcd_address": etcd_address,
295-
}
296-
logger.info("Yuanrong backend (etcd + datasystem) started successfully.")
297-
298-
except Exception as e:
299-
# Clean up on failure
300-
if etcd_process is not None and etcd_process.poll() is None:
301-
etcd_process.terminate()
302-
try:
303-
etcd_process.wait(timeout=5)
304-
except subprocess.TimeoutExpired:
305-
etcd_process.kill()
306-
etcd_process.wait()
307-
if etcd_data_dir is not None:
308-
try:
309-
shutil.rmtree(etcd_data_dir, ignore_errors=True)
310-
except Exception:
311-
pass
312-
raise RuntimeError(f"Failed to start Yuanrong backend: {e}") from e
191+
if conf.backend.storage_backend == "Yuanrong" and conf.backend.Yuanrong.auto_init:
192+
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = initialize_yuanrong_backend(conf)
313193
return conf
314194

315195

@@ -335,6 +215,7 @@ def _init_from_existing() -> bool:
335215
conf = ray.get(_TRANSFER_QUEUE_CONTROLLER.get_config.remote())
336216
if conf is not None:
337217
_maybe_create_transferqueue_client(conf)
218+
338219
logger.info("TransferQueueClient initialized.")
339220
return True
340221

@@ -475,51 +356,7 @@ def close():
475356
except Exception:
476357
pass
477358
elif key == "Yuanrong":
478-
# Stop etcd process and clean up data directory, stop datasystem worker via dscli
479-
if isinstance(value, dict):
480-
etcd_process = value.get("etcd")
481-
etcd_data_dir = value.get("etcd_data_dir")
482-
worker_address = value.get("worker_address")
483-
484-
# Stop etcd if running
485-
if etcd_process is not None and etcd_process.poll() is None:
486-
etcd_process.terminate()
487-
try:
488-
etcd_process.wait(timeout=5)
489-
except subprocess.TimeoutExpired:
490-
etcd_process.kill()
491-
etcd_process.wait()
492-
493-
# Clean up etcd data directory
494-
if etcd_data_dir is not None and os.path.exists(etcd_data_dir):
495-
try:
496-
shutil.rmtree(etcd_data_dir, ignore_errors=True)
497-
logger.info(f"Cleaned up etcd data directory: {etcd_data_dir}")
498-
except Exception as e:
499-
logger.warning(f"Failed to clean up etcd data directory {etcd_data_dir}: {e}")
500-
501-
# Stop datasystem worker via dscli command
502-
if worker_address:
503-
try:
504-
result = subprocess.run(
505-
["dscli", "stop", "--worker_address", worker_address],
506-
timeout=90,
507-
capture_output=True,
508-
)
509-
if result.returncode == 0:
510-
logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop")
511-
else:
512-
error_msg = (result.stderr or result.stdout or b"").decode()
513-
logger.warning(
514-
f"Failed to stop datasystem worker at {worker_address}. "
515-
f"Return code: {result.returncode}, Error: {error_msg}"
516-
)
517-
except subprocess.TimeoutExpired as err:
518-
logger.warning(f"dscli stop timed out for {worker_address}: {err}")
519-
except Exception as e:
520-
logger.warning(f"Failed to stop datasystem worker via dscli: {e}")
521-
else:
522-
logger.warning(f"Unexpected Yuanrong storage value: {value}")
359+
cleanup_yuanrong_resources(value)
523360
else:
524361
logger.warning(f"close for _TRANSFER_QUEUE_STORAGE with key {key} is not supported for now.")
525362

0 commit comments

Comments
 (0)