Skip to content

Latest commit

 

History

History
187 lines (140 loc) · 6.93 KB

File metadata and controls

187 lines (140 loc) · 6.93 KB

Fan Out/Fan In Pattern

Description of the Sample

This sample demonstrates the fan out/fan in pattern with the Azure Durable Task Scheduler using the Python SDK. This pattern is used for parallel processing of multiple items, followed by an aggregation of the results.

In this sample:

  1. The orchestrator receives a list of work items as input
  2. It "fans out" by creating parallel tasks for each work item (calling process_work_item for each one)
  3. It waits for all tasks to complete using task.when_all
  4. It then "fans in" by aggregating the results with the aggregate_results activity
  5. The final aggregated result is returned to the client

This pattern is useful for:

  • Processing multiple items concurrently to improve throughput
  • Performing calculations on batches of data
  • Running operations in parallel that don't depend on each other
  • Aggregating results from multiple parallel operations

Prerequisites

  1. Python 3.9+
  2. Docker (for running the emulator) installed
  3. Azure CLI (if using a deployed Durable Task Scheduler)

Configuring Durable Task Scheduler

There are two ways to run this sample locally:

Using the Emulator (Recommended)

The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning.

  1. Pull the Docker Image for the Emulator:
docker pull mcr.microsoft.com/dts/dts-emulator:latest
  1. Run the Emulator:
docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest

Wait a few seconds for the container to be ready.

Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables.

Using a Deployed Scheduler and Taskhub in Azure

Local development with a deployed scheduler:

  1. Install the durable task scheduler CLI extension:

    az upgrade
    az extension add --name durabletask --allow-preview true
  2. Create a resource group in a region where the Durable Task Scheduler is available:

    az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table
    az group create --name my-resource-group --location <location>
  3. Create a durable task scheduler resource:

    az durabletask scheduler create \
        --resource-group my-resource-group \
        --name my-scheduler \
        --ip-allowlist '["0.0.0.0/0"]' \
        --sku-name "Dedicated" \
        --sku-capacity 1 \
        --tags "{'myattribute':'myvalue'}"
  4. Create a task hub within the scheduler resource:

    az durabletask taskhub create \
        --resource-group my-resource-group \
        --scheduler-name my-scheduler \
        --name "my-taskhub"
  5. Grant the current user permission to connect to the my-taskhub task hub:

    subscriptionId=$(az account show --query "id" -o tsv)
    loggedInUser=$(az account show --query "user.name" -o tsv)
    
    az role assignment create \
        --assignee $loggedInUser \
        --role "Durable Task Data Contributor" \
        --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub"

How to Run the Sample

Once you have set up either the emulator or deployed scheduler, follow these steps to run the sample:

  1. First, activate your Python virtual environment (if you're using one):
python -m venv venv
source venv/bin/activate  # On Windows, use: venv\Scripts\activate
  1. If you're using a deployed scheduler, you need set Environment Variables:
export ENDPOINT=$(az durabletask scheduler show \
    --resource-group my-resource-group \
    --name my-scheduler \
    --query "properties.endpoint" \
    --output tsv)

export TASKHUB="my-taskhub"
  1. Install the required packages:

    pip install -r requirements.txt
  2. Start the worker in a terminal:

    python worker.py

    You should see output indicating the worker has started and registered the orchestration and activities.

  3. In a new terminal (with the virtual environment activated if applicable), run the client:

Note: Remember to set the environment variables again if you're using a deployed scheduler.

python client.py [number_of_items]

You can optionally provide the number of work items as an argument. If not provided, 10 items will be used by default.

Identity-based authentication

Learn how to set up identity-based authentication when you deploy the app Azure.

Understanding the Output

When you run the sample, you'll see output from both the worker and client processes:

Worker Output

The worker shows:

  • Registration of the orchestrator and activities
  • Status messages when processing each work item in parallel, showing that they're executing concurrently
  • Random delays for each work item (between 0.5 and 2 seconds) to simulate varying processing times
  • A final message showing the aggregation of results

Client Output

The client shows:

  • Starting the orchestration with the specified number of work items
  • The unique orchestration instance ID
  • The final aggregated result, which includes:
    • Total number of items processed
    • Sum of all results (each item result is the square of its value)
    • Average of all results

The example demonstrates how multiple items can be processed in parallel, with the results gathered and aggregated once all parallel tasks are complete.

Reviewing the Orchestration in the Durable Task Scheduler Dashboard

To access the Durable Task Scheduler Dashboard and review your orchestration:

Using the Emulator

  1. Navigate to http://localhost:8082 in your web browser
  2. Click on the "default" task hub
  3. You'll see the orchestration instance in the list
  4. Click on the instance ID to view the execution details, which will show:
    • The parallel execution of multiple process_work_item activities
    • The wait for all tasks to complete using task.when_all
    • The final call to aggregate_results with the collected results
    • The inputs and outputs for each activity

Using a Deployed Scheduler

  1. Navigate to the Scheduler resource in the Azure portal
  2. Go to the Task Hub subresource that you're using
  3. Click on the dashboard URL in the top right corner
  4. Search for your orchestration instance ID
  5. Review the execution details

The dashboard visualizes the concurrent execution of the tasks, allowing you to see how the fan-out/fan-in pattern improves throughput by processing items in parallel.