Skip to content

Commit f202626

Browse files
authored
[Feature]console metrics log for pd disaggregation (#7843)
1 parent 6045f04 commit f202626

5 files changed

Lines changed: 125 additions & 14 deletions

File tree

fastdeploy/engine/common_engine.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False):
207207
self.scheduler_metrics_logger = SchedulerMetricsLogger(
208208
enabled=True,
209209
dp_rank=self.cfg.parallel_config.local_data_parallel_id,
210+
splitwise_role=self.cfg.scheduler_config.splitwise_role,
210211
)
211212
self.resource_manager.scheduler_metrics_logger = self.scheduler_metrics_logger
212213
self.token_processor.set_scheduler_metrics_logger(self.scheduler_metrics_logger)

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,15 +1795,26 @@ def _log_console_scheduler_metrics(self, batch_request: BatchRequest) -> None:
17951795
prefill_reqs = [r for r in batch_request if isinstance(r, Request) and r.task_type == RequestType.PREFILL]
17961796
has_decode = any(getattr(r, "task_type", None) == RequestType.DECODE for r in batch_request)
17971797

1798-
self.scheduler_metrics_logger.log_prefill_batch(
1799-
prefill_reqs=prefill_reqs,
1800-
running_cnt=running_cnt,
1801-
queue_cnt=queue_cnt,
1802-
tokens_used=tokens_used,
1803-
token_usage=token_usage,
1804-
free_blocks=free_blocks,
1805-
evictable_blocks=evictable_blocks,
1806-
)
1798+
if self.config.scheduler_config.splitwise_role == "decode":
1799+
self.scheduler_metrics_logger.log_decode_bootstrap_batch(
1800+
prefill_reqs=prefill_reqs,
1801+
running_cnt=running_cnt,
1802+
queue_cnt=queue_cnt,
1803+
tokens_used=tokens_used,
1804+
token_usage=token_usage,
1805+
free_blocks=free_blocks,
1806+
evictable_blocks=evictable_blocks,
1807+
)
1808+
else:
1809+
self.scheduler_metrics_logger.log_prefill_batch(
1810+
prefill_reqs=prefill_reqs,
1811+
running_cnt=running_cnt,
1812+
queue_cnt=queue_cnt,
1813+
tokens_used=tokens_used,
1814+
token_usage=token_usage,
1815+
free_blocks=free_blocks,
1816+
evictable_blocks=evictable_blocks,
1817+
)
18071818
if has_decode:
18081819
has_prefill = len(prefill_reqs) > 0
18091820
graph_opt_cfg = self.config.graph_opt_config

fastdeploy/engine/sched/scheduler_metrics_logger.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ class SchedulerMetricsLogger:
2929

3030
DEFAULT_DECODE_LOG_INTERVAL = 5
3131

32-
def __init__(self, enabled: bool = True, dp_rank: int = 0) -> None:
32+
def __init__(self, enabled: bool = True, dp_rank: int = 0, splitwise_role: str = "mixed") -> None:
3333
self.enabled = enabled
3434
self.dp_rank = dp_rank
35+
self.splitwise_role = splitwise_role
3536
decode_log_interval = envs.FD_CONSOLE_DECODE_LOG_INTERVAL
3637
if decode_log_interval <= 0:
3738
decode_log_interval = self.DEFAULT_DECODE_LOG_INTERVAL
@@ -65,8 +66,9 @@ def on_decode_tokens(self, num_tokens: int) -> None:
6566
with self._lock:
6667
self._decode_tokens_since_last += num_tokens
6768

68-
def log_prefill_batch(
69+
def _log_prefill_like_batch(
6970
self,
71+
batch_name: str,
7072
prefill_reqs: Iterable,
7173
running_cnt: int,
7274
queue_cnt: int,
@@ -91,8 +93,9 @@ def log_prefill_batch(
9193
cached_tokens += getattr(req, "num_cached_tokens", 0) or 0
9294

9395
msg = (
94-
"Prefill batch, "
96+
f"{batch_name}, "
9597
f"dp_rank: {self.dp_rank}, "
98+
f"splitwise_role: {self.splitwise_role}, "
9699
f"#new-seq: {len(prefill_reqs)}, "
97100
f"#new-token: {new_tokens}, "
98101
f"#cached-token: {cached_tokens}, "
@@ -104,6 +107,48 @@ def log_prefill_batch(
104107
)
105108
self._logger.info(msg)
106109

110+
def log_prefill_batch(
111+
self,
112+
prefill_reqs: Iterable,
113+
running_cnt: int,
114+
queue_cnt: int,
115+
tokens_used: int,
116+
token_usage: float,
117+
free_blocks: int = 0,
118+
evictable_blocks: int = 0,
119+
) -> None:
120+
self._log_prefill_like_batch(
121+
batch_name="Prefill batch",
122+
prefill_reqs=prefill_reqs,
123+
running_cnt=running_cnt,
124+
queue_cnt=queue_cnt,
125+
tokens_used=tokens_used,
126+
token_usage=token_usage,
127+
free_blocks=free_blocks,
128+
evictable_blocks=evictable_blocks,
129+
)
130+
131+
def log_decode_bootstrap_batch(
132+
self,
133+
prefill_reqs: Iterable,
134+
running_cnt: int,
135+
queue_cnt: int,
136+
tokens_used: int,
137+
token_usage: float,
138+
free_blocks: int = 0,
139+
evictable_blocks: int = 0,
140+
) -> None:
141+
self._log_prefill_like_batch(
142+
batch_name="Decode bootstrap batch from prefill",
143+
prefill_reqs=prefill_reqs,
144+
running_cnt=running_cnt,
145+
queue_cnt=queue_cnt,
146+
tokens_used=tokens_used,
147+
token_usage=token_usage,
148+
free_blocks=free_blocks,
149+
evictable_blocks=evictable_blocks,
150+
)
151+
107152
def log_decode_batch(
108153
self,
109154
running_cnt: int,
@@ -132,6 +177,7 @@ def log_decode_batch(
132177
msg = (
133178
"Decode batch, "
134179
f"dp_rank: {self.dp_rank}, "
180+
f"splitwise_role: {self.splitwise_role}, "
135181
f"#running-req: {running_cnt}, "
136182
f"#token: {tokens_used}, "
137183
f"token usage: {token_usage:.2f}, "

tests/engine/test_scheduler_metrics_logger.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def test_on_decode_tokens_accumulates():
3232

3333

3434
def test_log_prefill_batch_logs_expected_message():
35-
logger = SchedulerMetricsLogger(enabled=True, dp_rank=2)
35+
logger = SchedulerMetricsLogger(enabled=True, dp_rank=2, splitwise_role="prefill")
3636
logger._logger = mock.Mock()
3737

3838
reqs = [
@@ -46,6 +46,7 @@ def test_log_prefill_batch_logs_expected_message():
4646
message = logger._logger.info.call_args[0][0]
4747
assert "Prefill batch" in message
4848
assert "dp_rank: 2" in message
49+
assert "splitwise_role: prefill" in message
4950
assert "#new-seq: 2" in message
5051
assert "#new-token: 4" in message
5152
assert "#cached-token: 3" in message
@@ -54,8 +55,31 @@ def test_log_prefill_batch_logs_expected_message():
5455
assert "#queue-req: 6" in message
5556

5657

58+
def test_log_decode_bootstrap_batch_logs_expected_message():
59+
logger = SchedulerMetricsLogger(enabled=True, dp_rank=0, splitwise_role="decode")
60+
logger._logger = mock.Mock()
61+
62+
reqs = [types.SimpleNamespace(prefill_start_index=4, prefill_end_index=5, num_cached_tokens=4)]
63+
64+
logger.log_decode_bootstrap_batch(
65+
prefill_reqs=reqs,
66+
running_cnt=1,
67+
queue_cnt=0,
68+
tokens_used=5,
69+
token_usage=0.25,
70+
)
71+
72+
logger._logger.info.assert_called_once()
73+
message = logger._logger.info.call_args[0][0]
74+
assert "Decode bootstrap batch" in message
75+
assert "splitwise_role: decode" in message
76+
assert "#new-seq: 1" in message
77+
assert "#new-token: 1" in message
78+
assert "#cached-token: 4" in message
79+
80+
5781
def test_log_decode_batch_computes_throughput(monkeypatch):
58-
logger = SchedulerMetricsLogger(enabled=True, dp_rank=1)
82+
logger = SchedulerMetricsLogger(enabled=True, dp_rank=1, splitwise_role="decode")
5983
logger._logger = mock.Mock()
6084
logger._decode_batch_count = logger._decode_log_interval - 1
6185
logger._decode_tokens_since_last = 10
@@ -69,6 +93,7 @@ def test_log_decode_batch_computes_throughput(monkeypatch):
6993
message = logger._logger.info.call_args[0][0]
7094
assert "Decode batch" in message
7195
assert "dp_rank: 1" in message
96+
assert "splitwise_role: decode" in message
7297
assert "gen throughput (token/s): 5.00" in message
7398
assert "#queue-req: 7" in message
7499
assert logger._decode_tokens_since_last == 0
@@ -99,3 +124,8 @@ def test_decode_log_interval_non_positive_falls_back_to_default(monkeypatch):
99124
monkeypatch.setenv("FD_CONSOLE_DECODE_LOG_INTERVAL", "0")
100125
logger = SchedulerMetricsLogger(enabled=True, dp_rank=0)
101126
assert logger._decode_log_interval == SchedulerMetricsLogger.DEFAULT_DECODE_LOG_INTERVAL
127+
128+
129+
def test_default_splitwise_role_is_mixed():
130+
logger = SchedulerMetricsLogger(enabled=True, dp_rank=0)
131+
assert logger.splitwise_role == "mixed"

tests/v1/test_resource_manager_v1.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
if not hasattr(paddle, "enable_compat"):
2828
paddle.enable_compat = lambda scope=None: None
2929

30+
from fastdeploy import envs
3031
from fastdeploy.config import CacheConfig, FDConfig, ParallelConfig, SchedulerConfig
3132
from fastdeploy.engine.args_utils import EngineArgs
3233
from fastdeploy.engine.request import (
@@ -37,6 +38,7 @@
3738
RequestMetrics,
3839
RequestOutput,
3940
RequestStatus,
41+
RequestType,
4042
)
4143
from fastdeploy.engine.sched.resource_manager_v1 import (
4244
ResourceManagerV1,
@@ -570,6 +572,27 @@ def test_preallocate_resource_in_p_and_d(self):
570572
self.assertEqual(request_d.num_computed_tokens, request_d.need_prefill_tokens)
571573
self.assertEqual(request_d.disaggregate_info["block_tables"], [4, 5])
572574

575+
def test_decode_role_prefill_task_logs_decode_bootstrap_batch(self):
576+
manager = _build_manager(splitwise_role="decode", enable_prefix_caching=False)
577+
_register_manager_cleanup(self, manager)
578+
manager.cache_manager = MagicMock()
579+
manager.cache_manager.num_gpu_blocks = 8
580+
manager.cache_manager.gpu_free_block_list = [0, 1, 2, 3]
581+
manager.scheduler_metrics_logger = MagicMock()
582+
583+
request = _make_request(prompt_token_ids=[1, 2, 3, 4])
584+
request.task_type = RequestType.PREFILL
585+
request.prefill_start_index = 4
586+
request.prefill_end_index = 5
587+
batch_request = BatchRequest()
588+
batch_request.add_request(request)
589+
590+
with patch.object(envs, "FD_CONSOLE_SCHEDULER_METRICS", True):
591+
manager._log_console_scheduler_metrics(batch_request)
592+
593+
manager.scheduler_metrics_logger.log_decode_bootstrap_batch.assert_called_once()
594+
manager.scheduler_metrics_logger.log_prefill_batch.assert_not_called()
595+
573596
def test_prefilled_request_flow_and_resource_check(self):
574597
manager = _build_manager(splitwise_role="decode", speculative_method="mtp")
575598
_register_manager_cleanup(self, manager)

0 commit comments

Comments
 (0)