-
Notifications
You must be signed in to change notification settings - Fork 146
Add support for async workflow activities #1053
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
872e75c
413f16f
b52139c
0c64d1a
69ea96e
81fa323
dadaa9a
8c4ce88
e8c4c05
7ec820e
9185482
73add2e
5709bcd
e68141c
feb60db
70c6fad
5fb88e6
6eb9ce0
8cf248b
a73e994
a2d4ad8
b1e0c3f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| # -*- coding: utf-8 -*- | ||
| # Copyright 2026 The Dapr Authors | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Async activities running alongside a sync one in a fan-out/fan-in workflow. | ||
|
|
||
| Each async activity simulates an I/O-bound call: it takes a payload, awaits a fixed | ||
| delay (standing in for a network round-trip), and returns a result payload. The async | ||
| instances run concurrently on the worker's event loop; a final sync activity aggregates | ||
| the results. Fan-out width, input/output payload sizes, and the delay are configurable | ||
| via environment variables. | ||
|
|
||
| Run with: | ||
|
|
||
| dapr run --app-id async-activities --app-protocol grpc --dapr-grpc-port 50001 \\ | ||
| -- python async_activities.py | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import os | ||
| import random | ||
| import string | ||
| from time import sleep | ||
|
|
||
| import dapr.ext.workflow as wf | ||
| from pydantic import BaseModel | ||
|
|
||
| FAN_OUT = int(os.environ.get('WORKFLOW_FAN_OUT', '5')) | ||
| INPUT_BYTES = int(os.environ.get('WORKFLOW_INPUT_BYTES', '2048')) | ||
| OUTPUT_BYTES = int(os.environ.get('WORKFLOW_OUTPUT_BYTES', '1024')) | ||
| IO_SECONDS = float(os.environ.get('WORKFLOW_IO_SECONDS', '1.0')) | ||
|
|
||
| wfr = wf.WorkflowRuntime() | ||
|
|
||
|
|
||
| def _random_digits(n: int) -> str: | ||
| return ''.join(random.choices(string.digits, k=n)) | ||
|
|
||
|
|
||
| class Payload(BaseModel): | ||
| index: int | ||
| data: str | ||
|
|
||
|
|
||
| @wfr.workflow(name='fan_out_fan_in_workflow') | ||
| def fan_out_fan_in_workflow(ctx: wf.DaprWorkflowContext, payloads: list[dict]): | ||
| tasks = [ctx.call_activity(process_payload, input=p) for p in payloads] | ||
| results = yield wf.when_all(tasks) | ||
| summary = yield ctx.call_activity(summarize, input=results) | ||
| return summary | ||
|
|
||
|
|
||
| @wfr.activity(name='process_payload') | ||
| async def process_payload(ctx: wf.WorkflowActivityContext, payload: Payload) -> str: | ||
| """Async activity: simulate an I/O-bound call. Instances run concurrently on the loop.""" | ||
| await asyncio.sleep(IO_SECONDS) | ||
| result = _random_digits(OUTPUT_BYTES) | ||
| print( | ||
| f'[async] payload {payload.index}: {len(payload.data)}B in -> {len(result)}B out', | ||
| flush=True, | ||
| ) | ||
| return result | ||
|
|
||
|
|
||
| @wfr.activity(name='summarize') | ||
| def summarize(ctx: wf.WorkflowActivityContext, results: list[str]) -> str: | ||
| """Sync activity: aggregate the fan-out results on the thread pool.""" | ||
| summary = f'{len(results)} results, {sum(len(r) for r in results)} bytes' | ||
| print(f'[sync] {summary}', flush=True) | ||
| return summary | ||
|
|
||
|
|
||
| def main() -> None: | ||
| payloads = [ | ||
| Payload(index=i, data=_random_digits(INPUT_BYTES)).model_dump() for i in range(FAN_OUT) | ||
| ] | ||
|
|
||
| wfr.start() | ||
| sleep(5) # wait for workflow runtime to start | ||
|
|
||
| wf_client = wf.DaprWorkflowClient() | ||
| instance_id = wf_client.schedule_new_workflow(workflow=fan_out_fan_in_workflow, input=payloads) | ||
| print(f'Workflow started. Instance ID: {instance_id}') | ||
|
|
||
| state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60) | ||
| assert state is not None | ||
| print(f'Workflow completed! Status: {state.runtime_status.name}') | ||
| print(f'Workflow result: {state.serialized_output.strip(chr(34))}') | ||
|
|
||
| wfr.shutdown() | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
| main() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| import logging | ||
| from typing import Optional, Sequence, Union | ||
|
|
||
| import grpc | ||
|
|
@@ -27,6 +28,30 @@ | |
| grpc_aio.StreamStreamClientInterceptor, | ||
| ] | ||
|
|
||
| _POLLER_NOISE_MARKER = 'PollerCompletionQueue._handle_events' | ||
|
|
||
|
|
||
| class _GrpcAioPollerNoiseFilter(logging.Filter): | ||
| """Drops the harmless grpc.aio poller BlockingIOError (EAGAIN) records. | ||
|
|
||
| The poller does a non-blocking read on its wake-up fd and can get EAGAIN, which | ||
| asyncio logs at ERROR even though the read is retried and nothing is lost. | ||
| """ | ||
|
|
||
| def filter(self, record: logging.LogRecord) -> bool: | ||
| exc = record.exc_info[1] if record.exc_info else None | ||
| is_poller_noise = isinstance(exc, BlockingIOError) and ( | ||
| _POLLER_NOISE_MARKER in record.getMessage() | ||
| ) | ||
| return not is_poller_noise | ||
|
|
||
|
|
||
| def _silence_grpc_aio_poller_noise() -> None: | ||
| """Install the poller-noise filter on the asyncio logger if not already present.""" | ||
| asyncio_logger = logging.getLogger('asyncio') | ||
| if not any(isinstance(f, _GrpcAioPollerNoiseFilter) for f in asyncio_logger.filters): | ||
| asyncio_logger.addFilter(_GrpcAioPollerNoiseFilter()) | ||
|
|
||
|
|
||
| def get_grpc_aio_channel( | ||
| host_address: Optional[str], | ||
|
|
@@ -42,6 +67,8 @@ def get_grpc_aio_channel( | |
| interceptors: Optional sequence of client interceptors to apply to the channel. | ||
| options: Optional sequence of gRPC channel options as (key, value) tuples. Keys defined in https://grpc.github.io/grpc/core/group__grpc__arg__keys.html | ||
| """ | ||
| _silence_grpc_aio_poller_noise() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this only on the asyncio side of things?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, grpc.aio spams the error logs when their client is used from multiple event loops, and that was the case for FastAPI applications using this SDK. Nothing was actually an error but the logs got extremely noisy in Linux. It got fixed on their 1.80.0 release, as soon as we update to that dep (in a separate PR ofc) we can delete this |
||
|
|
||
| if host_address is None: | ||
| host_address = get_default_host_address() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,8 @@ | |
| # limitations under the License. | ||
|
|
||
| import dataclasses | ||
| import functools | ||
| import inspect | ||
| import json | ||
| import logging | ||
| import os | ||
|
|
@@ -19,6 +21,28 @@ | |
| import grpc | ||
| from dapr.ext.workflow import _model_protocol | ||
|
|
||
|
|
||
| def is_async_callable(fn: Any) -> bool: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there a test somewhere for this addition?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, it's at |
||
| """Return True if ``fn`` is async. Catches ``functools.partial`` of coroutines, | ||
| sync decorators that wrap async functions, and callable instances with ``async __call__``. | ||
| """ | ||
| candidate = fn | ||
| while isinstance(candidate, functools.partial): | ||
| candidate = candidate.func | ||
| if callable(candidate): | ||
| try: | ||
| candidate = inspect.unwrap(candidate) | ||
| except ValueError: | ||
| # Cyclic ``__wrapped__`` chain from a malformed decorator. Fall back to the | ||
| # outermost callable; misclassification is preferable to crashing dispatch. | ||
| pass | ||
|
seherv marked this conversation as resolved.
Comment on lines
+35
to
+38
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we warn log here? |
||
| if inspect.iscoroutinefunction(candidate): | ||
| return True | ||
| if not inspect.isfunction(candidate) and hasattr(candidate, '__call__'): | ||
| return inspect.iscoroutinefunction(candidate.__call__) | ||
| return False | ||
|
seherv marked this conversation as resolved.
|
||
|
|
||
|
|
||
| ClientInterceptor = Union[ | ||
| grpc.UnaryUnaryClientInterceptor, | ||
| grpc.UnaryStreamClientInterceptor, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.