diff --git a/custom_ops/xpu_ops/test/pytest.ini b/custom_ops/xpu_ops/test/pytest.ini index 15438a3cf7f..62d02c1d64e 100644 --- a/custom_ops/xpu_ops/test/pytest.ini +++ b/custom_ops/xpu_ops/test/pytest.ini @@ -40,4 +40,4 @@ addopts = --ignore=test_set_data_ipc.py --ignore=test_read_data_ipc.py --ignore=test_set_get_data_ipc.py - --ignore=test_draft_model_preprocess.py \ No newline at end of file + --ignore=test_draft_model_preprocess.py diff --git a/fastdeploy/cache_manager/cache_metrics.py b/fastdeploy/cache_manager/cache_metrics.py index 2dd3137d328..ade0718d56b 100644 --- a/fastdeploy/cache_manager/cache_metrics.py +++ b/fastdeploy/cache_manager/cache_metrics.py @@ -57,10 +57,10 @@ def _update_history_hit_metrics(self): self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num self.storage_hit_token_ratio = self.total_storage_matched_token_num / self.total_token_num - main_process_metrics.hit_req_rate.set(self.hit_req_ratio) - main_process_metrics.hit_token_rate.set(self.hit_token_ratio) - main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio) - main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio) + main_process_metrics.set_value("hit_req_rate", self.hit_req_ratio) + main_process_metrics.set_value("hit_token_rate", self.hit_token_ratio) + main_process_metrics.set_value("cpu_hit_token_rate", self.cpu_hit_token_ratio) + main_process_metrics.set_value("gpu_hit_token_rate", self.gpu_hit_token_ratio) logger.info( f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}" diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index d28c1e6f6b0..88e3a9e5391 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -131,11 +131,11 @@ def __init__( f"{self.cache_config.bytes_per_token_per_layer / self.config.parallel_config.tensor_parallel_size}" ) - main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks) - main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.available_gpu_resource.set(1.0) + main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks) + main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("available_gpu_resource", 1.0) def _get_kv_cache_shape(self, max_block_num): from fastdeploy.model_executor.layers.attention import get_attention_backend @@ -462,11 +462,11 @@ def update_cache_config(self, cache_config): heapq.heapify(self.gpu_free_block_list) self.node_id_pool = list(range(self.num_gpu_blocks + self.num_cpu_blocks)) - main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks) - main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.available_gpu_resource.set(1.0) + main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks) + main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("available_gpu_resource", 1.0) def can_allocate_gpu_blocks(self, num_blocks: int, try_free_gpu_blocks: bool = True): """ @@ -494,8 +494,8 @@ def allocate_gpu_blocks(self, num_blocks, req_id=None): logger.info( f"req_id:{req_id} allocate_gpu_blocks: {allocated_block_ids}, len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}" ) - main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource) return allocated_block_ids def recycle_gpu_blocks(self, gpu_block_ids, req_id=None): @@ -529,8 +529,8 @@ def recycle_gpu_blocks(self, gpu_block_ids, req_id=None): else: heapq.heappush(self.gpu_free_block_list, gpu_block_ids) logger.debug(f"req_id:{req_id} recycle blocks end") - main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource) def allocate_cpu_blocks(self, num_blocks): """ @@ -2296,9 +2296,9 @@ def reset(self, wait_for_tasks_done=False): # reset metrics self.metrics.reset_metrics() - main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource) def clear_prefix_cache(self): """ diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 64c0058db06..58aac576194 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -893,8 +893,8 @@ def _schedule_request_to_worker(self): else: continue - main_process_metrics.num_requests_waiting.dec(len(tasks)) - main_process_metrics.num_requests_running.inc(len(tasks)) + main_process_metrics.dec_value("num_requests_waiting", len(tasks)) + main_process_metrics.inc_value("num_requests_running", len(tasks)) except Exception as e: err_msg = f"Error happened while insert task to engine: {e}, {traceback.format_exc()!s}." self.llm_logger.error(err_msg) @@ -1022,7 +1022,7 @@ def _fetch_request(): ) ] ) - main_process_metrics.reschedule_req_num.inc() + main_process_metrics.inc_value("reschedule_req_num") need_delete_tasks.append(task) continue for tmp_task in need_delete_tasks: @@ -1130,7 +1130,7 @@ def _fetch_request(): f"preallocated request. req:{task.request_id} " ) self.llm_logger.error(msg) - main_process_metrics.reschedule_req_num.inc() + main_process_metrics.inc_value("reschedule_req_num") self.scheduler.put_results( [ RequestOutput( @@ -1337,7 +1337,7 @@ def _insert_zmq_task_to_scheduler(self): request = Request.from_dict(data) request.metrics.scheduler_recv_req_time = time.time() - main_process_metrics.requests_number.inc() + main_process_metrics.inc_value("requests_number") trace_carrier = data.get("trace_carrier") if trace_carrier: request_id = get_base_request_id(data["request_id"]) @@ -1400,7 +1400,7 @@ def _insert_zmq_task_to_scheduler(self): added_requests.pop(request_id) if failed is None: - main_process_metrics.num_requests_waiting.inc(1) + main_process_metrics.inc_value("num_requests_waiting", 1) continue self._send_error_response(request_id, failed) @@ -2096,7 +2096,7 @@ def _process_allocate_resource_requests(): self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}") processed_indices.append(idx) is_success = True - main_process_metrics.decode_preallocated_req_num.inc() + main_process_metrics.inc_value("decode_preallocated_req_num") else: if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len): self.llm_logger.debug(f"D Resource available, processing task {task.request_id}") @@ -2173,7 +2173,7 @@ def _process_prefilled_requests(): else: for req_output in ready_request_outputs: request_id = req_output.request_id - main_process_metrics.decode_preallocated_req_num.dec() + main_process_metrics.dec_value("decode_preallocated_req_num") trace_print(LoggingEventName.DECODE_PROCESS_PREFILLED_REQUEST_END, request_id, "") if envs.FD_ENABLE_INTERNAL_ADAPTER and not req_output.outputs.token_ids: # first token is eos in Prefill, just recycle resource and continue @@ -2188,7 +2188,7 @@ def _process_prefilled_requests(): self.llm_logger.warning( f"{request_id} prefill failed with msg:{req_output.error_msg}, recycle resource." ) - main_process_metrics.failed_recv_first_token_req_num.inc() + main_process_metrics.inc_value("failed_recv_first_token_req_num") self.resource_manager.pre_recycle_resource(request_id) if request_id in self.token_processor.tokens_counter: del self.token_processor.tokens_counter[request_id] diff --git a/fastdeploy/engine/resource_manager.py b/fastdeploy/engine/resource_manager.py index 98b5d7190bf..3c957ffdef0 100644 --- a/fastdeploy/engine/resource_manager.py +++ b/fastdeploy/engine/resource_manager.py @@ -70,7 +70,7 @@ def __init__( self.real_bsz = 0 self.abort_req_ids_set = set() llm_logger.info(f"{self.info()}") - main_process_metrics.max_batch_size.set(max_num_seqs) + main_process_metrics.set_value("max_batch_size", max_num_seqs) def reset_cache_config(self, cfg): """ @@ -180,7 +180,7 @@ def _recycle_block_tables(self, task): ori_number = self.available_block_num() self.cache_manager.recycle_gpu_blocks(block_tables) cur_number = self.available_block_num() - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) llm_logger.info(f"recycle {req_id} {cur_number - ori_number} blocks.") def available_batch(self): @@ -322,14 +322,14 @@ def allocate_resources_for_new_tasks(self, tasks): # record batch size here num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list]) - main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks) - main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks) + main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) llm_logger.info( f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}" ) llm_logger.info(f"{self.info()}") - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) return processed_tasks @@ -357,9 +357,9 @@ def _record_request_cache_info(self, task, common_block_ids, unique_block_ids, h task.cache_info = (cache_block_num, no_cache_block_num) # Report the number of cached tokens to Prometheus metrics - main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens) - main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num) - main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num) + main_process_metrics.inc_value("prefix_cache_token_num", task.num_cached_tokens) + main_process_metrics.inc_value("prefix_gpu_cache_token_num", task.gpu_cache_token_num) + main_process_metrics.inc_value("prefix_cpu_cache_token_num", task.cpu_cache_token_num) cached_len = len(common_block_ids) * self.cfg.block_size task.block_tables = common_block_ids + unique_block_ids diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 1437ebc850f..846dad4ebd8 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -200,7 +200,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l self.finish_execution_pool = ThreadPoolExecutor(max_workers=1) self.lock = threading.Lock() self.to_be_rescheduled_request_id_set = set() - main_process_metrics.max_batch_size.set(max_num_seqs) + main_process_metrics.set_value("max_batch_size", max_num_seqs) self.using_extend_tables_req_id = set() self.reuse_block_num_map = dict() @@ -1466,9 +1466,9 @@ def get_prefix_cached_blocks(self, request: Request): request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"] request.metrics.prompt_token_ids_len = request.prompt_token_ids_len - main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens) - main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num) - main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num) + main_process_metrics.inc_value("prefix_cache_token_num", request.num_computed_tokens) + main_process_metrics.inc_value("prefix_gpu_cache_token_num", request.metrics.gpu_cache_token_num) + main_process_metrics.inc_value("prefix_cpu_cache_token_num", request.metrics.cpu_cache_token_num) trace_print(LoggingEventName.PREPARE_PREFIX_CACHE_END, request.request_id, getattr(request, "user", "")) @@ -1743,12 +1743,14 @@ def update_metrics(self, verbose=False): if task is not None: blocks_used_by_tasks.update(getattr(task, "block_tables", [])) blocks_used_by_tasks.update(getattr(task, "extend_block_tables", [])) - main_process_metrics.available_gpu_block_num.set(self.total_block_number() - len(blocks_used_by_tasks)) - main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) - main_process_metrics.num_requests_running.set(num_requests_running) - main_process_metrics.num_requests_waiting.set(num_requests_waiting) - main_process_metrics.num_requests_queuing.set(num_requests_queuing) + main_process_metrics.set_value( + "available_gpu_block_num", self.total_block_number() - len(blocks_used_by_tasks) + ) + main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("num_requests_running", num_requests_running) + main_process_metrics.set_value("num_requests_waiting", num_requests_waiting) + main_process_metrics.set_value("num_requests_queuing", num_requests_queuing) if verbose: llm_logger.info( f"update metrics: running={num_requests_running}, " diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index a650ed4ad04..f27656f56bf 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -369,9 +369,9 @@ async def add_requests(self, task): if "messages" in task: task["messages"] = None - main_process_metrics.request_params_max_tokens.observe(task["max_tokens"]) - main_process_metrics.prompt_tokens_total.inc(input_ids_len) - main_process_metrics.request_prompt_tokens.observe(input_ids_len) + main_process_metrics.obs_value("request_params_max_tokens", task["max_tokens"]) + main_process_metrics.inc_value("prompt_tokens_total", input_ids_len) + main_process_metrics.obs_value("request_prompt_tokens", input_ids_len) except Exception as e: log_request_error( message="request[{request_id}] add_requests error: {error}, {traceback}", diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index d6429521f05..18bcad5b61c 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -479,8 +479,8 @@ async def chat_completion_stream_generator( if "trace_carrier" in res: del res["trace_carrier"] num_choices -= 1 - main_process_metrics.e2e_request_latency.observe( - time.time() - res["metrics"]["request_start_time"] + main_process_metrics.obs_value( + "e2e_request_latency", time.time() - res["metrics"]["request_start_time"] ) if previous_num_tokens[idx] != max_tokens: choice.finish_reason = "stop" @@ -829,8 +829,8 @@ async def _create_chat_completion_choice( return_completion_token_ids = True if output is not None and output.get("metrics") and output["metrics"].get("request_start_time"): - main_process_metrics.e2e_request_latency.observe( - time.time() - data.get("metrics").get("request_start_time") + main_process_metrics.obs_value( + "e2e_request_latency", time.time() - data.get("metrics").get("request_start_time") ) message = ChatMessage( role="assistant", diff --git a/fastdeploy/entrypoints/openai/v1/serving_chat.py b/fastdeploy/entrypoints/openai/v1/serving_chat.py index a199df1ae6e..a130c3aa9d6 100644 --- a/fastdeploy/entrypoints/openai/v1/serving_chat.py +++ b/fastdeploy/entrypoints/openai/v1/serving_chat.py @@ -301,8 +301,8 @@ async def _build_stream_response( if request_output.finished: if request_output.metrics and request_output.metrics.request_start_time: - main_process_metrics.e2e_request_latency.observe( - time.time() - request_output.metrics.request_start_time + main_process_metrics.obs_value( + "e2e_request_latency", time.time() - request_output.metrics.request_start_time ) max_tokens = request.max_completion_tokens or request.max_tokens choice_completion_tokens = response_ctx.choice_completion_tokens_dict[output.index] @@ -393,7 +393,9 @@ async def _create_chat_completion_choice( message.reasoning_content = output.reasoning_content message.tool_calls = request_output.accumulate_tool_calls if request_output.accumulate_tool_calls else None if output is not None and request_output.metrics and request_output.metrics.request_start_time: - main_process_metrics.e2e_request_latency.observe(time.time() - request_output.metrics.request_start_time) + main_process_metrics.obs_value( + "e2e_request_latency", time.time() - request_output.metrics.request_start_time + ) if request.return_token_ids: message.prompt_token_ids = request_output.prompt_token_ids diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 8e0344384b6..5fd73e1962b 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -298,6 +298,10 @@ def _validate_split_kv_size(value: int) -> int: # When set to 1, print which op / shape enters the block-wise CUDA Graph # during the capture phase. Defaults to 0 (silent). "FD_BLOCK_WISE_DEBUG": lambda: bool(int(os.getenv("FD_BLOCK_WISE_DEBUG", "0"))), + # Default label values for Prometheus metrics, specified as a JSON dict string. + # When set to a valid JSON dict, metric labels are automatically enabled. + # Example: '{"model_id":"my_model"}' adds model_id label to all metrics. + "FD_DEFAULT_METRIC_LABEL_VALUES": lambda: os.getenv("FD_DEFAULT_METRIC_LABEL_VALUES", "{}"), } diff --git a/fastdeploy/metrics/interface.py b/fastdeploy/metrics/interface.py new file mode 100644 index 00000000000..8c32d4e58cb --- /dev/null +++ b/fastdeploy/metrics/interface.py @@ -0,0 +1,72 @@ +""" +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +""" +MetricsManagerInterface provides a unified interface for metric operations. +When FD_DEFAULT_METRIC_LABEL_VALUES is set to a valid JSON dict, metric labels +(e.g. model_id) are automatically applied. Otherwise, operations fall back to +the raw prometheus_client calls. +""" + +from abc import ABC, abstractmethod + + +class MetricsManagerInterface(ABC): + """Abstract base class that defines the unified metrics interface.""" + + @abstractmethod + def set_value(self, name: str, value, labelvalues: dict = None): + """Set a Gauge metric to the given value. + + Args: + name: The attribute name of the metric on the MetricsManager. + value: The value to set. + labelvalues: Optional dict of label key-value pairs. + """ + raise NotImplementedError + + @abstractmethod + def inc_value(self, name: str, value=1, labelvalues: dict = None): + """Increment a Counter or Gauge metric by the given value. + + Args: + name: The attribute name of the metric on the MetricsManager. + value: The amount to increment by (default 1). + labelvalues: Optional dict of label key-value pairs. + """ + raise NotImplementedError + + @abstractmethod + def dec_value(self, name: str, value=1, labelvalues: dict = None): + """Decrement a Gauge metric by the given value. + + Args: + name: The attribute name of the metric on the MetricsManager. + value: The amount to decrement by (default 1). + labelvalues: Optional dict of label key-value pairs. + """ + raise NotImplementedError + + @abstractmethod + def obs_value(self, name: str, value, labelvalues: dict = None): + """Observe a value on a Histogram metric. + + Args: + name: The attribute name of the metric on the MetricsManager. + value: The value to observe. + labelvalues: Optional dict of label key-value pairs. + """ + raise NotImplementedError diff --git a/fastdeploy/metrics/metrics.py b/fastdeploy/metrics/metrics.py index 0daa36ad58a..7cb95afbf76 100644 --- a/fastdeploy/metrics/metrics.py +++ b/fastdeploy/metrics/metrics.py @@ -17,6 +17,8 @@ """ metrics """ +import copy +import json import os from typing import Set @@ -32,11 +34,13 @@ from fastdeploy import envs from fastdeploy.metrics import build_1_2_5_buckets +from fastdeploy.metrics.interface import MetricsManagerInterface from fastdeploy.metrics.prometheus_multiprocess_setup import ( setup_multiprocess_prometheus, ) from fastdeploy.metrics.stats import ZMQMetricsStats from fastdeploy.spec_decode import SpecMethod +from fastdeploy.utils import llm_logger class SimpleCollector(Collector): @@ -127,7 +131,7 @@ def get_filtered_metrics() -> str: ] -class MetricsManager: +class MetricsManager(MetricsManagerInterface): """Prometheus Metrics Manager handles all metric updates""" _instance = None @@ -152,7 +156,7 @@ class MetricsManager: spec_decode_num_accepted_tokens_total: "Gauge" spec_decode_num_draft_tokens_total: "Counter" spec_decode_num_emitted_tokens_total: "Gauge" - spec_decode_draft_single_head_acceptance_rate: "list[Gauge]" + spec_decode_draft_single_head_acceptance_rate: "Gauge" prefix_cache_token_num: "Counter" prefix_gpu_cache_token_num: "Counter" @@ -653,21 +657,57 @@ class MetricsManager: }, } + def _patch_labelnames(self, metrics_dict: dict) -> dict: + """When _enable_labels is True, add keys from _default_labelvalues to + labelnames for all metrics. Does not modify the original dict. + + Returns a deep-copied dict with patched kwargs. + """ + if not self._enable_labels: + return metrics_dict + patched = {} + for name, config in metrics_dict.items(): + new_config = copy.deepcopy(config) + kwargs = new_config["kwargs"] + if "labelnames" in kwargs: + for label in self._default_labelvalues: + if label not in kwargs["labelnames"]: + kwargs["labelnames"].append(label) + else: + kwargs["labelnames"] = list(self._default_labelvalues.keys()) + patched[name] = new_config + return patched + def __init__(self): """Initializes the Prometheus metrics and starts the HTTP server if not already initialized.""" + # 解析 FD_DEFAULT_METRIC_LABEL_VALUES + # 当值为合法 JSON dict 且非空时启用 metric labels + try: + self._default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES) + except (json.JSONDecodeError, TypeError): + self._default_labelvalues = {} + self._enable_labels = isinstance(self._default_labelvalues, dict) and len(self._default_labelvalues) > 0 + if self._enable_labels: + llm_logger.info(f"Metric labels are enabled with default values: {self._default_labelvalues}") + # 在模块加载,指标注册先设置Prometheus环境变量 setup_multiprocess_prometheus() + # 用 _patch_labelnames 处理后的副本创建指标,不修改类级别原始 dict + patched_metrics = self._patch_labelnames(self.METRICS) + patched_gauge_metrics = self._patch_labelnames(self.GAUGE_METRICS) + patched_server_metrics = self._patch_labelnames(self.SERVER_METRICS) + # 动态创建所有非 gauge 型指标 - for metric_name, config in self.METRICS.items(): + for metric_name, config in patched_metrics.items(): setattr( self, metric_name, config["type"](config["name"], config["description"], **config["kwargs"]), ) # 动态创建所有 gauge 型指标,统一配置 multiprocess_mode 为 livesum - for metric_name, config in self.GAUGE_METRICS.items(): + for metric_name, config in patched_gauge_metrics.items(): kwargs = config["kwargs"].copy() if "multiprocess_mode" not in kwargs: kwargs["multiprocess_mode"] = "livesum" @@ -677,13 +717,64 @@ def __init__(self): config["type"](config["name"], config["description"], **kwargs), ) # 动态创建server metrics - for metric_name, config in self.SERVER_METRICS.items(): + for metric_name, config in patched_server_metrics.items(): setattr( self, metric_name, config["type"](config["name"], config["description"], **config["kwargs"]), ) + def _get_metric_and_labels(self, name: str, labelvalues: dict = None): + """Get the metric object and merged labelvalues. + + When _enable_labels is True, returns (metric, merged_labels) where + merged_labels is the union of _default_labelvalues and caller-provided + labelvalues. When False but caller provides labelvalues (for metrics + with their own labelnames like spec_decode_draft_single_head_acceptance_rate), + returns (metric, labelvalues). Otherwise returns (metric, None). + """ + metric = getattr(self, name) + if not self._enable_labels: + if labelvalues: + return metric, labelvalues + return metric, None + merged = dict(self._default_labelvalues) + if labelvalues: + merged.update(labelvalues) + return metric, merged + + def set_value(self, name: str, value, labelvalues: dict = None): + """Set a Gauge metric to the given value.""" + metric, merged = self._get_metric_and_labels(name, labelvalues) + if merged is not None: + metric.labels(**merged).set(value) + else: + metric.set(value) + + def inc_value(self, name: str, value=1, labelvalues: dict = None): + """Increment a Counter or Gauge metric by the given value.""" + metric, merged = self._get_metric_and_labels(name, labelvalues) + if merged is not None: + metric.labels(**merged).inc(value) + else: + metric.inc(value) + + def dec_value(self, name: str, value=1, labelvalues: dict = None): + """Decrement a Gauge metric by the given value.""" + metric, merged = self._get_metric_and_labels(name, labelvalues) + if merged is not None: + metric.labels(**merged).dec(value) + else: + metric.dec(value) + + def obs_value(self, name: str, value, labelvalues: dict = None): + """Observe a value on a Histogram metric.""" + metric, merged = self._get_metric_and_labels(name, labelvalues) + if merged is not None: + metric.labels(**merged).observe(value) + else: + metric.observe(value) + def _init_speculative_metrics(self, speculative_method, num_speculative_tokens): self.SPECULATIVE_METRICS = { "spec_decode_draft_acceptance_rate": { @@ -719,41 +810,33 @@ def _init_speculative_metrics(self, speculative_method, num_speculative_tokens): "kwargs": {}, } self.SPECULATIVE_METRICS["spec_decode_draft_single_head_acceptance_rate"] = { - "type": list[Gauge], + "type": Gauge, "name": "fastdeploy:spec_decode_draft_single_head_acceptance_rate", "description": "Single head acceptance rate of speculative decoding", - "kwargs": {}, + "kwargs": {"labelnames": ["head"]}, } - for metric_name, config in self.SPECULATIVE_METRICS.items(): - if metric_name == "spec_decode_draft_single_head_acceptance_rate": - gauges = [] - for i in range(num_speculative_tokens): - gauges.append( - Gauge( - f"{config['name']}_{i}", - f"{config['description']} (head {i})", - multiprocess_mode="livesum", - ) - ) - setattr(self, metric_name, gauges) - else: - # For Gauge metrics, automatically add multiprocess_mode="livesum" - kwargs = config["kwargs"].copy() - if config["type"] == Gauge and "multiprocess_mode" not in kwargs: - kwargs["multiprocess_mode"] = "livesum" - setattr( - self, - metric_name, - config["type"]( - config["name"], - config["description"], - **kwargs, - ), - ) + + patched_spec_metrics = self._patch_labelnames(self.SPECULATIVE_METRICS) + + for metric_name, config in patched_spec_metrics.items(): + # For Gauge metrics, automatically add multiprocess_mode="livesum" + kwargs = config["kwargs"].copy() + if config["type"] == Gauge and "multiprocess_mode" not in kwargs: + kwargs["multiprocess_mode"] = "livesum" + setattr( + self, + metric_name, + config["type"]( + config["name"], + config["description"], + **kwargs, + ), + ) def init_zmq_metrics(self): - # 动态创建所有指标 - for metric_name, config in self.ZMQ_METRICS.items(): + # 用 _patch_labelnames 处理 ZMQ_METRICS dict 后再创建指标 + patched_zmq_metrics = self._patch_labelnames(self.ZMQ_METRICS) + for metric_name, config in patched_zmq_metrics.items(): setattr( self, metric_name, @@ -769,44 +852,62 @@ def record_zmq_stats(self, zmq_metrics_stats: ZMQMetricsStats, address: str = "u if not self._collect_zmq_metrics: return + # 构建 zmq labelvalues: address + _default_labelvalues + zmq_labels = dict() + if self._enable_labels: + zmq_labels.update(self._default_labelvalues) + zmq_labels.update({"address": address}) + # 记录zmq统计信息 - self.msg_send_total.labels(address=address).inc(zmq_metrics_stats.msg_send_total) - self.msg_send_failed_total.labels(address=address).inc(zmq_metrics_stats.msg_send_failed_total) - self.msg_bytes_send_total.labels(address=address).inc(zmq_metrics_stats.msg_bytes_send_total) - self.msg_recv_total.labels(address=address).inc(zmq_metrics_stats.msg_recv_total) - self.msg_bytes_recv_total.labels(address=address).inc(zmq_metrics_stats.msg_bytes_recv_total) + self.msg_send_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_send_total) + self.msg_send_failed_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_send_failed_total) + self.msg_bytes_send_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_bytes_send_total) + self.msg_recv_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_recv_total) + self.msg_bytes_recv_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_bytes_recv_total) if zmq_metrics_stats.zmq_latency > 0.0: # trans to millisecond - self.zmq_latency.labels(address=address).observe(zmq_metrics_stats.zmq_latency * 1000) + self.zmq_latency.labels(**zmq_labels).observe(zmq_metrics_stats.zmq_latency * 1000) def set_cache_config_info(self, obj) -> None: + metrics_info = obj.metrics_info() + if hasattr(self, "cache_config_info") and isinstance(self.cache_config_info, Gauge): - metrics_info = obj.metrics_info() if metrics_info: - self.cache_config_info.labels(**metrics_info).set(1) + # 合并 default labelvalues + merged = dict() + if self._enable_labels: + merged.update(self._default_labelvalues) + merged.update(metrics_info) + self.cache_config_info.labels(**merged).set(1) return - metrics_info = obj.metrics_info() if not metrics_info: return + # 动态创建 cache_config_info gauge,追加 default labelvalues 的 labelnames + labelnames = list(metrics_info.keys()) + if self._enable_labels: + for label in self._default_labelvalues: + if label not in labelnames: + labelnames.append(label) + self.cache_config_info = Gauge( name="fastdeploy:cache_config_info", documentation="Information of the engine's CacheConfig", - labelnames=list(metrics_info.keys()), + labelnames=labelnames, multiprocess_mode="mostrecent", ) - self.cache_config_info.labels(**metrics_info).set(1) + merged = dict() + if self._enable_labels: + merged.update(self._default_labelvalues) + merged.update(metrics_info) # Priority: metrics_info > default + self.cache_config_info.labels(**merged).set(1) def register_speculative_metrics(self, registry: CollectorRegistry): """Register all speculative metrics to the specified registry""" for metric_name in self.SPECULATIVE_METRICS: - if metric_name == "spec_decode_draft_single_head_acceptance_rate": - for gauge in getattr(self, metric_name): - registry.register(gauge) - else: - registry.register(getattr(self, metric_name)) + registry.register(getattr(self, metric_name)) def re_register_speculative_gauge(self, registry: CollectorRegistry): """Re-register gauge metrics from SPECULATIVE_METRICS to the specified registry""" @@ -815,10 +916,7 @@ def re_register_speculative_gauge(self, registry: CollectorRegistry): if not hasattr(self, "spec_decode_draft_acceptance_rate"): return for metric_name, config in self.SPECULATIVE_METRICS.items(): - if metric_name == "spec_decode_draft_single_head_acceptance_rate": - for gauge in getattr(self, metric_name): - registry.register(gauge) - elif config["type"] == Gauge: + if config["type"] == Gauge: registry.register(getattr(self, metric_name)) def re_register_gauge(self, registry: CollectorRegistry): @@ -850,7 +948,7 @@ def get_excluded_metrics(self) -> Set[str]: # Also add gauge metrics from SPECULATIVE_METRICS (if initialized) if hasattr(self, "SPECULATIVE_METRICS"): for config in self.SPECULATIVE_METRICS.values(): - if config["type"] == Gauge or config["type"] == list[Gauge]: + if config["type"] == Gauge: excluded.add(config["name"]) return excluded diff --git a/fastdeploy/metrics/metrics_middleware.py b/fastdeploy/metrics/metrics_middleware.py index 1ab1198e600..883218f2eb9 100644 --- a/fastdeploy/metrics/metrics_middleware.py +++ b/fastdeploy/metrics/metrics_middleware.py @@ -52,7 +52,11 @@ async def dispatch(self, request: Request, call_next): process_time = end_time - start_time # record http metrics - main_process_metrics.http_requests_total.labels(method=method, path=path, status_code=status_code).inc() - main_process_metrics.http_request_duration_seconds.labels(method=method, path=path).observe(process_time) + main_process_metrics.inc_value( + "http_requests_total", labelvalues={"method": method, "path": path, "status_code": status_code} + ) + main_process_metrics.obs_value( + "http_request_duration_seconds", process_time, labelvalues={"method": method, "path": path} + ) return response diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index f0cd22e1309..c8393ac6731 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -287,7 +287,7 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: preempted_count=getattr(task.metrics, "preempted_count", 0), ) - main_process_metrics.request_token_ratio.observe(token_ratio) + main_process_metrics.obs_value("request_token_ratio", token_ratio) llm_logger.info(self.resource_manager.info()) if self.cfg.speculative_config.method: self._compute_speculative_status() @@ -630,13 +630,13 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False num_blocks_used_by_tasks = sum( [len(task.block_tables) if task else 0 for task in self.resource_manager.tasks_list] ) - main_process_metrics.available_gpu_block_num.set( - self.resource_manager.total_block_number() - num_blocks_used_by_tasks + main_process_metrics.set_value( + "available_gpu_block_num", self.resource_manager.total_block_number() - num_blocks_used_by_tasks ) - main_process_metrics.batch_size.set( - self.resource_manager.max_num_seqs - self.resource_manager.available_batch() + main_process_metrics.set_value( + "batch_size", self.resource_manager.max_num_seqs - self.resource_manager.available_batch() ) - main_process_metrics.available_batch_size.set(self.resource_manager.available_batch()) + main_process_metrics.set_value("available_batch_size", self.resource_manager.available_batch()) if task_id in self.tokens_counter: del self.tokens_counter[task_id] @@ -1062,7 +1062,7 @@ def _process_batch_output(self): preempted_count=getattr(task.metrics, "preempted_count", 0), ) - main_process_metrics.request_token_ratio.observe(token_ratio) + main_process_metrics.obs_value("request_token_ratio", token_ratio) if self.cfg.speculative_config.method: self._compute_speculative_status(result) self._record_completion_metrics(task, current_time) @@ -1100,7 +1100,7 @@ def _record_metrics(self, task, current_time, token_ids): """Record all metrics for a task""" if hasattr(task, "last_token_time") and task.last_token_time is not None: token_gen_time = current_time - task.last_token_time - main_process_metrics.time_per_output_token.observe(token_gen_time) + main_process_metrics.obs_value("time_per_output_token", token_gen_time) if self._benchmark_logger: if not hasattr(task, "_itl_samples"): task._itl_samples = [] @@ -1108,16 +1108,18 @@ def _record_metrics(self, task, current_time, token_ids): task.last_token_time = current_time # Record generation metrics - main_process_metrics.generation_tokens_total.inc(len(token_ids)) + main_process_metrics.inc_value("generation_tokens_total", len(token_ids)) def _record_first_token_metrics(self, task, current_time): """Record metrics for first token""" metrics = task.metrics trace_print(LoggingEventName.FIRST_TOKEN_GENERATED, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.DECODE_START, task.request_id, getattr(task, "user", "")) - main_process_metrics.time_to_first_token.observe(current_time - metrics.arrival_time) - main_process_metrics.request_queue_time.observe(metrics.inference_start_time - metrics.preprocess_end_time) - main_process_metrics.request_prefill_time.observe(current_time - metrics.inference_start_time) + main_process_metrics.obs_value("time_to_first_token", current_time - metrics.arrival_time) + main_process_metrics.obs_value( + "request_queue_time", metrics.inference_start_time - metrics.preprocess_end_time + ) + main_process_metrics.obs_value("request_prefill_time", current_time - metrics.inference_start_time) def _record_completion_metrics(self, task, current_time): """Record metrics when request completes""" @@ -1127,7 +1129,7 @@ def _record_completion_metrics(self, task, current_time): if role in ("mixed", "decode"): if metrics.engine_recv_first_token_time: decode_time = current_time - metrics.engine_recv_first_token_time - main_process_metrics.request_decode_time.observe(decode_time) + main_process_metrics.obs_value("request_decode_time", decode_time) trace_print(LoggingEventName.INFERENCE_END, task.request_id, getattr(task, "user", "")) if role == "prefill": @@ -1136,9 +1138,9 @@ def _record_completion_metrics(self, task, current_time): trace_print(LoggingEventName.DECODE_INFERENCE_END, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.POSTPROCESSING_START, task.request_id, getattr(task, "user", "")) - main_process_metrics.request_success_total.inc() - main_process_metrics.request_inference_time.observe(current_time - metrics.inference_start_time) - main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id]) + main_process_metrics.inc_value("request_success_total") + main_process_metrics.obs_value("request_inference_time", current_time - metrics.inference_start_time) + main_process_metrics.obs_value("request_generation_tokens", self.tokens_counter[task.request_id]) if self._benchmark_logger: from fastdeploy.metrics.benchmark_metrics_logger import ( @@ -1173,12 +1175,12 @@ def _record_speculative_decoding_metrics(self, accept_num): if self.num_emitted_tokens == 0: return - main_process_metrics.spec_decode_num_accepted_tokens_total.set(self.num_accepted_tokens) - main_process_metrics.spec_decode_num_emitted_tokens_total.set(self.num_emitted_tokens) + main_process_metrics.set_value("spec_decode_num_accepted_tokens_total", self.num_accepted_tokens) + main_process_metrics.set_value("spec_decode_num_emitted_tokens_total", self.num_emitted_tokens) if self.cfg.speculative_config.method == SpecMethod.NGRAM: - main_process_metrics.spec_decode_draft_acceptance_rate.set( - self.num_accepted_tokens / self.num_emitted_tokens + main_process_metrics.set_value( + "spec_decode_draft_acceptance_rate", self.num_accepted_tokens / self.num_emitted_tokens ) if self.cfg.speculative_config.method == SpecMethod.MTP: @@ -1189,19 +1191,23 @@ def _record_speculative_decoding_metrics(self, accept_num): self.cfg.speculative_config.num_speculative_tokens + 1 ) - main_process_metrics.spec_decode_draft_acceptance_rate.set( - self.num_accepted_tokens / self.num_draft_tokens + main_process_metrics.set_value( + "spec_decode_draft_acceptance_rate", self.num_accepted_tokens / self.num_draft_tokens + ) + main_process_metrics.set_value( + "spec_decode_efficiency", self.num_emitted_tokens / self.max_num_emitted_tokens ) - main_process_metrics.spec_decode_efficiency.set(self.num_emitted_tokens / self.max_num_emitted_tokens) - main_process_metrics.spec_decode_num_draft_tokens_total.inc(num_draft_tokens) + main_process_metrics.inc_value("spec_decode_num_draft_tokens_total", num_draft_tokens) for i in range(1, self.cfg.speculative_config.num_speculative_tokens + 1): if self.accept_token_num_per_head[i - 1] != 0: single_head_acceptance_rate = ( self.accept_token_num_per_head[i] / self.accept_token_num_per_head[i - 1] ) - main_process_metrics.spec_decode_draft_single_head_acceptance_rate[i - 1].set( - single_head_acceptance_rate + main_process_metrics.set_value( + "spec_decode_draft_single_head_acceptance_rate", + single_head_acceptance_rate, + labelvalues={"head": str(i - 1)}, ) def _record_speculative_decoding_accept_num_per_request(self, req_id, accept_num): diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index 8d2091c7d31..6cc4dc930d1 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -177,7 +177,7 @@ def _send_message(self, addr, msg_type: str, payload): self.logger.warning(f"_send_message: Send queue full for {addr}") except Exception as e: self.logger.error(f"_send_message: Send to {addr} failed: {e}, {str(traceback.format_exc())}") - main_process_metrics.send_cache_failed_num.inc() + main_process_metrics.inc_value("send_cache_failed_num") self._close_connection(addr) except Exception as e: self.logger.error(f"_send_message: Message preparation failed: {e}, {traceback.format_exc()}") diff --git a/tests/cache_manager/test_prefix_cache_manager.py b/tests/cache_manager/test_prefix_cache_manager.py index 91eceeb9268..4228892f8d4 100644 --- a/tests/cache_manager/test_prefix_cache_manager.py +++ b/tests/cache_manager/test_prefix_cache_manager.py @@ -79,6 +79,26 @@ def __getattr__(self, name): self.metrics[name] = _DummyMetric() return self.metrics[name] + def set_value(self, name, value, labelvalues=None): + if name not in self.metrics: + self.metrics[name] = _DummyMetric() + self.metrics[name].set(value) + + def inc_value(self, name, value=1, labelvalues=None): + if name not in self.metrics: + self.metrics[name] = _DummyMetric() + self.metrics[name].inc(value) + + def dec_value(self, name, value=1, labelvalues=None): + if name not in self.metrics: + self.metrics[name] = _DummyMetric() + self.metrics[name].dec(value) + + def obs_value(self, name, value, labelvalues=None): + if name not in self.metrics: + self.metrics[name] = _DummyMetric() + self.metrics[name].observe(value) + # IPC signal stub that mirrors the real object's surface area. class _DummyIPCSignal: diff --git a/tests/engine/test_common_engine.py b/tests/engine/test_common_engine.py index a3487133bcb..4b539555a34 100644 --- a/tests/engine/test_common_engine.py +++ b/tests/engine/test_common_engine.py @@ -1312,6 +1312,18 @@ def __init__(self): self.requests_number = Mock(inc=Mock()) self.num_requests_waiting = Mock(inc=Mock()) + def inc_value(self, name, value=1, labelvalues=None): + getattr(self, name).inc(value) + + def dec_value(self, name, value=1, labelvalues=None): + getattr(self, name).dec(value) + + def set_value(self, name, value, labelvalues=None): + getattr(self, name).set(value) + + def obs_value(self, name, value, labelvalues=None): + getattr(self, name).observe(value) + class DummyRecv: def __init__(self): self.calls = 0 @@ -3348,6 +3360,18 @@ def __init__(self): self.requests_number = Mock(inc=Mock()) self.num_requests_waiting = Mock(inc=Mock()) + def inc_value(self, name, value=1, labelvalues=None): + getattr(self, name).inc(value) + + def dec_value(self, name, value=1, labelvalues=None): + getattr(self, name).dec(value) + + def set_value(self, name, value, labelvalues=None): + getattr(self, name).set(value) + + def obs_value(self, name, value, labelvalues=None): + getattr(self, name).observe(value) + with ( patch("fastdeploy.engine.common_engine.envs.ZMQ_SEND_BATCH_DATA", True), patch("fastdeploy.engine.common_engine.envs.FD_ENABLE_INTERNAL_ADAPTER", False), diff --git a/tests/engine/test_resource_manager.py b/tests/engine/test_resource_manager.py index 5dadb9861a6..4604829f673 100644 --- a/tests/engine/test_resource_manager.py +++ b/tests/engine/test_resource_manager.py @@ -107,16 +107,15 @@ def _noop_logger(): def _stub_metrics(): m = SimpleNamespace() - for n in ( - "max_batch_size", - "batch_size", - "available_gpu_block_num", - "gpu_cache_usage_perc", - "prefix_cache_token_num", - "prefix_gpu_cache_token_num", - "prefix_cpu_cache_token_num", - ): - setattr(m, n, SimpleNamespace(set=lambda v: None, inc=lambda v: None)) + m._store = {} + + def _get(name): + if name not in m._store: + m._store[name] = SimpleNamespace(value=None) + return m._store[name] + + m.set_value = lambda name, v, **kw: setattr(_get(name), "value", v) + m.inc_value = lambda name, v=1, **kw: setattr(_get(name), "value", (getattr(_get(name), "value") or 0) + v) return m diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index c4bad6f48af..75839d77cd1 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -67,12 +67,12 @@ def test_speculative_single_head_gauge_returns_single_value_without_pid(self): if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"): main_process_metrics._init_speculative_metrics(SpecMethod.MTP, 2) - metric = main_process_metrics.spec_decode_draft_single_head_acceptance_rate[0] - metric.set(0.6) + main_process_metrics.set_value("spec_decode_draft_single_head_acceptance_rate", 0.6, labelvalues={"head": "0"}) + metric = main_process_metrics.spec_decode_draft_single_head_acceptance_rate result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1000.6) - self._assert_unique_metric_value(result, metric._name, 0.6) + self.assertIn('head="0"', result) if __name__ == "__main__": diff --git a/tests/metrics/test_metrics_middleware.py b/tests/metrics/test_metrics_middleware.py index 27af6ff22f5..7a092467046 100644 --- a/tests/metrics/test_metrics_middleware.py +++ b/tests/metrics/test_metrics_middleware.py @@ -71,12 +71,14 @@ def test_dispatch_successful_request(mock_request, mock_call_next): mock_call_next.assert_called_once_with(mock_request) assert result == mock_response - # 验证指标记录 - mock_metrics.http_requests_total.labels.assert_called_once_with(method="POST", path="/test", status_code=200) - mock_metrics.http_requests_total.labels().inc.assert_called_once() + # 验证指标记录 (now using inc_value/obs_value interface) + mock_metrics.inc_value.assert_called_once_with( + "http_requests_total", labelvalues={"method": "POST", "path": "/test", "status_code": 200} + ) - mock_metrics.http_request_duration_seconds.labels.assert_called_once_with(method="POST", path="/test") - mock_metrics.http_request_duration_seconds.labels().observe.assert_called_once_with(1.5) + mock_metrics.obs_value.assert_called_once_with( + "http_request_duration_seconds", 1.5, labelvalues={"method": "POST", "path": "/test"} + ) def test_dispatch_with_exception(mock_request, mock_call_next): @@ -95,12 +97,14 @@ def test_dispatch_with_exception(mock_request, mock_call_next): with pytest.raises(Exception, match="Test error"): run_async(middleware.dispatch(mock_request, mock_call_next)) - # 验证即使抛出异常也记录了指标 - mock_metrics.http_requests_total.labels.assert_called_once_with(method="GET", path="/error", status_code=500) - mock_metrics.http_requests_total.labels().inc.assert_called_once() + # 验证即使抛出异常也记录了指标 (now using inc_value/obs_value interface) + mock_metrics.inc_value.assert_called_once_with( + "http_requests_total", labelvalues={"method": "GET", "path": "/error", "status_code": 500} + ) - mock_metrics.http_request_duration_seconds.labels.assert_called_once_with(method="GET", path="/error") - mock_metrics.http_request_duration_seconds.labels().observe.assert_called_once_with(2.0) + mock_metrics.obs_value.assert_called_once_with( + "http_request_duration_seconds", 2.0, labelvalues={"method": "GET", "path": "/error"} + ) def test_all_excluded_paths(mock_request, mock_call_next): diff --git a/tests/metrics/test_new_metrics.py b/tests/metrics/test_new_metrics.py index 030acaf4299..fb0ef22190a 100644 --- a/tests/metrics/test_new_metrics.py +++ b/tests/metrics/test_new_metrics.py @@ -43,11 +43,11 @@ def test_cache_metrics_update_history(self, mock_main_process_metrics): # 调用目标方法 metrics._update_history_hit_metrics() - # 断言 Prometheus 指标的 set 方法是否被正确的值调用 - mock_main_process_metrics.hit_req_rate.set.assert_called_once_with(0.5) # 10 / 20 - mock_main_process_metrics.hit_token_rate.set.assert_called_once_with(0.6) # 600 / 1000 - mock_main_process_metrics.cpu_hit_token_rate.set.assert_called_once_with(0.25) # 250 / 1000 - mock_main_process_metrics.gpu_hit_token_rate.set.assert_called_once_with(0.35) # 350 / 1000 + # 断言 Prometheus 指标的 set_value 方法是否被正确的值调用 + mock_main_process_metrics.set_value.assert_any_call("hit_req_rate", 0.5) # 10 / 20 + mock_main_process_metrics.set_value.assert_any_call("hit_token_rate", 0.6) # 600 / 1000 + mock_main_process_metrics.set_value.assert_any_call("cpu_hit_token_rate", 0.25) # 250 / 1000 + mock_main_process_metrics.set_value.assert_any_call("gpu_hit_token_rate", 0.35) # 350 / 1000 print("Test for CacheMetrics passed.") @@ -98,7 +98,7 @@ def test_recycle_resources_updates_metrics(self, mock_main_process_metrics): self.processor._recycle_resources(task_id=task_id, index=index, task=mock_task, result=None, is_prefill=False) # 核心断言:验证 available_batch_size 指标是否被正确设置 - mock_main_process_metrics.available_batch_size.set.assert_called_once_with(8) + mock_main_process_metrics.set_value.assert_any_call("available_batch_size", 8) print("Test for TokenProcessor passed.") diff --git a/tests/output/test_token_processor.py b/tests/output/test_token_processor.py index 4ca70b9a689..b4df3b26159 100644 --- a/tests/output/test_token_processor.py +++ b/tests/output/test_token_processor.py @@ -177,30 +177,42 @@ def observe(self, v): class _Metrics: def __init__(self): - self.spec_decode_num_accepted_tokens_total = _Metric() - self.spec_decode_num_emitted_tokens_total = _Metric() - self.spec_decode_draft_acceptance_rate = _Metric() - self.spec_decode_efficiency = _Metric() - self.spec_decode_num_draft_tokens_total = _Metric() - self.spec_decode_draft_single_head_acceptance_rate = [_Metric() for _ in range(MAX_DRAFT_TOKENS)] - self.time_per_output_token = _Metric() - self.generation_tokens_total = _Metric() - self.time_to_first_token = _Metric() - self.request_queue_time = _Metric() - self.request_prefill_time = _Metric() - self.request_decode_time = _Metric() - self.request_inference_time = _Metric() - self.request_generation_tokens = _Metric() - self.num_requests_running = _Metric() - self.request_success_total = _Metric() - self.available_gpu_block_num = _Metric() - self.batch_size = _Metric() - self.available_batch_size = _Metric() - self.request_token_ratio = _Metric() + self._metric_store = {} + + def _key(self, name, labelvalues=None): + if labelvalues: + return (name, frozenset(labelvalues.items())) + return (name, None) + + def _get_metric(self, name, labelvalues=None): + key = self._key(name, labelvalues) + if key not in self._metric_store: + self._metric_store[key] = _Metric() + return self._metric_store[key] + + def __getattr__(self, name): + if name.startswith("_"): + raise AttributeError(name) + return self._get_metric(name) + + def __hasattr__(self, name): + return any(k[0] == name for k in self._metric_store) def _init_speculative_metrics(self, method, num_speculative_tokens): return None + def set_value(self, name, value, labelvalues=None): + self._get_metric(name, labelvalues).set(value) + + def inc_value(self, name, value=1, labelvalues=None): + self._get_metric(name, labelvalues).inc(value) + + def dec_value(self, name, value=1, labelvalues=None): + self._get_metric(name, labelvalues).dec(value) + + def obs_value(self, name, value, labelvalues=None): + self._get_metric(name, labelvalues).observe(value) + def test_init_allocates_expected_buffers(): processor, _, _, _ = _make_processor() @@ -547,7 +559,12 @@ def test_record_speculative_decoding_metrics_tracks_acceptance(): assert metrics.spec_decode_num_emitted_tokens_total.value == 5 assert pytest.approx(metrics.spec_decode_draft_acceptance_rate.value) == 0.75 assert pytest.approx(metrics.spec_decode_efficiency.value) == pytest.approx(5 / 6) - assert pytest.approx(metrics.spec_decode_draft_single_head_acceptance_rate[0].value) == 1.5 + assert ( + pytest.approx( + metrics._get_metric("spec_decode_draft_single_head_acceptance_rate", labelvalues={"head": "0"}).value + ) + == 1.5 + ) def test_recycle_resources_prefill_sends_first_token(): @@ -1111,16 +1128,46 @@ def test_record_speculative_metrics_calls_init_when_missing(): class _MinimalMetrics: def __init__(self): + self._metric_store = {} + self._created_attrs = set() self.init_called = False + def _get_metric(self, name, labelvalues=None): + key = (name, frozenset(labelvalues.items()) if labelvalues else None) + if key not in self._metric_store: + self._metric_store[key] = _Metric() + return self._metric_store[key] + + def __getattr__(self, name): + if name.startswith("_"): + raise AttributeError(name) + if name not in self._created_attrs: + raise AttributeError(name) + return self._get_metric(name) + def _init_speculative_metrics(self, method, num_speculative_tokens): - self.spec_decode_num_accepted_tokens_total = _Metric() - self.spec_decode_num_emitted_tokens_total = _Metric() - self.spec_decode_draft_acceptance_rate = _Metric() - self.spec_decode_efficiency = _Metric() - self.spec_decode_num_draft_tokens_total = _Metric() - self.spec_decode_draft_single_head_acceptance_rate = [_Metric() for _ in range(MAX_DRAFT_TOKENS)] self.init_called = True + for n in ( + "spec_decode_num_accepted_tokens_total", + "spec_decode_num_emitted_tokens_total", + "spec_decode_draft_acceptance_rate", + "spec_decode_efficiency", + "spec_decode_num_draft_tokens_total", + "spec_decode_draft_single_head_acceptance_rate", + ): + self._created_attrs.add(n) + + def set_value(self, name, value, labelvalues=None): + self._get_metric(name, labelvalues).set(value) + + def inc_value(self, name, value=1, labelvalues=None): + self._get_metric(name, labelvalues).inc(value) + + def dec_value(self, name, value=1, labelvalues=None): + self._get_metric(name, labelvalues).dec(value) + + def obs_value(self, name, value, labelvalues=None): + self._get_metric(name, labelvalues).observe(value) processor.accept_token_num_per_head = [1, 1] + [0] * (MAX_DRAFT_TOKENS - 2) processor.num_accepted_tokens = 2