Skip to content

Commit bc0f51e

Browse files
[cross-repo from workflow#130] GitHub #481: server + workflow + sdk-python + cloud: invocable carrier for activity-grade external execution (#30)
1 parent 249bfa9 commit bc0f51e

4 files changed

Lines changed: 243 additions & 4 deletions

File tree

docs/reference/invocable.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,32 @@ async def handle_request(request_json: dict) -> dict:
2020
return await adapter.handle(request_json)
2121
```
2222

23+
For HTTP endpoints or API Gateway/Lambda style handlers, use the packaged
24+
adapters so content type, body parsing, and result serialization stay aligned
25+
with the carrier contract:
26+
27+
```python
28+
from durable_workflow import handle_invocable_http_request, lambda_invocable_activity_handler
29+
30+
31+
async def handle_http(body: bytes):
32+
return await handle_invocable_http_request(body, {"billing.charge-card": charge_card})
33+
34+
35+
lambda_handler = lambda_invocable_activity_handler({"billing.charge-card": charge_card})
36+
```
37+
38+
`handle_invocable_http_request` returns an `InvocableHttpResponse` with
39+
`status_code`, `headers`, and `body`. On success (HTTP 200) the body is the
40+
external-task result envelope with `Content-Type:
41+
application/vnd.durable-workflow.external-task-result+json`. On a bad or
42+
unparseable request body it returns HTTP 400 with a JSON error object — no
43+
durable task identity is available in that case so no structured result envelope
44+
can be built.
45+
46+
`lambda_invocable_activity_handler` wraps the async helper in a synchronous
47+
AWS Lambda / API Gateway handler. It decodes base64 bodies when
48+
`isBase64Encoded` is `true` so it works with both REST API and HTTP API
49+
integrations without additional configuration.
50+
2351
::: durable_workflow.invocable

src/durable_workflow/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,14 @@
119119
WorkflowTaskHandler,
120120
WorkflowTaskInterceptorContext,
121121
)
122-
from .invocable import InvocableActivityHandler, handle_invocable_activity
122+
from .invocable import (
123+
InvocableActivityHandler,
124+
InvocableHttpResponse,
125+
handle_invocable_activity,
126+
handle_invocable_http_request,
127+
handle_invocable_lambda_event,
128+
lambda_invocable_activity_handler,
129+
)
123130
from .metrics import (
124131
InMemoryMetrics,
125132
MetricsRecorder,
@@ -249,6 +256,7 @@
249256
"InvalidArgument",
250257
"InMemoryMetrics",
251258
"InvocableActivityHandler",
259+
"InvocableHttpResponse",
252260
"GCSExternalStorage",
253261
"LocalFilesystemExternalStorage",
254262
"MetricsRecorder",
@@ -286,6 +294,9 @@
286294
"parse_external_task_input_artifact",
287295
"parse_external_task_result",
288296
"handle_invocable_activity",
297+
"handle_invocable_http_request",
298+
"handle_invocable_lambda_event",
299+
"lambda_invocable_activity_handler",
289300
"store_external_payload",
290301
"to_avro_payload_value",
291302
"to_avro_payload_values",

src/durable_workflow/invocable.py

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,25 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import base64
45
import inspect
6+
import json
57
import time
68
import traceback
79
from collections.abc import Awaitable, Callable, Mapping
10+
from dataclasses import dataclass
811
from datetime import datetime, timezone
9-
from typing import Any
12+
from typing import Any, cast
1013

1114
from . import serializer
1215
from .errors import ActivityCancelled, NonRetryableError
1316
from .external_storage import ExternalPayloadCache, ExternalStorageDriver
1417
from .external_task_input import ExternalTaskInput, parse_external_task_input
15-
from .external_task_result import EXTERNAL_TASK_RESULT_SCHEMA, EXTERNAL_TASK_RESULT_VERSION
18+
from .external_task_result import (
19+
EXTERNAL_TASK_RESULT_MEDIA_TYPE,
20+
EXTERNAL_TASK_RESULT_SCHEMA,
21+
EXTERNAL_TASK_RESULT_VERSION,
22+
)
1623

1724
InvocableActivityCallable = Callable[..., Any | Awaitable[Any]]
1825

@@ -315,3 +322,123 @@ async def handle_invocable_activity(
315322
"""Handle one invocable activity task with a temporary adapter instance."""
316323

317324
return await InvocableActivityHandler(handlers, **options).handle(envelope)
325+
326+
327+
@dataclass(frozen=True)
328+
class InvocableHttpResponse:
329+
"""Structured HTTP response from an invocable activity carrier endpoint."""
330+
331+
status_code: int
332+
headers: Mapping[str, str]
333+
body: str
334+
335+
def json(self) -> dict[str, Any]:
336+
decoded = json.loads(self.body)
337+
if not isinstance(decoded, dict):
338+
raise ValueError("invocable HTTP response body is not a JSON object")
339+
return cast(dict[str, Any], decoded)
340+
341+
342+
async def handle_invocable_http_request(
343+
body: bytes | str | Mapping[str, Any],
344+
handlers: Mapping[str, InvocableActivityCallable],
345+
**options: Any,
346+
) -> InvocableHttpResponse:
347+
"""Handle one HTTP-addressed invocable activity request.
348+
349+
The server expects a structured external-task result envelope on HTTP 200.
350+
Bad request bodies return HTTP 400 because no durable task identity can be
351+
recovered for a valid failure envelope.
352+
"""
353+
354+
try:
355+
envelope = _coerce_json_object(body)
356+
except (TypeError, ValueError) as exc:
357+
return InvocableHttpResponse(
358+
status_code=400,
359+
headers={"Content-Type": "application/json"},
360+
body=_json_dump({"error": "invalid_invocable_request", "message": str(exc)}),
361+
)
362+
363+
try:
364+
result = await handle_invocable_activity(envelope, handlers, **options)
365+
except Exception as exc:
366+
return InvocableHttpResponse(
367+
status_code=400,
368+
headers={"Content-Type": "application/json"},
369+
body=_json_dump({"error": "invalid_invocable_request", "message": str(exc)}),
370+
)
371+
372+
return InvocableHttpResponse(
373+
status_code=200,
374+
headers={"Content-Type": EXTERNAL_TASK_RESULT_MEDIA_TYPE},
375+
body=_json_dump(result),
376+
)
377+
378+
379+
async def handle_invocable_lambda_event(
380+
event: Mapping[str, Any],
381+
handlers: Mapping[str, InvocableActivityCallable],
382+
**options: Any,
383+
) -> dict[str, Any]:
384+
"""Handle an AWS Lambda / API Gateway style invocable activity event."""
385+
386+
try:
387+
body = _lambda_event_body(event)
388+
except (TypeError, ValueError) as exc:
389+
response = InvocableHttpResponse(
390+
status_code=400,
391+
headers={"Content-Type": "application/json"},
392+
body=_json_dump({"error": "invalid_invocable_request", "message": str(exc)}),
393+
)
394+
else:
395+
response = await handle_invocable_http_request(body, handlers, **options)
396+
397+
return {
398+
"statusCode": response.status_code,
399+
"headers": dict(response.headers),
400+
"body": response.body,
401+
"isBase64Encoded": False,
402+
}
403+
404+
405+
def lambda_invocable_activity_handler(
406+
handlers: Mapping[str, InvocableActivityCallable],
407+
**options: Any,
408+
) -> Callable[[Mapping[str, Any], Any], dict[str, Any]]:
409+
"""Build a synchronous AWS Lambda handler for invocable activities."""
410+
411+
def _handler(event: Mapping[str, Any], context: Any) -> dict[str, Any]:
412+
return asyncio.run(handle_invocable_lambda_event(event, handlers, **options))
413+
414+
return _handler
415+
416+
417+
def _coerce_json_object(body: bytes | str | Mapping[str, Any]) -> dict[str, Any]:
418+
if isinstance(body, Mapping):
419+
return dict(body)
420+
421+
if isinstance(body, bytes):
422+
body = body.decode("utf-8")
423+
424+
if not isinstance(body, str):
425+
raise TypeError("request body must be bytes, str, or a JSON object")
426+
427+
decoded = json.loads(body)
428+
if not isinstance(decoded, dict):
429+
raise ValueError("request body must decode to a JSON object")
430+
431+
return cast(dict[str, Any], decoded)
432+
433+
434+
def _lambda_event_body(event: Mapping[str, Any]) -> bytes | str | Mapping[str, Any]:
435+
body = event.get("body")
436+
if isinstance(body, str) and event.get("isBase64Encoded") is True:
437+
return base64.b64decode(body)
438+
if isinstance(body, (bytes, str, Mapping)):
439+
return cast(bytes | str | Mapping[str, Any], body)
440+
raise ValueError("Lambda event body must contain the invocable request JSON")
441+
442+
443+
def _json_dump(value: Mapping[str, Any]) -> str:
444+
return json.dumps(value, separators=(",", ":"), sort_keys=True)

tests/test_invocable.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import base64
23
import copy
34
import json
45
import time
@@ -11,7 +12,12 @@
1112
from durable_workflow import serializer
1213
from durable_workflow.errors import NonRetryableError
1314
from durable_workflow.external_task_result import parse_external_task_result
14-
from durable_workflow.invocable import InvocableActivityHandler, handle_invocable_activity
15+
from durable_workflow.invocable import (
16+
InvocableActivityHandler,
17+
handle_invocable_activity,
18+
handle_invocable_http_request,
19+
lambda_invocable_activity_handler,
20+
)
1521

1622
FIXTURES = Path(__file__).parent / "fixtures" / "external-task-input"
1723

@@ -187,3 +193,70 @@ async def test_invocable_activity_handler_rejects_workflow_task_inputs() -> None
187193
async def test_invocable_activity_handler_validates_result_codec() -> None:
188194
with pytest.raises(ValueError, match="unsupported invocable result codec"):
189195
InvocableActivityHandler({}, result_codec="protobuf")
196+
197+
198+
async def test_invocable_http_request_helper_returns_result_response() -> None:
199+
response = await handle_invocable_http_request(
200+
json.dumps(activity_input(42)),
201+
{"billing.charge-card": lambda amount: {"amount": amount}},
202+
result_codec=serializer.JSON_CODEC,
203+
)
204+
205+
assert response.status_code == 200
206+
assert response.headers["Content-Type"] == "application/vnd.durable-workflow.external-task-result+json"
207+
208+
result = parse_external_task_result(response.json())
209+
assert result.succeeded is True
210+
assert result.result is not None
211+
assert serializer.decode_envelope(result.result["payload"]) == {"amount": 42}
212+
213+
214+
async def test_invocable_http_request_helper_accepts_bytes_body() -> None:
215+
response = await handle_invocable_http_request(
216+
json.dumps(activity_input(7)).encode(),
217+
{"billing.charge-card": lambda amount: {"amount": amount}},
218+
result_codec=serializer.JSON_CODEC,
219+
)
220+
221+
assert response.status_code == 200
222+
result = parse_external_task_result(response.json())
223+
assert result.succeeded is True
224+
225+
226+
async def test_invocable_http_request_helper_rejects_invalid_json_body() -> None:
227+
response = await handle_invocable_http_request("{", {})
228+
229+
assert response.status_code == 400
230+
assert response.json()["error"] == "invalid_invocable_request"
231+
232+
233+
def test_lambda_invocable_activity_handler_accepts_base64_api_gateway_body() -> None:
234+
handler = lambda_invocable_activity_handler(
235+
{"billing.charge-card": lambda amount: {"amount": amount}},
236+
result_codec=serializer.JSON_CODEC,
237+
)
238+
body = json.dumps(activity_input(99)).encode()
239+
240+
response = handler(
241+
{
242+
"body": base64.b64encode(body).decode(),
243+
"isBase64Encoded": True,
244+
},
245+
None,
246+
)
247+
248+
assert response["statusCode"] == 200
249+
assert response["headers"]["Content-Type"] == "application/vnd.durable-workflow.external-task-result+json"
250+
251+
result = parse_external_task_result(json.loads(response["body"]))
252+
assert result.succeeded is True
253+
assert result.result is not None
254+
assert serializer.decode_envelope(result.result["payload"]) == {"amount": 99}
255+
256+
257+
def test_lambda_invocable_activity_handler_rejects_missing_body() -> None:
258+
handler = lambda_invocable_activity_handler({})
259+
response = handler({"body": None}, None)
260+
261+
assert response["statusCode"] == 400
262+
assert json.loads(response["body"])["error"] == "invalid_invocable_request"

0 commit comments

Comments
 (0)