Detailed implementation patterns for the Durable Task Python SDK.
Sequential execution where each step depends on the previous:
def hello(ctx: task.ActivityContext, name: str) -> str:
"""Activity function that returns a greeting"""
return f'Hello {name}!'
def chained_workflow(ctx: task.OrchestrationContext, _):
"""Orchestrator that chains activity calls sequentially"""
result1 = yield ctx.call_activity(hello, input='Tokyo')
result2 = yield ctx.call_activity(hello, input='Seattle')
result3 = yield ctx.call_activity(hello, input='London')
return [result1, result2, result3]
# Registration
worker.add_orchestrator(chained_workflow)
worker.add_activity(hello)Parallel processing with aggregated results:
import random
from durabletask import task
def get_work_items(ctx: task.ActivityContext, _) -> list[str]:
"""Activity that returns a list of work items"""
count = random.randint(2, 10)
return [f'work item {i}' for i in range(count)]
def process_work_item(ctx: task.ActivityContext, item: str) -> int:
"""Activity that processes a single work item"""
return random.randint(0, 10)
def fanout_fanin_workflow(ctx: task.OrchestrationContext, _):
"""Orchestrator that fans out work and fans in results"""
# Get work items
work_items: list[str] = yield ctx.call_activity(get_work_items)
# Fan-out: schedule all work items in parallel
tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items]
# Fan-in: wait for all to complete
results: list[int] = yield task.when_all(tasks)
# Return aggregated results
return {
'work_items': work_items,
'results': results,
'total': sum(results)
}
# Registration
worker.add_orchestrator(fanout_fanin_workflow)
worker.add_activity(get_work_items)
worker.add_activity(process_work_item)For large numbers of items, process in batches:
def batched_fanout_workflow(ctx: task.OrchestrationContext, _):
"""Process work items in batches to control parallelism"""
work_items = yield ctx.call_activity(get_work_items)
batch_size = 10
all_results = []
for i in range(0, len(work_items), batch_size):
batch = work_items[i:i + batch_size]
tasks = [ctx.call_activity(process_work_item, input=item) for item in batch]
batch_results = yield task.when_all(tasks)
all_results.extend(batch_results)
return {'total': sum(all_results)}Workflow that waits for external approval with timeout:
from collections import namedtuple
from dataclasses import dataclass
from datetime import timedelta
from durabletask import task
@dataclass
class Order:
cost: float
product: str
quantity: int
def send_approval_request(ctx: task.ActivityContext, order: Order) -> None:
"""Send notification requesting approval"""
print(f"Approval needed for {order.product} (${order.cost})")
def place_order(ctx: task.ActivityContext, order: Order) -> str:
"""Place the order after approval"""
return f"Order placed: {order.quantity}x {order.product}"
def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
"""Orchestrator that implements human approval pattern"""
# Auto-approve small orders
if order.cost < 1000:
return "Auto-approved"
# Request approval for larger orders
yield ctx.call_activity(send_approval_request, input=order)
# Wait for approval OR timeout (whichever comes first)
approval_event = ctx.wait_for_external_event("approval_received")
timeout_event = ctx.create_timer(timedelta(hours=24))
winner = yield task.when_any([approval_event, timeout_event])
if winner == timeout_event:
return "Canceled - approval timeout"
# Order was approved
yield ctx.call_activity(place_order, input=order)
approval_details = approval_event.get_result()
return f"Approved by '{approval_details.approver}'"
# Raising the approval event from client
Approval = namedtuple("Approval", ["approver"])
approval_data = Approval("manager@company.com")
client.raise_orchestration_event(instance_id, "approval_received", data=approval_data)Schedule delayed execution:
from datetime import timedelta
def delayed_workflow(ctx: task.OrchestrationContext, _):
"""Orchestrator that waits before continuing"""
yield ctx.call_activity(start_activity)
# Wait for 5 minutes (survives restarts)
yield ctx.create_timer(timedelta(minutes=5))
yield ctx.call_activity(continue_activity)
return "Done"Execute at a specific time:
from datetime import datetime, timedelta
def scheduled_workflow(ctx: task.OrchestrationContext, scheduled_time: datetime):
"""Execute activity at a specific scheduled time"""
# Calculate delay from current orchestration time
delay = scheduled_time - ctx.current_utc_datetime
if delay > timedelta(0):
yield ctx.create_timer(delay)
result = yield ctx.call_activity(scheduled_activity)
return resultCompose orchestrations from smaller pieces:
def child_orchestration(ctx: task.OrchestrationContext, data: str):
"""Child orchestration that can be called from parents"""
result1 = yield ctx.call_activity(activity_a, input=data)
result2 = yield ctx.call_activity(activity_b, input=result1)
return result2
def parent_orchestration(ctx: task.OrchestrationContext, items: list[str]):
"""Parent orchestration that calls child orchestrations"""
# Call child orchestrations in parallel
tasks = [
ctx.call_sub_orchestrator(child_orchestration, input=item)
for item in items
]
results = yield task.when_all(tasks)
return results
# Registration - both must be registered
worker.add_orchestrator(parent_orchestration)
worker.add_orchestrator(child_orchestration)
worker.add_activity(activity_a)
worker.add_activity(activity_b)Stateful objects with operations:
from durabletask import entities
def counter(ctx: entities.EntityContext, input: int):
"""Function-based entity for a counter"""
state = ctx.get_state(int, 0) # Get state with default 0
if ctx.operation == "add":
state += input
ctx.set_state(state)
elif ctx.operation == "subtract":
state -= input
ctx.set_state(state)
elif ctx.operation == "get":
return state
elif ctx.operation == "reset":
ctx.set_state(0)from durabletask import entities
class Counter(entities.DurableEntity):
"""Class-based entity for a counter"""
def __init__(self):
self.set_state(0)
def add(self, amount: int):
current = self.get_state(int, 0)
self.set_state(current + amount)
def subtract(self, amount: int):
current = self.get_state(int, 0)
self.set_state(current - amount)
def get(self) -> int:
return self.get_state(int, 0)
def reset(self):
self.set_state(0)def workflow_with_entity(ctx: task.OrchestrationContext, _):
"""Orchestration that interacts with entities"""
entity_id = entities.EntityInstanceId("counter", "my-counter")
# Signal entity (fire-and-forget) - uses entity_id and operation_name params
ctx.signal_entity(entity_id=entity_id, operation_name="add", input=5)
# Call entity and wait for result - uses entity and operation params (different from signal!)
value = yield ctx.call_entity(entity=entity_id, operation="get")
return f"Counter value: {value}"entity_id = entities.EntityInstanceId("counter", "my-counter")
client.signal_entity(entity_id, "add", input=10)Ensure exclusive access to multiple entities:
def workflow_with_locks(ctx: task.OrchestrationContext, _):
"""Lock multiple entities for atomic operations"""
entity_id_1 = entities.EntityInstanceId("account", "account-1")
entity_id_2 = entities.EntityInstanceId("account", "account-2")
# Lock entities for exclusive access
with (yield ctx.lock_entities([entity_id_1, entity_id_2])):
# Perform atomic operations on both entities
# Note: call_entity uses 'entity' and 'operation' params
balance1 = yield ctx.call_entity(entity=entity_id_1, operation="get_balance")
balance2 = yield ctx.call_entity(entity=entity_id_2, operation="get_balance")
# Transfer between accounts
yield ctx.call_entity(entity=entity_id_1, operation="withdraw", input=100)
yield ctx.call_entity(entity=entity_id_2, operation="deposit", input=100)
return "Transfer complete"Long-running processes that periodically restart:
from datetime import timedelta
def eternal_orchestration(ctx: task.OrchestrationContext, iteration: int):
"""Orchestration that runs forever, restarting periodically"""
# Do periodic work
yield ctx.call_activity(periodic_work, input=iteration)
# Wait before next iteration
yield ctx.create_timer(timedelta(minutes=5))
# Restart with new iteration count
# This prevents history from growing unbounded
ctx.continue_as_new(iteration + 1)
# Start with iteration 0
client.schedule_new_orchestration(eternal_orchestration, input=0)Periodic polling with flexible exit conditions:
from datetime import timedelta
def monitoring_workflow(ctx: task.OrchestrationContext, job_id: str):
"""Monitor a job until completion or timeout"""
max_attempts = 10
polling_interval = timedelta(seconds=30)
for attempt in range(max_attempts):
status = yield ctx.call_activity(check_job_status, input=job_id)
if status == "completed":
return {"status": "success", "attempts": attempt + 1}
if status == "failed":
return {"status": "failed", "attempts": attempt + 1}
# Wait before next poll
yield ctx.create_timer(polling_interval)
return {"status": "timeout", "attempts": max_attempts}Handle breaking changes gracefully using orchestration versioning. Version is set when scheduling the orchestration and read via ctx.version.
# Schedule orchestration with a specific version
instance_id = client.schedule_new_orchestration(
"versioned_orchestration",
input="data",
version="2.0.0" # Version is set here
)Use the packaging module for semantic version comparison:
from packaging import version
def compare_version(v1: str | None, v2: str) -> int:
"""Compare two version strings.
Returns: -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
"""
if v1 is None:
return -1
try:
ver1 = version.parse(v1)
ver2 = version.parse(v2)
if ver1 < ver2:
return -1
elif ver1 > ver2:
return 1
return 0
except Exception:
# Fall back to string comparison
return (v1 > v2) - (v1 < v2)def versioned_orchestration(ctx: task.OrchestrationContext, name: str):
"""Orchestrator that handles multiple versions.
Version history:
- v1.0.0: Basic hello greeting
- v2.0.0: Added goodbye greeting
"""
results = []
orch_version = ctx.version # Read version set during scheduling
# v1.0.0+: Always run this
hello = yield ctx.call_activity(say_hello, input=name)
results.append(hello)
# v2.0.0+: Added in version 2
if compare_version(orch_version, "2.0.0") >= 0:
goodbye = yield ctx.call_activity(say_goodbye, input=name)
results.append(goodbye)
return {"version": orch_version, "results": results}Without versioning, changing orchestration logic causes non-deterministic errors:
- v1 orchestration starts (calls activity A, then B)
- You deploy v2 code (calls A, C, then B)
- v1 orchestration replays but hits new code path
- ERROR: History doesn't match - expected B, got C
With versioning:
- v1 orchestrations continue using v1 code path
- v2 orchestrations use v2 code path
- Both run on the same worker without conflict