Durable Task Scheduler Setup and Deployment
Local Development with Emulator
# Pull the emulator image
docker pull mcr.microsoft.com/dts/dts-emulator:latest
# Run the emulator
docker run -d \
-p 8080:8080 \
-p 8082:8082 \
--name dts-emulator \
mcr.microsoft.com/dts/dts-emulator:latest
# Emulator endpoints:
# - gRPC: http://localhost:8080
# - Dashboard: http://localhost:8082
version : ' 3.8'
services :
dts-emulator :
image : mcr.microsoft.com/dts/dts-emulator:latest
ports :
- " 8080:8080" # gRPC endpoint
- " 8082:8082" # Dashboard
restart : unless-stopped
Default Emulator Configuration
# No authentication needed for local emulator
taskhub = "default"
endpoint = "http://localhost:8080"
credential = None
secure_channel = False
Azure Durable Task Scheduler Provisioning
# Install Azure CLI
# https://learn.microsoft.com/cli/azure/install-azure-cli
# Login to Azure
az login
# Install durabletask extension
az extension add --name durabletask
Create Scheduler and Task Hub
# Set variables
RESOURCE_GROUP=" my-rg"
SCHEDULER_NAME=" my-scheduler"
TASKHUB_NAME=" my-taskhub"
LOCATION=" eastus"
# Create resource group
az group create --name $RESOURCE_GROUP --location $LOCATION
# Create scheduler
az durabletask scheduler create \
--resource-group $RESOURCE_GROUP \
--name $SCHEDULER_NAME \
--location $LOCATION \
--ip-allowlist " [0.0.0.0/0]" \
--sku-capacity 1 \
--sku-name " Dedicated" \
--tags " environment=dev"
# Create task hub
az durabletask taskhub create \
--resource-group $RESOURCE_GROUP \
--scheduler-name $SCHEDULER_NAME \
--name $TASKHUB_NAME
# Get endpoint URL
az durabletask scheduler show \
--resource-group $RESOURCE_GROUP \
--name $SCHEDULER_NAME \
--query " properties.endpoint" -o tsv
# Get your user principal ID
USER_PRINCIPAL_ID=$( az ad signed-in-user show --query id -o tsv)
# Assign Durable Task Contributor role
az role assignment create \
--assignee $USER_PRINCIPAL_ID \
--role " Durable Task Contributor" \
--scope " /subscriptions/{subscription-id}/resourceGroups/$RESOURCE_GROUP /providers/Microsoft.DurableTask/schedulers/$SCHEDULER_NAME "
Application Configuration
# Bash
export ENDPOINT=" https://my-scheduler.region.durabletask.io"
export TASKHUB=" my-taskhub"
# PowerShell
$env :ENDPOINT = " https://my-scheduler.region.durabletask.io"
$env :TASKHUB = " my-taskhub"
import os
from azure .identity import DefaultAzureCredential
def get_connection_config ():
"""Get configuration for DTS connection"""
endpoint = os .getenv ("ENDPOINT" , "http://localhost:8080" )
taskhub = os .getenv ("TASKHUB" , "default" )
is_local = endpoint == "http://localhost:8080"
return {
"host_address" : endpoint ,
"taskhub" : taskhub ,
"secure_channel" : not is_local ,
"token_credential" : None if is_local else DefaultAzureCredential ()
}
# settings.py
import os
from dataclasses import dataclass
from azure .identity import DefaultAzureCredential
@dataclass
class DurableTaskSettings :
endpoint : str
taskhub : str
secure_channel : bool
credential : any
@classmethod
def from_environment (cls ):
endpoint = os .getenv ("ENDPOINT" , "http://localhost:8080" )
taskhub = os .getenv ("TASKHUB" , "default" )
is_local = endpoint == "http://localhost:8080"
return cls (
endpoint = endpoint ,
taskhub = taskhub ,
secure_channel = not is_local ,
credential = None if is_local else DefaultAzureCredential ()
)
# Usage
settings = DurableTaskSettings .from_environment ()
Local Development (No Auth)
credential = None
secure_channel = False
DefaultAzureCredential (Recommended for Azure)
from azure .identity import DefaultAzureCredential
credential = DefaultAzureCredential ()
secure_channel = True
from azure .identity import ManagedIdentityCredential
# System-assigned managed identity
credential = ManagedIdentityCredential ()
# User-assigned managed identity
credential = ManagedIdentityCredential (client_id = "<client-id>" )
Azure CLI Credential (Development)
from azure .identity import AzureCliCredential
credential = AzureCliCredential ()
Worker Application Templates
#!/usr/bin/env python3
"""Durable Task Worker Application"""
import os
import signal
import sys
from azure .identity import DefaultAzureCredential
from durabletask import task
from durabletask .azuremanaged .worker import DurableTaskSchedulerWorker
# Activities
def my_activity (ctx : task .ActivityContext , input : str ) -> str :
return f"Processed: { input } "
# Orchestrations
def my_orchestration (ctx : task .OrchestrationContext , input : str ):
result = yield ctx .call_activity (my_activity , input = input )
return result
def main ():
# Configuration
endpoint = os .getenv ("ENDPOINT" , "http://localhost:8080" )
taskhub = os .getenv ("TASKHUB" , "default" )
is_local = endpoint == "http://localhost:8080"
credential = None if is_local else DefaultAzureCredential ()
secure_channel = not is_local
print (f"Starting worker..." )
print (f" Endpoint: { endpoint } " )
print (f" Task Hub: { taskhub } " )
with DurableTaskSchedulerWorker (
host_address = endpoint ,
secure_channel = secure_channel ,
taskhub = taskhub ,
token_credential = credential
) as worker :
# Register orchestrations and activities
worker .add_orchestrator (my_orchestration )
worker .add_activity (my_activity )
# Handle shutdown signals
def shutdown (signum , frame ):
print ("\n Shutting down worker..." )
sys .exit (0 )
signal .signal (signal .SIGINT , shutdown )
signal .signal (signal .SIGTERM , shutdown )
# Start processing
worker .start ()
print ("Worker started. Press Ctrl+C to stop." )
# Keep running (cross-platform)
import asyncio
try :
asyncio .get_event_loop ().run_forever ()
except KeyboardInterrupt :
pass
if __name__ == "__main__" :
main ()
Flask/FastAPI Integration
from flask import Flask , jsonify , request
from azure .identity import DefaultAzureCredential
from durabletask .azuremanaged .client import DurableTaskSchedulerClient
import os
app = Flask (__name__ )
# Initialize client
endpoint = os .getenv ("ENDPOINT" , "http://localhost:8080" )
taskhub = os .getenv ("TASKHUB" , "default" )
is_local = endpoint == "http://localhost:8080"
client = DurableTaskSchedulerClient (
host_address = endpoint ,
secure_channel = not is_local ,
taskhub = taskhub ,
token_credential = None if is_local else DefaultAzureCredential ()
)
@app .route ('/orchestrations' , methods = ['POST' ])
def start_orchestration ():
"""Start a new orchestration"""
data = request .json
instance_id = client .schedule_new_orchestration (
"my_orchestration" ,
input = data .get ("input" )
)
return jsonify ({"instanceId" : instance_id }), 202
@app .route ('/orchestrations/<instance_id>' , methods = ['GET' ])
def get_status (instance_id ):
"""Get orchestration status"""
state = client .get_orchestration_state (instance_id )
if not state :
return jsonify ({"error" : "Not found" }), 404
return jsonify ({
"instanceId" : instance_id ,
"status" : state .runtime_status .name ,
"output" : state .serialized_output
})
@app .route ('/orchestrations/<instance_id>/events/<event_name>' , methods = ['POST' ])
def raise_event (instance_id , event_name ):
"""Raise an event on an orchestration"""
data = request .json
client .raise_orchestration_event (instance_id , event_name , data = data )
return jsonify ({"status" : "Event raised" }), 202
# container-app.yaml
properties :
configuration :
secrets :
- name : dts-endpoint
value : " https://my-scheduler.region.durabletask.io"
ingress :
external : false
template :
containers :
- image : myregistry.azurecr.io/durable-worker:latest
name : worker
env :
- name : ENDPOINT
secretRef : dts-endpoint
- name : TASKHUB
value : my-taskhub
resources :
cpu : 0.5
memory : 1Gi
scale :
minReplicas : 1
maxReplicas : 10
apiVersion : apps/v1
kind : Deployment
metadata :
name : durable-worker
spec :
replicas : 3
selector :
matchLabels :
app : durable-worker
template :
metadata :
labels :
app : durable-worker
spec :
containers :
- name : worker
image : myregistry.azurecr.io/durable-worker:latest
env :
- name : ENDPOINT
valueFrom :
secretKeyRef :
name : dts-config
key : endpoint
- name : TASKHUB
valueFrom :
configMapKeyRef :
name : dts-config
key : taskhub
resources :
requests :
cpu : 250m
memory : 256Mi
limits :
cpu : 500m
memory : 512Mi
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python" , "worker.py" ]
import logging
# Configure logging
log_handler = logging .FileHandler ('durable.log' , encoding = 'utf-8' )
log_handler .setLevel (logging .DEBUG )
log_formatter = logging .Formatter ('%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
log_handler .setFormatter (log_formatter )
# Apply to worker
with DurableTaskSchedulerWorker (
host_address = endpoint ,
secure_channel = secure_channel ,
taskhub = taskhub ,
token_credential = credential ,
log_handler = log_handler ,
log_formatter = log_formatter
) as worker :
worker .add_orchestrator (my_orchestration )
worker .add_activity (my_activity )
worker .start ()
# ...
Query Orchestration Status
# Get all running orchestrations
# (Note: SDK provides basic queries; use dashboard for advanced filtering)
# Check specific instance
state = client .get_orchestration_state (instance_id )
print (f"Status: { state .runtime_status } " )
print (f"Created: { state .created_time } " )
print (f"Updated: { state .last_updated_time } " )
print (f"Input: { state .serialized_input } " )
print (f"Output: { state .serialized_output } " )