Skip to content

Commit 7c69bfc

Browse files
committed
fix(router): output TPS via per-req deltas, skip aborted reqs in stats
Two correctness fixes flagged in PR review: 1. count_output_tokens(len(running_batch.reqs)) once per router loop is wrong — the router loop polls on schedule_time_interval, decoupled from inference, so this overcounts when the loop is faster than decode and undercounts when slower, and includes paused/prefill-only reqs. Track shm_cur_output_len per request and accumulate the delta each tick (with a tail settlement when the req is filtered out so we don't lose its last tokens to the post-final-tick window). 2. on_request_completed() and router_statics.update() now both run for aborted requests, whose candetoken_out_len is a short partial value. Restore the prior `if not req.is_aborted` guard so disconnects don't bias the output-length EMA used by KV-budget estimators.
1 parent 6c50c63 commit 7c69bfc

1 file changed

Lines changed: 34 additions & 10 deletions

File tree

lightllm/server/router/manager.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ def __init__(self, args: StartArgs):
6767
# 初始化 radix_cache_client 用于读取 prompt cache 的管理信息
6868
self.radix_cache_client = None
6969
self.status_reporter = None
70+
# Track shm_cur_output_len per running request to compute per-tick deltas
71+
# for accurate output TPS regardless of router schedule interval.
72+
self._req_last_output_len: Dict[int, int] = {}
7073

7174
# 共享变量,用于存储router端调度分析得到的机器负载信息
7275
self.shared_token_load = TokenLoad(f"{get_unique_server_name()}_shared_token_load", self.dp_size_in_node)
@@ -246,8 +249,18 @@ async def loop_for_fwd(
246249
await self._step()
247250
counter_count += 1
248251
if self.running_batch is not None:
249-
# Count output tokens (each running req produces ~1 token per decode step)
250-
self.status_reporter.count_output_tokens(len(self.running_batch.reqs))
252+
# Count output tokens via per-request shm_cur_output_len deltas, since the
253+
# router loop runs on schedule_time_interval and len(reqs) is not a per-step
254+
# token count.
255+
new_output_tokens = 0
256+
for req in self.running_batch.reqs:
257+
cur_out_len = req.shm_cur_output_len
258+
prev_out_len = self._req_last_output_len.get(req.request_id, 0)
259+
if cur_out_len > prev_out_len:
260+
new_output_tokens += cur_out_len - prev_out_len
261+
self._req_last_output_len[req.request_id] = cur_out_len
262+
if new_output_tokens:
263+
self.status_reporter.count_output_tokens(new_output_tokens)
251264
if counter_count % 100 == 0:
252265
self.metric_client.gauge_set("lightllm_batch_pause_size", self._get_paused_req_num())
253266
# pd decode mode need to update token_load more frequently
@@ -342,14 +355,25 @@ def _filter_reqs_from_running_batch(self):
342355
if self.running_batch is not None:
343356
# Capture finished req stats before filtering
344357
for req in self.running_batch.reqs:
345-
if req.shm_infer_released:
346-
self.status_reporter.on_request_completed(
347-
input_len=req.input_len,
348-
output_len=req.shm_cur_output_len,
349-
cache_len=req.prompt_cache_len,
350-
mtp_accepted=req.mtp_accepted_token_num,
351-
)
352-
self.router_statics.update(req.candetoken_out_len)
358+
if not req.shm_infer_released:
359+
continue
360+
# Settle any output-token delta produced after the last router tick
361+
# so windowed TPS does not lose the request's tail tokens.
362+
cur_out_len = req.shm_cur_output_len
363+
prev_out_len = self._req_last_output_len.pop(req.request_id, 0)
364+
if cur_out_len > prev_out_len:
365+
self.status_reporter.count_output_tokens(cur_out_len - prev_out_len)
366+
# Aborted/disconnected requests can leave a partial output_len that
367+
# would bias the EMA toward shorter generations; skip them.
368+
if req.is_aborted:
369+
continue
370+
self.status_reporter.on_request_completed(
371+
input_len=req.input_len,
372+
output_len=cur_out_len,
373+
cache_len=req.prompt_cache_len,
374+
mtp_accepted=req.mtp_accepted_token_num,
375+
)
376+
self.router_statics.update(req.candetoken_out_len)
353377
self.running_batch.filter_out_finished_req(self.shm_req_manager)
354378
if self.running_batch.is_clear():
355379
self.running_batch = None

0 commit comments

Comments
 (0)