Skip to content

Commit fdf908a

Browse files
committed
remove default worker profile
1 parent 950c3ef commit fdf908a

9 files changed

Lines changed: 223 additions & 9 deletions

File tree

durabletask-azuremanaged/durabletask/azuremanaged/preview/sandboxes/declarations.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
)
1414

1515

16-
DEFAULT_WORKER_PROFILE_ID = "default"
1716
DEFAULT_CPU = "1000m"
1817
DEFAULT_MEMORY = "2048Mi"
1918
DEFAULT_MAX_CONCURRENT_ACTIVITIES = 100
@@ -87,7 +86,7 @@ def _build_sandbox_activity_declaration(
8786
*,
8887
activity_names: str | Iterable[str],
8988
scheduler_managed_identity_client_id: Optional[str],
90-
worker_profile_id: str = DEFAULT_WORKER_PROFILE_ID,
89+
worker_profile_id: str,
9190
container_image: Optional[str] = None,
9291
image_pull_managed_identity_client_id: Optional[str] = None,
9392
cpu: str = DEFAULT_CPU,

durabletask-azuremanaged/durabletask/azuremanaged/preview/sandboxes/worker.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from durabletask.azuremanaged.preview.sandboxes.helpers import resolve_activity_names
1616
from durabletask.azuremanaged.preview.sandboxes.declarations import (
1717
DEFAULT_MAX_CONCURRENT_ACTIVITIES,
18-
DEFAULT_WORKER_PROFILE_ID,
1918
build_sandbox_worker_heartbeat,
2019
build_sandbox_worker_start,
2120
)
@@ -192,9 +191,12 @@ def _resolve_secure_channel(host_address: str) -> bool:
192191

193192

194193
def _resolve_worker_profile_id() -> str:
195-
resolved_worker_profile_id = (
196-
os.getenv("DTS_WORKER_PROFILE_ID")
197-
or DEFAULT_WORKER_PROFILE_ID)
194+
resolved_worker_profile_id = os.getenv("DTS_WORKER_PROFILE_ID")
195+
if not resolved_worker_profile_id or not resolved_worker_profile_id.strip():
196+
raise ValueError(
197+
"Sandbox worker requires DTS_WORKER_PROFILE_ID to be injected in the "
198+
"sandbox environment.")
199+
198200
return resolved_worker_profile_id.strip()
199201

200202

examples/sandboxes/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Bash:
2323
~~~bash
2424
export DTS_ENDPOINT="<scheduler endpoint>"
2525
export DTS_TASK_HUB="<task hub name>"
26-
export DTS_WORKER_PROFILE_ID="default"
26+
export DTS_WORKER_PROFILE_ID="python-sandbox-worker"
2727
export DTS_SANDBOX_CONTAINER_IMAGE="<container image reference>"
2828
export DTS_SANDBOX_IMAGE_PULL_UMI_CLIENT_ID="<image-pull UMI client ID>"
2929
export DTS_SANDBOX_SCHEDULER_UMI_CLIENT_ID="<scheduler UMI client ID>"
@@ -34,7 +34,7 @@ PowerShell:
3434
~~~powershell
3535
$env:DTS_ENDPOINT = "<scheduler endpoint>"
3636
$env:DTS_TASK_HUB = "<task hub name>"
37-
$env:DTS_WORKER_PROFILE_ID = "default"
37+
$env:DTS_WORKER_PROFILE_ID = "python-sandbox-worker"
3838
$env:DTS_SANDBOX_CONTAINER_IMAGE = "<container image reference>"
3939
$env:DTS_SANDBOX_IMAGE_PULL_UMI_CLIENT_ID = "<image-pull UMI client ID>"
4040
$env:DTS_SANDBOX_SCHEDULER_UMI_CLIENT_ID = "<scheduler UMI client ID>"

examples/sandboxes/main_app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def _get_required_env(name: str) -> str:
2828

2929
taskhub_name = os.getenv("DTS_TASK_HUB") or "SandboxPocHub"
3030
endpoint = os.getenv("DTS_ENDPOINT", "http://localhost:8080")
31-
worker_profile_id = os.getenv("DTS_WORKER_PROFILE_ID", "default")
31+
worker_profile_id = _get_required_env("DTS_WORKER_PROFILE_ID")
3232
container_image = (
3333
os.getenv("DTS_SANDBOX_CONTAINER_IMAGE")
3434
or "sandboxes-remote-worker:local")

examples/serverless/Containerfile

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
FROM python:3.12-slim AS runtime
2+
3+
WORKDIR /app
4+
5+
COPY . /src
6+
RUN pip install --no-cache-dir /src /src/durabletask-azuremanaged azure-identity
7+
8+
COPY examples/serverless/remote_worker.py /app/remote_worker.py
9+
COPY examples/serverless/activity_names.py /app/activity_names.py
10+
11+
EXPOSE 8080
12+
ENTRYPOINT ["python", "/app/remote_worker.py"]

examples/serverless/README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# DTS serverless activities sample
2+
3+
This sample mirrors the .NET serverless sample with two customer-owned pieces:
4+
5+
1. A **declarer app** (`main_app.py`) that declares which activity should run
6+
serverlessly, starts the orchestration, and waits for the result.
7+
2. A **remote worker image** (`remote_worker.py` plus `Containerfile`) that
8+
DTS starts in a sandbox to execute the declared activity.
9+
3. A tiny shared module (`activity_names.py`) that keeps the declarer and remote
10+
worker on the same activity name constants.
11+
12+
Reference .NET template:
13+
<https://github.com/microsoft/durabletask-dotnet/compare/wangbill/serverless-private-preview>
14+
under `samples/serverless`.
15+
16+
> [!NOTE]
17+
> Until the serverless extension is published in a preview package, the worker
18+
> image installs the SDK from this source tree. After publication, replace that
19+
> Containerfile step with `pip install durabletask.azuremanaged==<preview-version>`.
20+
21+
## Environment variables
22+
23+
Set these before running the declarer app:
24+
25+
```powershell
26+
$env:DTS_ENDPOINT = "<scheduler endpoint>"
27+
$env:DTS_TASK_HUB = "<task hub name>"
28+
$env:DTS_WORKER_PROFILE_ID = "python-serverless-worker"
29+
$env:DTS_SERVERLESS_CONTAINER_IMAGE = "<public container image reference>"
30+
```
31+
32+
After pushing the remote worker image, set `DTS_SERVERLESS_CONTAINER_IMAGE` to
33+
the pushed image reference. `RemoteWorkerProfile.configure()` declares CPU,
34+
memory, max concurrency, customer environment variables, and serverless activity
35+
names with `options.add_activity(...)`. The declarer and remote worker both use
36+
`activity_names.py` so they stay in sync.
37+
38+
The remote worker code cannot pass DTS runtime settings to the SDK. In a
39+
sandbox, `ServerlessWorker()` reads `DTS_ENDPOINT`,
40+
`DTS_TASK_HUB`, `DTS_WORKER_PROFILE_ID`, `DTS_SERVERLESS_MAX_ACTIVITIES`,
41+
`DTS_SUBSTRATE`, and `DTS_SANDBOX_ID` from environment variables injected by
42+
DTS. The worker reports its registered activity names when it connects, and
43+
DTS validates they match the declaration before advertising worker capacity.
44+
45+
## Build the remote worker image
46+
47+
From the repository root:
48+
49+
```powershell
50+
docker build `
51+
-f examples\serverless\Containerfile `
52+
-t <public container image reference> `
53+
.
54+
docker push <public container image reference>
55+
```
56+
57+
Private preview requires the image to be publicly pullable by ADC/DTS.
58+
59+
## Run the declarer app
60+
61+
Install local packages from the repository root:
62+
63+
```powershell
64+
pip install -e . -e .\durabletask-azuremanaged
65+
```
66+
67+
Then run:
68+
69+
```powershell
70+
python examples\serverless\main_app.py
71+
```
72+
73+
The declarer app registers the serverless activity metadata, starts
74+
`hello_orchestrator`, and the remote worker sandbox executes `remote_hello`.
75+
The result includes `SERVERLESS_SAMPLE_MARKER=serverless-python-sample-marker`,
76+
proving the customer environment variable declared on the worker profile reached
77+
the sandbox.

examples/serverless/main_app.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Declarer app for the DTS serverless activities sample."""
2+
3+
import os
4+
5+
from azure.identity import DefaultAzureCredential
6+
7+
from durabletask import client, task
8+
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
9+
from durabletask.azuremanaged.extensions.serverless import ServerlessActivitiesClient
10+
from durabletask.azuremanaged.extensions.serverless import ServerlessWorkerProfile
11+
from durabletask.azuremanaged.extensions.serverless import serverless_worker_profile
12+
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
13+
14+
from activity_names import REMOTE_HELLO
15+
16+
17+
def hello_orchestrator(ctx: task.OrchestrationContext, name: str):
18+
"""Orchestrator that calls an activity executed by the remote worker image."""
19+
return (yield ctx.call_activity(REMOTE_HELLO, input=name))
20+
21+
22+
def _get_required_env(name: str) -> str:
23+
value = os.getenv(name)
24+
if value:
25+
return value
26+
raise RuntimeError(f"Set {name} before running the serverless sample.")
27+
28+
29+
taskhub_name = os.getenv("DTS_TASK_HUB") or "ServerlessPocHub"
30+
endpoint = os.getenv("DTS_ENDPOINT", "http://localhost:8080")
31+
worker_profile_id = _get_required_env("DTS_WORKER_PROFILE_ID")
32+
container_image = os.getenv("DTS_SERVERLESS_CONTAINER_IMAGE", "serverless-remote-worker:local")
33+
sample_input = os.getenv("DTS_SAMPLE_HELLO_INPUT", "serverless Python")
34+
35+
36+
@serverless_worker_profile(worker_profile_id)
37+
class RemoteWorkerProfile(ServerlessWorkerProfile):
38+
"""Serverless worker profile used by the sample remote activity."""
39+
40+
def configure(self, options) -> None:
41+
options.container_image = container_image
42+
options.cpu = "1000m"
43+
options.memory = "2048Mi"
44+
options.max_concurrent_activities = 1
45+
options.environment_variables["SERVERLESS_SAMPLE_MARKER"] = "serverless-python-sample-marker"
46+
options.add_activity(REMOTE_HELLO)
47+
48+
49+
print(f"Using taskhub: {taskhub_name}")
50+
print(f"Using endpoint: {endpoint}")
51+
print(f"Declaring serverless activity image: {container_image}")
52+
53+
secure_channel = endpoint.startswith("https://") or endpoint.startswith("grpcs://")
54+
credential = DefaultAzureCredential() if secure_channel else None
55+
56+
serverless_client = ServerlessActivitiesClient(
57+
host_address=endpoint,
58+
secure_channel=secure_channel,
59+
taskhub=taskhub_name,
60+
token_credential=credential)
61+
serverless_client.enable_serverless_activities()
62+
63+
with DurableTaskSchedulerWorker(
64+
host_address=endpoint,
65+
secure_channel=secure_channel,
66+
taskhub=taskhub_name,
67+
token_credential=credential) as worker:
68+
worker.add_orchestrator(hello_orchestrator)
69+
worker.use_work_item_filters()
70+
worker.start()
71+
72+
durable_client = DurableTaskSchedulerClient(
73+
host_address=endpoint,
74+
secure_channel=secure_channel,
75+
taskhub=taskhub_name,
76+
token_credential=credential)
77+
instance_id = durable_client.schedule_new_orchestration(
78+
hello_orchestrator,
79+
input=sample_input)
80+
state = durable_client.wait_for_orchestration_completion(instance_id, timeout=120)
81+
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
82+
print(f"Orchestration completed! Result: {state.serialized_output}")
83+
elif state:
84+
print(f"Orchestration failed: {state.failure_details}")
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Remote worker image entrypoint for the DTS serverless activities sample."""
2+
3+
import os
4+
import time
5+
6+
from durabletask import task
7+
from durabletask.azuremanaged.extensions.serverless import ServerlessWorker
8+
9+
from activity_names import REMOTE_HELLO
10+
11+
12+
def _remote_hello(ctx: task.ActivityContext, name: str) -> str:
13+
"""Activity function that runs inside the serverless worker container."""
14+
sandbox_id = os.getenv("DTS_SANDBOX_ID", "unknown-sandbox")
15+
marker = os.getenv("SERVERLESS_SAMPLE_MARKER", "<missing>")
16+
return f"Hello {name} from Python serverless worker {sandbox_id}! SERVERLESS_SAMPLE_MARKER={marker}"
17+
18+
19+
_remote_hello.__name__ = REMOTE_HELLO
20+
21+
22+
with ServerlessWorker() as worker:
23+
worker.add_activity(_remote_hello)
24+
worker.start()
25+
print("Python serverless remote worker is running. Press Ctrl+C to stop.")
26+
try:
27+
while True:
28+
time.sleep(3600)
29+
except KeyboardInterrupt:
30+
pass

tests/durabletask-azuremanaged/test_sandboxes_extension.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ def test_sandbox_activities_client_does_not_expose_worker_registration_rpc() ->
309309
def test_sandbox_worker_does_not_own_legacy_wakeup_server(monkeypatch) -> None:
310310
monkeypatch.setenv("DTS_ENDPOINT", "http://localhost:8080")
311311
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
312+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
312313
monkeypatch.setenv("DTS_SANDBOX_PROVIDER", "Sandbox")
313314

314315
worker = SandboxWorker()
@@ -359,6 +360,7 @@ def OtherActivity(_ctx, value):
359360
def test_sandbox_worker_stop_keeps_handle_for_still_running_registration_thread(monkeypatch) -> None:
360361
monkeypatch.setenv("DTS_ENDPOINT", "http://localhost:8080")
361362
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
363+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
362364
monkeypatch.setenv("DTS_SANDBOX_PROVIDER", "Sandbox")
363365

364366
class StillRunningThread:
@@ -385,6 +387,7 @@ def is_alive(self):
385387
def test_sandbox_worker_uses_scheduler_channel_without_credential(monkeypatch) -> None:
386388
monkeypatch.setenv("DTS_ENDPOINT", "https://example.scheduler")
387389
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
390+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
388391
monkeypatch.setenv("DTS_SANDBOX_PROVIDER", "Sandbox")
389392

390393
worker = SandboxWorker()
@@ -396,6 +399,7 @@ def test_sandbox_worker_uses_scheduler_channel_without_credential(monkeypatch) -
396399
def test_sandbox_worker_ignores_legacy_max_activities(monkeypatch) -> None:
397400
monkeypatch.setenv("DTS_ENDPOINT", "https://example.scheduler")
398401
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
402+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
399403
monkeypatch.setenv("DTS_SANDBOX_PROVIDER", "Sandbox")
400404
monkeypatch.delenv("DTS_SANDBOX_MAX_ACTIVITIES", raising=False)
401405
monkeypatch.setenv("DTS_" + "SERVER" + "LESS_MAX_ACTIVITIES", "7")
@@ -408,6 +412,7 @@ def test_sandbox_worker_ignores_legacy_max_activities(monkeypatch) -> None:
408412
def test_sandbox_worker_tracks_active_activity_count_with_hooks(monkeypatch) -> None:
409413
monkeypatch.setenv("DTS_ENDPOINT", "https://example.scheduler")
410414
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
415+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
411416
monkeypatch.setenv("DTS_SANDBOX_PROVIDER", "Sandbox")
412417

413418
worker = SandboxWorker()
@@ -425,6 +430,7 @@ def test_sandbox_worker_tracks_active_activity_count_with_hooks(monkeypatch) ->
425430
def test_sandbox_worker_uses_managed_identity_credential_when_injected(monkeypatch) -> None:
426431
monkeypatch.setenv("DTS_ENDPOINT", "https://example.scheduler")
427432
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
433+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
428434
monkeypatch.setenv("DTS_SANDBOX_PROVIDER", "Sandbox")
429435
monkeypatch.setenv("DTS_AUTHENTICATION", "ManagedIdentity")
430436
monkeypatch.setenv("DTS_UMI_CLIENT_ID", "worker-client-id")
@@ -440,6 +446,7 @@ def test_sandbox_worker_uses_managed_identity_credential_when_injected(monkeypat
440446
def test_sandbox_worker_requires_managed_identity_client_id_when_auth_enabled(monkeypatch) -> None:
441447
monkeypatch.setenv("DTS_ENDPOINT", "https://example.scheduler")
442448
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
449+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
443450
monkeypatch.setenv("DTS_SANDBOX_PROVIDER", "Sandbox")
444451
monkeypatch.setenv("DTS_AUTHENTICATION", "ManagedIdentity")
445452
monkeypatch.delenv("DTS_UMI_CLIENT_ID", raising=False)
@@ -455,6 +462,7 @@ def test_sandbox_worker_requires_managed_identity_client_id_when_auth_enabled(mo
455462
def test_sandbox_worker_requires_registered_activities(monkeypatch) -> None:
456463
monkeypatch.setenv("DTS_ENDPOINT", "http://localhost:8080")
457464
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
465+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
458466
monkeypatch.setenv("DTS_SANDBOX_PROVIDER", "Sandbox")
459467

460468
worker = SandboxWorker()
@@ -470,6 +478,7 @@ def test_sandbox_worker_requires_registered_activities(monkeypatch) -> None:
470478
def test_sandbox_worker_requires_injected_sandbox_provider(monkeypatch) -> None:
471479
monkeypatch.setenv("DTS_ENDPOINT", "https://example.scheduler")
472480
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
481+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
473482
monkeypatch.delenv("DTS_SANDBOX_PROVIDER", raising=False)
474483

475484
try:
@@ -483,6 +492,7 @@ def test_sandbox_worker_requires_injected_sandbox_provider(monkeypatch) -> None:
483492
def test_sandbox_worker_rejects_invalid_sandbox_provider(monkeypatch) -> None:
484493
monkeypatch.setenv("DTS_ENDPOINT", "https://example.scheduler")
485494
monkeypatch.setenv("DTS_TASK_HUB", "env-hub")
495+
monkeypatch.setenv("DTS_WORKER_PROFILE_ID", "env-profile")
486496
monkeypatch.setenv("DTS_SANDBOX_PROVIDER", "ContainerApp")
487497

488498
try:

0 commit comments

Comments
 (0)