Skip to content

Commit 9eb7841

Browse files
committed
Add support for async activities
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
1 parent 099d670 commit 9eb7841

11 files changed

Lines changed: 2829 additions & 122 deletions

File tree

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2026 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
"""Async activities running alongside sync ones in a single workflow.
14+
15+
Starts three async activities that do an HTTP request, then a sync activity that
16+
sums up the results. Shows that sync and async activities work side by side.
17+
18+
Run with:
19+
20+
dapr run --app-id async-activities --app-protocol grpc --dapr-grpc-port 50001 \\
21+
-- python async_activities.py
22+
"""
23+
24+
from __future__ import annotations
25+
26+
from time import sleep
27+
28+
import dapr.ext.workflow as wf
29+
import httpx
30+
from pydantic import BaseModel
31+
32+
wfr = wf.WorkflowRuntime()
33+
34+
35+
class FetchRequest(BaseModel):
36+
url: str
37+
timeout_seconds: float = 5.0
38+
39+
40+
class FetchResult(BaseModel):
41+
url: str
42+
status_code: int
43+
body_length: int
44+
45+
46+
@wfr.workflow(name='parallel_fetch_workflow')
47+
def parallel_fetch_workflow(ctx: wf.DaprWorkflowContext, urls: list[str]):
48+
fetch_tasks = [
49+
ctx.call_activity(fetch_url, input=FetchRequest(url=url).model_dump()) for url in urls
50+
]
51+
results = yield wf.when_all(fetch_tasks)
52+
summary = yield ctx.call_activity(summarize_fetches, input=results)
53+
return summary
54+
55+
56+
@wfr.activity(name='fetch_url')
57+
async def fetch_url(ctx: wf.WorkflowActivityContext, request: FetchRequest) -> dict:
58+
"""Async activity: fetch a URL with httpx. Multiple instances run concurrently."""
59+
async with httpx.AsyncClient(timeout=request.timeout_seconds) as client:
60+
response = await client.get(request.url)
61+
result = FetchResult(
62+
url=request.url,
63+
status_code=response.status_code,
64+
body_length=len(response.content),
65+
)
66+
print(f'[async] fetched {result.url} -> {result.status_code} ({result.body_length}B)', flush=True)
67+
return result.model_dump()
68+
69+
70+
@wfr.activity(name='summarize_fetches')
71+
def summarize_fetches(ctx: wf.WorkflowActivityContext, results: list[dict]) -> str:
72+
"""Sync activity: runs in the sync-fallback thread pool. Unchanged from before."""
73+
total_bytes = sum(r['body_length'] for r in results)
74+
summary = f'fetched {len(results)} URLs, total {total_bytes} bytes'
75+
print(f'[sync] {summary}', flush=True)
76+
return summary
77+
78+
79+
def main() -> None:
80+
urls = [
81+
'https://httpbin.org/uuid',
82+
'https://httpbin.org/get',
83+
'https://httpbin.org/headers',
84+
]
85+
86+
wfr.start()
87+
sleep(5) # wait for workflow runtime to start
88+
89+
wf_client = wf.DaprWorkflowClient()
90+
instance_id = wf_client.schedule_new_workflow(workflow=parallel_fetch_workflow, input=urls)
91+
print(f'Workflow started. Instance ID: {instance_id}')
92+
93+
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
94+
assert state is not None
95+
print(f'Workflow completed! Status: {state.runtime_status.name}')
96+
print(f'Workflow result: {state.serialized_output.strip(chr(34))}')
97+
98+
wfr.shutdown()
99+
100+
101+
if __name__ == '__main__':
102+
main()

ext/dapr-ext-workflow/AGENTS.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,24 @@ The entry point for registration and lifecycle:
105105

106106
Internally wraps user functions: workflow functions get a `DaprWorkflowContext`, activity functions get a `WorkflowActivityContext`. Tracks registration state via `_workflow_registered` / `_activity_registered` attributes on functions to prevent double registration.
107107

108+
#### Sync and async activities
109+
110+
Activities can be either `def my_activity(ctx, inp)` or `async def my_activity(ctx, inp)`. At registration, `_make_activity_wrapper` calls `_is_async_callable(fn)` to detect async-ness. That helper unwraps `functools.partial`, `@functools.wraps` chains, and callable-class `__call__` so common decorator patterns route correctly. The wrapper is built `async def` or `def` to match, then stored in the registry.
111+
112+
At dispatch time (the gRPC stream loop in `_durabletask/worker.py`), `inspect.iscoroutinefunction(activity_fn)` on the wrapper selects between two handlers.
113+
114+
- **Async activities** go through `_execute_activity_async`, then `_ActivityExecutor.execute_async`, which awaits `fn(...)` directly on the event loop. No thread pool involvement. The gRPC response is delivered via `loop.run_in_executor(None, stub.CompleteActivityTask, ...)` (asyncio's default executor).
115+
- **Sync activities** go through `_execute_activity`, dispatched to the thread pool by `_AsyncWorkerManager._run_func`. The activity runs on a worker thread, and the response is delivered from the same thread. The thread pool size is controlled by `maximum_thread_pool_workers`.
116+
117+
Workflow (orchestrator) functions must remain generators (`def` with `yield`). They cannot be `async def` because durabletask's deterministic replay depends on synchronous generator semantics. Only activities support async.
118+
119+
**Decorator ordering gotcha.** Stacking `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them.
120+
121+
**`maximum_thread_pool_workers` gotcha.** This knob sizes the sync-activity thread pool only. Async-activity response delivery uses asyncio's default executor (process-wide, lazily sized to `min(32, cpu_count + 4)`), which is not capped by this knob. Strict thread-count bounds for async response delivery require calling `asyncio.get_event_loop().set_default_executor(ThreadPoolExecutor(max_workers=N))` before `wfr.start()`. A future PR may migrate the worker to `grpc.aio` and remove this caveat by sending responses without any thread pool.
122+
123+
**Concurrency sizing and load characterization.** See `docs/concurrency.md` for sizing recommendations (`maximum_concurrent_activity_work_items`, `maximum_thread_pool_workers`), an async-vs-sync decision tree, and the default-executor caveat with a worked example. The `benchmarks/` directory ships `bench_async_activities.py` and the generated `RESULTS.md`; re-run it locally before claiming a perf regression — the report captures the run environment so a reader can tell whether a number applies to their hardware.
124+
125+
108126
### DaprWorkflowClient (`dapr_workflow_client.py`)
109127

110128
Client for workflow lifecycle management:
@@ -163,7 +181,7 @@ Retry configuration for activities and child workflows:
163181
1. **Registration**: User decorates functions with `@wfr.workflow` / `@wfr.activity`. The runtime wraps them and stores them in the durabletask worker's registry.
164182
2. **Startup**: `wfr.start()` opens a gRPC stream to the Dapr sidecar. The worker polls for work items.
165183
3. **Scheduling**: Client calls `schedule_new_workflow(fn, input=...)`. The function's name (or `_dapr_alternate_name`) is sent to the backend.
166-
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). The engine records history; on replay, yielded tasks return cached results without re-executing.
184+
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). Activity functions are either sync (dispatched to the worker's thread pool) or `async def` (awaited directly on the worker's event loop). The engine records history; on replay, yielded tasks return cached results without re-executing.
167185
5. **Determinism**: Workflows must be deterministic — no random, no wall-clock time, no I/O. Use `ctx.current_utc_datetime` instead of `datetime.now()`. Use `ctx.is_replaying` to guard side effects like logging.
168186
6. **Completion**: Client polls via `wait_for_workflow_completion()` or `get_workflow_state()`.
169187

@@ -191,6 +209,7 @@ Two example directories exercise workflows:
191209
- `cross-app1.py`, `cross-app2.py`, `cross-app3.py` — cross-app calls
192210
- `versioning.py` — workflow versioning with `is_patched()`
193211
- `simple_aio_client.py` — async client variant
212+
- `async_activities.py``async def` activities (HTTP fan-out with `httpx.AsyncClient`)
194213

195214
## Testing
196215

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# Async-activity load benchmark results
2+
3+
Generated by `bench_async_activities.py`. Re-run with:
4+
5+
```bash
6+
uv run python ext/dapr-ext-workflow/benchmarks/bench_async_activities.py
7+
```
8+
9+
## Run environment
10+
11+
- **Timestamp**: 2026-05-25 20:40:09 UTC
12+
- **Git commit**: `8f13da0-dirty`
13+
- **Python**: CPython 3.13.12
14+
- **OS**: Darwin 25.5.0 (arm64)
15+
- **Platform**: `macOS-26.5-arm64-arm-64bit-Mach-O`
16+
- **CPU**: Apple M3 Pro (12 logical cores)
17+
- **Memory**: 36.0 GB
18+
- **asyncio default executor**: max_workers = 16 (`min(32, cpu_count + 4)`)
19+
- **CI environment**: no
20+
21+
**Numbers from this report are specific to this machine.** Re-run the benchmark on your hardware before drawing conclusions; on a small CI runner or a busy workstation they will diverge. The shape of the curves (throughput plateau, p99 inflection, drift) is what to compare across machines.
22+
23+
24+
Each scenario drives the production dispatch path (`TaskHubGrpcWorker._execute_activity_async`) through `_AsyncWorkerManager` against a mock `CompleteActivityTask` stub. End-to-end latency is measured from `submit_activity` to the mock stub receiving the response, so queue wait, semaphore acquisition, activity work, response build, and `run_in_executor` delivery are all included.
25+
26+
## 1. Concurrency win (issue #897 repro)
27+
28+
Proves async activities run concurrently on the loop; the sync path is gated by the thread pool. This row reuses the original repro at 100 × 1 s HTTP fetches.
29+
30+
| Scenario | N | Sem | Pool | Latency (s) | Wallclock (s) | Tput/s | Peak tasks | Peak queue | Peak RSS Δ (MB) | Notes |
31+
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |
32+
| Async fan-out (issue #897 repro) | 100 | 1000 | 8 | 1.000 | 1.47 | 68.1 | 305 | 0 | 86.4 | 100 awaits run concurrently on the loop |
33+
| Sync baseline (pre-#897 behavior) | 100 | 1000 | 8 | 1.000 | 13.34 | 7.5 | 121 | 0 | 2.4 | gated by thread pool size, demonstrates the bug from #897 |
34+
35+
## 2. Throughput scaling
36+
37+
Async fan-out at 50 ms server latency, semaphore cap 5000, thread pool 16. Throughput is reported as items completed per wallclock second; the sweep shows where the curve flattens.
38+
39+
| Scenario | N | Sem | Pool | Latency (s) | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms | Peak tasks | Peak queue | Peak RSS Δ (MB) | Notes |
40+
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |
41+
| Throughput N=100 | 100 | 5000 | 16 | 0.050 | 0.06 | 1542.3 | 62.0 | 64.1 | 64.1 | 105 | 0 | 0.0 | full _execute_activity_async path + mock CompleteActivityTask |
42+
| Throughput N=500 | 500 | 5000 | 16 | 0.050 | 0.08 | 5931.1 | 78.6 | 79.6 | 79.6 | 505 | 0 | 0.4 | full _execute_activity_async path + mock CompleteActivityTask |
43+
| Throughput N=1000 | 1000 | 5000 | 16 | 0.050 | 0.11 | 8956.5 | 102.9 | 106.2 | 106.3 | 1005 | 0 | 2.9 | full _execute_activity_async path + mock CompleteActivityTask |
44+
| Throughput N=2500 | 2500 | 5000 | 16 | 0.050 | 0.24 | 10532.0 | 218.8 | 225.3 | 225.9 | 2505 | 0 | 10.0 | full _execute_activity_async path + mock CompleteActivityTask |
45+
| Throughput N=5000 | 5000 | 5000 | 16 | 0.050 | 0.57 | 8696.7 | 543.8 | 557.2 | 558.7 | 5005 | 0 | 25.2 | full _execute_activity_async path + mock CompleteActivityTask |
46+
47+
## 3. Semaphore-cap sensitivity
48+
49+
N=2500 async activities at 50 ms server latency. Cap below ~500 starves the loop and inflates queue wait. Above that, gains compress.
50+
51+
| Scenario | N | Sem | Pool | Latency (s) | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms | Peak tasks | Peak queue | Peak RSS Δ (MB) | Notes |
52+
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |
53+
| Sem cap=50 | 2500 | 50 | 16 | 0.050 | 2.69 | 928.6 | 1422.7 | 2583.5 | 2687.0 | 2505 | 0 | 0.0 | lower caps serialize the batch through fewer parallel slots |
54+
| Sem cap=100 | 2500 | 100 | 16 | 0.050 | 1.42 | 1758.2 | 794.9 | 1360.7 | 1412.0 | 2505 | 0 | 0.0 | lower caps serialize the batch through fewer parallel slots |
55+
| Sem cap=500 | 2500 | 500 | 16 | 0.050 | 0.40 | 6229.5 | 279.2 | 387.9 | 392.3 | 2505 | 0 | 0.0 | caps above N x latency yield no further gain |
56+
| Sem cap=1000 | 2500 | 1000 | 16 | 0.050 | 0.30 | 8322.3 | 235.6 | 286.9 | 290.2 | 2505 | 0 | 0.0 | caps above N x latency yield no further gain |
57+
| Sem cap=5000 | 2500 | 5000 | 16 | 0.050 | 0.23 | 10720.7 | 215.0 | 222.3 | 222.8 | 2505 | 0 | 0.0 | caps above N x latency yield no further gain |
58+
59+
## 4. Failure threshold (queue-wait inflection)
60+
61+
Cap held at 1000, ramp N. Until N approaches cap, p99 stays close to server latency. Past it, queue wait dominates and p99 grows ~linearly with `N / cap`.
62+
63+
| Scenario | N | Sem | Pool | Latency (s) | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms | Peak tasks | Peak queue | Peak RSS Δ (MB) | Notes |
64+
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |
65+
| Threshold N=500 (cap=1000) | 500 | 1000 | 16 | 0.050 | 0.08 | 6264.2 | 70.6 | 77.4 | 77.5 | 505 | 0 | 0.0 | N > cap forces queue wait; p99 grows linearly |
66+
| Threshold N=1000 (cap=1000) | 1000 | 1000 | 16 | 0.050 | 0.11 | 9145.3 | 94.5 | 104.2 | 104.7 | 1005 | 0 | 0.0 | N > cap forces queue wait; p99 grows linearly |
67+
| Threshold N=2500 (cap=1000) | 2500 | 1000 | 16 | 0.050 | 0.31 | 8086.8 | 243.6 | 294.2 | 298.0 | 2505 | 0 | 0.0 | N > cap forces queue wait; p99 grows linearly |
68+
| Threshold N=5000 (cap=1000) | 5000 | 1000 | 16 | 0.050 | 0.72 | 6983.2 | 584.2 | 691.1 | 700.5 | 5005 | 0 | 0.0 | N > cap forces queue wait; p99 grows linearly |
69+
| Threshold N=10000 (cap=1000) | 10000 | 1000 | 16 | 0.050 | 2.08 | 4813.1 | 1801.7 | 2019.3 | 2046.2 | 10005 | 0 | 1.1 | N > cap forces queue wait; p99 grows linearly |
70+
71+
**Threshold**: p99 first exceeds 2x server latency (100.0 ms) at **N=1000** with cap=1000 (p99 = 104.7 ms).
72+
73+
## 5. Sidecar response delivery overhead
74+
75+
Mock `CompleteActivityTask` is given an artificial delay. Async responses go through `loop.run_in_executor(None, ...)`, so they share asyncio's default executor (max `min(32, cpu_count + 4)`; on this run, `cpu_count=12`). Delivery latency above ~5 ms × concurrency exceeds the default pool and serializes, inflating tail latency.
76+
77+
| Scenario | N | Sem | Pool | Latency (s) | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms | Peak tasks | Peak queue | Peak RSS Δ (MB) | Notes |
78+
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |
79+
| Delivery latency=0ms | 1000 | 1000 | 16 | 0.050 | 0.11 | 9497.2 | 98.2 | 101.3 | 101.5 | 1005 | 0 | 0.0 | asyncio default executor caps response delivery at min(32, cpu+4) workers |
80+
| Delivery latency=1ms | 1000 | 1000 | 16 | 0.050 | 0.18 | 5699.8 | 133.0 | 167.7 | 171.0 | 1005 | 0 | 0.0 | asyncio default executor caps response delivery at min(32, cpu+4) workers |
81+
| Delivery latency=5ms | 1000 | 1000 | 16 | 0.050 | 0.48 | 2077.9 | 287.7 | 458.6 | 473.4 | 1005 | 0 | 0.0 | asyncio default executor caps response delivery at min(32, cpu+4) workers |
82+
| Delivery latency=10ms | 1000 | 1000 | 16 | 0.050 | 0.86 | 1162.5 | 494.1 | 820.4 | 843.5 | 1005 | 0 | 0.0 | asyncio default executor caps response delivery at min(32, cpu+4) workers |
83+
84+
## 6. Sustained load
85+
86+
- **Target rate**: 200/s for 120 s
87+
- **Submitted / completed**: 24000 / 24000
88+
- **Wallclock**: 120.05 s (effective throughput 199.9/s)
89+
- **Latency (overall)**: p50 50.2 ms, p95 50.6 ms, p99 50.8 ms, max 62.8 ms
90+
- **Latency (first 25%)**: p99 50.8 ms
91+
- **Latency (last 25%)**: p99 50.7 ms
92+
- **Peak tasks**: 19, peak queue depth: 3, peak RSS Δ: 5.8 MB
93+
94+
95+
## 7. Real HTTP workload (production shape)
96+
97+
Each activity opens a fresh `httpx.AsyncClient` and GETs a local aiohttp endpoint that sleeps 50 ms. Mirrors `examples/workflow/async_activities.py`. The sync row at N=100 shows the same workload throttled by the thread pool — directly comparable to the rest of the table.
98+
99+
| Scenario | N | Sem | Pool | Latency (s) | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms | Peak tasks | Peak queue | Peak RSS Δ (MB) | Notes |
100+
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |
101+
| Real HTTP async N=100 | 100 | 1000 | 16 | 0.050 | 0.49 | 205.3 | 485.1 | 485.4 | 485.5 | 305 | 0 | 0.0 | httpx.AsyncClient → aiohttp server (50 ms) |
102+
| Real HTTP async N=500 | 500 | 1000 | 16 | 0.050 | 2.06 | 243.2 | 1990.2 | 2052.6 | 2053.0 | 1376 | 0 | 308.1 | httpx.AsyncClient → aiohttp server (50 ms) |
103+
| Real HTTP async N=1000 | 1000 | 1000 | 16 | 0.050 | 4.28 | 233.4 | 4200.5 | 4274.9 | 4280.5 | 2555 | 0 | 398.5 | httpx.AsyncClient → aiohttp server (50 ms) |
104+
| Real HTTP async N=2500 | 2500 | 5000 | 16 | 0.050 | 15.16 | 165.0 | 10240.9 | 13260.9 | 15111.6 | 5776 | 0 | 1219.1 | httpx.AsyncClient → aiohttp server (50 ms) |
105+
| Real HTTP sync N=100 | 100 | 1000 | 16 | 0.050 | 0.51 | 194.2 | 324.6 | 458.5 | 514.4 | 137 | 0 | 0.7 | httpx.Client → aiohttp server, throttled by thread pool |
106+
107+
## 8. Real HTTP sustained load
108+
109+
Open-loop submission of real `httpx.AsyncClient` fetches at 100/s. Confirms steady state under genuine I/O, not synthetic sleep.
110+
111+
- **Target rate**: 100/s for 60 s
112+
- **Submitted / completed**: 6000 / 6000
113+
- **Wallclock**: 60.05 s (effective throughput 99.9/s)
114+
- **Latency (overall)**: p50 56.1 ms, p95 68.9 ms, p99 76.0 ms, max 145.2 ms
115+
- **Latency (first 25%)**: p99 75.7 ms
116+
- **Latency (last 25%)**: p99 76.2 ms
117+
- **Peak tasks**: 45, peak queue depth: 6, peak RSS Δ: 0.0 MB
118+
119+
120+
## 9. OOM safety
121+
122+
10 000 in-flight async activities at 50 ms with a 1 000-cap semaphore. The ~9 000 Tasks parked on the semaphore are the design-discussion concern. Peak RSS delta stays well under the 500 MB budget, so the unbounded-pending-Task pattern is fine in practice.
123+
124+
| Scenario | N | Sem | Pool | Latency (s) | Wallclock (s) | Tput/s | Peak tasks | Peak queue | Peak RSS Δ (MB) | Notes |
125+
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | --- |
126+
| OOM safety (10k tasks, 1k semaphore) | 10000 | 1000 | 8 | 0.050 | 2.03 | 4918.2 | 10005 | 0 | 0.0 | ~9k tasks blocked on the semaphore. Peak RSS delta budget is 500 MB. |
127+
128+
## How to read this report
129+
130+
- **Tput/s** is the closed-loop throughput (items completed / wallclock). For the sustained scenario it is the steady-state value over the full run.
131+
- **p99 ms** is the end-to-end latency for the 99th-percentile item: time from `submit_activity` to the mock stub seeing the response.
132+
- **Peak queue** is the maximum depth of the manager's `activity_queue` during the run. Non-zero peak queue means submission temporarily outran the semaphore.
133+
- **Peak tasks** is the maximum number of live `asyncio.Task` objects in the process, which doubles as a sanity check on the unbounded-pending-Task pattern.
134+
135+
## Operational guidance
136+
137+
See `ext/dapr-ext-workflow/docs/concurrency.md` for the full operational write-up, including sizing recommendations for `maximum_concurrent_activity_work_items`, `maximum_thread_pool_workers`, and the asyncio default-executor caveat.

0 commit comments

Comments
 (0)