Skip to content

Latest commit

 

History

History
452 lines (319 loc) · 12.6 KB

File metadata and controls

452 lines (319 loc) · 12.6 KB

Durable Task Python Patterns

Detailed implementation patterns for the Durable Task Python SDK.

Function Chaining

Sequential execution where each step depends on the previous:

def hello(ctx: task.ActivityContext, name: str) -> str:
    """Activity function that returns a greeting"""
    return f'Hello {name}!'


def chained_workflow(ctx: task.OrchestrationContext, _):
    """Orchestrator that chains activity calls sequentially"""
    result1 = yield ctx.call_activity(hello, input='Tokyo')
    result2 = yield ctx.call_activity(hello, input='Seattle')
    result3 = yield ctx.call_activity(hello, input='London')
    return [result1, result2, result3]


# Registration
worker.add_orchestrator(chained_workflow)
worker.add_activity(hello)

Fan-Out/Fan-In

Parallel processing with aggregated results:

import random
from durabletask import task


def get_work_items(ctx: task.ActivityContext, _) -> list[str]:
    """Activity that returns a list of work items"""
    count = random.randint(2, 10)
    return [f'work item {i}' for i in range(count)]


def process_work_item(ctx: task.ActivityContext, item: str) -> int:
    """Activity that processes a single work item"""
    return random.randint(0, 10)


def fanout_fanin_workflow(ctx: task.OrchestrationContext, _):
    """Orchestrator that fans out work and fans in results"""
    # Get work items
    work_items: list[str] = yield ctx.call_activity(get_work_items)
    
    # Fan-out: schedule all work items in parallel
    tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items]
    
    # Fan-in: wait for all to complete
    results: list[int] = yield task.when_all(tasks)
    
    # Return aggregated results
    return {
        'work_items': work_items,
        'results': results,
        'total': sum(results)
    }


# Registration
worker.add_orchestrator(fanout_fanin_workflow)
worker.add_activity(get_work_items)
worker.add_activity(process_work_item)

Batched Fan-Out (Large Scale)

For large numbers of items, process in batches:

def batched_fanout_workflow(ctx: task.OrchestrationContext, _):
    """Process work items in batches to control parallelism"""
    work_items = yield ctx.call_activity(get_work_items)
    
    batch_size = 10
    all_results = []
    
    for i in range(0, len(work_items), batch_size):
        batch = work_items[i:i + batch_size]
        tasks = [ctx.call_activity(process_work_item, input=item) for item in batch]
        batch_results = yield task.when_all(tasks)
        all_results.extend(batch_results)
    
    return {'total': sum(all_results)}

Human Interaction

Workflow that waits for external approval with timeout:

from collections import namedtuple
from dataclasses import dataclass
from datetime import timedelta
from durabletask import task


@dataclass
class Order:
    cost: float
    product: str
    quantity: int


def send_approval_request(ctx: task.ActivityContext, order: Order) -> None:
    """Send notification requesting approval"""
    print(f"Approval needed for {order.product} (${order.cost})")


def place_order(ctx: task.ActivityContext, order: Order) -> str:
    """Place the order after approval"""
    return f"Order placed: {order.quantity}x {order.product}"


def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
    """Orchestrator that implements human approval pattern"""
    # Auto-approve small orders
    if order.cost < 1000:
        return "Auto-approved"
    
    # Request approval for larger orders
    yield ctx.call_activity(send_approval_request, input=order)
    
    # Wait for approval OR timeout (whichever comes first)
    approval_event = ctx.wait_for_external_event("approval_received")
    timeout_event = ctx.create_timer(timedelta(hours=24))
    
    winner = yield task.when_any([approval_event, timeout_event])
    
    if winner == timeout_event:
        return "Canceled - approval timeout"
    
    # Order was approved
    yield ctx.call_activity(place_order, input=order)
    approval_details = approval_event.get_result()
    return f"Approved by '{approval_details.approver}'"


# Raising the approval event from client
Approval = namedtuple("Approval", ["approver"])
approval_data = Approval("manager@company.com")
client.raise_orchestration_event(instance_id, "approval_received", data=approval_data)

Durable Timers

Schedule delayed execution:

from datetime import timedelta


def delayed_workflow(ctx: task.OrchestrationContext, _):
    """Orchestrator that waits before continuing"""
    yield ctx.call_activity(start_activity)
    
    # Wait for 5 minutes (survives restarts)
    yield ctx.create_timer(timedelta(minutes=5))
    
    yield ctx.call_activity(continue_activity)
    return "Done"

Scheduled Execution

Execute at a specific time:

from datetime import datetime, timedelta


def scheduled_workflow(ctx: task.OrchestrationContext, scheduled_time: datetime):
    """Execute activity at a specific scheduled time"""
    # Calculate delay from current orchestration time
    delay = scheduled_time - ctx.current_utc_datetime
    
    if delay > timedelta(0):
        yield ctx.create_timer(delay)
    
    result = yield ctx.call_activity(scheduled_activity)
    return result

Sub-Orchestrations

Compose orchestrations from smaller pieces:

def child_orchestration(ctx: task.OrchestrationContext, data: str):
    """Child orchestration that can be called from parents"""
    result1 = yield ctx.call_activity(activity_a, input=data)
    result2 = yield ctx.call_activity(activity_b, input=result1)
    return result2


def parent_orchestration(ctx: task.OrchestrationContext, items: list[str]):
    """Parent orchestration that calls child orchestrations"""
    # Call child orchestrations in parallel
    tasks = [
        ctx.call_sub_orchestrator(child_orchestration, input=item) 
        for item in items
    ]
    results = yield task.when_all(tasks)
    return results


# Registration - both must be registered
worker.add_orchestrator(parent_orchestration)
worker.add_orchestrator(child_orchestration)
worker.add_activity(activity_a)
worker.add_activity(activity_b)

Durable Entities

Stateful objects with operations:

Function-Based Entity

from durabletask import entities


def counter(ctx: entities.EntityContext, input: int):
    """Function-based entity for a counter"""
    state = ctx.get_state(int, 0)  # Get state with default 0
    
    if ctx.operation == "add":
        state += input
        ctx.set_state(state)
    elif ctx.operation == "subtract":
        state -= input
        ctx.set_state(state)
    elif ctx.operation == "get":
        return state
    elif ctx.operation == "reset":
        ctx.set_state(0)

Class-Based Entity

from durabletask import entities


class Counter(entities.DurableEntity):
    """Class-based entity for a counter"""
    
    def __init__(self):
        self.set_state(0)
    
    def add(self, amount: int):
        current = self.get_state(int, 0)
        self.set_state(current + amount)
    
    def subtract(self, amount: int):
        current = self.get_state(int, 0)
        self.set_state(current - amount)
    
    def get(self) -> int:
        return self.get_state(int, 0)
    
    def reset(self):
        self.set_state(0)

Using Entities from Orchestrations

def workflow_with_entity(ctx: task.OrchestrationContext, _):
    """Orchestration that interacts with entities"""
    entity_id = entities.EntityInstanceId("counter", "my-counter")
    
    # Signal entity (fire-and-forget) - uses entity_id and operation_name params
    ctx.signal_entity(entity_id=entity_id, operation_name="add", input=5)
    
    # Call entity and wait for result - uses entity and operation params (different from signal!)
    value = yield ctx.call_entity(entity=entity_id, operation="get")
    
    return f"Counter value: {value}"

Using Entities from Client

entity_id = entities.EntityInstanceId("counter", "my-counter")
client.signal_entity(entity_id, "add", input=10)

Entity Locking

Ensure exclusive access to multiple entities:

def workflow_with_locks(ctx: task.OrchestrationContext, _):
    """Lock multiple entities for atomic operations"""
    entity_id_1 = entities.EntityInstanceId("account", "account-1")
    entity_id_2 = entities.EntityInstanceId("account", "account-2")
    
    # Lock entities for exclusive access
    with (yield ctx.lock_entities([entity_id_1, entity_id_2])):
        # Perform atomic operations on both entities
        # Note: call_entity uses 'entity' and 'operation' params
        balance1 = yield ctx.call_entity(entity=entity_id_1, operation="get_balance")
        balance2 = yield ctx.call_entity(entity=entity_id_2, operation="get_balance")
        
        # Transfer between accounts
        yield ctx.call_entity(entity=entity_id_1, operation="withdraw", input=100)
        yield ctx.call_entity(entity=entity_id_2, operation="deposit", input=100)
    
    return "Transfer complete"

Eternal Orchestrations (Continue-As-New)

Long-running processes that periodically restart:

from datetime import timedelta


def eternal_orchestration(ctx: task.OrchestrationContext, iteration: int):
    """Orchestration that runs forever, restarting periodically"""
    # Do periodic work
    yield ctx.call_activity(periodic_work, input=iteration)
    
    # Wait before next iteration
    yield ctx.create_timer(timedelta(minutes=5))
    
    # Restart with new iteration count
    # This prevents history from growing unbounded
    ctx.continue_as_new(iteration + 1)


# Start with iteration 0
client.schedule_new_orchestration(eternal_orchestration, input=0)

Monitoring Pattern

Periodic polling with flexible exit conditions:

from datetime import timedelta


def monitoring_workflow(ctx: task.OrchestrationContext, job_id: str):
    """Monitor a job until completion or timeout"""
    max_attempts = 10
    polling_interval = timedelta(seconds=30)
    
    for attempt in range(max_attempts):
        status = yield ctx.call_activity(check_job_status, input=job_id)
        
        if status == "completed":
            return {"status": "success", "attempts": attempt + 1}
        
        if status == "failed":
            return {"status": "failed", "attempts": attempt + 1}
        
        # Wait before next poll
        yield ctx.create_timer(polling_interval)
    
    return {"status": "timeout", "attempts": max_attempts}

Version-Aware Orchestration

Handle breaking changes gracefully using orchestration versioning. Version is set when scheduling the orchestration and read via ctx.version.

Setting Version When Scheduling

# Schedule orchestration with a specific version
instance_id = client.schedule_new_orchestration(
    "versioned_orchestration",
    input="data",
    version="2.0.0"  # Version is set here
)

Version Comparison Helper

Use the packaging module for semantic version comparison:

from packaging import version

def compare_version(v1: str | None, v2: str) -> int:
    """Compare two version strings.
    
    Returns: -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
    """
    if v1 is None:
        return -1
    try:
        ver1 = version.parse(v1)
        ver2 = version.parse(v2)
        if ver1 < ver2:
            return -1
        elif ver1 > ver2:
            return 1
        return 0
    except Exception:
        # Fall back to string comparison
        return (v1 > v2) - (v1 < v2)

Versioned Orchestration Example

def versioned_orchestration(ctx: task.OrchestrationContext, name: str):
    """Orchestrator that handles multiple versions.
    
    Version history:
    - v1.0.0: Basic hello greeting
    - v2.0.0: Added goodbye greeting
    """
    results = []
    orch_version = ctx.version  # Read version set during scheduling
    
    # v1.0.0+: Always run this
    hello = yield ctx.call_activity(say_hello, input=name)
    results.append(hello)
    
    # v2.0.0+: Added in version 2
    if compare_version(orch_version, "2.0.0") >= 0:
        goodbye = yield ctx.call_activity(say_goodbye, input=name)
        results.append(goodbye)
    
    return {"version": orch_version, "results": results}

Why Versioning Matters

Without versioning, changing orchestration logic causes non-deterministic errors:

  1. v1 orchestration starts (calls activity A, then B)
  2. You deploy v2 code (calls A, C, then B)
  3. v1 orchestration replays but hits new code path
  4. ERROR: History doesn't match - expected B, got C

With versioning:

  • v1 orchestrations continue using v1 code path
  • v2 orchestrations use v2 code path
  • Both run on the same worker without conflict