Skip to content

Commit 2f24c68

Browse files
authored
Add automatic Yuanrong startup to interface.py (#60)
## Description - Add Yuanrong startup to `init()` - Add Yuanrong shutdown to `close()` --------- Signed-off-by: dpj135 <958208521@qq.com>
1 parent 4e9ae22 commit 2f24c68

3 files changed

Lines changed: 181 additions & 6 deletions

File tree

transfer_queue/config.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ backend:
4848

4949
# For Yuanrong:
5050
Yuanrong:
51-
# Port of local yuanrong datasystem worker
51+
# Whether to let TQ automatically start etcd and datasystem services
52+
auto_init: True
53+
# etcd service address (used to start etcd when auto_init=true)
54+
etcd_address: "127.0.0.1:2379"
55+
# datasystem worker host and port (used to start dscli when auto_init=true)
56+
host: "127.0.0.1"
5257
port: 31501
53-
# If enable npu transport
54-
enable_yr_npu_transport: false

transfer_queue/interface.py

Lines changed: 174 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
import logging
1717
import math
1818
import os
19+
import shutil
20+
import socket
1921
import subprocess
22+
import tempfile
2023
import time
2124
from importlib import resources
2225
from typing import Any, Optional
@@ -102,11 +105,11 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
102105
check = subprocess.run(["pgrep", "-f", "mooncake_master"], stdout=subprocess.PIPE, text=True)
103106
if check.returncode == 0:
104107
pids = check.stdout.strip().replace("\n", ", ")
105-
logging.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...")
108+
logger.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...")
106109

107110
result = os.system('pkill -f "[m]ooncake_master"')
108111
if result == 0:
109-
logging.info("Successfully killed existing mooncake_master processes.")
112+
logger.info("Successfully killed existing mooncake_master processes.")
110113
else:
111114
raise RuntimeError(f"Failed to kill existing mooncake_master processes (exit code: {result}).")
112115

@@ -184,6 +187,129 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
184187
f"Output:\n{error_msg}"
185188
)
186189
_TRANSFER_QUEUE_STORAGE["MooncakeStore"] = process
190+
if conf.backend.storage_backend == "Yuanrong":
191+
if conf.backend.Yuanrong.auto_init:
192+
etcd_process = None
193+
etcd_data_dir = None
194+
worker_address = None
195+
if not shutil.which("etcd"):
196+
raise RuntimeError(
197+
"etcd executable not found in PATH. Please install etcd and make sure it's in the PATH."
198+
)
199+
if not shutil.which("dscli"):
200+
raise RuntimeError(
201+
"dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`."
202+
)
203+
try:
204+
# ========== Start etcd ==========
205+
etcd_address = "127.0.0.1:2379"
206+
try:
207+
etcd_address = conf.backend.Yuanrong.etcd_address
208+
except Exception:
209+
pass
210+
211+
# Assume host:port format
212+
parts = etcd_address.split(":")
213+
if len(parts) != 2:
214+
raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port")
215+
host = parts[0]
216+
port = int(parts[1])
217+
218+
# Create temporary data directory
219+
etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_")
220+
logger.info(f"Starting etcd with data directory: {etcd_data_dir}")
221+
222+
cmd = [
223+
"etcd",
224+
f"--data-dir={etcd_data_dir}",
225+
f"--listen-client-urls=http://{host}:{port}",
226+
f"--advertise-client-urls=http://{host}:{port}",
227+
]
228+
229+
etcd_process = subprocess.Popen(
230+
cmd,
231+
stdout=subprocess.DEVNULL,
232+
stderr=subprocess.DEVNULL,
233+
text=True,
234+
bufsize=1,
235+
universal_newlines=True,
236+
start_new_session=True,
237+
)
238+
time.sleep(3) # Wait for etcd to start
239+
240+
if etcd_process.poll() is None:
241+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242+
sock.settimeout(2)
243+
result = sock.connect_ex((host, port))
244+
sock.close()
245+
if result != 0:
246+
raise RuntimeError(f"etcd process started but not listening on {host}:{port}")
247+
else:
248+
raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}")
249+
250+
logger.info(f"etcd started, PID: {etcd_process.pid}")
251+
time.sleep(2)
252+
253+
# ========== Start datasystem worker ==========
254+
# Assume host:port format
255+
worker_host = conf.backend.Yuanrong.host
256+
worker_port = conf.backend.Yuanrong.port
257+
worker_address = worker_host + ":" + str(worker_port)
258+
259+
cmd = [
260+
"dscli",
261+
"start",
262+
"-w",
263+
"--worker_address",
264+
worker_address,
265+
"--etcd_address",
266+
etcd_address,
267+
]
268+
269+
try:
270+
ds_result = subprocess.run(
271+
cmd,
272+
stdout=subprocess.PIPE,
273+
stderr=subprocess.STDOUT,
274+
text=True,
275+
timeout=90,
276+
)
277+
except subprocess.TimeoutExpired as err:
278+
raise RuntimeError(f"dscli start timed out: {err}") from err
279+
# Wait for dscli to start and exit (it starts worker and exits)
280+
if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout:
281+
logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.")
282+
283+
else:
284+
raise RuntimeError(
285+
f"Failed to start datasystem worker at {worker_address}. "
286+
f"Return code: {ds_result.returncode}, Output: {ds_result.stdout}"
287+
)
288+
289+
# Store processes and data directory
290+
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {
291+
"etcd": etcd_process,
292+
"etcd_data_dir": etcd_data_dir,
293+
"worker_address": worker_address,
294+
"etcd_address": etcd_address,
295+
}
296+
logger.info("Yuanrong backend (etcd + datasystem) started successfully.")
297+
298+
except Exception as e:
299+
# Clean up on failure
300+
if etcd_process is not None and etcd_process.poll() is None:
301+
etcd_process.terminate()
302+
try:
303+
etcd_process.wait(timeout=5)
304+
except subprocess.TimeoutExpired:
305+
etcd_process.kill()
306+
etcd_process.wait()
307+
if etcd_data_dir is not None:
308+
try:
309+
shutil.rmtree(etcd_data_dir, ignore_errors=True)
310+
except Exception:
311+
pass
312+
raise RuntimeError(f"Failed to start Yuanrong backend: {e}") from e
187313
return conf
188314

189315

@@ -345,6 +471,52 @@ def close():
345471
logger.info("Successfully removed all existing keys in mooncake_master.")
346472
except Exception:
347473
pass
474+
elif key == "Yuanrong":
475+
# Stop etcd process and clean up data directory, stop datasystem worker via dscli
476+
if isinstance(value, dict):
477+
etcd_process = value.get("etcd")
478+
etcd_data_dir = value.get("etcd_data_dir")
479+
worker_address = value.get("worker_address")
480+
481+
# Stop etcd if running
482+
if etcd_process is not None and etcd_process.poll() is None:
483+
etcd_process.terminate()
484+
try:
485+
etcd_process.wait(timeout=5)
486+
except subprocess.TimeoutExpired:
487+
etcd_process.kill()
488+
etcd_process.wait()
489+
490+
# Clean up etcd data directory
491+
if etcd_data_dir is not None and os.path.exists(etcd_data_dir):
492+
try:
493+
shutil.rmtree(etcd_data_dir, ignore_errors=True)
494+
logger.info(f"Cleaned up etcd data directory: {etcd_data_dir}")
495+
except Exception as e:
496+
logger.warning(f"Failed to clean up etcd data directory {etcd_data_dir}: {e}")
497+
498+
# Stop datasystem worker via dscli command
499+
if worker_address:
500+
try:
501+
result = subprocess.run(
502+
["dscli", "stop", "--worker_address", worker_address],
503+
timeout=90,
504+
capture_output=True,
505+
)
506+
if result.returncode == 0:
507+
logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop")
508+
else:
509+
error_msg = (result.stderr or result.stdout or b"").decode()
510+
logger.warning(
511+
f"Failed to stop datasystem worker at {worker_address}. "
512+
f"Return code: {result.returncode}, Error: {error_msg}"
513+
)
514+
except subprocess.TimeoutExpired as err:
515+
logger.warning(f"dscli stop timed out for {worker_address}: {err}")
516+
except Exception as e:
517+
logger.warning(f"Failed to stop datasystem worker via dscli: {e}")
518+
else:
519+
logger.warning(f"Unexpected Yuanrong storage value: {value}")
348520
else:
349521
logger.warning(f"close for _TRANSFER_QUEUE_STORAGE with key {key} is not supported for now.")
350522

transfer_queue/storage/clients/yuanrong_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ class GeneralKVClientAdapter(StorageStrategy):
297297
The serialization method uses '_decoder' and '_encoder' from 'transfer_queue.utils.serial_utils'.
298298
"""
299299

300-
PUT_KEYS_LIMIT: int = 2_000
300+
PUT_KEYS_LIMIT: int = 10_000
301301
GET_CLEAR_KEYS_LIMIT: int = 10_000
302302

303303
# Header: number of entries (uint32, little-endian)

0 commit comments

Comments
 (0)