This sample demonstrates the fan out/fan in pattern with the Azure Durable Task Scheduler using the Python SDK. This pattern is used for parallel processing of multiple items, followed by an aggregation of the results.
In this sample:
- The orchestrator receives a list of work items as input
- It "fans out" by creating parallel tasks for each work item (calling
process_work_itemfor each one) - It waits for all tasks to complete using
task.when_all - It then "fans in" by aggregating the results with the
aggregate_resultsactivity - The final aggregated result is returned to the client
This pattern is useful for:
- Processing multiple items concurrently to improve throughput
- Performing calculations on batches of data
- Running operations in parallel that don't depend on each other
- Aggregating results from multiple parallel operations
- Python 3.9+
- Docker (for running the emulator) installed
- Azure CLI (if using a deployed Durable Task Scheduler)
There are two ways to run this sample locally:
The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning.
- Pull the Docker Image for the Emulator:
docker pull mcr.microsoft.com/dts/dts-emulator:latest- Run the Emulator:
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latestWait a few seconds for the container to be ready.
Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables.
Local development with a deployed scheduler:
-
Install the durable task scheduler CLI extension:
az upgrade az extension add --name durabletask --allow-preview true -
Create a resource group in a region where the Durable Task Scheduler is available:
az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out tableaz group create --name my-resource-group --location <location>
-
Create a durable task scheduler resource:
az durabletask scheduler create \ --resource-group my-resource-group \ --name my-scheduler \ --ip-allowlist '["0.0.0.0/0"]' \ --sku-name "Dedicated" \ --sku-capacity 1 \ --tags "{'myattribute':'myvalue'}" -
Create a task hub within the scheduler resource:
az durabletask taskhub create \ --resource-group my-resource-group \ --scheduler-name my-scheduler \ --name "my-taskhub" -
Grant the current user permission to connect to the
my-taskhubtask hub:subscriptionId=$(az account show --query "id" -o tsv) loggedInUser=$(az account show --query "user.name" -o tsv) az role assignment create \ --assignee $loggedInUser \ --role "Durable Task Data Contributor" \ --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub"
Once you have set up either the emulator or deployed scheduler, follow these steps to run the sample:
- First, activate your Python virtual environment (if you're using one):
python -m venv venv
source venv/bin/activate # On Windows, use: venv\Scripts\activate- If you're using a deployed scheduler, you need set Environment Variables:
export ENDPOINT=$(az durabletask scheduler show \
--resource-group my-resource-group \
--name my-scheduler \
--query "properties.endpoint" \
--output tsv)
export TASKHUB="my-taskhub"-
Install the required packages:
pip install -r requirements.txt
-
Start the worker in a terminal:
python worker.py
You should see output indicating the worker has started and registered the orchestration and activities.
-
In a new terminal (with the virtual environment activated if applicable), run the client:
Note: Remember to set the environment variables again if you're using a deployed scheduler.
python client.py [number_of_items]You can optionally provide the number of work items as an argument. If not provided, 10 items will be used by default.
Learn how to set up identity-based authentication when you deploy the app Azure.
When you run the sample, you'll see output from both the worker and client processes:
The worker shows:
- Registration of the orchestrator and activities
- Status messages when processing each work item in parallel, showing that they're executing concurrently
- Random delays for each work item (between 0.5 and 2 seconds) to simulate varying processing times
- A final message showing the aggregation of results
The client shows:
- Starting the orchestration with the specified number of work items
- The unique orchestration instance ID
- The final aggregated result, which includes:
- Total number of items processed
- Sum of all results (each item result is the square of its value)
- Average of all results
The example demonstrates how multiple items can be processed in parallel, with the results gathered and aggregated once all parallel tasks are complete.
To access the Durable Task Scheduler Dashboard and review your orchestration:
- Navigate to http://localhost:8082 in your web browser
- Click on the "default" task hub
- You'll see the orchestration instance in the list
- Click on the instance ID to view the execution details, which will show:
- The parallel execution of multiple
process_work_itemactivities - The wait for all tasks to complete using
task.when_all - The final call to
aggregate_resultswith the collected results - The inputs and outputs for each activity
- The parallel execution of multiple
- Navigate to the Scheduler resource in the Azure portal
- Go to the Task Hub subresource that you're using
- Click on the dashboard URL in the top right corner
- Search for your orchestration instance ID
- Review the execution details
The dashboard visualizes the concurrent execution of the tasks, allowing you to see how the fan-out/fan-in pattern improves throughput by processing items in parallel.