Skip to content

Commit e8c4c05

Browse files
committed
Address Copilot feedback (5)
Signed-off-by: Sergio Herrera <627709+seherv@users.noreply.github.com>
1 parent 8c4ce88 commit e8c4c05

8 files changed

Lines changed: 325 additions & 93 deletions

File tree

examples/workflow/async_activities.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ def summarize_fetches(ctx: wf.WorkflowActivityContext, results: list[dict]) -> s
8080

8181
def main() -> None:
8282
urls = [
83-
'https://httpbin.org/uuid',
84-
'https://httpbin.org/get',
85-
'https://httpbin.org/headers',
83+
'https://example.com',
84+
'https://example.org',
85+
'https://example.net',
8686
]
8787

8888
wfr.start()

ext/dapr-ext-strands/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ classifiers = [
2121
dependencies = [
2222
"dapr",
2323
"strands-agents>=1.30.0,<2.0.0",
24+
"strands-agents-tools>=0.2.22,<1.0.0",
2425
"python-ulid>=3.0.0,<4.0.0",
2526
"msgpack-python>=0.4.5,<1.0.0",
2627
]

ext/dapr-ext-workflow/AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ Internally wraps user functions: workflow functions get a `DaprWorkflowContext`,
109109

110110
Activities can be either `def my_activity(ctx, inp)` or `async def my_activity(ctx, inp)`. At registration, `_make_activity_wrapper` calls `_is_async_callable(fn)` to detect async-ness. That helper unwraps `functools.partial`, `@functools.wraps` chains, and callable-class `__call__` so common decorator patterns route correctly. The wrapper is built `async def` or `def` to match, then stored in the registry.
111111

112-
At dispatch time (the gRPC stream loop in `_durabletask/worker.py`), `inspect.iscoroutinefunction(activity_fn)` on the wrapper selects between two handlers.
112+
At dispatch time (the gRPC stream loop in `_durabletask/worker.py`), `is_async_callable(activity_fn)` on the wrapper selects between two handlers.
113113

114114
- **Async activities** go through `_execute_activity_async`, then `_ActivityExecutor.execute_async`, which awaits `fn(...)` directly on the event loop. The gRPC response is delivered via `loop.run_in_executor(self._async_worker_manager.thread_pool, stub.CompleteActivityTask, ...)` — the same pool sync activities use, sized by `maximum_thread_pool_workers`.
115115
- **Sync activities** go through `_execute_activity`, dispatched to the thread pool by `_AsyncWorkerManager._run_func`. The activity runs on a worker thread, and the response is delivered from the same thread.

ext/dapr-ext-workflow/benchmarks/bench_async_activities.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import math
3838
import os
3939
import platform
40-
import resource
4140
import shutil
4241
import socket
4342
import statistics
@@ -165,9 +164,19 @@ def _percentile(sorted_samples_ms: list[float], q: float) -> float:
165164
return sorted_samples_ms[lo] + frac * (sorted_samples_ms[hi] - sorted_samples_ms[lo])
166165

167166

167+
try:
168+
import resource as _resource # POSIX only
169+
except ImportError:
170+
_resource = None
171+
172+
168173
def _current_rss_kb() -> int:
169-
"""Process RSS in KB. macOS returns bytes from getrusage; Linux returns KB."""
170-
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
174+
"""Process RSS in KB. macOS returns bytes from getrusage; Linux returns KB.
175+
Returns 0 on Windows since `resource` is unavailable there.
176+
"""
177+
if _resource is None:
178+
return 0
179+
rss = _resource.getrusage(_resource.RUSAGE_SELF).ru_maxrss
171180
if IS_DARWIN:
172181
return rss // 1024
173182
return rss

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2141,7 +2141,7 @@ def execute(
21412141
"""Run a sync activity function and return the serialized result, if any.
21422142
21432143
Raises ``RuntimeError`` if the activity returns a coroutine, which happens when
2144-
``_is_async_callable`` fails to detect an async callable at registration.
2144+
``is_async_callable`` fails to detect an async callable at registration.
21452145
"""
21462146
resolved_fn, ctx, activity_input = self._resolve(
21472147
fn,
@@ -2157,7 +2157,9 @@ def execute(
21572157
activity_output.close()
21582158
raise RuntimeError(
21592159
f"Activity '{name}' returned a coroutine on the sync path. "
2160-
f'Declare it with ``async def`` so the worker dispatches it on the event loop.'
2160+
f'Declare it with ``async def``, or if it already is, check that any wrapping '
2161+
f'decorator uses ``@functools.wraps(fn)`` and does not wrap the async callable '
2162+
f'in a sync ``def`` (``is_async_callable`` needs to see through the decorator).'
21612163
)
21622164
return self._encode_output(orchestration_id, name, task_id, activity_output)
21632165

ext/dapr-ext-workflow/docs/concurrency.md

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,17 @@ backpressure.
2727

2828
## Sizing the thread pool
2929

30-
Two distinct uses of threads exist.
30+
The worker thread pool, sized by `maximum_thread_pool_workers`, has two uses.
3131

3232
**Sync activity execution.** Each `def` activity holds one thread for its
3333
duration. Size to peak concurrent sync-activity count.
3434

3535
**Async response delivery.** Each async activity, on completion, schedules
36-
`stub.CompleteActivityTask` on the worker thread pool to avoid blocking the loop
37-
during the gRPC send. The pool is the same one sync activities use, sized by
38-
`maximum_thread_pool_workers`. If the sidecar takes >5 ms to acknowledge and the
39-
worker runs many concurrent async activities, response delivery can serialize
40-
through the pool and tail latency inflates. Raise `maximum_thread_pool_workers`
41-
to widen response-delivery throughput.
36+
`stub.CompleteActivityTask` on the same pool to avoid blocking the loop during
37+
the gRPC send. If the sidecar takes >5 ms to acknowledge and the worker runs
38+
many concurrent async activities, response delivery can serialize through the
39+
pool and tail latency inflates. Raise `maximum_thread_pool_workers` to widen
40+
response-delivery throughput.
4241

4342
Mixed workloads with long-running sync activities can starve async response
4443
delivery (and vice versa) since they share the pool. If that becomes an issue,

ext/dapr-ext-workflow/tests/durabletask/test_activity_dispatch_routing.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
1414
The work-item dispatcher at the top of ``worker.py``'s gRPC loop selects between
1515
``_execute_activity`` (sync, runs in the thread pool) and ``_execute_activity_async``
16-
(coroutine, awaited on the event loop) using ``inspect.iscoroutinefunction(handler)``
17-
via ``_AsyncWorkerManager._run_func``. These tests pin the async-ness of each handler so
16+
(coroutine, awaited on the event loop) using ``is_async_callable(handler)`` via
17+
``_AsyncWorkerManager._run_func``. These tests pin the async-ness of each handler so
1818
the dispatch routing stays correct.
1919
"""
2020

0 commit comments

Comments
 (0)