Skip to content

Commit 817e0d7

Browse files
Add invocable activity carrier reference handler to Python SDK
Provide a Python adapter for activity-grade invocable carriers that consumes the stable external-task input envelope and returns the stable external-task result envelope. The handler deliberately rejects workflow-task inputs so HTTP or serverless adapters cannot become hidden workflow runtimes, and maps cancellation, deadline, decode, and non-retryable errors onto the existing failure shape.
1 parent 8fed219 commit 817e0d7

4 files changed

Lines changed: 365 additions & 0 deletions

File tree

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,18 @@ decisions for success, retryability, malformed output, cancellation, deadline
420420
exceeded, handler crash, decode failure, and unsupported payload
421421
codec/reference states without treating stderr as a machine signal.
422422

423+
Invocable activity carriers can use `InvocableActivityHandler` as a reference
424+
adapter for HTTP or serverless runtimes. It accepts the same external-task input
425+
envelope, invokes a registered activity handler, and returns the same
426+
external-task result envelope while rejecting workflow-task inputs:
427+
428+
```python
429+
from durable_workflow import InvocableActivityHandler
430+
431+
adapter = InvocableActivityHandler({"billing.charge-card": charge_card})
432+
result_envelope = await adapter.handle(request_json)
433+
```
434+
423435
Bridge adapters can hand bounded webhook ingress into the server through
424436
`Client.send_webhook_bridge_event()`. The method returns the server's typed
425437
bridge outcome for accepted, duplicate, and rejected events, including

src/durable_workflow/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
WorkflowTaskHandler,
9999
WorkflowTaskInterceptorContext,
100100
)
101+
from .invocable import InvocableActivityHandler, handle_invocable_activity
101102
from .metrics import (
102103
InMemoryMetrics,
103104
MetricsRecorder,
@@ -192,6 +193,7 @@
192193
"parse_external_task_result_artifact",
193194
"InvalidArgument",
194195
"InMemoryMetrics",
196+
"InvocableActivityHandler",
195197
"LocalFilesystemExternalStorage",
196198
"MetricsRecorder",
197199
"NamespaceNotFound",
@@ -223,6 +225,7 @@
223225
"parse_external_task_input",
224226
"parse_external_task_input_artifact",
225227
"parse_external_task_result",
228+
"handle_invocable_activity",
226229
"to_avro_payload_value",
227230
"to_avro_payload_values",
228231
"replay",

src/durable_workflow/invocable.py

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import time
5+
import traceback
6+
from collections.abc import Awaitable, Callable, Mapping
7+
from typing import Any
8+
9+
from . import serializer
10+
from .errors import ActivityCancelled, NonRetryableError
11+
from .external_storage import ExternalPayloadCache, ExternalStorageDriver
12+
from .external_task_input import ExternalTaskInput, parse_external_task_input
13+
from .external_task_result import EXTERNAL_TASK_RESULT_SCHEMA, EXTERNAL_TASK_RESULT_VERSION
14+
15+
InvocableActivityCallable = Callable[..., Any | Awaitable[Any]]
16+
17+
18+
class InvocableActivityHandler:
19+
"""Reference adapter for activity-grade invocable carriers.
20+
21+
The handler consumes the stable external-task input envelope and returns
22+
the stable external-task result envelope. It deliberately rejects workflow
23+
tasks so lightweight HTTP/serverless carriers cannot become hidden workflow
24+
runtimes.
25+
"""
26+
27+
def __init__(
28+
self,
29+
handlers: Mapping[str, InvocableActivityCallable],
30+
*,
31+
carrier: str = "python-invocable",
32+
result_codec: str = serializer.AVRO_CODEC,
33+
external_storage: ExternalStorageDriver | None = None,
34+
external_storage_cache: ExternalPayloadCache | None = None,
35+
) -> None:
36+
if result_codec not in serializer.SUPPORTED_CODECS:
37+
raise ValueError(f"unsupported invocable result codec {result_codec!r}")
38+
self.handlers = dict(handlers)
39+
self.carrier = carrier
40+
self.result_codec = result_codec
41+
self.external_storage = external_storage
42+
self.external_storage_cache = external_storage_cache
43+
44+
async def handle(self, envelope: Mapping[str, Any]) -> dict[str, Any]:
45+
started = time.monotonic()
46+
task_input = parse_external_task_input(envelope)
47+
48+
if not task_input.is_activity_task:
49+
return self._failure(
50+
task_input,
51+
started,
52+
kind="application",
53+
classification="application_error",
54+
message="invocable activity handlers only accept activity_task inputs",
55+
failure_type="UnsupportedExternalTaskKind",
56+
retryable=False,
57+
)
58+
59+
handler_name = task_input.task.handler
60+
handler = self.handlers.get(handler_name or "")
61+
if handler is None:
62+
return self._failure(
63+
task_input,
64+
started,
65+
kind="application",
66+
classification="application_error",
67+
message=f"no invocable activity handler registered for {handler_name!r}",
68+
failure_type="UnknownActivityHandler",
69+
retryable=False,
70+
)
71+
72+
try:
73+
args = self._decode_arguments(task_input)
74+
result = handler(*args)
75+
if asyncio.iscoroutine(result):
76+
result = await result
77+
except ActivityCancelled:
78+
return self._failure(
79+
task_input,
80+
started,
81+
kind="cancellation",
82+
classification="cancelled",
83+
message="Activity was cancelled before the handler completed.",
84+
failure_type="ActivityCancelled",
85+
retryable=False,
86+
cancelled=True,
87+
)
88+
except NonRetryableError as exc:
89+
return self._failure(
90+
task_input,
91+
started,
92+
kind="application",
93+
classification="application_error",
94+
message=str(exc),
95+
failure_type=type(exc).__name__,
96+
stack_trace=traceback.format_exc(),
97+
retryable=False,
98+
)
99+
except TimeoutError as exc:
100+
return self._failure(
101+
task_input,
102+
started,
103+
kind="timeout",
104+
classification="deadline_exceeded",
105+
message=str(exc) or "Invocable activity handler exceeded its deadline.",
106+
failure_type=type(exc).__name__,
107+
stack_trace=traceback.format_exc(),
108+
timeout_type="deadline_exceeded",
109+
retryable=True,
110+
)
111+
except (TypeError, ValueError) as exc:
112+
return self._failure(
113+
task_input,
114+
started,
115+
kind="decode_failure",
116+
classification="decode_failure",
117+
message=f"Carrier could not decode or encode the activity payload: {exc}",
118+
failure_type=type(exc).__name__,
119+
stack_trace=traceback.format_exc(),
120+
retryable=False,
121+
details={"codec": self._input_codec(task_input)},
122+
)
123+
except Exception as exc:
124+
return self._failure(
125+
task_input,
126+
started,
127+
kind="application",
128+
classification="application_error",
129+
message=str(exc),
130+
failure_type=type(exc).__name__,
131+
stack_trace=traceback.format_exc(),
132+
retryable=True,
133+
)
134+
135+
return self._success(task_input, started, result)
136+
137+
def _decode_arguments(self, task_input: ExternalTaskInput) -> list[Any]:
138+
raw_args = task_input.payloads["arguments"]
139+
if raw_args is None:
140+
return []
141+
142+
decoded = serializer.decode_envelope(
143+
raw_args,
144+
codec=self._input_codec(task_input),
145+
external_storage=self.external_storage,
146+
external_storage_cache=self.external_storage_cache,
147+
)
148+
if decoded is None:
149+
return []
150+
if isinstance(decoded, list):
151+
return decoded
152+
return [decoded]
153+
154+
@staticmethod
155+
def _input_codec(task_input: ExternalTaskInput) -> str:
156+
raw_args = task_input.payloads.get("arguments")
157+
if isinstance(raw_args, Mapping):
158+
codec = raw_args.get("codec")
159+
if isinstance(codec, str):
160+
return codec
161+
return serializer.JSON_CODEC
162+
163+
def _success(self, task_input: ExternalTaskInput, started: float, result: Any) -> dict[str, Any]:
164+
return {
165+
"schema": EXTERNAL_TASK_RESULT_SCHEMA,
166+
"version": EXTERNAL_TASK_RESULT_VERSION,
167+
"outcome": {
168+
"status": "succeeded",
169+
"recorded": True,
170+
},
171+
"task": self._task_identity(task_input),
172+
"result": {
173+
"payload": serializer.envelope(result, codec=self.result_codec),
174+
"metadata": {
175+
"content_type": "application/vnd.durable-workflow.result+json",
176+
},
177+
},
178+
"metadata": self._metadata(task_input, started),
179+
}
180+
181+
def _failure(
182+
self,
183+
task_input: ExternalTaskInput,
184+
started: float,
185+
*,
186+
kind: str,
187+
classification: str,
188+
message: str,
189+
failure_type: str,
190+
retryable: bool,
191+
stack_trace: str | None = None,
192+
timeout_type: str | None = None,
193+
cancelled: bool = False,
194+
details: Mapping[str, Any] | None = None,
195+
) -> dict[str, Any]:
196+
return {
197+
"schema": EXTERNAL_TASK_RESULT_SCHEMA,
198+
"version": EXTERNAL_TASK_RESULT_VERSION,
199+
"outcome": {
200+
"status": "failed",
201+
"retryable": retryable,
202+
"recorded": True,
203+
},
204+
"task": self._task_identity(task_input),
205+
"failure": {
206+
"kind": kind,
207+
"classification": classification,
208+
"message": message,
209+
"type": failure_type,
210+
"stack_trace": stack_trace,
211+
"timeout_type": timeout_type,
212+
"cancelled": cancelled,
213+
"details": details,
214+
},
215+
"metadata": self._metadata(task_input, started),
216+
}
217+
218+
@staticmethod
219+
def _task_identity(task_input: ExternalTaskInput) -> dict[str, Any]:
220+
return {
221+
"id": task_input.task.id,
222+
"kind": task_input.task.kind,
223+
"attempt": task_input.task.attempt,
224+
"idempotency_key": task_input.task.idempotency_key,
225+
}
226+
227+
def _metadata(self, task_input: ExternalTaskInput, started: float) -> dict[str, Any]:
228+
return {
229+
"handler": task_input.task.handler,
230+
"carrier": self.carrier,
231+
"duration_ms": max(0, int((time.monotonic() - started) * 1000)),
232+
}
233+
234+
235+
async def handle_invocable_activity(
236+
envelope: Mapping[str, Any],
237+
handlers: Mapping[str, InvocableActivityCallable],
238+
**options: Any,
239+
) -> dict[str, Any]:
240+
"""Handle one invocable activity task with a temporary adapter instance."""
241+
242+
return await InvocableActivityHandler(handlers, **options).handle(envelope)

tests/test_invocable.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import copy
2+
import json
3+
from pathlib import Path
4+
from typing import Any
5+
6+
import pytest
7+
8+
from durable_workflow import serializer
9+
from durable_workflow.errors import NonRetryableError
10+
from durable_workflow.external_task_result import parse_external_task_result
11+
from durable_workflow.invocable import InvocableActivityHandler, handle_invocable_activity
12+
13+
FIXTURES = Path(__file__).parent / "fixtures" / "external-task-input"
14+
15+
16+
def load_fixture(name: str) -> dict[str, Any]:
17+
return json.loads((FIXTURES / name).read_text())
18+
19+
20+
def activity_input(*args: Any) -> dict[str, Any]:
21+
envelope = copy.deepcopy(load_fixture("activity-task.v1.json"))
22+
envelope["payloads"]["arguments"] = serializer.envelope(list(args), codec=serializer.JSON_CODEC)
23+
return envelope
24+
25+
26+
async def test_invocable_activity_handler_returns_success_result_envelope() -> None:
27+
handler = InvocableActivityHandler(
28+
{"billing.charge-card": lambda amount, currency: {"approved": True, "amount": amount, "currency": currency}},
29+
carrier="lambda-adapter",
30+
result_codec=serializer.JSON_CODEC,
31+
)
32+
33+
output = await handler.handle(activity_input(4200, "USD"))
34+
result = parse_external_task_result(output)
35+
36+
assert result.succeeded is True
37+
assert result.task.id == "acttask_01HV7D3G3G61TAH2YB5RK45XJS"
38+
assert result.metadata["carrier"] == "lambda-adapter"
39+
assert result.result is not None
40+
assert serializer.decode_envelope(result.result["payload"]) == {
41+
"approved": True,
42+
"amount": 4200,
43+
"currency": "USD",
44+
}
45+
46+
47+
async def test_invocable_activity_handler_awaits_async_handlers() -> None:
48+
async def charge(amount: int) -> dict[str, int]:
49+
return {"amount": amount}
50+
51+
output = await handle_invocable_activity(
52+
activity_input(42),
53+
{"billing.charge-card": charge},
54+
result_codec=serializer.JSON_CODEC,
55+
)
56+
result = parse_external_task_result(output)
57+
58+
assert result.succeeded is True
59+
assert result.result is not None
60+
assert serializer.decode_envelope(result.result["payload"]) == {"amount": 42}
61+
62+
63+
async def test_invocable_activity_handler_fails_closed_for_unknown_handler() -> None:
64+
output = await InvocableActivityHandler({}, result_codec=serializer.JSON_CODEC).handle(activity_input("x"))
65+
result = parse_external_task_result(output)
66+
67+
assert result.failed is True
68+
assert result.retryable is False
69+
assert result.failure_kind == "application"
70+
assert result.failure_classification == "application_error"
71+
assert result.failure is not None
72+
assert "no invocable activity handler registered" in result.failure.message
73+
74+
75+
async def test_invocable_activity_handler_maps_non_retryable_errors() -> None:
76+
def handler() -> None:
77+
raise NonRetryableError("card rejected")
78+
79+
output = await InvocableActivityHandler(
80+
{"billing.charge-card": handler},
81+
result_codec=serializer.JSON_CODEC,
82+
).handle(activity_input())
83+
result = parse_external_task_result(output)
84+
85+
assert result.failed is True
86+
assert result.retryable is False
87+
assert result.failure_kind == "application"
88+
assert result.failure is not None
89+
assert result.failure.message == "card rejected"
90+
91+
92+
async def test_invocable_activity_handler_rejects_workflow_task_inputs() -> None:
93+
output = await InvocableActivityHandler(
94+
{"billing.invoice.workflow": lambda: {"ignored": True}},
95+
result_codec=serializer.JSON_CODEC,
96+
).handle(load_fixture("workflow-task.v1.json"))
97+
result = parse_external_task_result(output)
98+
99+
assert result.failed is True
100+
assert result.retryable is False
101+
assert result.task.kind == "workflow_task"
102+
assert result.failure is not None
103+
assert "only accept activity_task" in result.failure.message
104+
105+
106+
async def test_invocable_activity_handler_validates_result_codec() -> None:
107+
with pytest.raises(ValueError, match="unsupported invocable result codec"):
108+
InvocableActivityHandler({}, result_codec="protobuf")

0 commit comments

Comments
 (0)