Skip to content
2 changes: 1 addition & 1 deletion custom_ops/xpu_ops/test/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ addopts =
--ignore=test_set_data_ipc.py
--ignore=test_read_data_ipc.py
--ignore=test_set_get_data_ipc.py
--ignore=test_draft_model_preprocess.py
--ignore=test_draft_model_preprocess.py
8 changes: 4 additions & 4 deletions fastdeploy/cache_manager/cache_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ def _update_history_hit_metrics(self):
self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num
self.storage_hit_token_ratio = self.total_storage_matched_token_num / self.total_token_num

main_process_metrics.hit_req_rate.set(self.hit_req_ratio)
main_process_metrics.hit_token_rate.set(self.hit_token_ratio)
main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio)
main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio)
main_process_metrics.set_value("hit_req_rate", self.hit_req_ratio)
main_process_metrics.set_value("hit_token_rate", self.hit_token_ratio)
main_process_metrics.set_value("cpu_hit_token_rate", self.cpu_hit_token_ratio)
main_process_metrics.set_value("gpu_hit_token_rate", self.gpu_hit_token_ratio)

logger.info(
f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}"
Expand Down
34 changes: 17 additions & 17 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ def __init__(
f"{self.cache_config.bytes_per_token_per_layer / self.config.parallel_config.tensor_parallel_size}"
)

main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks)
main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_resource.set(1.0)
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks)
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_resource", 1.0)

def _get_kv_cache_shape(self, max_block_num):
from fastdeploy.model_executor.layers.attention import get_attention_backend
Expand Down Expand Up @@ -462,11 +462,11 @@ def update_cache_config(self, cache_config):
heapq.heapify(self.gpu_free_block_list)
self.node_id_pool = list(range(self.num_gpu_blocks + self.num_cpu_blocks))

main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks)
main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_resource.set(1.0)
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks)
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_resource", 1.0)

def can_allocate_gpu_blocks(self, num_blocks: int, try_free_gpu_blocks: bool = True):
"""
Expand Down Expand Up @@ -494,8 +494,8 @@ def allocate_gpu_blocks(self, num_blocks, req_id=None):
logger.info(
f"req_id:{req_id} allocate_gpu_blocks: {allocated_block_ids}, len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}"
)
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
return allocated_block_ids

def recycle_gpu_blocks(self, gpu_block_ids, req_id=None):
Expand Down Expand Up @@ -529,8 +529,8 @@ def recycle_gpu_blocks(self, gpu_block_ids, req_id=None):
else:
heapq.heappush(self.gpu_free_block_list, gpu_block_ids)
logger.debug(f"req_id:{req_id} recycle blocks end")
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)

def allocate_cpu_blocks(self, num_blocks):
"""
Expand Down Expand Up @@ -2296,9 +2296,9 @@ def reset(self, wait_for_tasks_done=False):

# reset metrics
self.metrics.reset_metrics()
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)

def clear_prefix_cache(self):
"""
Expand Down
18 changes: 9 additions & 9 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,8 @@ def _schedule_request_to_worker(self):
else:
continue

main_process_metrics.num_requests_waiting.dec(len(tasks))
main_process_metrics.num_requests_running.inc(len(tasks))
main_process_metrics.dec_value("num_requests_waiting", len(tasks))
main_process_metrics.inc_value("num_requests_running", len(tasks))
except Exception as e:
err_msg = f"Error happened while insert task to engine: {e}, {traceback.format_exc()!s}."
self.llm_logger.error(err_msg)
Expand Down Expand Up @@ -1022,7 +1022,7 @@ def _fetch_request():
)
]
)
main_process_metrics.reschedule_req_num.inc()
main_process_metrics.inc_value("reschedule_req_num")
need_delete_tasks.append(task)
continue
for tmp_task in need_delete_tasks:
Expand Down Expand Up @@ -1130,7 +1130,7 @@ def _fetch_request():
f"preallocated request. req:{task.request_id} "
)
self.llm_logger.error(msg)
main_process_metrics.reschedule_req_num.inc()
main_process_metrics.inc_value("reschedule_req_num")
self.scheduler.put_results(
[
RequestOutput(
Expand Down Expand Up @@ -1337,7 +1337,7 @@ def _insert_zmq_task_to_scheduler(self):
request = Request.from_dict(data)

request.metrics.scheduler_recv_req_time = time.time()
main_process_metrics.requests_number.inc()
main_process_metrics.inc_value("requests_number")
trace_carrier = data.get("trace_carrier")
if trace_carrier:
request_id = get_base_request_id(data["request_id"])
Expand Down Expand Up @@ -1400,7 +1400,7 @@ def _insert_zmq_task_to_scheduler(self):
added_requests.pop(request_id)

if failed is None:
main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.inc_value("num_requests_waiting", 1)
continue

self._send_error_response(request_id, failed)
Expand Down Expand Up @@ -2096,7 +2096,7 @@ def _process_allocate_resource_requests():
self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}")
processed_indices.append(idx)
is_success = True
main_process_metrics.decode_preallocated_req_num.inc()
main_process_metrics.inc_value("decode_preallocated_req_num")
else:
if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len):
self.llm_logger.debug(f"D Resource available, processing task {task.request_id}")
Expand Down Expand Up @@ -2173,7 +2173,7 @@ def _process_prefilled_requests():
else:
for req_output in ready_request_outputs:
request_id = req_output.request_id
main_process_metrics.decode_preallocated_req_num.dec()
main_process_metrics.dec_value("decode_preallocated_req_num")
trace_print(LoggingEventName.DECODE_PROCESS_PREFILLED_REQUEST_END, request_id, "")
if envs.FD_ENABLE_INTERNAL_ADAPTER and not req_output.outputs.token_ids:
# first token is eos in Prefill, just recycle resource and continue
Expand All @@ -2188,7 +2188,7 @@ def _process_prefilled_requests():
self.llm_logger.warning(
f"{request_id} prefill failed with msg:{req_output.error_msg}, recycle resource."
)
main_process_metrics.failed_recv_first_token_req_num.inc()
main_process_metrics.inc_value("failed_recv_first_token_req_num")
self.resource_manager.pre_recycle_resource(request_id)
if request_id in self.token_processor.tokens_counter:
del self.token_processor.tokens_counter[request_id]
Expand Down
18 changes: 9 additions & 9 deletions fastdeploy/engine/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(
self.real_bsz = 0
self.abort_req_ids_set = set()
llm_logger.info(f"{self.info()}")
main_process_metrics.max_batch_size.set(max_num_seqs)
main_process_metrics.set_value("max_batch_size", max_num_seqs)

def reset_cache_config(self, cfg):
"""
Expand Down Expand Up @@ -180,7 +180,7 @@ def _recycle_block_tables(self, task):
ori_number = self.available_block_num()
self.cache_manager.recycle_gpu_blocks(block_tables)
cur_number = self.available_block_num()
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
llm_logger.info(f"recycle {req_id} {cur_number - ori_number} blocks.")

def available_batch(self):
Expand Down Expand Up @@ -322,14 +322,14 @@ def allocate_resources_for_new_tasks(self, tasks):

# record batch size here
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
llm_logger.info(
f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}"
)
llm_logger.info(f"{self.info()}")
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())

return processed_tasks

Expand Down Expand Up @@ -357,9 +357,9 @@ def _record_request_cache_info(self, task, common_block_ids, unique_block_ids, h
task.cache_info = (cache_block_num, no_cache_block_num)

# Report the number of cached tokens to Prometheus metrics
main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num)
main_process_metrics.inc_value("prefix_cache_token_num", task.num_cached_tokens)
main_process_metrics.inc_value("prefix_gpu_cache_token_num", task.gpu_cache_token_num)
main_process_metrics.inc_value("prefix_cpu_cache_token_num", task.cpu_cache_token_num)

cached_len = len(common_block_ids) * self.cfg.block_size
task.block_tables = common_block_ids + unique_block_ids
Expand Down
22 changes: 12 additions & 10 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
self.finish_execution_pool = ThreadPoolExecutor(max_workers=1)
self.lock = threading.Lock()
self.to_be_rescheduled_request_id_set = set()
main_process_metrics.max_batch_size.set(max_num_seqs)
main_process_metrics.set_value("max_batch_size", max_num_seqs)

self.using_extend_tables_req_id = set()
self.reuse_block_num_map = dict()
Expand Down Expand Up @@ -1466,9 +1466,9 @@ def get_prefix_cached_blocks(self, request: Request):
request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"]
request.metrics.prompt_token_ids_len = request.prompt_token_ids_len

main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num)
main_process_metrics.inc_value("prefix_cache_token_num", request.num_computed_tokens)
main_process_metrics.inc_value("prefix_gpu_cache_token_num", request.metrics.gpu_cache_token_num)
main_process_metrics.inc_value("prefix_cpu_cache_token_num", request.metrics.cpu_cache_token_num)

trace_print(LoggingEventName.PREPARE_PREFIX_CACHE_END, request.request_id, getattr(request, "user", ""))

Expand Down Expand Up @@ -1743,12 +1743,14 @@ def update_metrics(self, verbose=False):
if task is not None:
blocks_used_by_tasks.update(getattr(task, "block_tables", []))
blocks_used_by_tasks.update(getattr(task, "extend_block_tables", []))
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - len(blocks_used_by_tasks))
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.num_requests_running.set(num_requests_running)
main_process_metrics.num_requests_waiting.set(num_requests_waiting)
main_process_metrics.num_requests_queuing.set(num_requests_queuing)
main_process_metrics.set_value(
"available_gpu_block_num", self.total_block_number() - len(blocks_used_by_tasks)
)
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("num_requests_running", num_requests_running)
main_process_metrics.set_value("num_requests_waiting", num_requests_waiting)
main_process_metrics.set_value("num_requests_queuing", num_requests_queuing)
if verbose:
llm_logger.info(
f"update metrics: running={num_requests_running}, "
Expand Down
6 changes: 3 additions & 3 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,9 @@ async def add_requests(self, task):

if "messages" in task:
task["messages"] = None
main_process_metrics.request_params_max_tokens.observe(task["max_tokens"])
main_process_metrics.prompt_tokens_total.inc(input_ids_len)
main_process_metrics.request_prompt_tokens.observe(input_ids_len)
main_process_metrics.obs_value("request_params_max_tokens", task["max_tokens"])
main_process_metrics.inc_value("prompt_tokens_total", input_ids_len)
main_process_metrics.obs_value("request_prompt_tokens", input_ids_len)
except Exception as e:
log_request_error(
message="request[{request_id}] add_requests error: {error}, {traceback}",
Expand Down
8 changes: 4 additions & 4 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ async def chat_completion_stream_generator(
if "trace_carrier" in res:
del res["trace_carrier"]
num_choices -= 1
main_process_metrics.e2e_request_latency.observe(
time.time() - res["metrics"]["request_start_time"]
main_process_metrics.obs_value(
"e2e_request_latency", time.time() - res["metrics"]["request_start_time"]
)
if previous_num_tokens[idx] != max_tokens:
choice.finish_reason = "stop"
Expand Down Expand Up @@ -829,8 +829,8 @@ async def _create_chat_completion_choice(
return_completion_token_ids = True

if output is not None and output.get("metrics") and output["metrics"].get("request_start_time"):
main_process_metrics.e2e_request_latency.observe(
time.time() - data.get("metrics").get("request_start_time")
main_process_metrics.obs_value(
"e2e_request_latency", time.time() - data.get("metrics").get("request_start_time")
)
message = ChatMessage(
role="assistant",
Expand Down
8 changes: 5 additions & 3 deletions fastdeploy/entrypoints/openai/v1/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ async def _build_stream_response(

if request_output.finished:
if request_output.metrics and request_output.metrics.request_start_time:
main_process_metrics.e2e_request_latency.observe(
time.time() - request_output.metrics.request_start_time
main_process_metrics.obs_value(
"e2e_request_latency", time.time() - request_output.metrics.request_start_time
)
max_tokens = request.max_completion_tokens or request.max_tokens
choice_completion_tokens = response_ctx.choice_completion_tokens_dict[output.index]
Expand Down Expand Up @@ -393,7 +393,9 @@ async def _create_chat_completion_choice(
message.reasoning_content = output.reasoning_content
message.tool_calls = request_output.accumulate_tool_calls if request_output.accumulate_tool_calls else None
if output is not None and request_output.metrics and request_output.metrics.request_start_time:
main_process_metrics.e2e_request_latency.observe(time.time() - request_output.metrics.request_start_time)
main_process_metrics.obs_value(
"e2e_request_latency", time.time() - request_output.metrics.request_start_time
)

if request.return_token_ids:
message.prompt_token_ids = request_output.prompt_token_ids
Expand Down
4 changes: 4 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ def _validate_split_kv_size(value: int) -> int:
# When set to 1, print which op / shape enters the block-wise CUDA Graph
# during the capture phase. Defaults to 0 (silent).
"FD_BLOCK_WISE_DEBUG": lambda: bool(int(os.getenv("FD_BLOCK_WISE_DEBUG", "0"))),
# Default label values for Prometheus metrics, specified as a JSON dict string.
# When set to a valid JSON dict, metric labels are automatically enabled.
# Example: '{"model_id":"my_model"}' adds model_id label to all metrics.
"FD_DEFAULT_METRIC_LABEL_VALUES": lambda: os.getenv("FD_DEFAULT_METRIC_LABEL_VALUES", "{}"),
}


Expand Down
Loading
Loading