Skip to content

Commit eb12fd3

Browse files
liyonghua0910claude
andcommitted
[Metric] Support model_id as metric labels by redefining metric update interface
Introduce MetricsManagerInterface with unified set_value/inc_value/dec_value/obs_value methods. When FD_DEFAULT_METRIC_LABEL_VALUES is set to a valid non-empty JSON dict, metric labels (e.g. model_id) are automatically applied. Otherwise, operations fall back to the raw prometheus_client calls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d54e207 commit eb12fd3

17 files changed

Lines changed: 305 additions & 107 deletions

File tree

custom_ops/xpu_ops/test/pytest.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,4 @@ addopts =
4040
--ignore=test_set_data_ipc.py
4141
--ignore=test_read_data_ipc.py
4242
--ignore=test_set_get_data_ipc.py
43-
--ignore=test_draft_model_preprocess.py
43+
--ignore=test_draft_model_preprocess.py

fastdeploy/cache_manager/cache_metrics.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ def _update_history_hit_metrics(self):
5757
self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num
5858
self.storage_hit_token_ratio = self.total_storage_matched_token_num / self.total_token_num
5959

60-
main_process_metrics.hit_req_rate.set(self.hit_req_ratio)
61-
main_process_metrics.hit_token_rate.set(self.hit_token_ratio)
62-
main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio)
63-
main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio)
60+
main_process_metrics.set_value("hit_req_rate", self.hit_req_ratio)
61+
main_process_metrics.set_value("hit_token_rate", self.hit_token_ratio)
62+
main_process_metrics.set_value("cpu_hit_token_rate", self.cpu_hit_token_ratio)
63+
main_process_metrics.set_value("gpu_hit_token_rate", self.gpu_hit_token_ratio)
6464

6565
logger.info(
6666
f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}"

fastdeploy/cache_manager/prefix_cache_manager.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,11 @@ def __init__(
131131
f"{self.cache_config.bytes_per_token_per_layer / self.config.parallel_config.tensor_parallel_size}"
132132
)
133133

134-
main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
135-
main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks)
136-
main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks)
137-
main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks)
138-
main_process_metrics.available_gpu_resource.set(1.0)
134+
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
135+
main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks)
136+
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
137+
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
138+
main_process_metrics.set_value("available_gpu_resource", 1.0)
139139

140140
def _get_kv_cache_shape(self, max_block_num):
141141
from fastdeploy.model_executor.layers.attention import get_attention_backend
@@ -462,11 +462,11 @@ def update_cache_config(self, cache_config):
462462
heapq.heapify(self.gpu_free_block_list)
463463
self.node_id_pool = list(range(self.num_gpu_blocks + self.num_cpu_blocks))
464464

465-
main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
466-
main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks)
467-
main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks)
468-
main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks)
469-
main_process_metrics.available_gpu_resource.set(1.0)
465+
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
466+
main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks)
467+
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
468+
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
469+
main_process_metrics.set_value("available_gpu_resource", 1.0)
470470

471471
def can_allocate_gpu_blocks(self, num_blocks: int, try_free_gpu_blocks: bool = True):
472472
"""
@@ -494,8 +494,8 @@ def allocate_gpu_blocks(self, num_blocks, req_id=None):
494494
logger.info(
495495
f"req_id:{req_id} allocate_gpu_blocks: {allocated_block_ids}, len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}"
496496
)
497-
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
498-
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
497+
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
498+
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
499499
return allocated_block_ids
500500

501501
def recycle_gpu_blocks(self, gpu_block_ids, req_id=None):
@@ -529,8 +529,8 @@ def recycle_gpu_blocks(self, gpu_block_ids, req_id=None):
529529
else:
530530
heapq.heappush(self.gpu_free_block_list, gpu_block_ids)
531531
logger.debug(f"req_id:{req_id} recycle blocks end")
532-
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
533-
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
532+
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
533+
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
534534

535535
def allocate_cpu_blocks(self, num_blocks):
536536
"""
@@ -2296,9 +2296,9 @@ def reset(self, wait_for_tasks_done=False):
22962296

22972297
# reset metrics
22982298
self.metrics.reset_metrics()
2299-
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
2300-
main_process_metrics.available_gpu_block_num.set(len(self.gpu_free_block_list))
2301-
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
2299+
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
2300+
main_process_metrics.set_value("available_gpu_block_num", len(self.gpu_free_block_list))
2301+
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
23022302

23032303
def clear_prefix_cache(self):
23042304
"""

fastdeploy/cache_manager/transfer_factory/kvcache_transfer/benchmark.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@
3232
from typing import List
3333

3434
import paddle
35-
import rdma_comm
3635
import zmq
3736

37+
import rdma_comm
38+
3839
if paddle.is_compiled_with_xpu():
3940
from custom_setup_ops import get_peer_mem_addr
4041

fastdeploy/cache_manager/transfer_factory/kvcache_transfer/test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
import time
1313

1414
import paddle
15-
import rdma_comm
1615
import zmq
1716

17+
import rdma_comm
18+
1819
if paddle.is_compiled_with_xpu():
1920
from custom_setup_ops import get_peer_mem_addr
2021

fastdeploy/engine/common_engine.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -881,8 +881,8 @@ def _schedule_request_to_worker(self):
881881
else:
882882
continue
883883

884-
main_process_metrics.num_requests_waiting.dec(len(tasks))
885-
main_process_metrics.num_requests_running.inc(len(tasks))
884+
main_process_metrics.dec_value("num_requests_waiting", len(tasks))
885+
main_process_metrics.inc_value("num_requests_running", len(tasks))
886886
except Exception as e:
887887
err_msg = f"Error happened while insert task to engine: {e}, {traceback.format_exc()!s}."
888888
self.llm_logger.error(err_msg)
@@ -1010,7 +1010,7 @@ def _fetch_request():
10101010
)
10111011
]
10121012
)
1013-
main_process_metrics.reschedule_req_num.inc()
1013+
main_process_metrics.inc_value("reschedule_req_num")
10141014
need_delete_tasks.append(task)
10151015
continue
10161016
for tmp_task in need_delete_tasks:
@@ -1118,7 +1118,7 @@ def _fetch_request():
11181118
f"preallocated request. req:{task.request_id} "
11191119
)
11201120
self.llm_logger.error(msg)
1121-
main_process_metrics.reschedule_req_num.inc()
1121+
main_process_metrics.inc_value("reschedule_req_num")
11221122
self.scheduler.put_results(
11231123
[
11241124
RequestOutput(
@@ -1325,7 +1325,7 @@ def _insert_zmq_task_to_scheduler(self):
13251325
request = Request.from_dict(data)
13261326

13271327
request.metrics.scheduler_recv_req_time = time.time()
1328-
main_process_metrics.requests_number.inc()
1328+
main_process_metrics.inc_value("requests_number")
13291329
trace_carrier = data.get("trace_carrier")
13301330
if trace_carrier:
13311331
request_id = get_base_request_id(data["request_id"])
@@ -1388,7 +1388,7 @@ def _insert_zmq_task_to_scheduler(self):
13881388
added_requests.pop(request_id)
13891389

13901390
if failed is None:
1391-
main_process_metrics.num_requests_waiting.inc(1)
1391+
main_process_metrics.inc_value("num_requests_waiting", 1)
13921392
continue
13931393

13941394
self._send_error_response(request_id, failed)
@@ -2084,7 +2084,7 @@ def _process_allocate_resource_requests():
20842084
self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}")
20852085
processed_indices.append(idx)
20862086
is_success = True
2087-
main_process_metrics.decode_preallocated_req_num.inc()
2087+
main_process_metrics.inc_value("decode_preallocated_req_num")
20882088
else:
20892089
if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len):
20902090
self.llm_logger.debug(f"D Resource available, processing task {task.request_id}")
@@ -2161,7 +2161,7 @@ def _process_prefilled_requests():
21612161
else:
21622162
for req_output in ready_request_outputs:
21632163
request_id = req_output.request_id
2164-
main_process_metrics.decode_preallocated_req_num.dec()
2164+
main_process_metrics.dec_value("decode_preallocated_req_num")
21652165
trace_print(LoggingEventName.DECODE_PROCESS_PREFILLED_REQUEST_END, request_id, "")
21662166
if envs.FD_ENABLE_INTERNAL_ADAPTER and not req_output.outputs.token_ids:
21672167
# first token is eos in Prefill, just recycle resource and continue
@@ -2176,7 +2176,7 @@ def _process_prefilled_requests():
21762176
self.llm_logger.warning(
21772177
f"{request_id} prefill failed with msg:{req_output.error_msg}, recycle resource."
21782178
)
2179-
main_process_metrics.failed_recv_first_token_req_num.inc()
2179+
main_process_metrics.inc_value("failed_recv_first_token_req_num")
21802180
self.resource_manager.pre_recycle_resource(request_id)
21812181
if request_id in self.token_processor.tokens_counter:
21822182
del self.token_processor.tokens_counter[request_id]

fastdeploy/engine/resource_manager.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def __init__(
7070
self.real_bsz = 0
7171
self.abort_req_ids_set = set()
7272
llm_logger.info(f"{self.info()}")
73-
main_process_metrics.max_batch_size.set(max_num_seqs)
73+
main_process_metrics.set_value("max_batch_size", max_num_seqs)
7474

7575
def reset_cache_config(self, cfg):
7676
"""
@@ -180,7 +180,7 @@ def _recycle_block_tables(self, task):
180180
ori_number = self.available_block_num()
181181
self.cache_manager.recycle_gpu_blocks(block_tables)
182182
cur_number = self.available_block_num()
183-
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
183+
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
184184
llm_logger.info(f"recycle {req_id} {cur_number - ori_number} blocks.")
185185

186186
def available_batch(self):
@@ -322,14 +322,14 @@ def allocate_resources_for_new_tasks(self, tasks):
322322

323323
# record batch size here
324324
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
325-
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks)
326-
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
327-
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
325+
main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks)
326+
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
327+
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
328328
llm_logger.info(
329329
f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}"
330330
)
331331
llm_logger.info(f"{self.info()}")
332-
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
332+
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
333333

334334
return processed_tasks
335335

@@ -357,9 +357,9 @@ def _record_request_cache_info(self, task, common_block_ids, unique_block_ids, h
357357
task.cache_info = (cache_block_num, no_cache_block_num)
358358

359359
# Report the number of cached tokens to Prometheus metrics
360-
main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens)
361-
main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num)
362-
main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num)
360+
main_process_metrics.inc_value("prefix_cache_token_num", task.num_cached_tokens)
361+
main_process_metrics.inc_value("prefix_gpu_cache_token_num", task.gpu_cache_token_num)
362+
main_process_metrics.inc_value("prefix_cpu_cache_token_num", task.cpu_cache_token_num)
363363

364364
cached_len = len(common_block_ids) * self.cfg.block_size
365365
task.block_tables = common_block_ids + unique_block_ids

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
200200
self.finish_execution_pool = ThreadPoolExecutor(max_workers=1)
201201
self.lock = threading.Lock()
202202
self.to_be_rescheduled_request_id_set = set()
203-
main_process_metrics.max_batch_size.set(max_num_seqs)
203+
main_process_metrics.set_value("max_batch_size", max_num_seqs)
204204

205205
self.using_extend_tables_req_id = set()
206206
self.reuse_block_num_map = dict()
@@ -1466,9 +1466,9 @@ def get_prefix_cached_blocks(self, request: Request):
14661466
request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"]
14671467
request.metrics.prompt_token_ids_len = request.prompt_token_ids_len
14681468

1469-
main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens)
1470-
main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num)
1471-
main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num)
1469+
main_process_metrics.inc_value("prefix_cache_token_num", request.num_computed_tokens)
1470+
main_process_metrics.inc_value("prefix_gpu_cache_token_num", request.metrics.gpu_cache_token_num)
1471+
main_process_metrics.inc_value("prefix_cpu_cache_token_num", request.metrics.cpu_cache_token_num)
14721472

14731473
trace_print(LoggingEventName.PREPARE_PREFIX_CACHE_END, request.request_id, getattr(request, "user", ""))
14741474

@@ -1743,12 +1743,14 @@ def update_metrics(self, verbose=False):
17431743
if task is not None:
17441744
blocks_used_by_tasks.update(getattr(task, "block_tables", []))
17451745
blocks_used_by_tasks.update(getattr(task, "extend_block_tables", []))
1746-
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - len(blocks_used_by_tasks))
1747-
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
1748-
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
1749-
main_process_metrics.num_requests_running.set(num_requests_running)
1750-
main_process_metrics.num_requests_waiting.set(num_requests_waiting)
1751-
main_process_metrics.num_requests_queuing.set(num_requests_queuing)
1746+
main_process_metrics.set_value(
1747+
"available_gpu_block_num", self.total_block_number() - len(blocks_used_by_tasks)
1748+
)
1749+
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
1750+
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
1751+
main_process_metrics.set_value("num_requests_running", num_requests_running)
1752+
main_process_metrics.set_value("num_requests_waiting", num_requests_waiting)
1753+
main_process_metrics.set_value("num_requests_queuing", num_requests_queuing)
17521754
if verbose:
17531755
llm_logger.info(
17541756
f"update metrics: running={num_requests_running}, "

fastdeploy/entrypoints/engine_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,9 +369,9 @@ async def add_requests(self, task):
369369

370370
if "messages" in task:
371371
task["messages"] = None
372-
main_process_metrics.request_params_max_tokens.observe(task["max_tokens"])
373-
main_process_metrics.prompt_tokens_total.inc(input_ids_len)
374-
main_process_metrics.request_prompt_tokens.observe(input_ids_len)
372+
main_process_metrics.obs_value("request_params_max_tokens", task["max_tokens"])
373+
main_process_metrics.inc_value("prompt_tokens_total", input_ids_len)
374+
main_process_metrics.obs_value("request_prompt_tokens", input_ids_len)
375375
except Exception as e:
376376
log_request_error(
377377
message="request[{request_id}] add_requests error: {error}, {traceback}",

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,8 @@ async def chat_completion_stream_generator(
479479
if "trace_carrier" in res:
480480
del res["trace_carrier"]
481481
num_choices -= 1
482-
main_process_metrics.e2e_request_latency.observe(
483-
time.time() - res["metrics"]["request_start_time"]
482+
main_process_metrics.obs_value(
483+
"e2e_request_latency", time.time() - res["metrics"]["request_start_time"]
484484
)
485485
if previous_num_tokens[idx] != max_tokens:
486486
choice.finish_reason = "stop"
@@ -829,8 +829,8 @@ async def _create_chat_completion_choice(
829829
return_completion_token_ids = True
830830

831831
if output is not None and output.get("metrics") and output["metrics"].get("request_start_time"):
832-
main_process_metrics.e2e_request_latency.observe(
833-
time.time() - data.get("metrics").get("request_start_time")
832+
main_process_metrics.obs_value(
833+
"e2e_request_latency", time.time() - data.get("metrics").get("request_start_time")
834834
)
835835
message = ChatMessage(
836836
role="assistant",

0 commit comments

Comments
 (0)