diff --git a/transfer_queue/config.yaml b/transfer_queue/config.yaml index 433c026d..d5a3bac6 100644 --- a/transfer_queue/config.yaml +++ b/transfer_queue/config.yaml @@ -48,7 +48,10 @@ backend: # For Yuanrong: Yuanrong: - # Port of local yuanrong datasystem worker + # Whether to let TQ automatically start etcd and datasystem services + auto_init: True + # etcd service address (used to start etcd when auto_init=true) + etcd_address: "127.0.0.1:2379" + # datasystem worker host and port (used to start dscli when auto_init=true) + host: "127.0.0.1" port: 31501 - # If enable npu transport - enable_yr_npu_transport: false diff --git a/transfer_queue/interface.py b/transfer_queue/interface.py index 7e1e23a5..347bd764 100644 --- a/transfer_queue/interface.py +++ b/transfer_queue/interface.py @@ -16,7 +16,10 @@ import logging import math import os +import shutil +import socket import subprocess +import tempfile import time from importlib import resources from typing import Any, Optional @@ -103,11 +106,11 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: check = subprocess.run(["pgrep", "-f", "mooncake_master"], stdout=subprocess.PIPE, text=True) if check.returncode == 0: pids = check.stdout.strip().replace("\n", ", ") - logging.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...") + logger.info(f"Find existing mooncake_master (PID: {pids}), try to kill first...") result = os.system('pkill -f "[m]ooncake_master"') if result == 0: - logging.info("Successfully killed existing mooncake_master processes.") + logger.info("Successfully killed existing mooncake_master processes.") else: raise RuntimeError(f"Failed to kill existing mooncake_master processes (exit code: {result}).") @@ -185,6 +188,129 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig: f"Output:\n{error_msg}" ) _TRANSFER_QUEUE_STORAGE["MooncakeStore"] = process + if conf.backend.storage_backend == "Yuanrong": + if conf.backend.Yuanrong.auto_init: + etcd_process = None + etcd_data_dir = None + worker_address = None + if not shutil.which("etcd"): + raise RuntimeError( + "etcd executable not found in PATH. Please install etcd and make sure it's in the PATH." + ) + if not shutil.which("dscli"): + raise RuntimeError( + "dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`." + ) + try: + # ========== Start etcd ========== + etcd_address = "127.0.0.1:2379" + try: + etcd_address = conf.backend.Yuanrong.etcd_address + except Exception: + pass + + # Assume host:port format + parts = etcd_address.split(":") + if len(parts) != 2: + raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port") + host = parts[0] + port = int(parts[1]) + + # Create temporary data directory + etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_") + logger.info(f"Starting etcd with data directory: {etcd_data_dir}") + + cmd = [ + "etcd", + f"--data-dir={etcd_data_dir}", + f"--listen-client-urls=http://{host}:{port}", + f"--advertise-client-urls=http://{host}:{port}", + ] + + etcd_process = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + text=True, + bufsize=1, + universal_newlines=True, + start_new_session=True, + ) + time.sleep(3) # Wait for etcd to start + + if etcd_process.poll() is None: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(2) + result = sock.connect_ex((host, port)) + sock.close() + if result != 0: + raise RuntimeError(f"etcd process started but not listening on {host}:{port}") + else: + raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}") + + logger.info(f"etcd started, PID: {etcd_process.pid}") + time.sleep(2) + + # ========== Start datasystem worker ========== + # Assume host:port format + worker_host = conf.backend.Yuanrong.host + worker_port = conf.backend.Yuanrong.port + worker_address = worker_host + ":" + str(worker_port) + + cmd = [ + "dscli", + "start", + "-w", + "--worker_address", + worker_address, + "--etcd_address", + etcd_address, + ] + + try: + ds_result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + timeout=90, + ) + except subprocess.TimeoutExpired as err: + raise RuntimeError(f"dscli start timed out: {err}") from err + # Wait for dscli to start and exit (it starts worker and exits) + if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout: + logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.") + + else: + raise RuntimeError( + f"Failed to start datasystem worker at {worker_address}. " + f"Return code: {ds_result.returncode}, Output: {ds_result.stdout}" + ) + + # Store processes and data directory + _TRANSFER_QUEUE_STORAGE["Yuanrong"] = { + "etcd": etcd_process, + "etcd_data_dir": etcd_data_dir, + "worker_address": worker_address, + "etcd_address": etcd_address, + } + logger.info("Yuanrong backend (etcd + datasystem) started successfully.") + + except Exception as e: + # Clean up on failure + if etcd_process is not None and etcd_process.poll() is None: + etcd_process.terminate() + try: + etcd_process.wait(timeout=5) + except subprocess.TimeoutExpired: + etcd_process.kill() + etcd_process.wait() + if etcd_data_dir is not None: + try: + shutil.rmtree(etcd_data_dir, ignore_errors=True) + except Exception: + pass + raise RuntimeError(f"Failed to start Yuanrong backend: {e}") from e return conf @@ -346,6 +472,52 @@ def close(): logger.info("Successfully removed all existing keys in mooncake_master.") except Exception: pass + elif key == "Yuanrong": + # Stop etcd process and clean up data directory, stop datasystem worker via dscli + if isinstance(value, dict): + etcd_process = value.get("etcd") + etcd_data_dir = value.get("etcd_data_dir") + worker_address = value.get("worker_address") + + # Stop etcd if running + if etcd_process is not None and etcd_process.poll() is None: + etcd_process.terminate() + try: + etcd_process.wait(timeout=5) + except subprocess.TimeoutExpired: + etcd_process.kill() + etcd_process.wait() + + # Clean up etcd data directory + if etcd_data_dir is not None and os.path.exists(etcd_data_dir): + try: + shutil.rmtree(etcd_data_dir, ignore_errors=True) + logger.info(f"Cleaned up etcd data directory: {etcd_data_dir}") + except Exception as e: + logger.warning(f"Failed to clean up etcd data directory {etcd_data_dir}: {e}") + + # Stop datasystem worker via dscli command + if worker_address: + try: + result = subprocess.run( + ["dscli", "stop", "--worker_address", worker_address], + timeout=90, + capture_output=True, + ) + if result.returncode == 0: + logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop") + else: + error_msg = (result.stderr or result.stdout or b"").decode() + logger.warning( + f"Failed to stop datasystem worker at {worker_address}. " + f"Return code: {result.returncode}, Error: {error_msg}" + ) + except subprocess.TimeoutExpired as err: + logger.warning(f"dscli stop timed out for {worker_address}: {err}") + except Exception as e: + logger.warning(f"Failed to stop datasystem worker via dscli: {e}") + else: + logger.warning(f"Unexpected Yuanrong storage value: {value}") else: logger.warning(f"close for _TRANSFER_QUEUE_STORAGE with key {key} is not supported for now.") diff --git a/transfer_queue/storage/clients/yuanrong_client.py b/transfer_queue/storage/clients/yuanrong_client.py index 77a981eb..3b0de3ab 100644 --- a/transfer_queue/storage/clients/yuanrong_client.py +++ b/transfer_queue/storage/clients/yuanrong_client.py @@ -297,7 +297,7 @@ class GeneralKVClientAdapter(StorageStrategy): The serialization method uses '_decoder' and '_encoder' from 'transfer_queue.utils.serial_utils'. """ - PUT_KEYS_LIMIT: int = 2_000 + PUT_KEYS_LIMIT: int = 10_000 GET_CLEAR_KEYS_LIMIT: int = 10_000 # Header: number of entries (uint32, little-endian)