forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathentity_locking.py
More file actions
68 lines (52 loc) · 2.8 KB
/
Copy pathentity_locking.py
File metadata and controls
68 lines (52 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from azure.identity import DefaultAzureCredential
from durabletask import client, task, entities
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
class Counter(entities.DurableEntity):
def set(self, input: int):
self.set_state(input)
def add(self, input: int):
current_state = self.get_state(int, 0)
new_state = current_state + (1 if input is None else input)
self.set_state(new_state)
return new_state
def get(self):
return self.get_state(int, 0)
def counter_orchestrator(ctx: task.OrchestrationContext, _):
"""Orchestrator function that demonstrates the behavior of the counter entity"""
entity_id = entities.EntityInstanceId("Counter", "myCounter")
# Initialize the entity with state 0, increment the counter by 1, and get the entity state using
# entity locking to ensure no other orchestrator can modify the entity state between the calls to call_entity
with (yield ctx.lock_entities([entity_id])):
yield ctx.call_entity(entity_id, "set", 0)
yield ctx.call_entity(entity_id, "add", 1)
result = yield ctx.call_entity(entity_id, "get")
# Return the entity's current value (will be 1)
return result
# Use environment variables if provided, otherwise use default emulator values
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
secure_channel = endpoint.startswith("https://")
credential = DefaultAzureCredential() if secure_channel else None
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential) as w:
w.add_orchestrator(counter_orchestrator)
w.add_entity(Counter)
w.start()
# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential)
instance_id = c.schedule_new_orchestration(counter_orchestrator)
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')