Skip to content

Commit d29ca88

Browse files
committed
[None][fix] Restore benchmark-disagg immediate fail-fast in _prepare_and_schedule_batch
The cherry-pick of #14042-era "Fix deepseekv4 stall" replaced main's immediate "Insufficient KV cache for gen-only benchmark mode" guard with a time-based gen-count stall watchdog. On current main that watchdog is both superseded and incompatible: main already handles the ADP fill-completion case via _is_benchmark_disagg_fill_complete (per-rank allgather), and the time-based watchdog never fires on a single scheduling iteration, so it regressed tests/unittest/_torch/executor/test_benchmark_disagg.py (TestFailFastDuringBenchmarkFill, TestFillPhaseEndToEnd) which assert an immediate fail-fast when all benchmark requests are fetched (or the fill phase is over) and the scheduler can fit no INIT request. Restore main's immediate guard (fail once all requests are fetched and no INIT request fits, suppressed during warmup) and drop the now-unused benchmark_fill_stall_timeout_s. Verified on 8xB200: test_benchmark_disagg.py, test_py_executor.py and the tp4_pp2_dp_both transceiver cases all pass (240 passed, 0 failed). Signed-off-by: Shixiaowei02 <39303645+Shixiaowei02@users.noreply.github.com>
1 parent d257bf1 commit d29ca88

1 file changed

Lines changed: 24 additions & 63 deletions

File tree

tensorrt_llm/_torch/pyexecutor/py_executor.py

Lines changed: 24 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -504,8 +504,6 @@ def __init__(
504504
self.num_scheduled_requests: int = 0
505505
self.benchmark_req_queues_size = int(
506506
os.environ.get("TLLM_BENCHMARK_REQ_QUEUES_SIZE", 0))
507-
self.benchmark_fill_stall_timeout_s = float(
508-
os.environ.get("TLLM_BENCHMARK_FILL_STALL_TIMEOUT_S", 60.0))
509507

510508
# list of requests in each PP micro batch
511509
self.num_micro_batches = max(self.dist.pp_size,
@@ -2672,67 +2670,30 @@ def _prepare_and_schedule_batch(self):
26722670
# scheduler could not allocate KV for any of them, the benchmark
26732671
# will hang forever because in-progress generation requests won't
26742672
# release their KV cache.
2675-
#
2676-
# Only watch during the fill phase: once fill completes the count
2677-
# stays at its target value through the entire decode, which would
2678-
# otherwise look like a stall. With ADP, requests are sharded
2679-
# across TP ranks so the comparison must use the global count
2680-
# (allgather) against the global target.
2681-
if (self.is_benchmark_disagg and self._benchmark_fill_phase_active
2682-
and not self.is_warmup):
2683-
# NOTE: keep the gate condition free of any per-rank state
2684-
# (e.g. `fitting_disagg_gen_init_requests`). The
2685-
# `tp_allgather` below is a collective and every ADP rank
2686-
# must participate together; otherwise ranks desync and a
2687-
# later allgather mixes payload shapes (list[int] from
2688-
# gather_all_rank_states vs int from the gate's
2689-
# _is_benchmark_disagg_fill_complete), producing TypeErrors
2690-
# like "argument after * must be an iterable, not int" or
2691-
# "unsupported operand type(s) for +: 'int' and 'list'".
2692-
# The per-rank "still has fitting requests" hint is folded
2693-
# into the same allgather so we can suppress the stall
2694-
# check globally when any rank is still making progress.
2695-
local_ready_gen = sum(
2696-
1 for req in self.active_requests if req.state in (
2697-
LlmRequestState.DISAGG_GENERATION_TRANS_COMPLETE,
2698-
LlmRequestState.GENERATION_IN_PROGRESS,
2699-
))
2700-
local_has_fitting = 1 if fitting_disagg_gen_init_requests else 0
2701-
if self.enable_attention_dp:
2702-
responses = self.dist.tp_allgather(
2703-
[local_ready_gen, local_has_fitting])
2704-
total_ready_gen = sum(r[0] for r in responses)
2705-
any_rank_has_fitting = any(r[1] for r in responses)
2706-
else:
2707-
total_ready_gen = local_ready_gen
2708-
any_rank_has_fitting = bool(local_has_fitting)
2709-
2710-
if not any_rank_has_fitting:
2711-
now = time.time()
2712-
last_count = getattr(self, "_bench_disagg_last_gen_count",
2713-
None)
2714-
last_change_time = getattr(
2715-
self, "_bench_disagg_last_gen_count_time", None)
2716-
if (last_count != total_ready_gen
2717-
or last_change_time is None):
2718-
self._bench_disagg_last_gen_count = total_ready_gen
2719-
self._bench_disagg_last_gen_count_time = now
2720-
elif (now - last_change_time
2721-
> self.benchmark_fill_stall_timeout_s
2722-
and total_ready_gen < self.benchmark_req_queues_size):
2723-
error_msg = (
2724-
f"Benchmark gen request count stalled at "
2725-
f"{total_ready_gen} "
2726-
f"for {now - last_change_time:.0f}s "
2727-
f"(target {self.benchmark_req_queues_size}, "
2728-
f"fetched={self.num_fetch_requests}). "
2729-
f"Likely causes: KV transfer stuck, KV cache pool "
2730-
f"too small, or transceiver deadlock. Aborting all "
2731-
f"active requests.")
2732-
logger.error(error_msg)
2733-
self._handle_errors(error_msg,
2734-
requests=self.active_requests)
2735-
return None, None
2673+
if (self.benchmark_req_queues_size > 0 and not self.is_warmup
2674+
and not fitting_disagg_gen_init_requests):
2675+
stuck_init_requests = [
2676+
req for req in self.active_requests
2677+
if req.is_disagg_generation_init_state
2678+
]
2679+
# Only fail once all benchmark requests have been fetched
2680+
# so that _handle_errors covers every request and every
2681+
# client receives an error response.
2682+
if (stuck_init_requests and self.num_fetch_requests
2683+
>= self.benchmark_req_queues_size):
2684+
error_msg = (
2685+
f"Insufficient KV cache for gen-only benchmark mode: "
2686+
f"{len(stuck_init_requests)} request(s) are waiting for "
2687+
f"KV cache allocation but the scheduler could not fit "
2688+
f"any of them. Increase free_gpu_memory_fraction or "
2689+
f"reduce TLLM_BENCHMARK_REQ_QUEUES_SIZE (currently "
2690+
f"{self.benchmark_req_queues_size}).")
2691+
logger.error(error_msg)
2692+
# Fail all active and waiting requests so every
2693+
# client receives an error instead of hanging.
2694+
self._handle_errors(error_msg,
2695+
requests=self.active_requests)
2696+
return None, None
27362697

27372698
self.num_scheduled_requests = scheduled_batch.batch_size
27382699
logger.debug(

0 commit comments

Comments
 (0)