Skip to content

Commit 89332c6

Browse files
committed
[feat] Support auto init YR backend based on metastore
Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
1 parent e04cc05 commit 89332c6

3 files changed

Lines changed: 388 additions & 159 deletions

File tree

transfer_queue/config.yaml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,14 @@ backend:
5050
Yuanrong:
5151
# Whether to let TQ automatically start etcd and datasystem services
5252
auto_init: True
53+
# ========== Etcd mode config ==========
5354
# etcd service address (used to start etcd when auto_init=true)
5455
etcd_address: "127.0.0.1:2379"
55-
# datasystem worker host and port (used to start dscli when auto_init=true)
56-
host: "127.0.0.1"
56+
# ========== Metastore mode config ==========
57+
# Whether to use metastore mode instead of etcd
58+
metastore_mode: False
59+
# For auto_init=True, metastore_address is auto-generated on head node
60+
# For auto_init=False, specify the external metastore address here
61+
metastore_address: ""
62+
# Datasystem worker port (host is auto-detected)
5763
port: 31501

transfer_queue/interface.py

Lines changed: 146 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
import math
1818
import os
1919
import shutil
20-
import socket
2120
import subprocess
22-
import tempfile
2321
import time
2422
from importlib import resources
2523
from typing import Any, Optional
@@ -36,8 +34,15 @@
3634
from transfer_queue.metadata import KVBatchMeta
3735
from transfer_queue.sampler import * # noqa: F401
3836
from transfer_queue.sampler import BaseSampler
37+
from transfer_queue.storage.clients.yuanrong_client import get_local_ip_addresses
3938
from transfer_queue.storage.simple_backend import SimpleStorageUnit
4039
from transfer_queue.utils.common import get_placement_group
40+
from transfer_queue.utils.yuanrong_utils import (
41+
start_datasystem_worker,
42+
start_etcd,
43+
stop_datasystem_worker,
44+
stop_etcd_process,
45+
)
4146
from transfer_queue.utils.zmq_utils import process_zmq_server_info
4247

4348
logger = logging.getLogger(__name__)
@@ -189,126 +194,92 @@ def _maybe_create_transferqueue_storage(conf: DictConfig) -> DictConfig:
189194
_TRANSFER_QUEUE_STORAGE["MooncakeStore"] = process
190195
if conf.backend.storage_backend == "Yuanrong":
191196
if conf.backend.Yuanrong.auto_init:
192-
etcd_process = None
193-
etcd_data_dir = None
194-
worker_address = None
195-
if not shutil.which("etcd"):
196-
raise RuntimeError(
197-
"etcd executable not found in PATH. Please install etcd and make sure it's in the PATH."
198-
)
199-
if not shutil.which("dscli"):
200-
raise RuntimeError(
201-
"dscli executable not found in PATH. Please run `pip install openyuanrong-datasystem`."
202-
)
203197
try:
204-
# ========== Start etcd ==========
205-
etcd_address = "127.0.0.1:2379"
206-
try:
207-
etcd_address = conf.backend.Yuanrong.etcd_address
208-
except Exception:
209-
pass
210-
211-
# Assume host:port format
212-
parts = etcd_address.split(":")
213-
if len(parts) != 2:
214-
raise ValueError(f"Invalid etcd_address format: {etcd_address}. Expected host:port")
215-
host = parts[0]
216-
port = int(parts[1])
217-
218-
# Create temporary data directory
219-
etcd_data_dir = tempfile.mkdtemp(prefix="tq_etcd_")
220-
logger.info(f"Starting etcd with data directory: {etcd_data_dir}")
221-
222-
cmd = [
223-
"etcd",
224-
f"--data-dir={etcd_data_dir}",
225-
f"--listen-client-urls=http://{host}:{port}",
226-
f"--advertise-client-urls=http://{host}:{port}",
227-
]
228-
229-
etcd_process = subprocess.Popen(
230-
cmd,
231-
stdout=subprocess.DEVNULL,
232-
stderr=subprocess.DEVNULL,
233-
text=True,
234-
bufsize=1,
235-
universal_newlines=True,
236-
start_new_session=True,
237-
)
238-
time.sleep(3) # Wait for etcd to start
239-
240-
if etcd_process.poll() is None:
241-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
242-
sock.settimeout(2)
243-
result = sock.connect_ex((host, port))
244-
sock.close()
245-
if result != 0:
246-
raise RuntimeError(f"etcd process started but not listening on {host}:{port}")
198+
if conf.backend.Yuanrong.metastore_mode:
199+
# ========== Metastore Mode ==========
200+
if not shutil.which("dscli"):
201+
raise RuntimeError("dscli executable not found. Please install openyuanrong-datasystem.")
202+
203+
# Auto-detect addresses using local IP
204+
local_ips = get_local_ip_addresses()
205+
# Skip loopback addresses for metastore mode (multi-node scenario)
206+
network_ips = [ip for ip in local_ips if not ip.startswith("127.")]
207+
if not network_ips:
208+
raise RuntimeError(
209+
"No non-loopback IP addresses found for metastore mode. "
210+
"Each node must have a network IP address reachable by other nodes. "
211+
"Please check your network configuration."
212+
)
213+
worker_host = network_ips[0]
214+
worker_port = conf.backend.Yuanrong.port
215+
worker_address = f"{worker_host}:{worker_port}"
216+
217+
# Use user-specified metastore_address if provided, otherwise auto-generate
218+
if conf.backend.Yuanrong.metastore_address:
219+
metastore_address = conf.backend.Yuanrong.metastore_address
220+
logger.info(f"Using user-specified metastore_address: {metastore_address}")
221+
else:
222+
metastore_address = f"{worker_host}:2379"
223+
logger.info(f"Auto-generated metastore_address: {metastore_address}")
224+
225+
# Start datasystem node as head (first node to call init)
226+
start_datasystem_worker(worker_address, metastore_address=metastore_address, is_head=True)
227+
228+
# Update config with metastore_address for worker nodes to discover
229+
conf.backend.Yuanrong.metastore_address = metastore_address
230+
ray.get(_TRANSFER_QUEUE_CONTROLLER.store_config.remote(conf))
231+
232+
# Store metastore mode information
233+
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {
234+
"worker_address": worker_address,
235+
"metastore_address": metastore_address,
236+
"is_head_node": True,
237+
"metastore_mode": True,
238+
}
239+
logger.info("Yuanrong backend (metastore mode, head node) started successfully.")
247240
else:
248-
raise RuntimeError(f"etcd exited immediately with return code {etcd_process.returncode}")
249-
250-
logger.info(f"etcd started, PID: {etcd_process.pid}")
251-
time.sleep(2)
252-
253-
# ========== Start datasystem worker ==========
254-
# Assume host:port format
255-
worker_host = conf.backend.Yuanrong.host
256-
worker_port = conf.backend.Yuanrong.port
257-
worker_address = worker_host + ":" + str(worker_port)
258-
259-
cmd = [
260-
"dscli",
261-
"start",
262-
"-w",
263-
"--worker_address",
264-
worker_address,
265-
"--etcd_address",
266-
etcd_address,
267-
]
268-
269-
try:
270-
ds_result = subprocess.run(
271-
cmd,
272-
stdout=subprocess.PIPE,
273-
stderr=subprocess.STDOUT,
274-
text=True,
275-
timeout=90,
276-
)
277-
except subprocess.TimeoutExpired as err:
278-
raise RuntimeError(f"dscli start timed out: {err}") from err
279-
# Wait for dscli to start and exit (it starts worker and exits)
280-
if ds_result.returncode == 0 and "[ OK ]" in ds_result.stdout:
281-
logger.info(f"dscli started Yuanrong datasystem worker at {worker_address} successfully.")
282-
283-
else:
284-
raise RuntimeError(
285-
f"Failed to start datasystem worker at {worker_address}. "
286-
f"Return code: {ds_result.returncode}, Output: {ds_result.stdout}"
287-
)
288-
289-
# Store processes and data directory
290-
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {
291-
"etcd": etcd_process,
292-
"etcd_data_dir": etcd_data_dir,
293-
"worker_address": worker_address,
294-
"etcd_address": etcd_address,
295-
}
296-
logger.info("Yuanrong backend (etcd + datasystem) started successfully.")
297-
298-
except Exception as e:
299-
# Clean up on failure
300-
if etcd_process is not None and etcd_process.poll() is None:
301-
etcd_process.terminate()
302-
try:
303-
etcd_process.wait(timeout=5)
304-
except subprocess.TimeoutExpired:
305-
etcd_process.kill()
306-
etcd_process.wait()
307-
if etcd_data_dir is not None:
241+
# ========== Etcd Mode (Backward Compatible) ==========
242+
etcd_address = "127.0.0.1:2379"
308243
try:
309-
shutil.rmtree(etcd_data_dir, ignore_errors=True)
244+
etcd_address = conf.backend.Yuanrong.etcd_address
310245
except Exception:
311246
pass
247+
248+
# Start etcd
249+
etcd_process, etcd_data_dir = start_etcd(etcd_address)
250+
251+
# Auto-detect worker host address
252+
local_ips = get_local_ip_addresses()
253+
# Skip loopback addresses (use network IP for multi-node support)
254+
network_ips = [ip for ip in local_ips if not ip.startswith("127.")]
255+
worker_host = network_ips[0] if network_ips else local_ips[0]
256+
worker_port = conf.backend.Yuanrong.port
257+
worker_address = f"{worker_host}:{worker_port}"
258+
259+
start_datasystem_worker(worker_address, etcd_address=etcd_address)
260+
261+
# Store processes and data directory
262+
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {
263+
"etcd": etcd_process,
264+
"etcd_data_dir": etcd_data_dir,
265+
"worker_address": worker_address,
266+
"etcd_address": etcd_address,
267+
"metastore_mode": False,
268+
}
269+
logger.info("Yuanrong backend (etcd mode) started successfully.")
270+
except Exception as e:
271+
# Clean up on failure (etcd mode only)
272+
if "Yuanrong" in _TRANSFER_QUEUE_STORAGE:
273+
value = _TRANSFER_QUEUE_STORAGE["Yuanrong"]
274+
if isinstance(value, dict) and "etcd" in value:
275+
etcd_cleanup_process: Optional[subprocess.Popen] = value.get("etcd")
276+
etcd_cleanup_data_dir: Optional[str] = value.get("etcd_data_dir")
277+
stop_etcd_process(etcd_cleanup_process)
278+
if etcd_cleanup_data_dir is not None:
279+
try:
280+
shutil.rmtree(etcd_cleanup_data_dir, ignore_errors=True)
281+
except Exception:
282+
pass
312283
raise RuntimeError(f"Failed to start Yuanrong backend: {e}") from e
313284
return conf
314285

@@ -335,6 +306,40 @@ def _init_from_existing() -> bool:
335306
conf = ray.get(_TRANSFER_QUEUE_CONTROLLER.get_config.remote())
336307
if conf is not None:
337308
_maybe_create_transferqueue_client(conf)
309+
310+
# Handle metastore mode for worker nodes
311+
if conf.backend.storage_backend == "Yuanrong" and conf.backend.Yuanrong.metastore_mode:
312+
if conf.backend.Yuanrong.auto_init:
313+
# This is a worker node - get metastore address from config
314+
metastore_address = conf.backend.Yuanrong.metastore_address
315+
if not metastore_address:
316+
raise RuntimeError(
317+
"metastore_address not found in config. The head node should have set this value."
318+
)
319+
# Auto-detect worker address and start worker node (is_head=False)
320+
local_ips = get_local_ip_addresses()
321+
# Skip loopback addresses for metastore mode (multi-node scenario)
322+
network_ips = [ip for ip in local_ips if not ip.startswith("127.")]
323+
if not network_ips:
324+
raise RuntimeError(
325+
"No non-loopback IP addresses found for metastore mode. "
326+
"Each node must have a network IP address reachable by other nodes. "
327+
"Please check your network configuration."
328+
)
329+
worker_host = network_ips[0]
330+
worker_port = conf.backend.Yuanrong.port
331+
worker_address = f"{worker_host}:{worker_port}"
332+
start_datasystem_worker(worker_address, metastore_address=metastore_address, is_head=False)
333+
334+
# Store metastore mode information
335+
_TRANSFER_QUEUE_STORAGE["Yuanrong"] = {
336+
"worker_address": worker_address,
337+
"metastore_address": metastore_address,
338+
"is_head_node": False,
339+
"metastore_mode": True,
340+
}
341+
logger.info("Yuanrong backend (metastore mode, worker node) started successfully.")
342+
338343
logger.info("TransferQueueClient initialized.")
339344
return True
340345

@@ -475,49 +480,33 @@ def close():
475480
except Exception:
476481
pass
477482
elif key == "Yuanrong":
478-
# Stop etcd process and clean up data directory, stop datasystem worker via dscli
483+
# Stop datasystem worker and optionally clean up etcd (for etcd mode only)
479484
if isinstance(value, dict):
480-
etcd_process = value.get("etcd")
481-
etcd_data_dir = value.get("etcd_data_dir")
485+
metastore_mode = value.get("metastore_mode", False)
482486
worker_address = value.get("worker_address")
483487

484-
# Stop etcd if running
485-
if etcd_process is not None and etcd_process.poll() is None:
486-
etcd_process.terminate()
487-
try:
488-
etcd_process.wait(timeout=5)
489-
except subprocess.TimeoutExpired:
490-
etcd_process.kill()
491-
etcd_process.wait()
492-
493-
# Clean up etcd data directory
494-
if etcd_data_dir is not None and os.path.exists(etcd_data_dir):
495-
try:
496-
shutil.rmtree(etcd_data_dir, ignore_errors=True)
497-
logger.info(f"Cleaned up etcd data directory: {etcd_data_dir}")
498-
except Exception as e:
499-
logger.warning(f"Failed to clean up etcd data directory {etcd_data_dir}: {e}")
500-
501-
# Stop datasystem worker via dscli command
502-
if worker_address:
503-
try:
504-
result = subprocess.run(
505-
["dscli", "stop", "--worker_address", worker_address],
506-
timeout=90,
507-
capture_output=True,
508-
)
509-
if result.returncode == 0:
510-
logger.info(f"Stopped datasystem worker at {worker_address} via dscli stop")
511-
else:
512-
error_msg = (result.stderr or result.stdout or b"").decode()
513-
logger.warning(
514-
f"Failed to stop datasystem worker at {worker_address}. "
515-
f"Return code: {result.returncode}, Error: {error_msg}"
516-
)
517-
except subprocess.TimeoutExpired as err:
518-
logger.warning(f"dscli stop timed out for {worker_address}: {err}")
519-
except Exception as e:
520-
logger.warning(f"Failed to stop datasystem worker via dscli: {e}")
488+
if metastore_mode:
489+
# ========== Metastore Mode ==========
490+
# Only stop the datasystem worker (no etcd to clean up)
491+
logger.info("Cleaning up Yuanrong backend (metastore mode)...")
492+
else:
493+
# ========== Etcd Mode ==========
494+
# Stop etcd process and clean up data directory
495+
etcd_process = value.get("etcd")
496+
etcd_data_dir = value.get("etcd_data_dir")
497+
498+
stop_etcd_process(etcd_process)
499+
500+
# Clean up etcd data directory
501+
if etcd_data_dir is not None and os.path.exists(etcd_data_dir):
502+
try:
503+
shutil.rmtree(etcd_data_dir, ignore_errors=True)
504+
logger.info(f"Cleaned up etcd data directory: {etcd_data_dir}")
505+
except Exception as e:
506+
logger.warning(f"Failed to clean up etcd data directory {etcd_data_dir}: {e}")
507+
508+
# Stop datasystem worker (common to both modes)
509+
stop_datasystem_worker(worker_address)
521510
else:
522511
logger.warning(f"Unexpected Yuanrong storage value: {value}")
523512
else:

0 commit comments

Comments
 (0)