Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/activity_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
that calls an activity function in a sequence and prints the outputs."""
import logging
import os
from collections.abc import Generator
from typing import Any

from azure.identity import DefaultAzureCredential

Expand All @@ -17,7 +19,7 @@ def hello(ctx: task.ActivityContext, name: str) -> str:
return f'Hello {name}!'


def sequence(ctx: task.OrchestrationContext, _):
def sequence(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, list[str]]:
"""Orchestrator function that calls the 'hello' activity function in a sequence"""
# Create a replay-safe logger to avoid duplicate log messages during replay
replay_logger = ctx.create_replay_safe_logger(logger)
Expand Down
10 changes: 6 additions & 4 deletions examples/distributed-tracing/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import os
import time
from collections.abc import Generator
from datetime import timedelta
from typing import Any

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
Expand Down Expand Up @@ -64,7 +66,7 @@ def get_weather(ctx: task.ActivityContext, city: str) -> str:
return result


def summarize(ctx: task.ActivityContext, reports: list) -> str:
def summarize(ctx: task.ActivityContext, reports: list[str]) -> str:
"""Combine individual weather reports into a summary string."""
summary = " | ".join(reports)
print(f" [Activity] summarize -> {summary}")
Expand All @@ -75,9 +77,9 @@ def summarize(ctx: task.ActivityContext, reports: list) -> str:
# Sub-orchestration
# ---------------------------------------------------------------------------

def collect_weather(ctx: task.OrchestrationContext, cities: list):
def collect_weather(ctx: task.OrchestrationContext, cities: list[str]) -> Generator[task.Task[Any], Any, list[str]]:
"""Sub-orchestration that collects weather for a list of cities."""
results = []
results: list[str] = []
for city in cities:
weather = yield ctx.call_activity(get_weather, input=city)
results.append(f"{city}: {weather}")
Expand All @@ -88,7 +90,7 @@ def collect_weather(ctx: task.OrchestrationContext, cities: list):
# Main orchestration
# ---------------------------------------------------------------------------

def weather_report_orchestrator(ctx: task.OrchestrationContext, cities: list):
def weather_report_orchestrator(ctx: task.OrchestrationContext, cities: list[str]) -> Generator[task.Task[Any], Any, str]:
"""Top-level orchestration demonstrating timers, activities, and sub-orchestrations.

Flow:
Expand Down
6 changes: 4 additions & 2 deletions examples/entities/class_based_entity.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from collections.abc import Generator
from typing import Any

from azure.identity import DefaultAzureCredential

Expand All @@ -13,7 +15,7 @@ class Counter(entities.DurableEntity):
def set(self, input: int):
self.set_state(input)

def add(self, input: int):
def add(self, input: int | None = None):
current_state = self.get_state(int, 0)
new_state = current_state + (1 if input is None else input)
self.set_state(new_state)
Expand All @@ -23,7 +25,7 @@ def get(self):
return self.get_state(int, 0)


def counter_orchestrator(ctx: task.OrchestrationContext, _):
def counter_orchestrator(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, Any]:
"""Orchestrator function that demonstrates the behavior of the counter entity"""

entity_id = task.EntityInstanceId("Counter", "myCounter")
Expand Down
8 changes: 5 additions & 3 deletions examples/entities/class_based_entity_actions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from collections.abc import Generator
from typing import Any

from azure.identity import DefaultAzureCredential

Expand All @@ -13,7 +15,7 @@ class Counter(entities.DurableEntity):
def set(self, input: int):
self.set_state(input)

def add(self, input: int):
def add(self, input: int | None = None):
current_state = self.get_state(int, 0)
new_state = current_state + (1 if input is None else input)
self.set_state(new_state)
Expand All @@ -32,7 +34,7 @@ def get(self):
return self.get_state(int, 0)


def counter_orchestrator(ctx: task.OrchestrationContext, _):
def counter_orchestrator(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, Any]:
"""Orchestrator function that demonstrates the behavior of the counter entity"""

entity_id = task.EntityInstanceId("Counter", "myCounter")
Expand All @@ -51,7 +53,7 @@ def counter_orchestrator(ctx: task.OrchestrationContext, _):
return (yield ctx.call_entity(parent_entity_id, "get"))


def hello_orchestrator(ctx: task.OrchestrationContext, _):
def hello_orchestrator(ctx: task.OrchestrationContext, _: Any) -> str:
return "Hello world!"


Expand Down
6 changes: 4 additions & 2 deletions examples/entities/entity_locking.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from collections.abc import Generator
from typing import Any

from azure.identity import DefaultAzureCredential

Expand All @@ -13,7 +15,7 @@ class Counter(entities.DurableEntity):
def set(self, input: int):
self.set_state(input)

def add(self, input: int):
def add(self, input: int | None = None):
current_state = self.get_state(int, 0)
new_state = current_state + (1 if input is None else input)
self.set_state(new_state)
Expand All @@ -23,7 +25,7 @@ def get(self):
return self.get_state(int, 0)


def counter_orchestrator(ctx: task.OrchestrationContext, _):
def counter_orchestrator(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, Any]:
"""Orchestrator function that demonstrates the behavior of the counter entity"""

entity_id = entities.EntityInstanceId("Counter", "myCounter")
Expand Down
6 changes: 4 additions & 2 deletions examples/entities/function_based_entity.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from collections.abc import Generator
from typing import Any

from azure.identity import DefaultAzureCredential

Expand All @@ -9,7 +11,7 @@
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker


def counter(ctx: entities.EntityContext, input: int) -> int | None:
def counter(ctx: entities.EntityContext, input: int | None = None) -> int | None:
if ctx.operation == "set":
ctx.set_state(input)
elif ctx.operation == "add":
Expand All @@ -23,7 +25,7 @@ def counter(ctx: entities.EntityContext, input: int) -> int | None:
raise ValueError(f"Unknown operation '{ctx.operation}'")


def counter_orchestrator(ctx: task.OrchestrationContext, _):
def counter_orchestrator(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, Any]:
"""Orchestrator function that demonstrates the behavior of the counter entity"""

entity_id = entities.EntityInstanceId("counter", "myCounter")
Expand Down
8 changes: 5 additions & 3 deletions examples/entities/function_based_entity_actions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from collections.abc import Generator
from typing import Any

from azure.identity import DefaultAzureCredential

Expand All @@ -9,7 +11,7 @@
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker


def counter(ctx: entities.EntityContext, input: int) -> int | None:
def counter(ctx: entities.EntityContext, input: int | None = None) -> int | None:
if ctx.operation == "set":
ctx.set_state(input)
elif ctx.operation == "add":
Expand All @@ -30,7 +32,7 @@ def counter(ctx: entities.EntityContext, input: int) -> int | None:
raise ValueError(f"Unknown operation '{ctx.operation}'")


def counter_orchestrator(ctx: task.OrchestrationContext, _):
def counter_orchestrator(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, Any]:
"""Orchestrator function that demonstrates the behavior of the counter entity"""

entity_id = task.EntityInstanceId("counter", "myCounter")
Expand All @@ -49,7 +51,7 @@ def counter_orchestrator(ctx: task.OrchestrationContext, _):
return (yield ctx.call_entity(parent_entity_id, "get"))


def hello_orchestrator(ctx: task.OrchestrationContext, _):
def hello_orchestrator(ctx: task.OrchestrationContext, _: Any) -> str:
return "Hello world!"


Expand Down
8 changes: 5 additions & 3 deletions examples/fanout_fanin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
import random
import time
from collections.abc import Generator
from typing import Any

from azure.identity import DefaultAzureCredential

Expand All @@ -12,7 +14,7 @@
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker


def get_work_items(ctx: task.ActivityContext, _) -> list[str]:
def get_work_items(ctx: task.ActivityContext, _: Any) -> list[str]:
"""Activity function that returns a list of work items"""
# return a random number of work items
count = random.randint(2, 10)
Expand All @@ -31,15 +33,15 @@ def process_work_item(ctx: task.ActivityContext, item: str) -> int:
return random.randint(0, 10)


def orchestrator(ctx: task.OrchestrationContext, _):
def orchestrator(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, dict[str, Any]]:
"""Orchestrator function that calls the 'get_work_items' and 'process_work_item'
activity functions in parallel, waits for them all to complete, and prints
an aggregate summary of the outputs"""

work_items: list[str] = yield ctx.call_activity(get_work_items)

# execute the work-items in parallel and wait for them all to return
tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items]
tasks: list[task.Task[int]] = [ctx.call_activity(process_work_item, input=item) for item in work_items]
results: list[int] = yield task.when_all(tasks)

# return an aggregate summary of the results
Expand Down
4 changes: 3 additions & 1 deletion examples/history_export/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import os
import time
from collections.abc import Generator
from datetime import datetime, timedelta, timezone
from typing import Any

from durabletask import client, task, worker
from durabletask.extensions.history_export import (
Expand Down Expand Up @@ -50,7 +52,7 @@ def square(_: task.ActivityContext, n: int) -> int:
return n * n


def sample_orchestrator(ctx: task.OrchestrationContext, n: int):
def sample_orchestrator(ctx: task.OrchestrationContext, n: int) -> Generator[task.Task[Any], Any, int]:
result = yield ctx.call_activity(square, input=n)
return result

Expand Down
14 changes: 10 additions & 4 deletions examples/human_interaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import os
import threading
import time
from collections import namedtuple
from collections.abc import Generator
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, NamedTuple

from azure.identity import DefaultAzureCredential

Expand All @@ -17,6 +18,11 @@
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker


class Approval(NamedTuple):
"""Represents an approval event payload"""
approver: str


@dataclass
class Order:
"""Represents a purchase order"""
Expand All @@ -39,7 +45,7 @@ def place_order(_: task.ActivityContext, order: Order) -> None:
print(f'*** Placing order: {order}')


def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):
def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order) -> Generator[task.Task[Any], Any, str]:
"""Orchestrator function that represents a purchase order workflow"""
# Orders under $1000 are auto-approved
if order.Cost < 1000:
Expand Down Expand Up @@ -87,7 +93,7 @@ def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order):

def prompt_for_approval():
input("Press [ENTER] to approve the order...\n")
approval_event = namedtuple("Approval", ["approver"])(args.approver)
approval_event = Approval(args.approver)
c.raise_orchestration_event(instance_id, "approval_received", data=approval_event)

# Prompt the user for approval on a background thread
Expand Down Expand Up @@ -133,7 +139,7 @@ def prompt_for_approval():

def prompt_for_approval():
input("Press [ENTER] to approve the order...\n")
approval_event = namedtuple("Approval", ["approver"])(args.approver)
approval_event = Approval(args.approver)
c.raise_orchestration_event(instance_id, "approval_received", data=approval_event)

# Prompt the user for approval on a background thread
Expand Down
10 changes: 6 additions & 4 deletions examples/in_memory_backend_example/src/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
access (``item["quantity"]``) for nested data.
"""

from collections.abc import Generator
from dataclasses import dataclass
from datetime import timedelta
from typing import Any

from durabletask import task

Expand Down Expand Up @@ -50,7 +52,7 @@ class Order:
# ---------------------------------------------------------------------------


def validate_order(ctx: task.ActivityContext, order) -> None:
def validate_order(ctx: task.ActivityContext, order: Any) -> None:
"""Validate that the order has items and all quantities/prices are valid.

Raises ``ValueError`` on invalid input.
Expand All @@ -66,7 +68,7 @@ def validate_order(ctx: task.ActivityContext, order) -> None:
f"Invalid price for '{item['name']}': {item['unit_price']}")


def calculate_total(ctx: task.ActivityContext, items: list) -> float:
def calculate_total(ctx: task.ActivityContext, items: list[Any]) -> float:
"""Return the total cost for a list of item dicts."""
return sum(item["quantity"] * item["unit_price"] for item in items)

Expand Down Expand Up @@ -95,7 +97,7 @@ def ship_item(ctx: task.ActivityContext, item_name: str) -> str:
# ---------------------------------------------------------------------------


def process_order(ctx: task.OrchestrationContext, order):
def process_order(ctx: task.OrchestrationContext, order: Any) -> Generator[task.Task[Any], Any, dict[str, Any]]:
"""Process a complete order: validate, pay, ship items in parallel, confirm.

Demonstrates:
Expand Down Expand Up @@ -135,7 +137,7 @@ def process_order(ctx: task.OrchestrationContext, order):
}


def order_with_approval(ctx: task.OrchestrationContext, order):
def order_with_approval(ctx: task.OrchestrationContext, order: Any) -> Generator[task.Task[Any], Any, dict[str, Any]]:
"""Order workflow that requires manager approval for high-value orders.

Demonstrates:
Expand Down
4 changes: 3 additions & 1 deletion examples/large_payload/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"""

import os
from collections.abc import Generator
from typing import Any

from azure.identity import DefaultAzureCredential

Expand All @@ -48,7 +50,7 @@ def summarize(ctx: task.ActivityContext, report: str) -> str:

# --------------- Orchestrator ---------------

def large_payload_orchestrator(ctx: task.OrchestrationContext, num_records: int):
def large_payload_orchestrator(ctx: task.OrchestrationContext, num_records: int) -> Generator[task.Task[Any], Any, str]:
Comment thread
andystaples marked this conversation as resolved.
"""Orchestrator that generates a large report and then summarizes it.

Both the report (activity output) and the orchestration input are
Expand Down
Loading
Loading