forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
170 lines (133 loc) · 6.32 KB
/
Copy pathapp.py
File metadata and controls
170 lines (133 loc) · 6.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Distributed tracing example using OpenTelemetry and Jaeger.
This example demonstrates how to configure OpenTelemetry distributed tracing
with the Durable Task Python SDK. The orchestration showcases timers,
activities, and a sub-orchestration, all producing correlated trace spans
visible in the Jaeger UI.
Prerequisites:
- DTS emulator running on localhost:8080
- Jaeger running on localhost:4317 (OTLP gRPC) / localhost:16686 (UI)
- pip install -r requirements.txt
"""
import os
import time
from collections.abc import Generator
from datetime import timedelta
from typing import Any
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from azure.identity import DefaultAzureCredential
from durabletask import client, task
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
# ---------------------------------------------------------------------------
# OpenTelemetry configuration — MUST be done before any spans are created
# ---------------------------------------------------------------------------
OTEL_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
resource = Resource.create({"service.name": "durabletask-tracing-example"})
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint=OTEL_ENDPOINT, insecure=True)
)
)
trace.set_tracer_provider(provider)
# ---------------------------------------------------------------------------
# Activity functions
# ---------------------------------------------------------------------------
def get_weather(ctx: task.ActivityContext, city: str) -> str:
"""Simulate fetching weather data for a city."""
# In a real app this would call an external API
weather_data = {
"Tokyo": "Sunny, 22°C",
"Seattle": "Rainy, 12°C",
"London": "Cloudy, 15°C",
}
result = weather_data.get(city, "Unknown")
print(f" [Activity] get_weather({city}) -> {result}")
return result
def summarize(ctx: task.ActivityContext, reports: list[str]) -> str:
"""Combine individual weather reports into a summary string."""
summary = " | ".join(reports)
print(f" [Activity] summarize -> {summary}")
return summary
# ---------------------------------------------------------------------------
# Sub-orchestration
# ---------------------------------------------------------------------------
def collect_weather(ctx: task.OrchestrationContext, cities: list[str]) -> Generator[task.Task[Any], Any, list[str]]:
"""Sub-orchestration that collects weather for a list of cities."""
results: list[str] = []
for city in cities:
weather = yield ctx.call_activity(get_weather, input=city)
results.append(f"{city}: {weather}")
return results
# ---------------------------------------------------------------------------
# Main orchestration
# ---------------------------------------------------------------------------
def weather_report_orchestrator(ctx: task.OrchestrationContext, cities: list[str]) -> Generator[task.Task[Any], Any, str]:
"""Top-level orchestration demonstrating timers, activities, and sub-orchestrations.
Flow:
1. Wait for a short timer (simulating a scheduled delay).
2. Call a sub-orchestration to collect weather data for each city.
3. Call an activity to summarize the results.
"""
# Step 1 — Timer: wait briefly before starting work
yield ctx.create_timer(timedelta(milliseconds=100))
if not ctx.is_replaying:
print(" [Orchestrator] Timer fired — starting weather collection")
# Step 2 — Sub-orchestration: delegate city-level work
reports = yield ctx.call_sub_orchestrator(collect_weather, input=cities)
# Step 3 — Activity: summarize the collected reports
summary = yield ctx.call_activity(summarize, input=reports)
return summary
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# Use environment variables if provided, otherwise use default emulator values
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
print(f"OTLP endpoint: {OTEL_ENDPOINT}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
secure_channel = endpoint.startswith("https://")
credential = DefaultAzureCredential() if secure_channel else None
with DurableTaskSchedulerWorker(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub_name,
token_credential=credential,
) as w:
# Register orchestrators and activities
w.add_orchestrator(weather_report_orchestrator)
w.add_orchestrator(collect_weather)
w.add_activity(get_weather)
w.add_activity(summarize)
w.start()
print("Worker started.")
# Create client, schedule the orchestration, and wait for completion
c = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub_name,
token_credential=credential,
)
cities = ["Tokyo", "Seattle", "London"]
instance_id = c.schedule_new_orchestration(
weather_report_orchestrator, input=cities,
)
print(f"Orchestration started: {instance_id}")
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f"Orchestration completed! Result: {state.serialized_output}")
elif state:
print(f"Orchestration failed: {state.failure_details}")
# Flush any remaining spans to the exporter
provider.force_flush()
time.sleep(1)
print("Done. Open Jaeger at http://localhost:16686 to view traces.")