Skip to content

Commit fefd619

Browse files
authored
Merge branch 'main' into remove-mechanical-markdown
Signed-off-by: seherv <627709+seherv@users.noreply.github.com>
2 parents 4e458b1 + 13e1e07 commit fefd619

5 files changed

Lines changed: 97 additions & 138 deletions

File tree

.github/scripts/automerge.py

Lines changed: 0 additions & 73 deletions
This file was deleted.

.github/workflows/backport.yaml

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#
1313

1414
name: Backport
15+
1516
on:
1617
pull_request_target:
1718
types:
@@ -20,18 +21,6 @@ on:
2021

2122
jobs:
2223
backport:
23-
name: Backport
24-
runs-on: ubuntu-latest
25-
if: >
26-
github.event.pull_request.merged
27-
&& (
28-
github.event.action == 'closed'
29-
|| (
30-
github.event.action == 'labeled'
31-
&& contains(github.event.label.name, 'backport')
32-
)
33-
)
34-
steps:
35-
- uses: tibdex/backport@9565281eda0731b1d20c4025c43339fb0a23812e
36-
with:
37-
github_token: ${{ secrets.DAPR_BOT_TOKEN }}
24+
uses: dapr/.github/.github/workflows/backport.yaml@main
25+
secrets:
26+
dapr_bot_token: ${{ secrets.DAPR_BOT_TOKEN }}

.github/workflows/dapr-bot-schedule.yml

Lines changed: 15 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,53 +17,21 @@ on:
1717
schedule:
1818
- cron: '*/10 * * * *'
1919
workflow_dispatch:
20+
2021
jobs:
2122
automerge:
22-
if: github.repository_owner == 'dapr'
23-
name: Automerge and update PRs.
24-
runs-on: ubuntu-latest
25-
steps:
26-
- name: Checkout repo
27-
uses: actions/checkout@v6
28-
- name: Install dependencies
29-
run: pip install PyGithub
30-
- name: Automerge and update
31-
env:
32-
MAINTAINERS: berndverst,wcs1only
33-
GITHUB_TOKEN: ${{ secrets.DAPR_BOT_TOKEN }}
34-
run: python ./.github/scripts/automerge.py
23+
uses: dapr/.github/.github/workflows/automerge.yaml@main
24+
with:
25+
maintainer-teams: python-sdk-maintainers
26+
secrets:
27+
dapr_bot_token: ${{ secrets.DAPR_BOT_TOKEN }}
28+
3529
prune_stale:
36-
name: Prune Stale
37-
runs-on: ubuntu-latest
38-
steps:
39-
- name: Prune Stale
40-
uses: actions/stale@v5
41-
with:
42-
repo-token: ${{ secrets.DAPR_BOT_TOKEN }}
43-
# Different amounts of days for issues/PRs are not currently supported but there is a PR
44-
# open for it: https://github.com/actions/stale/issues/214
45-
days-before-stale: 60
46-
days-before-close: 7
47-
stale-issue-message: >
48-
This issue has been automatically marked as stale because it has not had activity in the
49-
last 60 days. It will be closed in the next 7 days unless it is tagged (pinned, good first issue, help wanted or triaged/resolved) or other activity
50-
occurs. Thank you for your contributions.
51-
close-issue-message: >
52-
This issue has been automatically closed because it has not had activity in the
53-
last 67 days. If this issue is still valid, please ping a maintainer and ask them to label it as pinned, good first issue, help wanted or triaged/resolved.
54-
Thank you for your contributions.
55-
stale-pr-message: >
56-
This pull request has been automatically marked as stale because it has not had
57-
activity in the last 60 days. It will be closed in 7 days if no further activity occurs. Please
58-
feel free to give a status update now, ping for review, or re-open when it's ready.
59-
Thank you for your contributions!
60-
close-pr-message: >
61-
This pull request has been automatically closed because it has not had
62-
activity in the last 67 days. Please feel free to give a status update now, ping for review, or re-open when it's ready.
63-
Thank you for your contributions!
64-
stale-issue-label: 'stale'
65-
exempt-issue-labels: 'pinned,good first issue,help wanted,triaged/resolved'
66-
stale-pr-label: 'stale'
67-
exempt-pr-labels: 'pinned'
68-
operations-per-run: 500
69-
ascending: true
30+
uses: dapr/.github/.github/workflows/prune-stale.yaml@main
31+
with:
32+
days-before-issue-stale: 60
33+
days-before-pr-stale: 60
34+
days-before-issue-close: 7
35+
days-before-pr-close: 7
36+
secrets:
37+
dapr_bot_token: ${{ secrets.DAPR_BOT_TOKEN }}

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ def __init__(
331331
self._current_channel: Optional[grpc.Channel] = None # Store channel reference for cleanup
332332
self._channel_cleanup_threads: list[threading.Thread] = [] # Deferred channel close threads
333333
self._stream_ready = threading.Event()
334+
self._runLoop: Optional[Thread] = None
334335
# Use provided concurrency options or create default ones
335336
self._concurrency_options = (
336337
concurrency_options if concurrency_options is not None else ConcurrencyOptions()
@@ -387,8 +388,13 @@ def run_loop():
387388
self._logger.info(f'Starting gRPC worker that connects to {self._host_address}')
388389
self._runLoop = Thread(target=run_loop, name='WorkerRunLoop')
389390
self._runLoop.start()
390-
if not self._stream_ready.wait(timeout=10):
391-
raise RuntimeError('Failed to establish work item stream connection within 10 seconds')
391+
while not self._stream_ready.wait(timeout=1):
392+
if self._shutdown.is_set():
393+
raise RuntimeError('Worker was stopped before the work item stream was established')
394+
if not self._runLoop.is_alive():
395+
raise RuntimeError(
396+
'Worker run loop exited before the work item stream was established'
397+
)
392398
self._is_running = True
393399

394400
async def _keepalive_loop(self, stub):
@@ -801,7 +807,9 @@ def _deferred_close():
801807

802808
def stop(self):
803809
"""Stops the worker and waits for any pending work items to complete."""
804-
if not self._is_running:
810+
# Guards on _runLoop rather than _is_running so stop() can unblock a start()
811+
# that is still waiting for the work item stream to be established.
812+
if self._runLoop is None:
805813
return
806814

807815
self._logger.info('Stopping gRPC worker...')
@@ -833,6 +841,7 @@ def stop(self):
833841
self._async_worker_manager.shutdown()
834842
self._logger.info('Worker shutdown completed')
835843
self._is_running = False
844+
self._runLoop = None
836845

837846
# TODO: This should be removed in the future as we do handle grpc errs
838847
def _handle_grpc_execution_error(self, rpc_error: grpc.RpcError, request_type: str):

ext/dapr-ext-workflow/tests/durabletask/test_worker_stop.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
import asyncio
2+
import threading
3+
import time
14
from unittest.mock import MagicMock, patch
25

36
import grpc
7+
import pytest
48
from dapr.ext.workflow._durabletask.worker import TaskHubGrpcWorker
59

610

@@ -146,3 +150,65 @@ def test_deferred_close_prunes_finished_threads():
146150
worker._channel_cleanup_threads[-1].join(timeout=2)
147151
# Only the still-alive (or just-finished ch2) thread remains; ch1's was pruned
148152
assert len(worker._channel_cleanup_threads) <= 1
153+
154+
155+
def test_stop_before_start_is_noop():
156+
"""stop() is safe to call before start() — _runLoop is None, no AttributeError."""
157+
worker = TaskHubGrpcWorker()
158+
with patch.object(worker._shutdown, 'set') as shutdown_set:
159+
worker.stop()
160+
shutdown_set.assert_not_called()
161+
162+
163+
def test_stop_is_idempotent():
164+
"""A second stop() returns early because _runLoop was cleared by the first."""
165+
worker = _make_running_worker()
166+
worker._current_channel = MagicMock()
167+
worker.stop()
168+
assert worker._runLoop is None
169+
with patch.object(worker._shutdown, 'set') as shutdown_set:
170+
worker.stop()
171+
shutdown_set.assert_not_called()
172+
173+
174+
def test_start_raises_when_run_loop_exits_early():
175+
"""start() raises RuntimeError if the run loop thread exits before _stream_ready is set."""
176+
worker = TaskHubGrpcWorker()
177+
178+
async def fast_exit():
179+
return
180+
181+
with patch.object(worker, '_async_run_loop', side_effect=fast_exit):
182+
with pytest.raises(RuntimeError, match='Worker run loop exited'):
183+
worker.start()
184+
185+
186+
def test_start_raises_when_stopped_during_startup():
187+
"""stop() unblocks a start() that is waiting for _stream_ready; start() raises."""
188+
worker = TaskHubGrpcWorker()
189+
190+
async def wait_for_shutdown():
191+
# Block without setting _stream_ready so start() stays in its wait loop.
192+
while not worker._shutdown.is_set():
193+
await asyncio.sleep(0.05)
194+
195+
errors = []
196+
197+
def _start():
198+
try:
199+
worker.start()
200+
except Exception as e: # noqa: BLE001
201+
errors.append(e)
202+
203+
with patch.object(worker, '_async_run_loop', side_effect=wait_for_shutdown):
204+
t = threading.Thread(target=_start)
205+
t.start()
206+
# Let start() enter its wait loop (timeout=1 per iteration).
207+
time.sleep(1.2)
208+
worker.stop()
209+
t.join(timeout=5)
210+
211+
assert not t.is_alive(), 'start() did not return after stop()'
212+
assert len(errors) == 1, f'Expected exactly one error, got: {errors}'
213+
assert isinstance(errors[0], RuntimeError)
214+
assert 'Worker was stopped' in str(errors[0])

0 commit comments

Comments
 (0)