1616from .batch import Batch , Req
1717from .model_infer .model_rpc import start_model_process , ModelRpcClient
1818from .req_queue import build_req_queue
19+ from .stats import SystemStatusReporter
1920from lightllm .server .core .objs .io_objs import (
2021 GroupReqIndexes ,
2122 AbortedReqCmd ,
2526from .dynamic_prompt .radix_cache import RadixCacheReadOnlyClient
2627from lightllm .server .multi_level_kv_cache .cpu_cache_client import CpuKvCacheClient
2728from lightllm .server .core .objs .shm_objs_io_buffer import ShmObjsIOBuffer
28- from lightllm .utils .log_utils import init_logger , log_time_ready
29+ from lightllm .utils .log_utils import init_logger
2930from lightllm .server .router .token_load import TokenLoad
3031from lightllm .server .metrics .manager import MetricClient
3132from lightllm .common .basemodel .infer_lock import g_router_lock
@@ -65,6 +66,7 @@ def __init__(self, args: StartArgs):
6566 self .read_only_statics_mem_manager = ReadOnlyStaticsMemoryManager ()
6667 # 初始化 radix_cache_client 用于读取 prompt cache 的管理信息
6768 self .radix_cache_client = None
69+ self .status_reporter = None
6870
6971 # 共享变量,用于存储router端调度分析得到的机器负载信息
7072 self .shared_token_load = TokenLoad (f"{ get_unique_server_name ()} _shared_token_load" , self .dp_size_in_node )
@@ -194,6 +196,11 @@ async def wait_to_model_ready(self):
194196 )
195197 self .req_queue = build_req_queue (self .args , self , self .dp_size_in_node )
196198 logger .info (f"use req queue { self .req_queue .__class__ .__name__ } " )
199+ self .status_reporter = SystemStatusReporter (
200+ args = self .args ,
201+ max_total_token_num = self .max_total_token_num ,
202+ dp_size_in_node = self .dp_size_in_node ,
203+ )
197204
198205 if self .args .run_mode == "prefill" :
199206 # 启动 prefill kv move 管理进程
@@ -239,26 +246,11 @@ async def loop_for_fwd(
239246 await self ._step ()
240247 counter_count += 1
241248 if self .running_batch is not None :
249+ # Count output tokens (each running req produces ~1 token per decode step)
250+ self .status_reporter .count_output_tokens (len (self .running_batch .reqs ))
242251 if counter_count % 100 == 0 :
243252 for dp_index in range (self .dp_size_in_node ):
244- token_ratio1 = self .get_used_tokens (dp_index ) / self .max_total_token_num
245- token_ratio2 = (
246- self .max_total_token_num
247- - self .read_only_statics_mem_manager .get_unrefed_token_num (dp_index )
248- ) / self .max_total_token_num
249- d_i = dp_index
250- frozen_token_num = self .shared_token_load .get_frozened_token_count (d_i )
251- estimated_peak_token_count = self .shared_token_load .get_estimated_peak_token_count (d_i )
252- paused_req_num = self ._get_paused_req_num_in_dp_index (dp_index = d_i )
253- logger .debug (
254- f"dp_i { d_i } current batch size: { len (self .running_batch .reqs )} \n "
255- f"dp_i { d_i } paused req num: { paused_req_num } \n "
256- f"dp_i { d_i } frozen token num: { frozen_token_num } \n "
257- f"dp_i { d_i } estimated_peak_token_count: { estimated_peak_token_count } \n "
258- f"dp_i { d_i } token used ratio: { token_ratio1 } not contain prompt cache tree unrefed token\n "
259- f"dp_i { d_i } token used ratio: { token_ratio2 } contain prompt cache tree unrefed token"
260- )
261- logger .debug (self .router_statics .log_str ())
253+ paused_req_num = self ._get_paused_req_num_in_dp_index (dp_index = dp_index )
262254 self .metric_client .gauge_set ("lightllm_batch_pause_size" , paused_req_num )
263255 # pd decode mode need to update token_load more frequently
264256 self .req_queue .update_token_load (self .running_batch , force_update = self .is_pd_decode_mode )
@@ -278,13 +270,15 @@ async def loop_for_fwd(
278270 self .metric_client .gauge_set ("lightllm_batch_pause_size" , 0.0 )
279271 self .metric_client .gauge_set ("lightllm_queue_size" , 0.0 )
280272 self .metric_client .gauge_set ("lightllm_batch_current_max_tokens" , 0.0 )
281- # 60s print once
282- if log_time_ready ("frozen_info" , 60 ):
283- for dp_i in range (self .dp_size_in_node ):
284- frozen_token_num = self .shared_token_load .get_frozened_token_count (dp_i )
285- estimated_peak_token_count = self .shared_token_load .get_estimated_peak_token_count (dp_i )
286- logger .debug (f"dp_i { dp_i } frozen token num: { frozen_token_num } \n " )
287- logger .debug (f"dp_i { dp_i } estimated_peak_token_count: { estimated_peak_token_count } \n " )
273+
274+ self .status_reporter .maybe_print (
275+ running_batch = self .running_batch ,
276+ req_queue = self .req_queue ,
277+ read_only_statics_mem_manager = self .read_only_statics_mem_manager ,
278+ paused_req_num = self ._get_paused_req_num (),
279+ radix_cache_client = self .radix_cache_client ,
280+ disable_dynamic_prompt_cache = self .args .disable_dynamic_prompt_cache ,
281+ )
288282
289283 await asyncio .sleep (self ._get_schedule_time_interval ())
290284
@@ -314,6 +308,7 @@ async def _step(self):
314308
315309 async def _add_batch (self , batch : Batch ):
316310 # 添加新请求
311+ self .status_reporter .count_prompt_tokens (batch .input_tokens ())
317312 reqs = [r .to_router_rpc_obj () for r in batch .reqs ]
318313 while not self .shm_reqs_io_buffer .is_empty ():
319314 await asyncio .sleep (0.02 )
@@ -350,7 +345,16 @@ def _add_new_batch_to_running_batch(self, new_batch: Batch):
350345
351346 def _filter_reqs_from_running_batch (self ):
352347 if self .running_batch is not None :
353- self .running_batch .filter_out_finished_req (self .shm_req_manager , self .router_statics )
348+ # Capture finished req stats before filtering
349+ for req in self .running_batch .reqs :
350+ if req .shm_infer_released :
351+ self .status_reporter .on_request_completed (
352+ input_len = req .input_len ,
353+ output_len = req .shm_cur_output_len ,
354+ cache_len = req .prompt_cache_len ,
355+ mtp_accepted = req .mtp_accepted_token_num ,
356+ )
357+ self .running_batch .filter_out_finished_req (self .shm_req_manager )
354358 if self .running_batch .is_clear ():
355359 self .running_batch = None
356360 return
@@ -422,7 +426,7 @@ def _add_req(self, group_req_indexes: GroupReqIndexes):
422426 req ._router_stop_str_matched = False
423427 req_group .append (req )
424428
425- logger .info (f"router recive req id { req .request_id } cost time { time .time () - req .start_time } s" )
429+ logger .debug (f"router recive req id { req .request_id } cost time { time .time () - req .start_time } s" )
426430 self .req_queue .extend (req_group )
427431 self .send_to_detokenization .send_pyobj (group_req_indexes , protocol = pickle .HIGHEST_PROTOCOL )
428432 return
0 commit comments