Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
)
from fastdeploy.inter_communicator.fmq import FMQ
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir, get_original_prom_dir
from fastdeploy.model_executor.guided_decoding import schema_checker
from fastdeploy.plugins.token_processor import load_token_processor_plugins
from fastdeploy.spec_decode import SpecMethod
Expand Down Expand Up @@ -2720,6 +2721,7 @@ def launch_components(self):
self.launched_expert_service_signal.value[0] = 1
self.dp_processed = []
self.dp_engine_worker_queue_server = []
base_prom_dir = get_original_prom_dir()
for i in range(
1,
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
Expand Down Expand Up @@ -2758,10 +2760,15 @@ def launch_components(self):
f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}"
+ f" data parallel id {i}"
)
if envs.FD_ENABLE_INTERNAL_ADAPTER:
setup_dp_prometheus_dir(i, base_prom_dir)
self.dp_processed[-1].start()
while self.launched_expert_service_signal.value[i] == 0:
time.sleep(1)

if envs.FD_ENABLE_INTERNAL_ADAPTER:
setup_dp_prometheus_dir(0, base_prom_dir)

def check_worker_initialize_status(self):
"""
Check the initlialize status of workers by stdout logging
Expand Down
7 changes: 7 additions & 0 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
log_request_error,
)
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir, get_original_prom_dir
from fastdeploy.platforms import current_platform
from fastdeploy.utils import EngineError, console_logger, envs, llm_logger

Expand Down Expand Up @@ -860,6 +861,7 @@ def launch_components(self):
self.launched_expert_service_signal.value[0] = 1
self.dp_processed = []
self.dp_engine_worker_queue_server = []
base_prom_dir = get_original_prom_dir()
for i in range(
1,
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
Expand Down Expand Up @@ -897,8 +899,13 @@ def launch_components(self):
f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}"
+ f" data parallel id {i}"
)
if envs.FD_ENABLE_INTERNAL_ADAPTER:
setup_dp_prometheus_dir(i, base_prom_dir)
self.dp_processed[-1].start()

if envs.FD_ENABLE_INTERNAL_ADAPTER:
setup_dp_prometheus_dir(0, base_prom_dir)

for i in range(
1,
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
Expand Down
9 changes: 2 additions & 7 deletions fastdeploy/entrypoints/openai/multi_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
import time

from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir
from fastdeploy.platforms import current_platform
from fastdeploy.utils import find_free_ports, get_logger, is_port_available

Expand Down Expand Up @@ -108,13 +109,7 @@ def start_servers(
env["FD_ENABLE_MULTI_API_SERVER"] = "1"
env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}"
if "PROMETHEUS_MULTIPROC_DIR" in env:
prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR")
prom_dir_i = os.path.join(os.path.dirname(prom_dir), os.path.basename(prom_dir) + f"_dp{i}")
# Create the directory if it doesn't exist
if not os.path.exists(prom_dir_i):
os.makedirs(prom_dir_i, exist_ok=True)
env["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_i
logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {i}: {prom_dir_i}")
setup_dp_prometheus_dir(i, env["PROMETHEUS_MULTIPROC_DIR"], env)

cmd = [
sys.executable,
Expand Down
59 changes: 44 additions & 15 deletions fastdeploy/metrics/prometheus_multiprocess_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,56 @@

from fastdeploy.utils import llm_logger

_original_prom_dir = None


def get_original_prom_dir():
"""Return the PROMETHEUS_MULTIPROC_DIR before any dp suffix was appended."""
return _original_prom_dir


def setup_multiprocess_prometheus():
"""
Cleans and recreates the Prometheus multiprocess directory.
"""
"""Cleans and recreates the Prometheus multiprocess directory."""
global _original_prom_dir

if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
base_dir = "/tmp/prom_main"
instance_id = str(uuid.uuid4())
prom_dir = f"{base_dir}_{instance_id}"
prom_dir = f"/tmp/prom_main_{uuid.uuid4()}"
if os.path.exists(prom_dir):
shutil.rmtree(prom_dir, ignore_errors=True)
os.makedirs(prom_dir, exist_ok=True)
llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}")
os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir
_original_prom_dir = prom_dir
llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}")
return prom_dir
else:
prom_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"]
llm_logger.warning(
f"Found PROMETHEUS_MULTIPROC_DIR:{prom_dir} was set by user. "
"you will find inaccurate metrics. Unset the variable "
"will properly handle cleanup."
)
return os.environ["PROMETHEUS_MULTIPROC_DIR"]

user_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"]
_original_prom_dir = user_dir
os.makedirs(user_dir, exist_ok=True)
llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to {user_dir}")
return user_dir


def setup_dp_prometheus_dir(dp_id, base_dir, env_dict=None):
"""Set up an isolated PROMETHEUS_MULTIPROC_DIR subdirectory for a DP rank.

For DP0: moves existing .db files from base_dir into dp0/ and updates env.
mmap writes remain valid after rename on the same filesystem.
For DP1+: creates dp{i}/ subdirectory and updates env. Fork triggers PID
change → prometheus_client reset → new .db files in the subdirectory.

Args:
dp_id: Data parallel rank id.
base_dir: Original PROMETHEUS_MULTIPROC_DIR (before any dp suffix).
env_dict: If provided, write to this dict instead of os.environ.
"""
prom_dir_dp = os.path.join(base_dir, f"dp{dp_id}")
os.makedirs(prom_dir_dp, exist_ok=True)
if dp_id == 0 and os.path.isdir(base_dir):
for fname in os.listdir(base_dir):
src = os.path.join(base_dir, fname)
if os.path.isfile(src) and fname.endswith(".db"):
os.rename(src, os.path.join(prom_dir_dp, fname))
llm_logger.info(f"Moved {src} -> {prom_dir_dp}")
target = env_dict if env_dict is not None else os.environ
target["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_dp
llm_logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {dp_id}: {prom_dir_dp}")
Loading