forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfunction_based_entity_actions.py
More file actions
83 lines (66 loc) · 3.64 KB
/
Copy pathfunction_based_entity_actions.py
File metadata and controls
83 lines (66 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""End-to-end sample that demonstrates how to configure an orchestrator
that calls an activity function in a sequence and prints the outputs."""
import os
from collections.abc import Generator
from typing import Any
from azure.identity import DefaultAzureCredential
from durabletask import client, task, entities
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
def counter(ctx: entities.EntityContext, input: int | None = None) -> int | None:
if ctx.operation == "set":
ctx.set_state(input)
elif ctx.operation == "add":
current_state = ctx.get_state(int, 0)
new_state = current_state + (1 if input is None else input)
ctx.set_state(new_state)
return new_state
elif ctx.operation == "get":
return ctx.get_state(int, 0)
elif ctx.operation == "update_parent":
parent_entity_id = entities.EntityInstanceId("counter", "parentCounter")
if ctx.entity_id == parent_entity_id:
return # Prevent self-update
ctx.signal_entity(parent_entity_id, "set", ctx.get_state(int, 0))
elif ctx.operation == "start_hello":
ctx.schedule_new_orchestration("hello_orchestrator")
else:
raise ValueError(f"Unknown operation '{ctx.operation}'")
def counter_orchestrator(ctx: task.OrchestrationContext, _: Any) -> Generator[task.Task[Any], Any, Any]:
"""Orchestrator function that demonstrates the behavior of the counter entity"""
entity_id = task.EntityInstanceId("counter", "myCounter")
parent_entity_id = task.EntityInstanceId("counter", "parentCounter")
# Use counter to demonstrate starting an orchestration from an entity
ctx.signal_entity(entity_id, "start_hello")
# User counter to demonstrate signaling an entity from another entity
# Initialize myCounter with state 0, increment it by 1, and set the state of parentCounter using
# update_parent on myCounter. Retrieve and return the state of parentCounter (should be 1).
ctx.signal_entity(entity_id, "set", 0)
yield ctx.call_entity(entity_id, "add", 1)
yield ctx.call_entity(entity_id, "update_parent")
return (yield ctx.call_entity(parent_entity_id, "get"))
def hello_orchestrator(ctx: task.OrchestrationContext, _: Any) -> str:
return "Hello world!"
# Use environment variables if provided, otherwise use default emulator values
taskhub_name = os.getenv("TASKHUB", "default")
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
# Set credential to None for emulator, or DefaultAzureCredential for Azure
secure_channel = endpoint.startswith("https://")
credential = DefaultAzureCredential() if secure_channel else None
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential) as w:
w.add_orchestrator(counter_orchestrator)
w.add_orchestrator(hello_orchestrator)
w.add_entity(counter)
w.start()
# Construct the client and run the orchestrations
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
taskhub=taskhub_name, token_credential=credential)
instance_id = c.schedule_new_orchestration(counter_orchestrator)
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f'Orchestration completed! Result: {state.serialized_output}')
elif state:
print(f'Orchestration failed: {state.failure_details}')