-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathworker.py
More file actions
45 lines (33 loc) · 1.37 KB
/
worker.py
File metadata and controls
45 lines (33 loc) · 1.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
from datetime import timedelta
import time
from azure.identity import DefaultAzureCredential
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
def cleanup_task(ctx, _) -> str:
"""Activity function that performs cleanup"""
print('Performing cleanup...')
# Simulate cleanup process
time.sleep(5)
return 'Cleanup completed'
def periodic_cleanup(ctx, counter):
yield ctx.call_activity(cleanup_task)
yield ctx.create_timer(timedelta(seconds=15))
if counter < 5:
ctx.continue_as_new(counter + 1)
# Get environment variables for taskhub and endpoint with defaults
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
# Configure and start the worker
with DurableTaskSchedulerWorker(host_address=endpoint,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential) as w:
w.add_orchestrator(periodic_cleanup)
w.add_activity(cleanup_task)
w.start()
while True:
time.sleep(1)