Skip to content

Commit 13e5962

Browse files
committed
[BugFix] Seperate prometheus multiproc dir for single-server multi-dp services
1 parent cbb0811 commit 13e5962

5 files changed

Lines changed: 65 additions & 25 deletions

File tree

fastdeploy/engine/common_engine.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
)
6666
from fastdeploy.inter_communicator.fmq import FMQ
6767
from fastdeploy.metrics.metrics import main_process_metrics
68+
from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir, get_original_prom_dir
6869
from fastdeploy.model_executor.guided_decoding import schema_checker
6970
from fastdeploy.plugins.token_processor import load_token_processor_plugins
7071
from fastdeploy.spec_decode import SpecMethod
@@ -2720,6 +2721,7 @@ def launch_components(self):
27202721
self.launched_expert_service_signal.value[0] = 1
27212722
self.dp_processed = []
27222723
self.dp_engine_worker_queue_server = []
2724+
base_prom_dir = get_original_prom_dir()
27232725
for i in range(
27242726
1,
27252727
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
@@ -2758,10 +2760,13 @@ def launch_components(self):
27582760
f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}"
27592761
+ f" data parallel id {i}"
27602762
)
2763+
setup_dp_prometheus_dir(i, base_prom_dir)
27612764
self.dp_processed[-1].start()
27622765
while self.launched_expert_service_signal.value[i] == 0:
27632766
time.sleep(1)
27642767

2768+
setup_dp_prometheus_dir(0, base_prom_dir)
2769+
27652770
def check_worker_initialize_status(self):
27662771
"""
27672772
Check the initlialize status of workers by stdout logging

fastdeploy/engine/engine.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
log_request_error,
5151
)
5252
from fastdeploy.metrics.metrics import main_process_metrics
53+
from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir, get_original_prom_dir
5354
from fastdeploy.platforms import current_platform
5455
from fastdeploy.utils import EngineError, console_logger, envs, llm_logger
5556

@@ -860,6 +861,7 @@ def launch_components(self):
860861
self.launched_expert_service_signal.value[0] = 1
861862
self.dp_processed = []
862863
self.dp_engine_worker_queue_server = []
864+
base_prom_dir = get_original_prom_dir()
863865
for i in range(
864866
1,
865867
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,
@@ -897,8 +899,11 @@ def launch_components(self):
897899
f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}"
898900
+ f" data parallel id {i}"
899901
)
902+
setup_dp_prometheus_dir(i, base_prom_dir)
900903
self.dp_processed[-1].start()
901904

905+
setup_dp_prometheus_dir(0, base_prom_dir)
906+
902907
for i in range(
903908
1,
904909
self.cfg.parallel_config.data_parallel_size // self.cfg.nnode,

fastdeploy/entrypoints/openai/multi_api_server.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import sys
2121
import time
2222

23+
from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir
2324
from fastdeploy.platforms import current_platform
2425
from fastdeploy.utils import find_free_ports, get_logger, is_port_available
2526

@@ -108,13 +109,7 @@ def start_servers(
108109
env["FD_ENABLE_MULTI_API_SERVER"] = "1"
109110
env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}"
110111
if "PROMETHEUS_MULTIPROC_DIR" in env:
111-
prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR")
112-
prom_dir_i = os.path.join(os.path.dirname(prom_dir), os.path.basename(prom_dir) + f"_dp{i}")
113-
# Create the directory if it doesn't exist
114-
if not os.path.exists(prom_dir_i):
115-
os.makedirs(prom_dir_i, exist_ok=True)
116-
env["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_i
117-
logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {i}: {prom_dir_i}")
112+
setup_dp_prometheus_dir(i, env["PROMETHEUS_MULTIPROC_DIR"], env)
118113

119114
cmd = [
120115
sys.executable,

fastdeploy/metrics/metrics.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ def get_filtered_metrics() -> str:
8484

8585
# 判断是否多进程
8686
if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
87+
llm_logger.info(f"Collect metrics from {os.environ['PROMETHEUS_MULTIPROC_DIR']}")
88+
8789
# multiprocess 会将当前共享目录中的所有指标收集到base_registry中
8890
multiprocess.MultiProcessCollector(base_registry)
8991

@@ -98,7 +100,11 @@ def get_filtered_metrics() -> str:
98100
# 将speculative中的gauge指标也重新注册
99101
main_process_metrics.re_register_speculative_gauge(filtered_registry)
100102

101-
return generate_latest(filtered_registry).decode("utf-8")
103+
ret = generate_latest(filtered_registry).decode("utf-8")
104+
105+
llm_logger.info(f"Filtered metrics:\n\n\n{ret}\n\n\n")
106+
107+
return ret
102108

103109
else:
104110
# 非多进程直接注册所有指标,从内存中读取
Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""
2-
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
2+
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
33
#
4-
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# Licensed under the Apache License, Version 2.0 (the "License"
55
# you may not use this file except in compliance with the License.
66
# You may obtain a copy of the License at
77
#
@@ -20,27 +20,56 @@
2020

2121
from fastdeploy.utils import llm_logger
2222

23+
_original_prom_dir = None
24+
25+
26+
def get_original_prom_dir():
27+
"""Return the PROMETHEUS_MULTIPROC_DIR before any dp suffix was appended."""
28+
return _original_prom_dir
29+
2330

2431
def setup_multiprocess_prometheus():
25-
"""
26-
Cleans and recreates the Prometheus multiprocess directory.
27-
"""
32+
"""Cleans and recreates the Prometheus multiprocess directory."""
33+
global _original_prom_dir
2834

2935
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
30-
base_dir = "/tmp/prom_main"
31-
instance_id = str(uuid.uuid4())
32-
prom_dir = f"{base_dir}_{instance_id}"
36+
prom_dir = f"/tmp/prom_main_{uuid.uuid4()}"
3337
if os.path.exists(prom_dir):
3438
shutil.rmtree(prom_dir, ignore_errors=True)
3539
os.makedirs(prom_dir, exist_ok=True)
36-
llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}")
3740
os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir
41+
_original_prom_dir = prom_dir
42+
llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}")
3843
return prom_dir
39-
else:
40-
prom_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"]
41-
llm_logger.warning(
42-
f"Found PROMETHEUS_MULTIPROC_DIR:{prom_dir} was set by user. "
43-
"you will find inaccurate metrics. Unset the variable "
44-
"will properly handle cleanup."
45-
)
46-
return os.environ["PROMETHEUS_MULTIPROC_DIR"]
44+
45+
user_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"]
46+
_original_prom_dir = user_dir
47+
os.makedirs(user_dir, exist_ok=True)
48+
llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to {user_dir}")
49+
return user_dir
50+
51+
52+
def setup_dp_prometheus_dir(dp_id, base_dir, env_dict=None):
53+
"""Set up an isolated PROMETHEUS_MULTIPROC_DIR subdirectory for a DP rank.
54+
55+
For DP0: moves existing .db files from base_dir into dp0/ and updates env.
56+
mmap writes remain valid after rename on the same filesystem.
57+
For DP1+: creates dp{i}/ subdirectory and updates env. Fork triggers PID
58+
change → prometheus_client reset → new .db files in the subdirectory.
59+
60+
Args:
61+
dp_id: Data parallel rank id.
62+
base_dir: Original PROMETHEUS_MULTIPROC_DIR (before any dp suffix).
63+
env_dict: If provided, write to this dict instead of os.environ.
64+
"""
65+
prom_dir_dp = os.path.join(base_dir, f"dp{dp_id}")
66+
os.makedirs(prom_dir_dp, exist_ok=True)
67+
if dp_id == 0 and os.path.isdir(base_dir):
68+
for fname in os.listdir(base_dir):
69+
src = os.path.join(base_dir, fname)
70+
if os.path.isfile(src) and fname.endswith(".db"):
71+
os.rename(src, os.path.join(prom_dir_dp, fname))
72+
llm_logger.info(f"Moved {src} -> {prom_dir_dp}")
73+
target = env_dict if env_dict is not None else os.environ
74+
target["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_dp
75+
llm_logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {dp_id}: {prom_dir_dp}")

0 commit comments

Comments
 (0)