Skip to content

Commit 13e1e07

Browse files
authored
worker: infinitely wait for start (#983)
* worker: infinitely wait for start Instead of returning error on worker start after 10s, wait indefinitely until the worker can start or until shutdown has been signalled. This is important for environments whereby you spin up 100-500 workers at the same time and it takes some time for the cluster to settle. Signed-off-by: joshvanl <me@joshvanl.dev> * Address comments Signed-off-by: joshvanl <me@joshvanl.dev> * review comments Signed-off-by: joshvanl <me@joshvanl.dev> * Adds unit test for change Signed-off-by: joshvanl <me@joshvanl.dev> * lint Signed-off-by: joshvanl <me@joshvanl.dev> --------- Signed-off-by: joshvanl <me@joshvanl.dev>
1 parent dc44685 commit 13e1e07

2 files changed

Lines changed: 78 additions & 3 deletions

File tree

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ def __init__(
331331
self._current_channel: Optional[grpc.Channel] = None # Store channel reference for cleanup
332332
self._channel_cleanup_threads: list[threading.Thread] = [] # Deferred channel close threads
333333
self._stream_ready = threading.Event()
334+
self._runLoop: Optional[Thread] = None
334335
# Use provided concurrency options or create default ones
335336
self._concurrency_options = (
336337
concurrency_options if concurrency_options is not None else ConcurrencyOptions()
@@ -387,8 +388,13 @@ def run_loop():
387388
self._logger.info(f'Starting gRPC worker that connects to {self._host_address}')
388389
self._runLoop = Thread(target=run_loop, name='WorkerRunLoop')
389390
self._runLoop.start()
390-
if not self._stream_ready.wait(timeout=10):
391-
raise RuntimeError('Failed to establish work item stream connection within 10 seconds')
391+
while not self._stream_ready.wait(timeout=1):
392+
if self._shutdown.is_set():
393+
raise RuntimeError('Worker was stopped before the work item stream was established')
394+
if not self._runLoop.is_alive():
395+
raise RuntimeError(
396+
'Worker run loop exited before the work item stream was established'
397+
)
392398
self._is_running = True
393399

394400
async def _keepalive_loop(self, stub):
@@ -801,7 +807,9 @@ def _deferred_close():
801807

802808
def stop(self):
803809
"""Stops the worker and waits for any pending work items to complete."""
804-
if not self._is_running:
810+
# Guards on _runLoop rather than _is_running so stop() can unblock a start()
811+
# that is still waiting for the work item stream to be established.
812+
if self._runLoop is None:
805813
return
806814

807815
self._logger.info('Stopping gRPC worker...')
@@ -833,6 +841,7 @@ def stop(self):
833841
self._async_worker_manager.shutdown()
834842
self._logger.info('Worker shutdown completed')
835843
self._is_running = False
844+
self._runLoop = None
836845

837846
# TODO: This should be removed in the future as we do handle grpc errs
838847
def _handle_grpc_execution_error(self, rpc_error: grpc.RpcError, request_type: str):

ext/dapr-ext-workflow/tests/durabletask/test_worker_stop.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
import asyncio
2+
import threading
3+
import time
14
from unittest.mock import MagicMock, patch
25

36
import grpc
7+
import pytest
48
from dapr.ext.workflow._durabletask.worker import TaskHubGrpcWorker
59

610

@@ -146,3 +150,65 @@ def test_deferred_close_prunes_finished_threads():
146150
worker._channel_cleanup_threads[-1].join(timeout=2)
147151
# Only the still-alive (or just-finished ch2) thread remains; ch1's was pruned
148152
assert len(worker._channel_cleanup_threads) <= 1
153+
154+
155+
def test_stop_before_start_is_noop():
156+
"""stop() is safe to call before start() — _runLoop is None, no AttributeError."""
157+
worker = TaskHubGrpcWorker()
158+
with patch.object(worker._shutdown, 'set') as shutdown_set:
159+
worker.stop()
160+
shutdown_set.assert_not_called()
161+
162+
163+
def test_stop_is_idempotent():
164+
"""A second stop() returns early because _runLoop was cleared by the first."""
165+
worker = _make_running_worker()
166+
worker._current_channel = MagicMock()
167+
worker.stop()
168+
assert worker._runLoop is None
169+
with patch.object(worker._shutdown, 'set') as shutdown_set:
170+
worker.stop()
171+
shutdown_set.assert_not_called()
172+
173+
174+
def test_start_raises_when_run_loop_exits_early():
175+
"""start() raises RuntimeError if the run loop thread exits before _stream_ready is set."""
176+
worker = TaskHubGrpcWorker()
177+
178+
async def fast_exit():
179+
return
180+
181+
with patch.object(worker, '_async_run_loop', side_effect=fast_exit):
182+
with pytest.raises(RuntimeError, match='Worker run loop exited'):
183+
worker.start()
184+
185+
186+
def test_start_raises_when_stopped_during_startup():
187+
"""stop() unblocks a start() that is waiting for _stream_ready; start() raises."""
188+
worker = TaskHubGrpcWorker()
189+
190+
async def wait_for_shutdown():
191+
# Block without setting _stream_ready so start() stays in its wait loop.
192+
while not worker._shutdown.is_set():
193+
await asyncio.sleep(0.05)
194+
195+
errors = []
196+
197+
def _start():
198+
try:
199+
worker.start()
200+
except Exception as e: # noqa: BLE001
201+
errors.append(e)
202+
203+
with patch.object(worker, '_async_run_loop', side_effect=wait_for_shutdown):
204+
t = threading.Thread(target=_start)
205+
t.start()
206+
# Let start() enter its wait loop (timeout=1 per iteration).
207+
time.sleep(1.2)
208+
worker.stop()
209+
t.join(timeout=5)
210+
211+
assert not t.is_alive(), 'start() did not return after stop()'
212+
assert len(errors) == 1, f'Expected exactly one error, got: {errors}'
213+
assert isinstance(errors[0], RuntimeError)
214+
assert 'Worker was stopped' in str(errors[0])

0 commit comments

Comments
 (0)