Skip to content

Commit 6d9739f

Browse files
[BugFix] fix speculative gauge metrics in multi api server (#7082)
1 parent 6727df8 commit 6d9739f

5 files changed

Lines changed: 147 additions & 48 deletions

File tree

fastdeploy/__init__.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,10 @@ def _configure_logger(name=None):
6969
# Configure root logger
7070
_configure_logger()
7171

72-
import uuid
73-
7472
# suppress warning log from paddlepaddle
7573
os.environ["GLOG_minloglevel"] = "2"
7674
# suppress log from aistudio
7775
os.environ["AISTUDIO_LOG"] = "critical"
78-
# set prometheus dir
79-
if os.getenv("PROMETHEUS_MULTIPROC_DIR", "") == "":
80-
prom_dir = f"/tmp/fd_prom_{str(uuid.uuid4())}"
81-
os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir
82-
if os.path.exists(prom_dir):
83-
os.rmdir(prom_dir)
84-
os.mkdir(prom_dir)
8576

8677
import typing
8778

fastdeploy/entrypoints/openai/multi_api_server.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,15 @@ def start_servers(
107107
env = os.environ.copy()
108108
env["FD_ENABLE_MULTI_API_SERVER"] = "1"
109109
env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}"
110+
if "PROMETHEUS_MULTIPROC_DIR" in env:
111+
prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR")
112+
prom_dir_i = os.path.join(os.path.dirname(prom_dir), os.path.basename(prom_dir) + f"_dp{i}")
113+
# Create the directory if it doesn't exist
114+
if not os.path.exists(prom_dir_i):
115+
os.makedirs(prom_dir_i, exist_ok=True)
116+
env["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_i
117+
logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {i}: {prom_dir_i}")
118+
110119
cmd = [
111120
sys.executable,
112121
"-m",

fastdeploy/metrics/metrics.py

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def collect(self):
6666
Metric: Prometheus Metric objects that are not excluded.
6767
"""
6868
for metric in self.base_registry.collect():
69-
if not any(name.startswith(metric.name) for name in self.exclude_names):
69+
if not any(metric.name.startswith(name) for name in self.exclude_names):
7070
yield metric
7171

7272

@@ -84,11 +84,15 @@ def get_filtered_metrics() -> str:
8484
multiprocess.MultiProcessCollector(base_registry)
8585

8686
filtered_registry = CollectorRegistry()
87-
# 注册一个新的colletor,过滤gauge指标
88-
filtered_registry.register(SimpleCollector(base_registry, EXCLUDE_LABELS))
87+
# 动态获取需要排除的 gauge 指标列表
88+
exclude_labels = main_process_metrics.get_excluded_metrics()
89+
# 注册一个新的collector,过滤gauge指标
90+
filtered_registry.register(SimpleCollector(base_registry, exclude_labels))
8991

9092
# 将gauge指标重新注册到filtered_registry中,从内存中读取
9193
main_process_metrics.re_register_gauge(filtered_registry)
94+
# 将speculative中的gauge指标也重新注册
95+
main_process_metrics.re_register_speculative_gauge(filtered_registry)
9296

9397
return generate_latest(filtered_registry).decode("utf-8")
9498

@@ -196,7 +200,7 @@ class MetricsManager:
196200
"type": Gauge,
197201
"name": "fastdeploy:num_requests_running",
198202
"description": "Number of requests currently running",
199-
"kwargs": {"multiprocess_mode": "sum"},
203+
"kwargs": {},
200204
},
201205
"num_requests_waiting": {
202206
"type": Gauge,
@@ -626,19 +630,22 @@ def __init__(self):
626630
# 在模块加载,指标注册先设置Prometheus环境变量
627631
setup_multiprocess_prometheus()
628632

629-
# 动态创建所有指标
633+
# 动态创建所有非 gauge 型指标
630634
for metric_name, config in self.METRICS.items():
631635
setattr(
632636
self,
633637
metric_name,
634638
config["type"](config["name"], config["description"], **config["kwargs"]),
635639
)
636-
# 动态创建所有指标
640+
# 动态创建所有 gauge 型指标,统一配置 multiprocess_mode 为 livesum
637641
for metric_name, config in self.GAUGE_METRICS.items():
642+
kwargs = config["kwargs"].copy()
643+
if "multiprocess_mode" not in kwargs:
644+
kwargs["multiprocess_mode"] = "livesum"
638645
setattr(
639646
self,
640647
metric_name,
641-
config["type"](config["name"], config["description"], **config["kwargs"]),
648+
config["type"](config["name"], config["description"], **kwargs),
642649
)
643650
# 动态创建server metrics
644651
for metric_name, config in self.SERVER_METRICS.items():
@@ -696,17 +703,22 @@ def _init_speculative_metrics(self, speculative_method, num_speculative_tokens):
696703
Gauge(
697704
f"{config['name']}_{i}",
698705
f"{config['description']} (head {i})",
706+
multiprocess_mode="livesum",
699707
)
700708
)
701709
setattr(self, metric_name, gauges)
702710
else:
711+
# For Gauge metrics, automatically add multiprocess_mode="livesum"
712+
kwargs = config["kwargs"].copy()
713+
if config["type"] == Gauge and "multiprocess_mode" not in kwargs:
714+
kwargs["multiprocess_mode"] = "livesum"
703715
setattr(
704716
self,
705717
metric_name,
706718
config["type"](
707719
config["name"],
708720
config["description"],
709-
**config["kwargs"],
721+
**kwargs,
710722
),
711723
)
712724

@@ -767,6 +779,19 @@ def register_speculative_metrics(self, registry: CollectorRegistry):
767779
else:
768780
registry.register(getattr(self, metric_name))
769781

782+
def re_register_speculative_gauge(self, registry: CollectorRegistry):
783+
"""Re-register gauge metrics from SPECULATIVE_METRICS to the specified registry"""
784+
# Check if SPECULATIVE_METRICS was initialized in this process
785+
# (it's an instance attribute set by _init_speculative_metrics, not the class-level empty dict)
786+
if not hasattr(self, "spec_decode_draft_acceptance_rate"):
787+
return
788+
for metric_name, config in self.SPECULATIVE_METRICS.items():
789+
if metric_name == "spec_decode_draft_single_head_acceptance_rate":
790+
for gauge in getattr(self, metric_name):
791+
registry.register(gauge)
792+
elif config["type"] == Gauge:
793+
registry.register(getattr(self, metric_name))
794+
770795
def re_register_gauge(self, registry: CollectorRegistry):
771796
"""Re-register gauge to the specified registry"""
772797
for metric_name in self.GAUGE_METRICS:
@@ -790,16 +815,19 @@ def register_all(self, registry: CollectorRegistry):
790815
if hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
791816
self.register_speculative_metrics(registry)
792817

793-
@classmethod
794-
def get_excluded_metrics(cls) -> Set[str]:
818+
def get_excluded_metrics(self) -> Set[str]:
795819
"""Get the set of indicator names that need to be excluded"""
796-
return {config["name"] for config in cls.GAUGE_METRICS.values()}
820+
excluded = {config["name"] for config in self.GAUGE_METRICS.values()}
821+
# Also add gauge metrics from SPECULATIVE_METRICS (if initialized)
822+
if hasattr(self, "SPECULATIVE_METRICS"):
823+
for config in self.SPECULATIVE_METRICS.values():
824+
if config["type"] == Gauge or config["type"] == list[Gauge]:
825+
excluded.add(config["name"])
826+
return excluded
797827

798828

799829
main_process_metrics = MetricsManager()
800830

801831
# 由于zmq指标记录比较耗时,默认不开启,通过DEBUG参数开启
802832
if envs.FD_DEBUG:
803833
main_process_metrics.init_zmq_metrics()
804-
805-
EXCLUDE_LABELS = MetricsManager.get_excluded_metrics()

tests/entrypoints/openai/test_multi_api_server.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,58 @@ def test_main_function(self, mock_check_param, mock_sleep, mock_start_servers, m
180180
mock_proc1.wait.assert_called_once()
181181
mock_proc2.wait.assert_called_once()
182182

183+
@patch("fastdeploy.entrypoints.openai.multi_api_server.subprocess.Popen")
184+
@patch("fastdeploy.entrypoints.openai.multi_api_server.is_port_available")
185+
def test_prometheus_multiprocess_dir_per_dp(self, mock_is_port_available, mock_popen):
186+
"""Test that each DP server gets a unique PROMETHEUS_MULTIPROC_DIR"""
187+
# Mock port availability check
188+
mock_is_port_available.return_value = True
189+
190+
# Mock subprocess.Popen to capture env passed to each server
191+
envs_captured = []
192+
193+
def capture_popen(*args, **kwargs):
194+
envs_captured.append(kwargs.get("env", {}).copy())
195+
mock_proc = MagicMock()
196+
mock_proc.pid = 1000 + len(envs_captured)
197+
return mock_proc
198+
199+
mock_popen.side_effect = capture_popen
200+
201+
# Call start_servers with 2 servers
202+
processes = start_servers(
203+
server_count=2,
204+
device_count=2,
205+
server_args=self.test_server_args,
206+
ports="8000,8001",
207+
metrics_ports="8800,8801",
208+
controller_ports="-1",
209+
)
210+
211+
# Verify subprocess.Popen was called twice
212+
self.assertEqual(mock_popen.call_count, 2)
213+
self.assertEqual(len(envs_captured), 2)
214+
self.assertEqual(len(processes), 2)
215+
216+
# Verify each server has a unique PROMETHEUS_MULTIPROC_DIR
217+
prom_dirs = []
218+
for i, env in enumerate(envs_captured):
219+
prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR")
220+
print(f"Server {i} PROMETHEUS_MULTIPROC_DIR: {prom_dir}")
221+
self.assertIsNotNone(prom_dir, f"Server {i} should have PROMETHEUS_MULTIPROC_DIR set")
222+
prom_dirs.append(prom_dir)
223+
224+
# Verify all PROMETHEUS_MULTIPROC_DIR values are unique
225+
self.assertEqual(
226+
len(prom_dirs), len(set(prom_dirs)), "Each DP server should have a unique PROMETHEUS_MULTIPROC_DIR"
227+
)
228+
229+
# Verify each directory contains the server index
230+
for i, prom_dir in enumerate(prom_dirs):
231+
# The directory should contain the server index (0 or 1)
232+
# to uniquely identify each server's metrics directory
233+
self.assertIn(f"_dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain _dp{i}")
234+
183235

184236
if __name__ == "__main__":
185237
unittest.main()

tests/metrics/test_metrics.py

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,46 +14,65 @@
1414
# limitations under the License.
1515
"""
1616

17+
import os
1718
import unittest
1819
from unittest.mock import patch
1920

2021
from prometheus_client import Gauge
2122

22-
from fastdeploy.metrics.metrics import get_filtered_metrics
23+
from fastdeploy.metrics.metrics import get_filtered_metrics, main_process_metrics
24+
from fastdeploy.spec_decode import SpecMethod
2325

2426

2527
class TestGetFilteredMetrics(unittest.TestCase):
26-
def test_filtered_and_custom_metrics(self):
27-
"""
28-
Test get_filtered_metrics function:
29-
1. Exclude specific metrics from base_registry
30-
2. Keep other metrics in base_registry
31-
3. Ensure metrics registered by extra_register_func are effective
32-
"""
33-
34-
# Simulated metrics in base_registry (Gauge instances)
35-
g_keep = Gauge("metric_to_keep", "Kept metric")
36-
g_keep.set(1.23)
37-
38-
g_exclude = Gauge("metric_to_exclude", "Excluded metric")
39-
g_exclude.set(99)
40-
41-
# Fake MultiProcessCollector: register our simulated metrics
28+
def _collect_metrics_with_mocked_multiprocess(self, metric_name, multiprocess_value):
4229
def fake_multiprocess_collector(registry):
43-
registry.register(g_keep)
44-
registry.register(g_exclude)
30+
gauge = Gauge(metric_name, f"fake metric for {metric_name}", ["pid"], registry=registry)
31+
gauge.labels(pid="10001").set(multiprocess_value)
4532

46-
with patch(
47-
"fastdeploy.metrics.metrics.multiprocess.MultiProcessCollector", side_effect=fake_multiprocess_collector
33+
with (
34+
patch.dict(os.environ, {"PROMETHEUS_MULTIPROC_DIR": "/tmp/fake-prometheus-multiproc-dir"}, clear=False),
35+
patch(
36+
"fastdeploy.metrics.metrics.multiprocess.MultiProcessCollector",
37+
side_effect=fake_multiprocess_collector,
38+
),
4839
):
49-
result = get_filtered_metrics()
40+
return get_filtered_metrics()
5041

51-
print("==== result ====\n", result)
42+
def _assert_unique_metric_value(self, metrics_text, metric_name, expected_value):
43+
metric_lines = [line for line in metrics_text.splitlines() if line.startswith(f"{metric_name} ")]
44+
self.assertEqual(metric_lines, [f"{metric_name} {expected_value}"])
45+
self.assertNotIn("pid=", metrics_text)
5246

53-
# 2. Kept metric should appear
54-
self.assertIn("metric_to_keep", result)
47+
def test_regular_gauge_returns_single_value_without_pid(self):
48+
metric = main_process_metrics.batch_size
49+
metric.set(8.0)
5550

56-
self.assertIn("metric_to_exclude", result)
51+
result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1008.0)
52+
53+
self._assert_unique_metric_value(result, metric._name, 8.0)
54+
55+
def test_speculative_gauge_returns_single_value_without_pid(self):
56+
if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
57+
main_process_metrics._init_speculative_metrics(SpecMethod.MTP, 2)
58+
59+
metric = main_process_metrics.spec_decode_draft_acceptance_rate
60+
metric.set(0.75)
61+
62+
result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1000.75)
63+
64+
self._assert_unique_metric_value(result, metric._name, 0.75)
65+
66+
def test_speculative_single_head_gauge_returns_single_value_without_pid(self):
67+
if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
68+
main_process_metrics._init_speculative_metrics(SpecMethod.MTP, 2)
69+
70+
metric = main_process_metrics.spec_decode_draft_single_head_acceptance_rate[0]
71+
metric.set(0.6)
72+
73+
result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1000.6)
74+
75+
self._assert_unique_metric_value(result, metric._name, 0.6)
5776

5877

5978
if __name__ == "__main__":

0 commit comments

Comments
 (0)