Skip to content

Commit a30587f

Browse files
authored
fix: Catch can't start new thread (#453)
* fix: Catch `can't start new thread` Signed-off-by: oliver könig <okoenig@nvidia.com> * add exponential backoff Signed-off-by: oliver könig <okoenig@nvidia.com> --------- Signed-off-by: oliver könig <okoenig@nvidia.com>
1 parent 7e01fdf commit a30587f

2 files changed

Lines changed: 84 additions & 1 deletion

File tree

nemo_run/run/torchx_backend/launcher.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,22 @@ def wait_and_exit(
153153

154154
tries = 0
155155
status = None
156+
thread_retries = 0
157+
thread_retry_delay = 20
156158
while tries < timeout:
157-
status = runner.wait(app_handle, wait_interval=2)
159+
try:
160+
status = runner.wait(app_handle, wait_interval=2)
161+
except RuntimeError as e:
162+
if "can't start new thread" in str(e) and thread_retries < 5:
163+
thread_retries += 1
164+
logger.warning(
165+
f"Thread limit reached while waiting for job {app_id}, "
166+
f"retrying ({thread_retries}/5) in {thread_retry_delay}s..."
167+
)
168+
time.sleep(thread_retry_delay)
169+
thread_retry_delay = min(thread_retry_delay * 2, 300)
170+
continue
171+
raise
158172
if status:
159173
break
160174
tries += 1

test/run/torchx_backend/test_launcher.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,75 @@ def test_function():
162162
assert isinstance(thread.ctx, contextvars.Context)
163163

164164

165+
def test_wait_and_exit_retries_on_thread_limit(mock_runner):
166+
mock_app_handle = "dummy://nemo_run/my-test-run"
167+
success_status = MagicMock(spec=AppStatus, state="SUCCEEDED")
168+
mock_runner.wait.side_effect = [
169+
RuntimeError("can't start new thread"),
170+
RuntimeError("can't start new thread"),
171+
success_status,
172+
]
173+
174+
with patch("nemo_run.run.torchx_backend.launcher.time.sleep"):
175+
result = wait_and_exit(app_handle=mock_app_handle, log=False, runner=mock_runner)
176+
177+
assert mock_runner.wait.call_count == 3
178+
assert result.state == "SUCCEEDED"
179+
180+
181+
def test_wait_and_exit_thread_limit_backoff(mock_runner):
182+
mock_app_handle = "dummy://nemo_run/my-test-run"
183+
success_status = MagicMock(spec=AppStatus, state="SUCCEEDED")
184+
mock_runner.wait.side_effect = [
185+
RuntimeError("can't start new thread"),
186+
RuntimeError("can't start new thread"),
187+
RuntimeError("can't start new thread"),
188+
success_status,
189+
]
190+
191+
sleep_calls = []
192+
with patch(
193+
"nemo_run.run.torchx_backend.launcher.time.sleep",
194+
side_effect=lambda t: sleep_calls.append(t),
195+
):
196+
wait_and_exit(app_handle=mock_app_handle, log=False, runner=mock_runner)
197+
198+
# backoff: 20, 40, 80
199+
assert sleep_calls[:3] == [20, 40, 80]
200+
201+
202+
def test_wait_and_exit_thread_limit_does_not_count_as_timeout(mock_runner):
203+
mock_app_handle = "dummy://nemo_run/my-test-run"
204+
success_status = MagicMock(spec=AppStatus, state="SUCCEEDED")
205+
# Fail with thread error 5 times (max retries), then succeed — should not time out
206+
mock_runner.wait.side_effect = [RuntimeError("can't start new thread")] * 5 + [success_status]
207+
208+
with patch("nemo_run.run.torchx_backend.launcher.time.sleep"):
209+
result = wait_and_exit(app_handle=mock_app_handle, log=False, runner=mock_runner, timeout=3)
210+
211+
assert result.state == "SUCCEEDED"
212+
213+
214+
def test_wait_and_exit_thread_limit_exceeded_raises(mock_runner):
215+
mock_app_handle = "dummy://nemo_run/my-test-run"
216+
# Fail with thread error more than 5 times — should re-raise after 5 retries
217+
mock_runner.wait.side_effect = RuntimeError("can't start new thread")
218+
219+
with patch("nemo_run.run.torchx_backend.launcher.time.sleep"):
220+
with pytest.raises(RuntimeError, match="can't start new thread"):
221+
wait_and_exit(app_handle=mock_app_handle, log=False, runner=mock_runner)
222+
223+
assert mock_runner.wait.call_count == 6 # 1 initial + 5 retries
224+
225+
226+
def test_wait_and_exit_other_runtime_error_propagates(mock_runner):
227+
mock_app_handle = "dummy://nemo_run/my-test-run"
228+
mock_runner.wait.side_effect = RuntimeError("some other error")
229+
230+
with pytest.raises(RuntimeError, match="some other error"):
231+
wait_and_exit(app_handle=mock_app_handle, log=False, runner=mock_runner)
232+
233+
165234
@patch("threading.Thread.run")
166235
def test_context_thread_run(mocked_run, setup_and_teardown):
167236
def test_function():

0 commit comments

Comments
 (0)