forked from microsoft/durabletask-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain_app.py
More file actions
127 lines (101 loc) · 5.09 KB
/
Copy pathmain_app.py
File metadata and controls
127 lines (101 loc) · 5.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Declarer app for the Durable Task Scheduler sandbox activities sample."""
import os
from azure.core.credentials import TokenCredential
from azure.identity import DefaultAzureCredential
from durabletask import client, task
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
from durabletask.azuremanaged.preview.sandboxes import SandboxActivitiesClient
from durabletask.azuremanaged.preview.sandboxes import SandboxWorkerProfile
from durabletask.azuremanaged.preview.sandboxes import sandbox_worker_profile
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
from activities import REMOTE_HELLO
WORKER_PROFILE_ID = "remote-hello-profile"
def hello_orchestrator(ctx: task.OrchestrationContext, name: str):
"""Orchestrator that calls an activity executed by the remote worker image."""
return (yield ctx.call_activity(REMOTE_HELLO.name, input=name))
def _get_required_env(name: str) -> str:
value = os.getenv(name)
if value:
return value
raise RuntimeError(f"Set {name} before running the sandbox sample.")
def _parse_scheduler_connection_string(connection_string: str) -> dict[str, str]:
settings: dict[str, str] = {}
for segment in connection_string.split(";"):
if not segment.strip():
continue
key, separator, value = segment.partition("=")
if not separator or not key.strip():
raise RuntimeError(
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING must use key=value segments.")
settings[key.strip().lower()] = value.strip()
return settings
def _resolve_scheduler_connection() -> tuple[str, str, bool, TokenCredential | None]:
settings = _parse_scheduler_connection_string(
_get_required_env("DURABLE_TASK_SCHEDULER_CONNECTION_STRING"))
endpoint = settings.get("endpoint")
taskhub = settings.get("taskhub")
if not endpoint:
raise RuntimeError("DURABLE_TASK_SCHEDULER_CONNECTION_STRING must include Endpoint.")
if not taskhub:
raise RuntimeError("DURABLE_TASK_SCHEDULER_CONNECTION_STRING must include TaskHub.")
authentication = settings.get("authentication", "").lower()
if authentication in ("", "none"):
credential = None
elif authentication == "defaultazure":
credential = DefaultAzureCredential()
else:
raise RuntimeError(
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING Authentication must be DefaultAzure or None.")
endpoint = endpoint.strip()
secure_channel = endpoint.lower().startswith(("https://", "grpcs://"))
return endpoint, taskhub.strip(), secure_channel, credential
endpoint, taskhub_name, secure_channel, credential = _resolve_scheduler_connection()
container_image = _get_required_env("DTS_SANDBOX_CONTAINER_IMAGE")
image_pull_managed_identity_client_id = _get_required_env("DTS_SANDBOX_IMAGE_PULL_UMI_CLIENT_ID")
scheduler_managed_identity_client_id = _get_required_env("DTS_SANDBOX_SCHEDULER_UMI_CLIENT_ID")
sample_input = os.getenv("DTS_SAMPLE_HELLO_INPUT", "sandbox Python")
sample_timeout_seconds = int(os.getenv("DTS_SAMPLE_TIMEOUT_SECONDS", "300"))
@sandbox_worker_profile(WORKER_PROFILE_ID)
class RemoteWorkerProfile(SandboxWorkerProfile):
"""Sandbox worker profile used by the sample remote activity."""
def configure(self, options) -> None:
options.image.image_ref = container_image
options.image.managed_identity_client_id = image_pull_managed_identity_client_id
options.scheduler_managed_identity_client_id = scheduler_managed_identity_client_id
options.cpu = "1000m"
options.memory = "2048Mi"
options.max_concurrent_activities = 1
options.environment_variables["SANDBOX_SAMPLE_MARKER"] = "sandboxes-python-sample-marker"
options.add_activity(REMOTE_HELLO.name)
print(f"Using taskhub: {taskhub_name}")
print(f"Using endpoint: {endpoint}")
print(f"Declaring sandbox activity image: {container_image}")
with SandboxActivitiesClient(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub_name,
token_credential=credential) as sandboxes_client:
sandboxes_client.enable_sandbox_activities()
with DurableTaskSchedulerWorker(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub_name,
token_credential=credential) as worker:
worker.add_orchestrator(hello_orchestrator)
worker.use_work_item_filters()
worker.start()
durable_client = DurableTaskSchedulerClient(
host_address=endpoint,
secure_channel=secure_channel,
taskhub=taskhub_name,
token_credential=credential)
instance_id = durable_client.schedule_new_orchestration(
hello_orchestrator,
input=sample_input)
state = durable_client.wait_for_orchestration_completion(instance_id, timeout=sample_timeout_seconds)
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
print(f"Orchestration completed! Result: {state.serialized_output}")
elif state:
print(f"Orchestration failed: {state.failure_details}")