Skip to content

Commit feb60db

Browse files
committed
Redo benchmarks and add performance regression tests
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
1 parent e68141c commit feb60db

9 files changed

Lines changed: 1197 additions & 1684 deletions

File tree

examples/workflow/async_activities.py

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
1212

13-
"""Async activities running alongside sync ones in a single workflow.
13+
"""Async activities running alongside a sync one in a fan-out/fan-in workflow.
1414
15-
Starts three async activities that do an HTTP request, then a sync activity that
16-
sums up the results. Shows that sync and async activities work side by side.
15+
Each async activity simulates an I/O-bound call: it takes a payload, awaits a fixed
16+
delay (standing in for a network round-trip), and returns a result payload. The async
17+
instances run concurrently on the worker's event loop; a final sync activity aggregates
18+
the results. Fan-out width, input/output payload sizes, and the delay are configurable
19+
via environment variables.
1720
1821
Run with:
1922
@@ -23,73 +26,72 @@
2326

2427
from __future__ import annotations
2528

29+
import asyncio
30+
import os
31+
import random
32+
import string
2633
from time import sleep
2734

2835
import dapr.ext.workflow as wf
29-
import httpx
3036
from pydantic import BaseModel
3137

38+
FAN_OUT = int(os.environ.get('WORKFLOW_FAN_OUT', '5'))
39+
INPUT_BYTES = int(os.environ.get('WORKFLOW_INPUT_BYTES', '2048'))
40+
OUTPUT_BYTES = int(os.environ.get('WORKFLOW_OUTPUT_BYTES', '1024'))
41+
IO_SECONDS = float(os.environ.get('WORKFLOW_IO_SECONDS', '1.0'))
42+
3243
wfr = wf.WorkflowRuntime()
3344

3445

35-
class FetchRequest(BaseModel):
36-
url: str
37-
timeout_seconds: float = 5.0
46+
def _random_digits(n: int) -> str:
47+
return ''.join(random.choices(string.digits, k=n))
3848

3949

40-
class FetchResult(BaseModel):
41-
url: str
42-
status_code: int
43-
body_length: int
50+
class Payload(BaseModel):
51+
index: int
52+
data: str
4453

4554

46-
@wfr.workflow(name='parallel_fetch_workflow')
47-
def parallel_fetch_workflow(ctx: wf.DaprWorkflowContext, urls: list[str]):
48-
fetch_tasks = [
49-
ctx.call_activity(fetch_url, input=FetchRequest(url=url).model_dump()) for url in urls
50-
]
51-
results = yield wf.when_all(fetch_tasks)
52-
summary = yield ctx.call_activity(summarize_fetches, input=results)
55+
@wfr.workflow(name='fan_out_fan_in_workflow')
56+
def fan_out_fan_in_workflow(ctx: wf.DaprWorkflowContext, payloads: list[dict]):
57+
tasks = [ctx.call_activity(process_payload, input=p) for p in payloads]
58+
results = yield wf.when_all(tasks)
59+
summary = yield ctx.call_activity(summarize, input=results)
5360
return summary
5461

5562

56-
@wfr.activity(name='fetch_url')
57-
async def fetch_url(ctx: wf.WorkflowActivityContext, request: FetchRequest) -> dict:
58-
"""Async activity: fetch a URL with httpx. Multiple instances run concurrently."""
59-
async with httpx.AsyncClient(timeout=request.timeout_seconds) as client:
60-
response = await client.get(request.url)
61-
result = FetchResult(
62-
url=request.url,
63-
status_code=response.status_code,
64-
body_length=len(response.content),
65-
)
63+
@wfr.activity(name='process_payload')
64+
async def process_payload(ctx: wf.WorkflowActivityContext, payload: Payload) -> str:
65+
"""Async activity: simulate an I/O-bound call. Instances run concurrently on the loop."""
66+
await asyncio.sleep(IO_SECONDS)
67+
result = _random_digits(OUTPUT_BYTES)
6668
print(
67-
f'[async] fetched {result.url} -> {result.status_code} ({result.body_length}B)', flush=True
69+
f'[async] payload {payload.index}: {len(payload.data)}B in -> {len(result)}B out',
70+
flush=True,
6871
)
69-
return result.model_dump()
72+
return result
7073

7174

72-
@wfr.activity(name='summarize_fetches')
73-
def summarize_fetches(ctx: wf.WorkflowActivityContext, results: list[dict]) -> str:
74-
"""Sync activity: runs in the sync-fallback thread pool. Unchanged from before."""
75-
total_bytes = sum(r['body_length'] for r in results)
76-
summary = f'fetched {len(results)} URLs, total {total_bytes} bytes'
75+
@wfr.activity(name='summarize')
76+
def summarize(ctx: wf.WorkflowActivityContext, results: list[str]) -> str:
77+
"""Sync activity: aggregate the fan-out results on the thread pool."""
78+
total_bytes = sum(len(r) for r in results)
79+
total_zeros = sum(r.count('0') for r in results)
80+
summary = f'{len(results)} results, {total_bytes} bytes, {total_zeros} zeros'
7781
print(f'[sync] {summary}', flush=True)
7882
return summary
7983

8084

8185
def main() -> None:
82-
urls = [
83-
'https://example.com',
84-
'https://example.org',
85-
'https://example.net',
86+
payloads = [
87+
Payload(index=i, data=_random_digits(INPUT_BYTES)).model_dump() for i in range(FAN_OUT)
8688
]
8789

8890
wfr.start()
8991
sleep(5) # wait for workflow runtime to start
9092

9193
wf_client = wf.DaprWorkflowClient()
92-
instance_id = wf_client.schedule_new_workflow(workflow=parallel_fetch_workflow, input=urls)
94+
instance_id = wf_client.schedule_new_workflow(workflow=fan_out_fan_in_workflow, input=payloads)
9395
print(f'Workflow started. Instance ID: {instance_id}')
9496

9597
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)

ext/dapr-ext-workflow/AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ Two example directories exercise workflows:
209209
- `cross-app1.py`, `cross-app2.py`, `cross-app3.py` — cross-app calls
210210
- `versioning.py` — workflow versioning with `is_patched()`
211211
- `simple_aio_client.py` — async client variant
212-
- `async_activities.py``async def` activities (HTTP fan-out with `httpx.AsyncClient`)
212+
- `async_activities.py``async def` activities (fan-out/fan-in with simulated I/O, configurable payload sizes)
213213

214214
## Testing
215215

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2026 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
"""Run-environment capture and Markdown formatting for the benchmark report."""
14+
15+
from __future__ import annotations
16+
17+
import os
18+
import platform
19+
import shutil
20+
import subprocess
21+
from dataclasses import dataclass
22+
from datetime import datetime, timezone
23+
from pathlib import Path
24+
25+
from dapr.ext.workflow._bench_harness import IS_DARWIN, ScenarioMetrics, SustainedMetrics
26+
27+
28+
def _read_text(path: str) -> str:
29+
try:
30+
return Path(path).read_text(encoding='utf-8', errors='ignore')
31+
except OSError:
32+
return ''
33+
34+
35+
def _cpu_model() -> str:
36+
"""Best-effort CPU model name. Cross-platform; returns a placeholder on failure."""
37+
if IS_DARWIN:
38+
sysctl = shutil.which('sysctl')
39+
if sysctl is not None:
40+
try:
41+
out = subprocess.run(
42+
[sysctl, '-n', 'machdep.cpu.brand_string'],
43+
capture_output=True,
44+
text=True,
45+
timeout=2,
46+
)
47+
if out.returncode == 0 and out.stdout.strip():
48+
return out.stdout.strip()
49+
except (subprocess.SubprocessError, OSError):
50+
pass
51+
cpuinfo = _read_text('/proc/cpuinfo')
52+
for line in cpuinfo.splitlines():
53+
if line.startswith('model name'):
54+
return line.split(':', 1)[1].strip()
55+
return platform.processor() or platform.machine() or 'unknown'
56+
57+
58+
def _total_memory_gb() -> float:
59+
"""Best-effort total physical memory in GB. Returns 0 on failure."""
60+
if IS_DARWIN:
61+
sysctl = shutil.which('sysctl')
62+
if sysctl is not None:
63+
try:
64+
out = subprocess.run(
65+
[sysctl, '-n', 'hw.memsize'],
66+
capture_output=True,
67+
text=True,
68+
timeout=2,
69+
)
70+
if out.returncode == 0 and out.stdout.strip().isdigit():
71+
return int(out.stdout.strip()) / (1024**3)
72+
except (subprocess.SubprocessError, OSError):
73+
pass
74+
meminfo = _read_text('/proc/meminfo')
75+
for line in meminfo.splitlines():
76+
if line.startswith('MemTotal:'):
77+
parts = line.split()
78+
if len(parts) >= 2 and parts[1].isdigit():
79+
return int(parts[1]) / (1024**2)
80+
return 0.0
81+
82+
83+
def _git_commit() -> str:
84+
"""Short git commit hash, or 'unknown' if not in a git repo."""
85+
git = shutil.which('git')
86+
if git is None:
87+
return 'unknown'
88+
try:
89+
out = subprocess.run(
90+
[git, 'rev-parse', '--short', 'HEAD'],
91+
capture_output=True,
92+
text=True,
93+
timeout=2,
94+
cwd=Path(__file__).parent,
95+
)
96+
if out.returncode == 0:
97+
commit = out.stdout.strip()
98+
# Mark dirty if there are uncommitted changes.
99+
status = subprocess.run(
100+
[git, 'status', '--porcelain'],
101+
capture_output=True,
102+
text=True,
103+
timeout=2,
104+
cwd=Path(__file__).parent,
105+
)
106+
if status.returncode == 0 and status.stdout.strip():
107+
return f'{commit}-dirty'
108+
return commit
109+
except (subprocess.SubprocessError, OSError):
110+
pass
111+
return 'unknown'
112+
113+
114+
@dataclass(slots=True)
115+
class RunEnvironment:
116+
"""Snapshot of the machine the benchmark ran on."""
117+
118+
timestamp_utc: str
119+
git_commit: str
120+
python_version: str
121+
python_implementation: str
122+
platform: str
123+
os_release: str
124+
cpu_model: str
125+
cpu_logical_cores: int
126+
cpu_physical_cores_hint: int
127+
total_memory_gb: float
128+
is_ci: bool
129+
130+
@classmethod
131+
def capture(cls) -> 'RunEnvironment':
132+
return cls(
133+
timestamp_utc=datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC'),
134+
git_commit=_git_commit(),
135+
python_version=platform.python_version(),
136+
python_implementation=platform.python_implementation(),
137+
platform=platform.platform(),
138+
os_release=f'{platform.system()} {platform.release()} ({platform.machine()})',
139+
cpu_model=_cpu_model(),
140+
cpu_logical_cores=os.cpu_count() or 0,
141+
cpu_physical_cores_hint=os.cpu_count() or 0,
142+
total_memory_gb=_total_memory_gb(),
143+
is_ci=any(os.environ.get(k) for k in ('CI', 'GITHUB_ACTIONS', 'TRAVIS', 'BUILDKITE')),
144+
)
145+
146+
147+
def _format_environment_block(env: RunEnvironment) -> str:
148+
mem_str = f'{env.total_memory_gb:.1f} GB' if env.total_memory_gb > 0 else 'unknown'
149+
return (
150+
'## Run environment\n'
151+
'\n'
152+
f'- **Timestamp**: {env.timestamp_utc}\n'
153+
f'- **Git commit**: `{env.git_commit}`\n'
154+
f'- **Python**: {env.python_implementation} {env.python_version}\n'
155+
f'- **OS**: {env.os_release}\n'
156+
f'- **CPU**: {env.cpu_model} ({env.cpu_logical_cores} logical cores)\n'
157+
f'- **Memory**: {mem_str}\n'
158+
'\n'
159+
'Numbers are specific to this machine; the sync-vs-async gap is what transfers across'
160+
' hardware, not the absolute values.'
161+
)
162+
163+
164+
def _speedup_cell(speedup: float) -> str:
165+
if speedup > 1.2:
166+
dot = '🟢'
167+
elif speedup >= 0.8:
168+
dot = '⚪'
169+
else:
170+
dot = '🔴'
171+
return f'{dot} {speedup:.1f}x'
172+
173+
174+
def _format_comparison_table(
175+
rows: list[tuple[str, ScenarioMetrics, ScenarioMetrics]],
176+
key_label: str = 'N',
177+
show_async_rss: bool = False,
178+
) -> str:
179+
rss_header = ' Async RAM (MB) |' if show_async_rss else ''
180+
rss_rule = ' ---: |' if show_async_rss else ''
181+
header = (
182+
f'| {key_label} | Sync (s) | Async (s) | Speedup |{rss_header}\n'
183+
f'| ---: | ---: | ---: | ---: |{rss_rule}\n'
184+
)
185+
lines = []
186+
for key, sync_m, async_m in rows:
187+
speedup = sync_m.wallclock_s / async_m.wallclock_s if async_m.wallclock_s > 0 else 0.0
188+
rss = f' {async_m.peak_rss_delta_mb:.0f} |' if show_async_rss else ''
189+
lines.append(
190+
f'| {key} | {sync_m.wallclock_s:.2f} | {async_m.wallclock_s:.2f} |'
191+
f' {_speedup_cell(speedup)} |{rss}'
192+
)
193+
return header + '\n'.join(lines)
194+
195+
196+
def _format_sustained_table(sync_m: SustainedMetrics, async_m: SustainedMetrics) -> str:
197+
def row(label: str, sync_val: str, async_val: str) -> str:
198+
return f'| {label} | {sync_val} | {async_val} |'
199+
200+
header = '| Metric | Sync | Async |\n| --- | ---: | ---: |\n'
201+
rows = [
202+
row(
203+
'Effective throughput',
204+
f'{sync_m.throughput_per_s:.0f}/s',
205+
f'{async_m.throughput_per_s:.0f}/s',
206+
),
207+
row(
208+
'p99 latency',
209+
f'{sync_m.latency_overall.p99_ms:.0f} ms',
210+
f'{async_m.latency_overall.p99_ms:.0f} ms',
211+
),
212+
row(
213+
'p99 first quarter',
214+
f'{sync_m.latency_first_quarter.p99_ms:.0f} ms',
215+
f'{async_m.latency_first_quarter.p99_ms:.0f} ms',
216+
),
217+
row(
218+
'p99 last quarter',
219+
f'{sync_m.latency_last_quarter.p99_ms:.0f} ms',
220+
f'{async_m.latency_last_quarter.p99_ms:.0f} ms',
221+
),
222+
]
223+
return header + '\n'.join(rows)

0 commit comments

Comments
 (0)