Skip to content

Commit 1832ab3

Browse files
committed
[https://nvbugs/6336747][fix] Fail fast when executor worker stalls
* Why? A stuck or disconnected executor worker left the proxy blocked indefinitely: the request queue uses an unbounded send HWM with no send timeout, so request_queue.put -> socket.send never returned once the worker stopped draining, and the error monitor never tripped. In CI this could surface as a ~1h hang ending in an opaque timeout kill. The stall itself is non-deterministic and not yet root-caused. * What? Make the failure fast and legible instead: - Bound request submission: poll the socket for send-readiness and check worker liveness, raising RequestError if the worker has not accepted the request within a timeout. - Add a progress watchdog to the error monitor that marks the worker stalled and aborts in-flight requests when no result arrives while requests are outstanding. - Honor the previously-ignored timeout in GenerationResult.result() and bound the per-request wait in the VideoMME evaluator. - On a detected stall, signal the worker (SIGUSR1/faulthandler) to dump all thread stacks so the next occurrence is diagnosable. This mitigates the hang and captures worker state; it does not fix the underlying intermittent stall. Signed-off-by: William Zhang <133824995+2ez4bz@users.noreply.github.com>
1 parent 07270b7 commit 1832ab3

5 files changed

Lines changed: 158 additions & 7 deletions

File tree

tensorrt_llm/executor/ipc.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,22 @@ def poll(self, timeout: int) -> bool:
181181
else:
182182
return False
183183

184+
def poll_send(self, timeout: float) -> bool:
185+
"""Return True if a message can be sent within *timeout* seconds.
186+
187+
Args:
188+
timeout (float): Timeout in seconds.
189+
190+
Returns:
191+
For a PAIR socket whose peer has disconnected, the socket enters mute state, and this
192+
returns `False` instead of blocking, letting callers detect a dead/disconnected peer
193+
rather than blocking forever in `send()`.
194+
"""
195+
self.setup_lazily()
196+
self._check_thread_safety()
197+
return bool(
198+
self.socket.poll(timeout=int(timeout * 1000), flags=zmq.POLLOUT))
199+
184200
def put(self, obj: Any, routing_id: Optional[bytes] = None):
185201
self.setup_lazily()
186202
self._check_thread_safety()

tensorrt_llm/executor/proxy.py

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import concurrent.futures
1717
import json
1818
import os
19+
import signal
1920
import threading
21+
import time
2022
import weakref
2123
from queue import Empty
2224
from typing import Dict, List, Optional, Union
@@ -122,6 +124,23 @@ def __init__(
122124

123125
self._results: Dict[int, GenerationResult] = {}
124126

127+
# --- liveness / stall detection state ---
128+
# Time of the last sign of worker progress (a request submitted or a result received). The
129+
# error monitor uses this to detect a worker that has silently stopped servicing requests.
130+
self._last_progress_time = time.monotonic()
131+
# Max time to wait for the worker to accept a submitted request before declaring it
132+
# dead/stalled. With an unbounded send HWM the send only blocks when the worker disconnected
133+
# or stopped draining.
134+
self._submit_timeout_secs = float(
135+
os.environ.get("TLLM_EXECUTOR_SUBMIT_TIMEOUT_SECS", "300"))
136+
# Max time with requests in flight but no result before treating the worker as stalled.
137+
self._stall_timeout_secs = float(
138+
os.environ.get("TLLM_EXECUTOR_STALL_TIMEOUT_SECS", "300"))
139+
# PID of the leader worker process, learned from the init handshake; used to request a
140+
# thread-stack dump (SIGUSR1) when a stall is detected. Stays `None` for remote/out-of-host
141+
# worker sessions.
142+
self._worker_pid: Optional[int] = None
143+
125144
self.model_world_size = model_world_size
126145

127146
_llm_args = worker_kwargs.get("llm_args", None)
@@ -262,13 +281,50 @@ def _error_monitor_loop(self) -> None:
262281
self._drain_error_queue()
263282
if self._fatal_error is not None:
264283
return
284+
285+
# Progress watchdog: a worker that silently stops servicing requests (no result for
286+
# a long time while requests are in flight) is treated as a fatal stall so callers
287+
# fail fast instead of hanging indefinitely.
288+
if self._results and (
289+
time.monotonic() -
290+
self._last_progress_time) > self._stall_timeout_secs:
291+
logger.error(
292+
f"Error monitor: no result progress for {self._stall_timeout_secs:.2f}s "
293+
f"with {len(self._results)} request(s) in flight; "
294+
"treating worker as stalled.")
295+
self._maybe_dump_worker_traceback()
296+
self._set_fatal_error(
297+
RuntimeError(
298+
f"Worker stalled: no result for {self._stall_timeout_secs:.2f}s "
299+
f"with {len(self._results)} request(s) in flight."))
300+
self.pre_shutdown()
301+
return
265302
except Exception as exc:
266303
logger.debug(f"Error monitor: unexpected exception (ignored): "
267304
f"{exc!r}")
268305

269306
# Wait up to 5s, but wake immediately if _shutdown_event is set
270307
self._shutdown_event.wait(timeout=5.0)
271308

309+
def _maybe_dump_worker_traceback(self) -> None:
310+
"""Best-effort: ask the worker process to dump all thread stacks.
311+
312+
The worker registers a SIGUSR1 handler (faulthandler) that writes a full traceback of every
313+
thread to its stderr / traceback file. This can be useful for diagnosing a stalled worker,
314+
whose state is otherwise lost when the process is killed at teardown. Only works when the
315+
worker runs on the same host (the in-process `MpiPoolSession` case).
316+
"""
317+
pid = self._worker_pid
318+
if pid is None or not hasattr(signal, "SIGUSR1"):
319+
return
320+
try:
321+
os.kill(pid, signal.SIGUSR1)
322+
logger.error(
323+
f"Sent SIGUSR1 to worker pid {pid} requesting a thread-stack "
324+
f"dump for stall diagnosis.")
325+
except OSError as e:
326+
logger.debug(f"Could not signal worker pid {pid}: {e!r}")
327+
272328
def _setup_queues(self) -> WorkerCommIpcAddrs:
273329

274330
self.request_queue = IpcQueue(is_server=True,
@@ -319,6 +375,9 @@ def dispatch_result_task(self) -> bool:
319375
if (res := self.result_queue.get()) is None:
320376
return False # shutdown the thread
321377

378+
# A result arrived: the worker is making progress.
379+
self._last_progress_time = time.monotonic()
380+
322381
async_queues = []
323382
event_loop = None
324383

@@ -421,7 +480,8 @@ def mpi_done_callback(future: concurrent.futures.Future):
421480

422481
while True:
423482
if self.worker_init_status_queue.poll(1):
424-
ready_signal, error_trace = self.worker_init_status_queue.get()
483+
ready_signal, ready_payload = self.worker_init_status_queue.get(
484+
)
425485
# Send ACK to the worker
426486
self.worker_init_status_queue.put("ACK")
427487
logger.info("get signal from executor worker")
@@ -432,11 +492,18 @@ def mpi_done_callback(future: concurrent.futures.Future):
432492
self._handle_background_error()
433493

434494
if ready_signal != GenerationExecutorProxy.READY_SIGNAL:
435-
logger.error(f"Executor worker initialization error: {error_trace}")
495+
# On the error path the payload is the worker's traceback string.
496+
logger.error(
497+
f"Executor worker initialization error: {ready_payload}")
436498
self.mpi_session.shutdown_abort(reason=ready_signal)
437499
raise RuntimeError(
438500
"Executor worker returned error") from ready_signal
439501

502+
# On success the worker sends its PID as the payload so we can signal it (SIGUSR1 ->
503+
# thread-stack dump) if it later stalls.
504+
if isinstance(ready_payload, int):
505+
self._worker_pid = ready_payload
506+
440507
def _abort_all_requests(self):
441508
# The results can be finished during this loop, so self._results may be changed.
442509
for result in list(self._results.values()):
@@ -549,12 +616,41 @@ def submit(self, request: GenerationRequest) -> GenerationResult:
549616
self._results[request.id] = result
550617

551618
with nvtx_range_debug("request_queue.put"):
552-
self.request_queue.put(request)
619+
self._submit_request(request)
553620

554621
self._handle_background_error()
555622

556623
return result
557624

625+
def _submit_request(self, request: GenerationRequest) -> None:
626+
"""Send a request to the worker with a bounded wait.
627+
628+
This is so a dead or stuck worker surfaces as a fast error instead of blocking forever.
629+
"""
630+
# With an unbounded send HWM, `socket.send` only blocks when the worker has disconnected
631+
# (PAIR mute state) or has stopped draining. We poll for send-readiness and, while we cannot
632+
# send, check whether the worker has died or a fatal error was recorded, giving up after
633+
# `_submit_timeout_secs`.
634+
deadline = time.monotonic() + self._submit_timeout_secs
635+
while not self.request_queue.poll_send(timeout=1.0):
636+
# Surface any error already recorded by the monitor / callbacks.
637+
self._handle_background_error()
638+
if self._check_mpi_futures():
639+
raise RequestError(
640+
"Executor worker exited before the request could be "
641+
"submitted.")
642+
if time.monotonic() >= deadline:
643+
self._maybe_dump_worker_traceback()
644+
err = RequestError(
645+
f"Worker did not accept request {request.id} within "
646+
f"{self._submit_timeout_secs:.0f}s; it appears stalled or "
647+
f"disconnected.")
648+
self._set_fatal_error(err)
649+
self.pre_shutdown()
650+
raise err
651+
self._last_progress_time = time.monotonic()
652+
self.request_queue.put(request)
653+
558654
def collective_rpc(
559655
self,
560656
method: str,

tensorrt_llm/executor/result.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -965,7 +965,12 @@ def _handle_ray_response(self, response: Any):
965965
return response
966966

967967
def _result_step(self, timeout: Optional[float] = None):
968-
response = self.queue.get()
968+
try:
969+
response = self.queue.get(timeout=timeout)
970+
except Empty:
971+
raise TimeoutError(
972+
f"Request {self.request_id} timed out after {timeout}s "
973+
f"waiting for a response from the executor worker.")
969974
self._handle_response(response)
970975

971976
async def _aresult_step(self):

tensorrt_llm/executor/worker.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import faulthandler
12
import gc
23
import os
4+
import signal
5+
import sys
36
import threading
47
import time
58
import traceback
@@ -185,6 +188,30 @@ def _print_stacks():
185188
daemon=True)
186189
print_stacks_thread.start()
187190

191+
# Install a faulthandler so the worker's thread stacks can be captured on demand (SIGUSR1, e.g.
192+
# triggered by the proxy's stall watchdog) or on a fatal signal. A stalled worker's state is
193+
# otherwise lost when it is killed at teardown. Optionally dump to a durable file (when worker
194+
# stderr is not captured) via TLLM_WORKER_TRACEBACK_DIR.
195+
faulthandler.enable()
196+
_traceback_file = sys.stderr
197+
_traceback_dir = os.getenv("TLLM_WORKER_TRACEBACK_DIR")
198+
if _traceback_dir:
199+
try:
200+
os.makedirs(_traceback_dir, exist_ok=True)
201+
_traceback_file = open(
202+
os.path.join(_traceback_dir,
203+
f"worker_traceback_{os.getpid()}.log"), "a")
204+
except OSError:
205+
_traceback_file = sys.stderr
206+
if hasattr(signal, "SIGUSR1"):
207+
try:
208+
faulthandler.register(signal.SIGUSR1,
209+
file=_traceback_file,
210+
all_threads=True,
211+
chain=False)
212+
except (ValueError, OSError):
213+
pass
214+
188215
mpi_comm().barrier()
189216

190217
if llm_args is not None and llm_args.env_overrides:
@@ -335,8 +362,9 @@ def notify_proxy_threads_to_quit():
335362
else:
336363
worker.set_result_queue(result_queue)
337364

338-
# Send ready signal with confirmation
339-
ready_msg = (ready_signal, None)
365+
# Send ready signal with confirmation. The payload carries the worker PID so the
366+
# proxy can signal it (SIGUSR1 -> thread-stack dump) if it later stalls.
367+
ready_msg = (ready_signal, os.getpid())
340368
if not worker_init_status_queue.notify_with_retry(ready_msg):
341369
logger.warning(
342370
"Failed to deliver ready signal to proxy, continuing anyway"

tests/integration/defs/accuracy/video_mme.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,13 @@ def evaluate(
151151
streaming=streaming,
152152
)
153153
)
154-
outputs = [future.result() for future in tqdm(futures, desc="Fetching responses")]
154+
# Bound the per-request wait so a stalled/dead worker fails the test fast instead of hanging
155+
# until the outer CI timeout. No healthy single request should come close to this budget.
156+
result_timeout = 300.0
157+
outputs = [
158+
future.result(timeout=result_timeout)
159+
for future in tqdm(futures, desc="Fetching responses")
160+
]
155161

156162
if self.output_dir:
157163
dump_inference_results(self.output_dir, outputs, getattr(llm, "tokenizer", None))

0 commit comments

Comments
 (0)