Skip to content

Commit 3f607ea

Browse files
committed
perf(router): sweep output-token deltas once per print interval
Move the per-running-req shm_cur_output_len delta tracking from the router tick (~33 Hz) into SystemStatusReporter.maybe_print, which only runs once per log_stats_interval (>= 5s). The reporter now owns the per-req snapshot dict and exposes discard_req(req) for tail settlement when a req leaves the running batch, so the router loop's hot path no longer walks the batch every schedule cycle. Output TPS accuracy is unchanged: still based on real shm_cur_output_len deltas, with tail tokens settled at completion.
1 parent 7c69bfc commit 3f607ea

2 files changed

Lines changed: 31 additions & 25 deletions

File tree

lightllm/server/router/manager.py

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,6 @@ def __init__(self, args: StartArgs):
6767
# 初始化 radix_cache_client 用于读取 prompt cache 的管理信息
6868
self.radix_cache_client = None
6969
self.status_reporter = None
70-
# Track shm_cur_output_len per running request to compute per-tick deltas
71-
# for accurate output TPS regardless of router schedule interval.
72-
self._req_last_output_len: Dict[int, int] = {}
7370

7471
# 共享变量,用于存储router端调度分析得到的机器负载信息
7572
self.shared_token_load = TokenLoad(f"{get_unique_server_name()}_shared_token_load", self.dp_size_in_node)
@@ -249,18 +246,8 @@ async def loop_for_fwd(
249246
await self._step()
250247
counter_count += 1
251248
if self.running_batch is not None:
252-
# Count output tokens via per-request shm_cur_output_len deltas, since the
253-
# router loop runs on schedule_time_interval and len(reqs) is not a per-step
254-
# token count.
255-
new_output_tokens = 0
256-
for req in self.running_batch.reqs:
257-
cur_out_len = req.shm_cur_output_len
258-
prev_out_len = self._req_last_output_len.get(req.request_id, 0)
259-
if cur_out_len > prev_out_len:
260-
new_output_tokens += cur_out_len - prev_out_len
261-
self._req_last_output_len[req.request_id] = cur_out_len
262-
if new_output_tokens:
263-
self.status_reporter.count_output_tokens(new_output_tokens)
249+
# Output-token counting is done in bulk at the print-window boundary
250+
# inside SystemStatusReporter.maybe_print, so the router tick stays cheap.
264251
if counter_count % 100 == 0:
265252
self.metric_client.gauge_set("lightllm_batch_pause_size", self._get_paused_req_num())
266253
# pd decode mode need to update token_load more frequently
@@ -357,19 +344,16 @@ def _filter_reqs_from_running_batch(self):
357344
for req in self.running_batch.reqs:
358345
if not req.shm_infer_released:
359346
continue
360-
# Settle any output-token delta produced after the last router tick
361-
# so windowed TPS does not lose the request's tail tokens.
362-
cur_out_len = req.shm_cur_output_len
363-
prev_out_len = self._req_last_output_len.pop(req.request_id, 0)
364-
if cur_out_len > prev_out_len:
365-
self.status_reporter.count_output_tokens(cur_out_len - prev_out_len)
347+
# Settle any output-token tail produced after the last window boundary,
348+
# so windowed TPS does not lose the req's last tokens.
349+
self.status_reporter.discard_req(req)
366350
# Aborted/disconnected requests can leave a partial output_len that
367351
# would bias the EMA toward shorter generations; skip them.
368352
if req.is_aborted:
369353
continue
370354
self.status_reporter.on_request_completed(
371355
input_len=req.input_len,
372-
output_len=cur_out_len,
356+
output_len=req.shm_cur_output_len,
373357
cache_len=req.prompt_cache_len,
374358
mtp_accepted=req.mtp_accepted_token_num,
375359
)

lightllm/server/router/stats.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import time
22
import logging
3+
from typing import Dict
34
from lightllm.server.core.objs import StartArgs
45
from lightllm.utils.log_utils import init_system_status_logger
56

@@ -31,13 +32,23 @@ def __init__(self, args, max_total_token_num, dp_size_in_node):
3132
self.global_mtp_output_total = 0
3233
self.global_mtp_accepted_total = 0
3334

35+
# Per-req shm_cur_output_len snapshot at the previous window boundary,
36+
# used to compute the windowed output-token count without per-tick scans.
37+
self._req_last_output_len: Dict[int, int] = {}
38+
3439
def count_prompt_tokens(self, num_tokens: int):
3540
if self.enabled:
3641
self.prompt_tokens += num_tokens
3742

38-
def count_output_tokens(self, num_tokens: int):
39-
if self.enabled:
40-
self.output_tokens += num_tokens
43+
def discard_req(self, req):
44+
"""Settle a finished/aborted req's tail output tokens (those produced after the last
45+
window-boundary sweep) and drop its tracking entry."""
46+
if not self.enabled:
47+
return
48+
cur_out_len = req.shm_cur_output_len
49+
prev_out_len = self._req_last_output_len.pop(req.request_id, 0)
50+
if cur_out_len > prev_out_len:
51+
self.output_tokens += cur_out_len - prev_out_len
4152

4253
def on_request_completed(self, input_len: int, output_len: int, cache_len: int, mtp_accepted: int):
4354
if self.enabled:
@@ -64,6 +75,17 @@ def maybe_print(
6475
if elapsed < self.interval:
6576
return
6677

78+
# Single bulk sweep at the window boundary: account for output tokens produced
79+
# by every still-running req since the previous boundary, and refresh their
80+
# snapshots. Reqs that finished in this window already settled via discard_req.
81+
if running_batch is not None:
82+
for req in running_batch.reqs:
83+
cur_out_len = req.shm_cur_output_len
84+
prev_out_len = self._req_last_output_len.get(req.request_id, 0)
85+
if cur_out_len > prev_out_len:
86+
self.output_tokens += cur_out_len - prev_out_len
87+
self._req_last_output_len[req.request_id] = cur_out_len
88+
6789
total_tps = (self.prompt_tokens + self.output_tokens) / elapsed
6890
input_tps = self.prompt_tokens / elapsed
6991
output_tps = self.output_tokens / elapsed

0 commit comments

Comments
 (0)