diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 72ca1f7c820..39379c62c64 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -65,6 +65,10 @@ ) from fastdeploy.inter_communicator.fmq import FMQ from fastdeploy.metrics.metrics import main_process_metrics +from fastdeploy.metrics.prometheus_multiprocess_setup import ( + get_original_prom_dir, + setup_dp_prometheus_dir, +) from fastdeploy.model_executor.guided_decoding import schema_checker from fastdeploy.plugins.token_processor import load_token_processor_plugins from fastdeploy.spec_decode import SpecMethod @@ -2720,6 +2724,7 @@ def launch_components(self): self.launched_expert_service_signal.value[0] = 1 self.dp_processed = [] self.dp_engine_worker_queue_server = [] + base_prom_dir = get_original_prom_dir() for i in range( 1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode, @@ -2758,10 +2763,15 @@ def launch_components(self): f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}" + f" data parallel id {i}" ) + if envs.FD_ENABLE_INTERNAL_ADAPTER: + setup_dp_prometheus_dir(i, base_prom_dir) self.dp_processed[-1].start() while self.launched_expert_service_signal.value[i] == 0: time.sleep(1) + if envs.FD_ENABLE_INTERNAL_ADAPTER: + setup_dp_prometheus_dir(0, base_prom_dir) + def check_worker_initialize_status(self): """ Check the initlialize status of workers by stdout logging diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 8614d29999b..ad71506d0f2 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -50,6 +50,10 @@ log_request_error, ) from fastdeploy.metrics.metrics import main_process_metrics +from fastdeploy.metrics.prometheus_multiprocess_setup import ( + get_original_prom_dir, + setup_dp_prometheus_dir, +) from fastdeploy.platforms import current_platform from fastdeploy.utils import EngineError, console_logger, envs, llm_logger @@ -860,6 +864,7 @@ def launch_components(self): self.launched_expert_service_signal.value[0] = 1 self.dp_processed = [] self.dp_engine_worker_queue_server = [] + base_prom_dir = get_original_prom_dir() for i in range( 1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode, @@ -897,8 +902,13 @@ def launch_components(self): f"Engine is initialized successfully with {self.cfg.parallel_config.tensor_parallel_size}" + f" data parallel id {i}" ) + if envs.FD_ENABLE_INTERNAL_ADAPTER: + setup_dp_prometheus_dir(i, base_prom_dir) self.dp_processed[-1].start() + if envs.FD_ENABLE_INTERNAL_ADAPTER: + setup_dp_prometheus_dir(0, base_prom_dir) + for i in range( 1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode, diff --git a/fastdeploy/entrypoints/openai/multi_api_server.py b/fastdeploy/entrypoints/openai/multi_api_server.py index 545fe2b1fb1..dbef909bf2c 100644 --- a/fastdeploy/entrypoints/openai/multi_api_server.py +++ b/fastdeploy/entrypoints/openai/multi_api_server.py @@ -20,6 +20,7 @@ import sys import time +from fastdeploy.metrics.prometheus_multiprocess_setup import setup_dp_prometheus_dir from fastdeploy.platforms import current_platform from fastdeploy.utils import find_free_ports, get_logger, is_port_available @@ -108,13 +109,7 @@ def start_servers( env["FD_ENABLE_MULTI_API_SERVER"] = "1" env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}" if "PROMETHEUS_MULTIPROC_DIR" in env: - prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR") - prom_dir_i = os.path.join(os.path.dirname(prom_dir), os.path.basename(prom_dir) + f"_dp{i}") - # Create the directory if it doesn't exist - if not os.path.exists(prom_dir_i): - os.makedirs(prom_dir_i, exist_ok=True) - env["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_i - logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {i}: {prom_dir_i}") + setup_dp_prometheus_dir(i, env["PROMETHEUS_MULTIPROC_DIR"], env) cmd = [ sys.executable, diff --git a/fastdeploy/metrics/prometheus_multiprocess_setup.py b/fastdeploy/metrics/prometheus_multiprocess_setup.py index 509fb7d158c..42b7723bf3e 100644 --- a/fastdeploy/metrics/prometheus_multiprocess_setup.py +++ b/fastdeploy/metrics/prometheus_multiprocess_setup.py @@ -20,27 +20,56 @@ from fastdeploy.utils import llm_logger +_original_prom_dir = None + + +def get_original_prom_dir(): + """Return the PROMETHEUS_MULTIPROC_DIR before any dp suffix was appended.""" + return _original_prom_dir + def setup_multiprocess_prometheus(): - """ - Cleans and recreates the Prometheus multiprocess directory. - """ + """Cleans and recreates the Prometheus multiprocess directory.""" + global _original_prom_dir if "PROMETHEUS_MULTIPROC_DIR" not in os.environ: - base_dir = "/tmp/prom_main" - instance_id = str(uuid.uuid4()) - prom_dir = f"{base_dir}_{instance_id}" + prom_dir = f"/tmp/prom_main_{uuid.uuid4()}" if os.path.exists(prom_dir): shutil.rmtree(prom_dir, ignore_errors=True) os.makedirs(prom_dir, exist_ok=True) - llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}") os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir + _original_prom_dir = prom_dir + llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to be {prom_dir}") return prom_dir - else: - prom_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"] - llm_logger.warning( - f"Found PROMETHEUS_MULTIPROC_DIR:{prom_dir} was set by user. " - "you will find inaccurate metrics. Unset the variable " - "will properly handle cleanup." - ) - return os.environ["PROMETHEUS_MULTIPROC_DIR"] + + user_dir = os.environ["PROMETHEUS_MULTIPROC_DIR"] + _original_prom_dir = user_dir + os.makedirs(user_dir, exist_ok=True) + llm_logger.info(f"PROMETHEUS_MULTIPROC_DIR is set to {user_dir}") + return user_dir + + +def setup_dp_prometheus_dir(dp_id, base_dir, env_dict=None): + """Set up an isolated PROMETHEUS_MULTIPROC_DIR subdirectory for a DP rank. + + For DP0: moves existing .db files from base_dir into dp0/ and updates env. + mmap writes remain valid after rename on the same filesystem. + For DP1+: creates dp{i}/ subdirectory and updates env. Fork triggers PID + change → prometheus_client reset → new .db files in the subdirectory. + + Args: + dp_id: Data parallel rank id. + base_dir: Original PROMETHEUS_MULTIPROC_DIR (before any dp suffix). + env_dict: If provided, write to this dict instead of os.environ. + """ + prom_dir_dp = os.path.join(base_dir, f"dp{dp_id}") + os.makedirs(prom_dir_dp, exist_ok=True) + if dp_id == 0 and os.path.isdir(base_dir): + for fname in os.listdir(base_dir): + src = os.path.join(base_dir, fname) + if os.path.isfile(src) and fname.endswith(".db"): + os.rename(src, os.path.join(prom_dir_dp, fname)) + llm_logger.info(f"Moved {src} -> {prom_dir_dp}") + target = env_dict if env_dict is not None else os.environ + target["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_dp + llm_logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {dp_id}: {prom_dir_dp}") diff --git a/tests/entrypoints/openai/test_multi_api_server.py b/tests/entrypoints/openai/test_multi_api_server.py index a333ee6de0c..c039dcfbc65 100644 --- a/tests/entrypoints/openai/test_multi_api_server.py +++ b/tests/entrypoints/openai/test_multi_api_server.py @@ -231,7 +231,7 @@ def capture_popen(*args, **kwargs): for i, prom_dir in enumerate(prom_dirs): # The directory should contain the server index (0 or 1) # to uniquely identify each server's metrics directory - self.assertIn(f"_dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain _dp{i}") + self.assertIn(f"/dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain /dp{i}") if __name__ == "__main__": diff --git a/tests/metrics/test_prometheus_multiprocess_setup.py b/tests/metrics/test_prometheus_multiprocess_setup.py index 5ccc960d522..36508503150 100644 --- a/tests/metrics/test_prometheus_multiprocess_setup.py +++ b/tests/metrics/test_prometheus_multiprocess_setup.py @@ -51,15 +51,11 @@ def test_when_env_var_already_set(self): test_dir = "/tmp/existing_dir" os.environ["PROMETHEUS_MULTIPROC_DIR"] = test_dir - with patch("fastdeploy.utils.llm_logger.warning") as mock_logger: + with patch("fastdeploy.utils.llm_logger.info") as mock_logger: result = setup_multiprocess_prometheus() assert result == test_dir - mock_logger.assert_called_once_with( - "Found PROMETHEUS_MULTIPROC_DIR:/tmp/existing_dir was set by user. " - "you will find inaccurate metrics. Unset the variable " - "will properly handle cleanup." - ) + mock_logger.assert_called_once_with(f"PROMETHEUS_MULTIPROC_DIR is set to {test_dir}") def test_cleanup_failure_handling(self): """测试清理目录失败时的处理"""