forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclass_based_entity.py
More file actions
65 lines (50 loc) · 2.62 KB
/
Copy pathclass_based_entity.py
File metadata and controls
65 lines (50 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from collections.abc import Generator
from typing import Any
from azure.identity import DefaultAzureCredential
from durabletask import client, task, entities
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
class Counter(entities.DurableEntity):
def set(self, input: int):
self.set_state(input)
def add(self, input: int | None = None):
current_state = self.get_state(int, 0)
new_state = current_state + (1 if input is None else input)
self.set_state(new_state)
return new_state
def get(self):
return self.get_state(int, 0)
def counter_orchestrator(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, Any]:
"""Orchestrator function that demonstrates the behavior of the counter entity"""
entity_id = task.EntityInstanceId("Counter", "myCounter")
# Initialize the entity with state 0
ctx.signal_entity(entity_id, "set", 0)
# Increment the counter by 1
yield ctx.call_entity(entity_id, "add", 1)
# Return the entity's current value (should be 1)
return (yield ctx.call_entity(entity_id, "get"))
# Use environment variables if provided, otherwise use default emulator values
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
secure_channel = endpoint.startswith("https://")
credential = DefaultAzureCredential() if secure_channel else None
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential) as w:
w.add_orchestrator(counter_orchestrator)
w.add_entity(Counter)
w.start()
# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential)
instance_id = c.schedule_new_orchestration(counter_orchestrator)
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')