Skip to content

Commit 9ac2f98

Browse files
Add Python order processing sample
1 parent 144349f commit 9ac2f98

13 files changed

Lines changed: 638 additions & 12 deletions

File tree

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
### Added
10+
- `Worker.run_until(workflow_id=..., timeout=...)` for examples, smoke tests,
11+
and single-workflow scripts that need to run a worker until one workflow
12+
reaches a terminal state.
13+
- A Docker Compose order-processing example under `examples/order_processing`
14+
that starts a local server and runs a multi-activity Python workflow
15+
end-to-end.
16+
917
### Changed
1018
- Worker compatibility checks now use `/api/cluster/info` protocol manifests
1119
as the authority instead of the top-level server app version. SDK 0.2.x
@@ -14,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1422
`durable-workflow.v2.control-plane-request.contract` version `1`, and
1523
`worker_protocol.version: "1.0"`. Missing, unknown, or undiscoverable
1624
compatibility states fail closed.
25+
- `Client.get_result()` now decodes `WorkflowCompleted` output with the event
26+
or workflow payload codec instead of assuming JSON.
1727

1828
## [0.2.0] — 2026-04-17
1929

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,15 @@ async def main():
4545
task_queue="python-workers",
4646
input=["world"],
4747
)
48-
await worker.run_until(workflow_id="greet-1")
48+
await worker.run_until(workflow_id="greet-1", timeout=30.0)
4949
result = await client.get_result(handle)
5050
print(result) # "hello, world"
5151
```
5252

53+
For a fuller deployable example, see
54+
[`examples/order_processing`](examples/order_processing), which runs a
55+
multi-activity order workflow against a local server with Docker Compose.
56+
5357
## Features
5458

5559
- **Async-first**: Built on `httpx` and `asyncio`

examples/greeter.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,10 @@ async def main() -> int:
5151
task_queue=task_queue,
5252
workflows=[GreeterWorkflow],
5353
activities=[greet],
54-
poll_timeout=5.0,
5554
)
5655

57-
log.info("running worker for 30s...")
58-
worker_task = asyncio.create_task(worker.run())
59-
await asyncio.sleep(30.0)
60-
worker.stop()
61-
worker_task.cancel()
62-
try:
63-
await worker_task
64-
except (asyncio.CancelledError, Exception):
65-
pass
56+
log.info("running worker until workflow completes...")
57+
await worker.run_until(workflow_id=wf_id, timeout=30.0)
6658

6759
log.info("worker stopped, checking result...")
6860
async with Client(url, token=token, namespace="default") as client2:
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
FROM python:3.12-slim
2+
3+
ENV PYTHONDONTWRITEBYTECODE=1 \
4+
PYTHONUNBUFFERED=1
5+
6+
WORKDIR /app
7+
8+
COPY pyproject.toml README.md LICENSE ./
9+
COPY src ./src
10+
11+
RUN pip install --no-cache-dir .
12+
13+
COPY examples ./examples
14+
15+
CMD ["python", "-m", "examples.order_processing.app"]
16+
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Order Processing Example
2+
3+
This sample runs a Python-authored workflow against a local Durable Workflow
4+
server. It is intentionally larger than the greeter example: one workflow
5+
coordinates inventory reservation, payment authorization, shipment creation,
6+
and customer confirmation activities.
7+
8+
## Run
9+
10+
From this directory:
11+
12+
```bash
13+
docker compose up --build --exit-code-from python-worker python-worker
14+
docker compose down -v
15+
```
16+
17+
The `python-worker` service starts a workflow, runs the Python worker until that
18+
workflow reaches a terminal state, prints the completed workflow result as JSON,
19+
and exits with a non-zero code if the workflow fails.
20+
21+
## Configuration
22+
23+
The Compose file uses `durableworkflow/server:0.2.2` by default. Override it
24+
when testing a specific server release:
25+
26+
```bash
27+
SERVER_IMAGE=durableworkflow/server:0.1.9 docker compose up --build --exit-code-from python-worker python-worker
28+
```
29+
30+
Useful environment variables:
31+
32+
| Variable | Default | Purpose |
33+
| --- | --- | --- |
34+
| `SERVER_IMAGE` | `durableworkflow/server:0.2.2` | Server image used by bootstrap, API, and queue worker services |
35+
| `SERVER_PORT` | `8080` | Host port for the server API |
36+
| `WORKFLOW_SERVER_AUTH_TOKEN` | `sample-token` | Token accepted by the local server and sent by the Python SDK |
37+
| `ORDER_ID` | generated | Order id used as the workflow id |
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
"""Order-processing example package."""
2+

examples/order_processing/app.py

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
"""Non-trivial Python SDK example: process an ecommerce order.
2+
3+
The workflow uses four sequential activities so replay has to advance through
4+
several task completions before producing a final result.
5+
"""
6+
from __future__ import annotations
7+
8+
import asyncio
9+
import json
10+
import logging
11+
import os
12+
import sys
13+
import uuid
14+
from typing import Any
15+
16+
from durable_workflow import Client, Worker, activity, workflow
17+
18+
TASK_QUEUE = "orders-python"
19+
20+
21+
def sample_order(order_id: str | None = None) -> dict[str, Any]:
22+
oid = order_id or f"order-{uuid.uuid4().hex[:8]}"
23+
return {
24+
"order_id": oid,
25+
"customer": {
26+
"id": "cust-1001",
27+
"email": "ada@example.com",
28+
"tier": "gold",
29+
"shipping_region": "US-CA",
30+
},
31+
"payment": {
32+
"token": "tok_sample_visa",
33+
"currency": "USD",
34+
},
35+
"items": [
36+
{"sku": "dwf-shirt", "quantity": 2, "unit_price_cents": 3200},
37+
{"sku": "dwf-mug", "quantity": 1, "unit_price_cents": 1800},
38+
],
39+
}
40+
41+
42+
def total_order_cents(order: dict[str, Any]) -> int:
43+
return sum(int(item["quantity"]) * int(item["unit_price_cents"]) for item in order["items"])
44+
45+
46+
@activity.defn(name="orders.reserve_inventory")
47+
def reserve_inventory(order_id: str, items: list[dict[str, Any]]) -> dict[str, Any]:
48+
reservations = [
49+
{
50+
"sku": item["sku"],
51+
"quantity": int(item["quantity"]),
52+
"reservation_id": f"res-{order_id}-{item['sku']}",
53+
}
54+
for item in items
55+
]
56+
return {
57+
"order_id": order_id,
58+
"status": "reserved",
59+
"reservations": reservations,
60+
"total_units": sum(item["quantity"] for item in reservations),
61+
}
62+
63+
64+
@activity.defn(name="orders.charge_payment")
65+
def charge_payment(order_id: str, amount_cents: int, payment: dict[str, Any]) -> dict[str, Any]:
66+
return {
67+
"order_id": order_id,
68+
"status": "authorized",
69+
"amount_cents": amount_cents,
70+
"currency": payment["currency"],
71+
"authorization_id": f"auth-{order_id}",
72+
}
73+
74+
75+
@activity.defn(name="orders.create_shipment")
76+
def create_shipment(
77+
order_id: str,
78+
customer: dict[str, Any],
79+
inventory: dict[str, Any],
80+
) -> dict[str, Any]:
81+
return {
82+
"order_id": order_id,
83+
"status": "label_created",
84+
"shipment_id": f"ship-{order_id}",
85+
"region": customer["shipping_region"],
86+
"packages": max(1, int(inventory["total_units"])),
87+
}
88+
89+
90+
@activity.defn(name="orders.send_confirmation")
91+
def send_confirmation(
92+
order_id: str,
93+
customer: dict[str, Any],
94+
payment: dict[str, Any],
95+
shipment: dict[str, Any],
96+
) -> dict[str, Any]:
97+
return {
98+
"order_id": order_id,
99+
"status": "sent",
100+
"email": customer["email"],
101+
"template": "order-confirmed",
102+
"summary": f"{payment['currency']} {payment['amount_cents'] / 100:.2f} via {shipment['shipment_id']}",
103+
}
104+
105+
106+
@workflow.defn(name="orders.process")
107+
class ProcessOrderWorkflow:
108+
def run(self, ctx, order): # type: ignore[no-untyped-def]
109+
amount_cents = total_order_cents(order)
110+
inventory = yield ctx.schedule_activity(
111+
"orders.reserve_inventory",
112+
[order["order_id"], order["items"]],
113+
)
114+
payment = yield ctx.schedule_activity(
115+
"orders.charge_payment",
116+
[order["order_id"], amount_cents, order["payment"]],
117+
)
118+
shipment = yield ctx.schedule_activity(
119+
"orders.create_shipment",
120+
[order["order_id"], order["customer"], inventory],
121+
)
122+
confirmation = yield ctx.schedule_activity(
123+
"orders.send_confirmation",
124+
[order["order_id"], order["customer"], payment, shipment],
125+
)
126+
return {
127+
"order_id": order["order_id"],
128+
"status": "confirmed",
129+
"amount_cents": amount_cents,
130+
"inventory": inventory,
131+
"payment": payment,
132+
"shipment": shipment,
133+
"confirmation": confirmation,
134+
}
135+
136+
137+
async def run_order() -> dict[str, Any]:
138+
server_url = os.environ.get("SERVER_URL", "http://localhost:8080")
139+
token = os.environ.get("WORKFLOW_TOKEN", "sample-token")
140+
namespace = os.environ.get("WORKFLOW_NAMESPACE", "default")
141+
task_queue = os.environ.get("TASK_QUEUE", TASK_QUEUE)
142+
order = sample_order(os.environ.get("ORDER_ID"))
143+
workflow_id = os.environ.get("WORKFLOW_ID", order["order_id"])
144+
145+
async with Client(server_url, token=token, namespace=namespace) as client:
146+
handle = await client.start_workflow(
147+
workflow_type="orders.process",
148+
workflow_id=workflow_id,
149+
task_queue=task_queue,
150+
input=[order],
151+
memo={
152+
"sample": "order_processing",
153+
"customer_id": order["customer"]["id"],
154+
"total_cents": total_order_cents(order),
155+
},
156+
)
157+
158+
worker = Worker(
159+
client,
160+
task_queue=task_queue,
161+
workflows=[ProcessOrderWorkflow],
162+
activities=[
163+
reserve_inventory,
164+
charge_payment,
165+
create_shipment,
166+
send_confirmation,
167+
],
168+
shutdown_timeout=10.0,
169+
)
170+
await worker.run_until(workflow_id=workflow_id, timeout=90.0, poll_interval=0.5)
171+
result = await handle.result(timeout=10.0, poll_interval=0.5)
172+
if not isinstance(result, dict):
173+
raise TypeError(f"expected workflow result object, got {type(result).__name__}")
174+
return result
175+
176+
177+
async def main() -> int:
178+
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
179+
log = logging.getLogger("order_processing")
180+
try:
181+
result = await run_order()
182+
except Exception:
183+
log.exception("order workflow failed")
184+
return 1
185+
186+
print(json.dumps(result, indent=2, sort_keys=True), flush=True)
187+
return 0
188+
189+
190+
if __name__ == "__main__":
191+
sys.exit(asyncio.run(main()))

0 commit comments

Comments
 (0)