-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathworker.py
More file actions
144 lines (116 loc) · 4.92 KB
/
worker.py
File metadata and controls
144 lines (116 loc) · 4.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio
import datetime
import logging
import time
import os
from azure.identity import DefaultAzureCredential
from durabletask import task
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Activity functions
def check_job_status(ctx, job_data: dict) -> dict:
"""
Activity that simulates checking the status of a long-running job.
In a real application, this would call an external API or service.
"""
# Extract job_id from the job_data dictionary
job_id = job_data.get("job_id", "unknown")
check_count = job_data.get("check_count", 0)
logger.info(f"Checking status for job: {job_id} (check #{check_count+1})")
# Simulate job status
if check_count >= 3:
status = "Completed"
else:
status = "Running"
return {
"job_id": job_id,
"status": status,
"check_count": check_count + 1,
"last_check_time": datetime.datetime.now().isoformat()
}
# Orchestrator function
def monitoring_job_orchestrator(ctx, job_data: dict) -> dict:
"""
Orchestrator that demonstrates the monitoring pattern.
This orchestrator periodically checks the status of a job until it
completes or reaches a maximum number of checks.
"""
job_id = job_data.get("job_id")
polling_interval = job_data.get("polling_interval_seconds", 5)
timeout = job_data.get("timeout_seconds", 30)
logger.info(f"Starting monitoring orchestration for job {job_id}")
logger.info(f"Polling interval: {polling_interval} seconds")
logger.info(f"Timeout: {timeout} seconds")
# Record the start time
start_time = ctx.current_utc_datetime
expiration_time = start_time + datetime.timedelta(seconds=timeout)
# Initialize monitoring state
job_status = {
"job_id": job_id,
"status": "Unknown",
"check_count": 0
}
# Loop until the job completes or times out
while True:
# Check current job status
check_input = {"job_id": job_id, "check_count": job_status.get("check_count", 0)}
job_status = yield ctx.call_activity("check_job_status", input=check_input)
# Make the job status available to clients via custom status
ctx.set_custom_status(job_status)
if job_status["status"] == "Completed":
logger.info(f"Job {job_id} completed after {job_status['check_count']} checks")
break
# Check if we've hit the timeout
current_time = ctx.current_utc_datetime
if current_time >= expiration_time:
logger.info(f"Monitoring for job {job_id} timed out after {timeout} seconds")
job_status["status"] = "Timeout"
break
# Determine the next check time
next_check_time = current_time + datetime.timedelta(seconds=polling_interval)
# Don't check past the expiration time
if next_check_time > expiration_time:
next_check_time = expiration_time
# Schedule the next check
logger.info(f"Waiting {polling_interval} seconds before next check of job {job_id}")
yield ctx.create_timer(next_check_time)
# Return the final status
return {
"job_id": job_id,
"final_status": job_status["status"],
"checks_performed": job_status["check_count"],
"monitoring_duration_seconds": (ctx.current_utc_datetime - start_time).total_seconds()
}
async def main():
"""Main entry point for the worker process."""
logger.info("Starting Monitoring pattern worker...")
# Get environment variables for taskhub and endpoint with defaults
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
# Create a worker using Azure Managed Durable Task and start it with a context manager
with DurableTaskSchedulerWorker(
host_address=endpoint,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential
) as worker:
# Register activities and orchestrators
worker.add_activity(check_job_status)
worker.add_orchestrator(monitoring_job_orchestrator)
# Start the worker (without awaiting)
worker.start()
try:
# Keep the worker running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Worker shutdown initiated")
logger.info("Worker stopped")
if __name__ == "__main__":
asyncio.run(main())