Skip to content

Commit d1d1e8b

Browse files
committed
Address PR feedback (2)
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
1 parent b1e0c3f commit d1d1e8b

5 files changed

Lines changed: 96 additions & 18 deletions

File tree

ext/dapr-ext-workflow/AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ At dispatch time (the gRPC stream loop in `_durabletask/worker.py`), `is_async_c
116116

117117
Workflow (orchestrator) functions must remain generators (`def` with `yield`). They cannot be `async def` because durabletask's deterministic replay depends on synchronous generator semantics. Only activities support async.
118118

119-
**Decorator ordering gotcha.** Stacking `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them.
119+
**Decorator ordering gotcha.** Wrapping `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them.
120120

121121
**`maximum_thread_pool_workers` covers both paths.** This knob sizes the worker thread pool used for sync-activity bodies and for async-activity gRPC response sends. Mixed workloads with long-running sync activities can starve async response delivery (and vice versa) since they share the pool — size to the sum of peak sync activity concurrency and peak in-flight async response sends.
122122

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/shared.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import grpc
2222
from dapr.ext.workflow import _model_protocol
2323

24+
logger = logging.getLogger(__name__)
25+
2426

2527
def is_async_callable(fn: Any) -> bool:
2628
"""Return True if ``fn`` is async. Catches ``functools.partial`` of coroutines,
@@ -35,7 +37,9 @@ def is_async_callable(fn: Any) -> bool:
3537
except ValueError:
3638
# Cyclic ``__wrapped__`` chain from a malformed decorator. Fall back to the
3739
# outermost callable; misclassification is preferable to crashing dispatch.
38-
pass
40+
logger.debug(
41+
f'Cyclic __wrapped__ on {fn!r}, using outermost callable for async detection.'
42+
)
3943
if inspect.iscoroutinefunction(candidate):
4044
return True
4145
if not inspect.isfunction(candidate) and hasattr(candidate, '__call__'):

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,11 @@ def _log_failure(ctx: task.ActivityContext, exc: Exception) -> None:
7878
activity_id = getattr(ctx, 'task_id', 'unknown')
7979
logger.warning(f'Activity execution failed - task_id: {activity_id}, error: {exc}')
8080

81-
if _is_async_callable(fn):
81+
is_async = _is_async_callable(fn)
82+
activity_name = getattr(fn, '__name__', repr(fn))
83+
kind = 'async' if is_async else 'sync'
84+
logger.debug(f"Registering activity '{activity_name}' on the {kind} dispatch path.")
85+
if is_async:
8286

8387
async def async_activity_wrapper(
8488
ctx: task.ActivityContext, inp: object | None = None

ext/dapr-ext-workflow/tests/durabletask/test_async_dispatch_regression.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
pytestmark = pytest.mark.perf
2626

27-
ACTIVITY_S = 0.02
27+
ACTIVITY_DURATION_SECONDS = 0.02
2828
N_ITEMS = 1000
2929
SEMAPHORE_CAP = 2000
3030
THREAD_POOL = 16
@@ -68,9 +68,10 @@ async def _run_async_batch(n_items: int, timeout_s: float) -> int:
6868
stub = _MockSidecarStub()
6969

7070
async def activity(ctx, _inp) -> None:
71-
await asyncio.sleep(ACTIVITY_S)
71+
await asyncio.sleep(ACTIVITY_DURATION_SECONDS)
7272

7373
worker_task = asyncio.create_task(manager.run())
74+
# Non-blocking poll: yield to the event loop until the worker creates the activity queue
7475
while manager.activity_queue is None:
7576
await asyncio.sleep(0)
7677
try:
@@ -96,7 +97,7 @@ def test_async_activities_overlap_instead_of_serializing():
9697
try:
9798
completions = asyncio.run(_run_async_batch(N_ITEMS, TIMEOUT_S))
9899
except asyncio.TimeoutError:
99-
serial_s = N_ITEMS * ACTIVITY_S
100+
serial_s = N_ITEMS * ACTIVITY_DURATION_SECONDS
100101
pytest.fail(
101102
f'{N_ITEMS} async activities did not drain within {TIMEOUT_S:.1f}s. Serialized'
102103
f' they would cost ~{serial_s:.0f}s, so the async path is not overlapping I/O.'

0 commit comments

Comments
 (0)