Skip to content

Commit 872e75c

Browse files
committed
Add support for async activities
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
1 parent 099d670 commit 872e75c

11 files changed

Lines changed: 2586 additions & 123 deletions

File tree

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2026 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
"""Async activities running alongside sync ones in a single workflow.
14+
15+
Starts three async activities that do an HTTP request, then a sync activity that
16+
sums up the results. Shows that sync and async activities work side by side.
17+
18+
Run with:
19+
20+
dapr run --app-id async-activities --app-protocol grpc --dapr-grpc-port 50001 \\
21+
-- python async_activities.py
22+
"""
23+
24+
from __future__ import annotations
25+
26+
from time import sleep
27+
28+
import dapr.ext.workflow as wf
29+
import httpx
30+
from pydantic import BaseModel
31+
32+
wfr = wf.WorkflowRuntime()
33+
34+
35+
class FetchRequest(BaseModel):
36+
url: str
37+
timeout_seconds: float = 5.0
38+
39+
40+
class FetchResult(BaseModel):
41+
url: str
42+
status_code: int
43+
body_length: int
44+
45+
46+
@wfr.workflow(name='parallel_fetch_workflow')
47+
def parallel_fetch_workflow(ctx: wf.DaprWorkflowContext, urls: list[str]):
48+
fetch_tasks = [
49+
ctx.call_activity(fetch_url, input=FetchRequest(url=url).model_dump()) for url in urls
50+
]
51+
results = yield wf.when_all(fetch_tasks)
52+
summary = yield ctx.call_activity(summarize_fetches, input=results)
53+
return summary
54+
55+
56+
@wfr.activity(name='fetch_url')
57+
async def fetch_url(ctx: wf.WorkflowActivityContext, request: FetchRequest) -> dict:
58+
"""Async activity: fetch a URL with httpx. Multiple instances run concurrently."""
59+
async with httpx.AsyncClient(timeout=request.timeout_seconds) as client:
60+
response = await client.get(request.url)
61+
result = FetchResult(
62+
url=request.url,
63+
status_code=response.status_code,
64+
body_length=len(response.content),
65+
)
66+
print(
67+
f'[async] fetched {result.url} -> {result.status_code} ({result.body_length}B)', flush=True
68+
)
69+
return result.model_dump()
70+
71+
72+
@wfr.activity(name='summarize_fetches')
73+
def summarize_fetches(ctx: wf.WorkflowActivityContext, results: list[dict]) -> str:
74+
"""Sync activity: runs in the sync-fallback thread pool. Unchanged from before."""
75+
total_bytes = sum(r['body_length'] for r in results)
76+
summary = f'fetched {len(results)} URLs, total {total_bytes} bytes'
77+
print(f'[sync] {summary}', flush=True)
78+
return summary
79+
80+
81+
def main() -> None:
82+
urls = [
83+
'https://httpbin.org/uuid',
84+
'https://httpbin.org/get',
85+
'https://httpbin.org/headers',
86+
]
87+
88+
wfr.start()
89+
sleep(5) # wait for workflow runtime to start
90+
91+
wf_client = wf.DaprWorkflowClient()
92+
instance_id = wf_client.schedule_new_workflow(workflow=parallel_fetch_workflow, input=urls)
93+
print(f'Workflow started. Instance ID: {instance_id}')
94+
95+
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
96+
assert state is not None
97+
print(f'Workflow completed! Status: {state.runtime_status.name}')
98+
print(f'Workflow result: {state.serialized_output.strip(chr(34))}')
99+
100+
wfr.shutdown()
101+
102+
103+
if __name__ == '__main__':
104+
main()

ext/dapr-ext-workflow/AGENTS.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,24 @@ The entry point for registration and lifecycle:
105105

106106
Internally wraps user functions: workflow functions get a `DaprWorkflowContext`, activity functions get a `WorkflowActivityContext`. Tracks registration state via `_workflow_registered` / `_activity_registered` attributes on functions to prevent double registration.
107107

108+
#### Sync and async activities
109+
110+
Activities can be either `def my_activity(ctx, inp)` or `async def my_activity(ctx, inp)`. At registration, `_make_activity_wrapper` calls `_is_async_callable(fn)` to detect async-ness. That helper unwraps `functools.partial`, `@functools.wraps` chains, and callable-class `__call__` so common decorator patterns route correctly. The wrapper is built `async def` or `def` to match, then stored in the registry.
111+
112+
At dispatch time (the gRPC stream loop in `_durabletask/worker.py`), `inspect.iscoroutinefunction(activity_fn)` on the wrapper selects between two handlers.
113+
114+
- **Async activities** go through `_execute_activity_async`, then `_ActivityExecutor.execute_async`, which awaits `fn(...)` directly on the event loop. No thread pool involvement. The gRPC response is delivered via `loop.run_in_executor(None, stub.CompleteActivityTask, ...)` (asyncio's default executor).
115+
- **Sync activities** go through `_execute_activity`, dispatched to the thread pool by `_AsyncWorkerManager._run_func`. The activity runs on a worker thread, and the response is delivered from the same thread. The thread pool size is controlled by `maximum_thread_pool_workers`.
116+
117+
Workflow (orchestrator) functions must remain generators (`def` with `yield`). They cannot be `async def` because durabletask's deterministic replay depends on synchronous generator semantics. Only activities support async.
118+
119+
**Decorator ordering gotcha.** Stacking `@wfr.activity` over `@alternate_name(...)` over `async def` works because `@alternate_name` now emits an `async def innerfn` when the wrapped function is async. A user-written decorator that wraps an async function in a sync `def` (without `@functools.wraps` exposing `__wrapped__`) defeats `_is_async_callable`, routes the activity to the sync path, and produces an un-awaited coroutine. Such decorators should use `@functools.wraps(fn)` so the unwrap walks through them.
120+
121+
**`maximum_thread_pool_workers` gotcha.** This knob sizes the sync-activity thread pool only. Async-activity response delivery uses asyncio's default executor (process-wide, lazily sized to `min(32, cpu_count + 4)`), which is not capped by this knob. Strict thread-count bounds for async response delivery require calling `asyncio.get_event_loop().set_default_executor(ThreadPoolExecutor(max_workers=N))` before `wfr.start()`. A future PR may migrate the worker to `grpc.aio` and remove this caveat by sending responses without any thread pool.
122+
123+
**Concurrency sizing and load characterization.** See `docs/concurrency.md` for sizing recommendations (`maximum_concurrent_activity_work_items`, `maximum_thread_pool_workers`), an async-vs-sync decision tree, and the default-executor caveat with a worked example. The `benchmarks/` directory ships `bench_async_activities.py` and the generated `RESULTS.md`; re-run it locally before claiming a perf regression — the report captures the run environment so a reader can tell whether a number applies to their hardware.
124+
125+
108126
### DaprWorkflowClient (`dapr_workflow_client.py`)
109127

110128
Client for workflow lifecycle management:
@@ -163,7 +181,7 @@ Retry configuration for activities and child workflows:
163181
1. **Registration**: User decorates functions with `@wfr.workflow` / `@wfr.activity`. The runtime wraps them and stores them in the durabletask worker's registry.
164182
2. **Startup**: `wfr.start()` opens a gRPC stream to the Dapr sidecar. The worker polls for work items.
165183
3. **Scheduling**: Client calls `schedule_new_workflow(fn, input=...)`. The function's name (or `_dapr_alternate_name`) is sent to the backend.
166-
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). The engine records history; on replay, yielded tasks return cached results without re-executing.
184+
4. **Execution**: The durabletask engine dispatches work items. Workflow functions are Python **generators** that `yield` tasks (activity calls, timers, child workflows). Activity functions are either sync (dispatched to the worker's thread pool) or `async def` (awaited directly on the worker's event loop). The engine records history; on replay, yielded tasks return cached results without re-executing.
167185
5. **Determinism**: Workflows must be deterministic — no random, no wall-clock time, no I/O. Use `ctx.current_utc_datetime` instead of `datetime.now()`. Use `ctx.is_replaying` to guard side effects like logging.
168186
6. **Completion**: Client polls via `wait_for_workflow_completion()` or `get_workflow_state()`.
169187

@@ -191,6 +209,7 @@ Two example directories exercise workflows:
191209
- `cross-app1.py`, `cross-app2.py`, `cross-app3.py` — cross-app calls
192210
- `versioning.py` — workflow versioning with `is_patched()`
193211
- `simple_aio_client.py` — async client variant
212+
- `async_activities.py``async def` activities (HTTP fan-out with `httpx.AsyncClient`)
194213

195214
## Testing
196215

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Async-activity load benchmark results
2+
3+
Generated by `bench_async_activities.py`. Re-run with:
4+
5+
```bash
6+
uv run python ext/dapr-ext-workflow/benchmarks/bench_async_activities.py
7+
```
8+
9+
## Run environment
10+
11+
- **Timestamp**: 2026-05-25 20:40:09 UTC
12+
- **Git commit**: `8f13da0-dirty`
13+
- **Python**: CPython 3.13.12
14+
- **OS**: Darwin 25.5.0 (arm64) on Apple M3 Pro (12 logical cores), 36.0 GB
15+
- **asyncio default executor**: `max_workers=16` (`min(32, cpu_count + 4)`)
16+
- **CI environment**: no
17+
18+
Numbers are specific to this hardware. Re-run locally to compare. The shape of
19+
the curves (throughput plateau, p99 inflection, drift) is what to compare
20+
across machines.
21+
22+
Each scenario drives `TaskHubGrpcWorker._execute_activity_async` through
23+
`_AsyncWorkerManager` against a mock `CompleteActivityTask` stub. End-to-end
24+
latency is measured from `submit_activity` to the mock stub seeing the response.
25+
26+
## 1. Concurrency win (issue #897 repro)
27+
28+
100 × 1 s HTTP fetches. Async runs them concurrently on the loop, sync gates
29+
them through the thread pool.
30+
31+
| Scenario | Wallclock (s) | Tput/s | Peak tasks | Peak RSS Δ (MB) |
32+
| --- | ---: | ---: | ---: | ---: |
33+
| Async fan-out | 1.47 | 68.1 | 305 | 86.4 |
34+
| Sync baseline | 13.34 | 7.5 | 121 | 2.4 |
35+
36+
## 2. Throughput scaling
37+
38+
Async fan-out, 50 ms activity, sem=5000, pool=16. Throughput plateaus around
39+
N=2500.
40+
41+
| N | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms | Peak tasks | Peak RSS Δ (MB) |
42+
| ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |
43+
| 100 | 0.06 | 1542.3 | 62.0 | 64.1 | 64.1 | 105 | 0.0 |
44+
| 500 | 0.08 | 5931.1 | 78.6 | 79.6 | 79.6 | 505 | 0.4 |
45+
| 1000 | 0.11 | 8956.5 | 102.9 | 106.2 | 106.3 | 1005 | 2.9 |
46+
| 2500 | 0.24 | 10532.0 | 218.8 | 225.3 | 225.9 | 2505 | 10.0 |
47+
| 5000 | 0.57 | 8696.7 | 543.8 | 557.2 | 558.7 | 5005 | 25.2 |
48+
49+
## 3. Semaphore-cap sensitivity
50+
51+
N=2500, 50 ms activity, pool=16. Caps below ~500 starve the loop. Gains
52+
compress above ~1000.
53+
54+
| Sem | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms |
55+
| ---: | ---: | ---: | ---: | ---: | ---: |
56+
| 50 | 2.69 | 928.6 | 1422.7 | 2583.5 | 2687.0 |
57+
| 100 | 1.42 | 1758.2 | 794.9 | 1360.7 | 1412.0 |
58+
| 500 | 0.40 | 6229.5 | 279.2 | 387.9 | 392.3 |
59+
| 1000 | 0.30 | 8322.3 | 235.6 | 286.9 | 290.2 |
60+
| 5000 | 0.23 | 10720.7 | 215.0 | 222.3 | 222.8 |
61+
62+
## 4. Failure threshold (queue-wait inflection)
63+
64+
Cap=1000, ramp N, 50 ms activity. p99 first exceeds 2× server latency at
65+
**N=1000** (p99 = 104.7 ms).
66+
67+
| N | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms |
68+
| ---: | ---: | ---: | ---: | ---: | ---: |
69+
| 500 | 0.08 | 6264.2 | 70.6 | 77.4 | 77.5 |
70+
| 1000 | 0.11 | 9145.3 | 94.5 | 104.2 | 104.7 |
71+
| 2500 | 0.31 | 8086.8 | 243.6 | 294.2 | 298.0 |
72+
| 5000 | 0.72 | 6983.2 | 584.2 | 691.1 | 700.5 |
73+
| 10000 | 2.08 | 4813.1 | 1801.7 | 2019.3 | 2046.2 |
74+
75+
## 5. Sidecar response delivery overhead
76+
77+
N=1000, sem=1000, pool=16, 50 ms activity. Mock `CompleteActivityTask` given
78+
an artificial delay. Async responses go through `loop.run_in_executor(None, ...)`,
79+
sharing asyncio's default executor (`max_workers=16` here). Delays past ~5 ms
80+
saturate that pool.
81+
82+
| Delivery | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms |
83+
| ---: | ---: | ---: | ---: | ---: | ---: |
84+
| 0 ms | 0.11 | 9497.2 | 98.2 | 101.3 | 101.5 |
85+
| 1 ms | 0.18 | 5699.8 | 133.0 | 167.7 | 171.0 |
86+
| 5 ms | 0.48 | 2077.9 | 287.7 | 458.6 | 473.4 |
87+
| 10 ms | 0.86 | 1162.5 | 494.1 | 820.4 | 843.5 |
88+
89+
## 6. Sustained load
90+
91+
200/s for 120 s, 50 ms activity. Submitted/completed: 24 000 / 24 000.
92+
Wallclock 120.05 s (effective 199.9/s).
93+
94+
- p50 50.2 ms, p95 50.6 ms, p99 50.8 ms, max 62.8 ms.
95+
- First-25% p99 50.8 ms, last-25% p99 50.7 ms. No drift.
96+
- Peak tasks 19, peak queue depth 3, peak RSS Δ 5.8 MB.
97+
98+
## 7. Real HTTP workload
99+
100+
Each activity opens a fresh `httpx.AsyncClient` and GETs an aiohttp endpoint
101+
sleeping 50 ms. Mirrors `examples/workflow/async_activities.py`. Pool=16 for
102+
all rows.
103+
104+
| Scenario | N | Sem | Wallclock (s) | Tput/s | p50 ms | p95 ms | p99 ms | Peak tasks | Peak RSS Δ (MB) |
105+
| --- | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: | ---: |
106+
| Async | 100 | 1000 | 0.49 | 205.3 | 485.1 | 485.4 | 485.5 | 305 | 0.0 |
107+
| Async | 500 | 1000 | 2.06 | 243.2 | 1990.2 | 2052.6 | 2053.0 | 1376 | 308.1 |
108+
| Async | 1000 | 1000 | 4.28 | 233.4 | 4200.5 | 4274.9 | 4280.5 | 2555 | 398.5 |
109+
| Async | 2500 | 5000 | 15.16 | 165.0 | 10240.9 | 13260.9 | 15111.6 | 5776 | 1219.1 |
110+
| Sync | 100 | 1000 | 0.51 | 194.2 | 324.6 | 458.5 | 514.4 | 137 | 0.7 |
111+
112+
## 8. Real HTTP sustained load
113+
114+
Open-loop 100/s for 60 s with real `httpx.AsyncClient`. Submitted/completed:
115+
6000 / 6000. Wallclock 60.05 s (effective 99.9/s).
116+
117+
- p50 56.1 ms, p95 68.9 ms, p99 76.0 ms, max 145.2 ms.
118+
- First-25% p99 75.7 ms, last-25% p99 76.2 ms. No drift.
119+
- Peak tasks 45, peak queue depth 6, peak RSS Δ 0.0 MB.
120+
121+
## 9. OOM safety
122+
123+
10 000 in-flight async activities, 50 ms, sem=1000, pool=8. ~9 000 Tasks
124+
parked on the semaphore. Peak RSS Δ stays well under the 500 MB budget.
125+
126+
| N | Sem | Wallclock (s) | Tput/s | Peak tasks | Peak RSS Δ (MB) |
127+
| ---: | ---: | ---: | ---: | ---: | ---: |
128+
| 10000 | 1000 | 2.03 | 4918.2 | 10005 | 0.0 |
129+
130+
## Operational guidance
131+
132+
See `ext/dapr-ext-workflow/docs/concurrency.md` for sizing recommendations.

0 commit comments

Comments
 (0)