forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfunction_based_entity.py
More file actions
66 lines (52 loc) · 2.7 KB
/
Copy pathfunction_based_entity.py
File metadata and controls
66 lines (52 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from azure.identity import DefaultAzureCredential
from durabletask import client, entities, task
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
def counter(ctx: entities.EntityContext, input: int) -> int | None:
if ctx.operation == "set":
ctx.set_state(input)
elif ctx.operation == "add":
current_state = ctx.get_state(int, 0)
new_state = current_state + (1 if input is None else input)
ctx.set_state(new_state)
return new_state
elif ctx.operation == "get":
return ctx.get_state(int, 0)
else:
raise ValueError(f"Unknown operation '{ctx.operation}'")
def counter_orchestrator(ctx: task.OrchestrationContext, _):
"""Orchestrator function that demonstrates the behavior of the counter entity"""
entity_id = entities.EntityInstanceId("counter", "myCounter")
# Initialize the entity with state 0
ctx.signal_entity(entity_id, "set", 0)
# Increment the counter by 1
yield ctx.call_entity(entity_id, "add", 1)
# Return the entity's current value (should be 1)
return (yield ctx.call_entity(entity_id, "get"))
# Use environment variables if provided, otherwise use default emulator values
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
secure_channel = endpoint.startswith("https://")
credential = DefaultAzureCredential() if secure_channel else None
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential) as w:
w.add_orchestrator(counter_orchestrator)
w.add_entity(counter)
w.start()
# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential)
instance_id = c.schedule_new_orchestration(counter_orchestrator)
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')