Skip to content

Commit 8367be5

Browse files
sehervsicoyle
andauthored
Add support for async workflow activities (#1053)
* Add support for async activities Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Address Copilot feedback (1) Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Address Copilot feedback (2) Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Address Copilot feedback (3) Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Address Copilot feedback (4) Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Fix linter Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Address Copilot feedback (5) Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Address Copilot feedback (6) Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Reword warning Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Cleanup Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Remove strands-agents-tools dependency Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Redo benchmarks and add performance regression tests Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Relax performance thresholds for CI Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * More async detection tests Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Create gRPC channel in the caller's event loop Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Silence gRPC error spam on EAGAIN Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Remove async benchmark code Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Address PR feedback (2) Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> * Update docs to match new dapr[ext] structure Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> --------- Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com> Co-authored-by: Sam <sam@diagrid.io>
1 parent b642701 commit 8367be5

19 files changed

Lines changed: 1378 additions & 157 deletions

dapr/ext/workflow/AGENTS.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,26 @@ The entry point for registration and lifecycle:
107107

108108
Internally wraps user functions: workflow functions get a `DaprWorkflowContext`, activity functions get a `WorkflowActivityContext`. Tracks registration state via `_workflow_registered` / `_activity_registered` attributes on functions to prevent double registration.
109109

110+
#### Sync and async activities
111+
112+
Activities can be either `def my_activity(ctx, inp)` or `async def my_activity(ctx, inp)`. At registration, `_make_activity_wrapper` calls `_is_async_callable(fn)` to detect async-ness. That helper unwraps `functools.partial`, `@functools.wraps` chains, and callable-class `__call__` so common decorator patterns route correctly. The wrapper is built `async def` or `def` to match, then stored in the registry.
113+
114+
At dispatch time (the gRPC stream loop in `_durabletask/worker.py`), `is_async_callable(activity_fn)` on the wrapper selects between two handlers.
115+
116+
- **Async activities** go through `_execute_activity_async`, then `_ActivityExecutor.execute_async`, which awaits `fn(...)` directly on the event loop. The gRPC response is delivered via `loop.run_in_executor(self._async_worker_manager.thread_pool, stub.CompleteActivityTask, ...)` — the same pool sync activities use, sized by `maximum_thread_pool_workers`.
117+
- **Sync activities** go through `_execute_activity`, dispatched to the thread pool by `_AsyncWorkerManager._run_func`. The activity runs on a worker thread, and the response is delivered from the same thread.
118+
119+
Workflow (orchestrator) functions must remain generators (`def` with `yield`). They cannot be `async def` because durabletask's deterministic replay depends on synchronous generator semantics. Only activities support async.
120+
121+
**Decorator ordering gotcha.** Wrapping `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them.
122+
123+
**`maximum_thread_pool_workers` covers both paths.** This knob sizes the worker thread pool used for sync-activity bodies and for async-activity gRPC response sends. Mixed workloads with long-running sync activities can starve async response delivery (and vice versa) since they share the pool — size to the sum of peak sync activity concurrency and peak in-flight async response sends.
124+
125+
**Concurrency sizing and load characterization.** See `docs/concurrency.md` for sizing recommendations (`maximum_concurrent_activity_work_items`, `maximum_thread_pool_workers`) and an async-vs-sync decision tree. `tests/ext/workflow/durabletask/test_async_dispatch_regression.py` (marked `perf`) guards the core invariant: a batch of async activities overlaps on the event loop instead of serializing through the thread pool.
126+
127+
**grpc.aio poller log noise.** The async client can emit benign `BlockingIOError: [Errno 11]` ERROR lines from `grpc.aio`'s `PollerCompletionQueue` under load. It is harmless and retried. `get_grpc_aio_channel` installs an internal `asyncio`-logger filter (`_silence_grpc_aio_poller_noise`) that drops only those records, so the SDK suppresses it automatically with no user action.
128+
129+
110130
### DaprWorkflowClient (`dapr_workflow_client.py`)
111131

112132
Client for workflow lifecycle management:
@@ -165,7 +185,7 @@ Retry configuration for activities and child workflows:
165185
1. **Registration**: User decorates functions with `@wfr.workflow` / `@wfr.activity`. The runtime wraps them and stores them in the durabletask worker's registry.
166186
2. **Startup**: `wfr.start()` opens a gRPC stream to the Dapr sidecar. The worker polls for work items.
167187
3. **Scheduling**: Client calls `schedule_new_workflow(fn, input=...)`. The function's name (or `_dapr_alternate_name`) is sent to the backend.
168-
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). The engine records history; on replay, yielded tasks return cached results without re-executing.
188+
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). Activity functions are either sync (dispatched to the worker's thread pool) or `async def` (awaited directly on the worker's event loop). The engine records history; on replay, yielded tasks return cached results without re-executing.
169189
5. **Determinism**: Workflows must be deterministic — no random, no wall-clock time, no I/O. Use `ctx.current_utc_datetime` instead of `datetime.now()`. Use `ctx.is_replaying` to guard side effects like logging.
170190
6. **Completion**: Client polls via `wait_for_workflow_completion()` or `get_workflow_state()`.
171191

@@ -193,6 +213,7 @@ Two example directories exercise workflows:
193213
- `cross-app1.py`, `cross-app2.py`, `cross-app3.py` — cross-app calls
194214
- `versioning.py` — workflow versioning with `is_patched()`
195215
- `simple_aio_client.py` — async client variant
216+
- `async_activities.py``async def` activities (fan-out/fan-in with simulated I/O, configurable payload sizes)
196217

197218
## Testing
198219

dapr/ext/workflow/_durabletask/aio/client.py

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -71,18 +71,32 @@ def __init__(
7171
else:
7272
interceptors = None
7373

74-
channel = get_grpc_aio_channel(
75-
host_address=host_address,
76-
secure_channel=secure_channel,
77-
interceptors=interceptors,
78-
options=channel_options,
79-
)
80-
self._channel = channel
81-
self._stub = stubs.TaskHubSidecarServiceStub(channel)
74+
self._host_address = host_address
75+
self._secure_channel = secure_channel
76+
self._interceptors = interceptors
77+
self._channel_options = channel_options
78+
self._channel: grpc.aio.Channel | None = None
79+
self._stub: stubs.TaskHubSidecarServiceStub | None = None
8280
self._logger = shared.get_logger('client', log_handler, log_formatter)
8381

82+
def _get_stub(self) -> stubs.TaskHubSidecarServiceStub:
83+
"""Lazily create the channel and stub on first use.
84+
85+
Async grpc binds a channel to the loop active at creation, deferring it avoids binding to the wrong loop.
86+
"""
87+
if self._stub is None:
88+
self._channel = get_grpc_aio_channel(
89+
host_address=self._host_address,
90+
secure_channel=self._secure_channel,
91+
interceptors=self._interceptors,
92+
options=self._channel_options,
93+
)
94+
self._stub = stubs.TaskHubSidecarServiceStub(self._channel)
95+
return self._stub
96+
8497
async def aclose(self):
85-
await self._channel.close()
98+
if self._channel is not None:
99+
await self._channel.close()
86100

87101
async def __aenter__(self):
88102
return self
@@ -113,14 +127,14 @@ async def schedule_new_orchestration(
113127
)
114128

115129
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
116-
res: pb.CreateInstanceResponse = await self._stub.StartInstance(req)
130+
res: pb.CreateInstanceResponse = await self._get_stub().StartInstance(req)
117131
return res.instanceId
118132

119133
async def get_orchestration_state(
120134
self, instance_id: str, *, fetch_payloads: bool = True
121135
) -> Optional[WorkflowState]:
122136
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
123-
res: pb.GetInstanceResponse = await self._stub.GetInstance(req)
137+
res: pb.GetInstanceResponse = await self._get_stub().GetInstance(req)
124138
return new_orchestration_state(req.instanceId, res)
125139

126140
async def wait_for_orchestration_start(
@@ -132,7 +146,7 @@ async def wait_for_orchestration_start(
132146
)
133147

134148
async def _call(grpc_timeout):
135-
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceStart(
149+
res: pb.GetInstanceResponse = await self._get_stub().WaitForInstanceStart(
136150
req, timeout=grpc_timeout
137151
)
138152
return new_orchestration_state(req.instanceId, res)
@@ -151,7 +165,7 @@ async def wait_for_orchestration_completion(
151165
)
152166

153167
async def _call(grpc_timeout):
154-
res: pb.GetInstanceResponse = await self._stub.WaitForInstanceCompletion(
168+
res: pb.GetInstanceResponse = await self._get_stub().WaitForInstanceCompletion(
155169
req, timeout=grpc_timeout
156170
)
157171
state = new_orchestration_state(req.instanceId, res)
@@ -262,7 +276,7 @@ async def raise_orchestration_event(
262276
)
263277

264278
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
265-
await self._stub.RaiseEvent(req)
279+
await self._get_stub().RaiseEvent(req)
266280

267281
async def terminate_orchestration(
268282
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
@@ -274,19 +288,19 @@ async def terminate_orchestration(
274288
)
275289

276290
self._logger.info(f"Terminating instance '{instance_id}'.")
277-
await self._stub.TerminateInstance(req)
291+
await self._get_stub().TerminateInstance(req)
278292

279293
async def suspend_orchestration(self, instance_id: str):
280294
req = pb.SuspendRequest(instanceId=instance_id)
281295
self._logger.info(f"Suspending instance '{instance_id}'.")
282-
await self._stub.SuspendInstance(req)
296+
await self._get_stub().SuspendInstance(req)
283297

284298
async def resume_orchestration(self, instance_id: str):
285299
req = pb.ResumeRequest(instanceId=instance_id)
286300
self._logger.info(f"Resuming instance '{instance_id}'.")
287-
await self._stub.ResumeInstance(req)
301+
await self._get_stub().ResumeInstance(req)
288302

289303
async def purge_orchestration(self, instance_id: str, recursive: bool = True):
290304
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
291305
self._logger.info(f"Purging instance '{instance_id}'.")
292-
await self._stub.PurgeInstances(req)
306+
await self._get_stub().PurgeInstances(req)

dapr/ext/workflow/_durabletask/aio/internal/shared.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
# See the License for the specific language governing permissions and
1010
# limitations under the License.
1111

12+
import logging
1213
from typing import Optional, Sequence, Union
1314

1415
import grpc
@@ -28,6 +29,30 @@
2829
grpc_aio.StreamStreamClientInterceptor,
2930
]
3031

32+
_POLLER_NOISE_MARKER = 'PollerCompletionQueue._handle_events'
33+
34+
35+
class _GrpcAioPollerNoiseFilter(logging.Filter):
36+
"""Drops the harmless grpc.aio poller BlockingIOError (EAGAIN) records.
37+
38+
The poller does a non-blocking read on its wake-up fd and can get EAGAIN, which
39+
asyncio logs at ERROR even though the read is retried and nothing is lost.
40+
"""
41+
42+
def filter(self, record: logging.LogRecord) -> bool:
43+
exc = record.exc_info[1] if record.exc_info else None
44+
is_poller_noise = isinstance(exc, BlockingIOError) and (
45+
_POLLER_NOISE_MARKER in record.getMessage()
46+
)
47+
return not is_poller_noise
48+
49+
50+
def _silence_grpc_aio_poller_noise() -> None:
51+
"""Install the poller-noise filter on the asyncio logger if not already present."""
52+
asyncio_logger = logging.getLogger('asyncio')
53+
if not any(isinstance(f, _GrpcAioPollerNoiseFilter) for f in asyncio_logger.filters):
54+
asyncio_logger.addFilter(_GrpcAioPollerNoiseFilter())
55+
3156

3257
def get_grpc_aio_channel(
3358
host_address: Optional[str],
@@ -43,6 +68,8 @@ def get_grpc_aio_channel(
4368
interceptors: Optional sequence of client interceptors to apply to the channel.
4469
options: Optional sequence of gRPC channel options as (key, value) tuples. Keys defined in https://grpc.github.io/grpc/core/group__grpc__arg__keys.html
4570
"""
71+
_silence_grpc_aio_poller_noise()
72+
4673
if host_address is None:
4774
host_address = get_default_host_address()
4875

dapr/ext/workflow/_durabletask/internal/shared.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
# limitations under the License.
1111

1212
import dataclasses
13+
import functools
14+
import inspect
1315
import json
1416
import logging
1517
import os
@@ -20,6 +22,32 @@
2022

2123
from dapr.ext.workflow import _model_protocol
2224

25+
logger = logging.getLogger(__name__)
26+
27+
28+
def is_async_callable(fn: Any) -> bool:
29+
"""Return True if ``fn`` is async. Catches ``functools.partial`` of coroutines,
30+
sync decorators that wrap async functions, and callable instances with ``async __call__``.
31+
"""
32+
candidate = fn
33+
while isinstance(candidate, functools.partial):
34+
candidate = candidate.func
35+
if callable(candidate):
36+
try:
37+
candidate = inspect.unwrap(candidate)
38+
except ValueError:
39+
# Cyclic ``__wrapped__`` chain from a malformed decorator. Fall back to the
40+
# outermost callable; misclassification is preferable to crashing dispatch.
41+
logger.warning(
42+
f'Cyclic __wrapped__ on {fn!r}, using outermost callable for async detection.'
43+
)
44+
if inspect.iscoroutinefunction(candidate):
45+
return True
46+
if not inspect.isfunction(candidate) and hasattr(candidate, '__call__'):
47+
return inspect.iscoroutinefunction(candidate.__call__)
48+
return False
49+
50+
2351
ClientInterceptor = Union[
2452
grpc.UnaryUnaryClientInterceptor,
2553
grpc.UnaryStreamClientInterceptor,

0 commit comments

Comments
 (0)