Skip to content

Commit 8fc211a

Browse files
feat(avro): optional Avro codec support for worker + client (#362)
Adds the 'avro' payload codec as an optional extra to the Python SDK, matching the wire format produced by the PHP workflow package's Workflow\Serializers\Avro serializer (base64 of 0x00 + Avro binary of generic-wrapper record {json: string, version: int}). - pyproject.toml: [avro] extra pulls in apache/avro ^1.12; mypy override added since upstream does not ship type stubs. - src/durable_workflow/_avro.py: new module encapsulating encode/decode for the generic wrapper format. Loud typed errors for typed-schema (0x01) prefix, unknown prefix, and JSON-tagged-as-avro ingress. - serializer.encode/decode/envelope now take a codec= argument and dispatch to the correct backend. decode_envelope honors the inner codec tag over the caller's codec= hint. - Worker._run_activity_task: decodes inbound arguments with task.payload_codec and echoes the same codec on complete_activity_task so results round-trip under the run's codec. - Client.complete_activity_task: new codec= parameter (defaults to json) plumbs through to serializer.envelope. - errors.AvroNotInstalledError: raised when the extra is missing; subclass of ImportError so existing optional-dep try/except patterns still catch it. Tests verify round-trip for primitives and containers, decode of known PHP-produced fixture blobs (cross-language wire compatibility), typed/ unknown-prefix errors, JSON-tagged-as-avro diagnostic, envelope routing by inner codec, and worker activity echo of inbound codec.
1 parent 551cb98 commit 8fc211a

9 files changed

Lines changed: 366 additions & 28 deletions

File tree

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414
attempting to drive a future breaking-release server. (#302)
1515
- `Client.get_cluster_info()` — fetches the server version and declared
1616
capability manifest from `/api/cluster/info`.
17+
- Avro payload codec support (optional). Install with
18+
`pip install 'durable-workflow[avro]'` to pull in `apache/avro 1.12`.
19+
`serializer.encode()`, `serializer.decode()`, and
20+
`serializer.envelope()` now accept a `codec=` argument, and
21+
`decode_envelope()` honors the inner codec tag. The Worker decodes
22+
Avro-coded activity arguments and echoes the inbound codec on its
23+
`complete_activity_task` result. Wire format is the Durable Workflow
24+
generic-wrapper (base64 of `0x00` + Avro binary of a `{json: string,
25+
version: int}` record), byte-compatible with the PHP
26+
`Workflow\Serializers\Avro` serializer. (#362)
1727

1828
## [0.1.0] — 2026-04-12
1929

pyproject.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ dependencies = [
3535
]
3636

3737
[project.optional-dependencies]
38+
avro = [
39+
"avro>=1.12,<2",
40+
]
3841
dev = [
42+
"avro>=1.12,<2",
3943
"pytest>=8.0",
4044
"pytest-asyncio>=0.23",
4145
"mypy>=1.10",
@@ -64,6 +68,11 @@ strict = true
6468
packages = ["durable_workflow"]
6569
mypy_path = "src"
6670

71+
# apache/avro does not ship type stubs; treat it as Any.
72+
[[tool.mypy.overrides]]
73+
module = ["avro", "avro.*"]
74+
ignore_missing_imports = true
75+
6776
[tool.ruff]
6877
target-version = "py310"
6978
line-length = 120

src/durable_workflow/_avro.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
"""Avro codec support for the Durable Workflow Python SDK.
2+
3+
The Durable Workflow server uses an Avro generic-wrapper format on the
4+
wire when the ``payload_codec`` tag is ``"avro"``. The wire layout is:
5+
6+
base64( 0x00 || avro_binary( record{ json: string, version: int } ) )
7+
8+
The ``json`` field carries ``json.dumps(value)``; ``version`` is currently
9+
``1``. A ``0x01`` prefix is reserved for typed-schema payloads — those
10+
are not yet encodeable/decodeable from this SDK because typed schemas
11+
require a schema registry that is out of scope for the first Avro release.
12+
13+
The ``avro`` third-party package is an *optional* runtime dependency.
14+
Install it with::
15+
16+
pip install 'durable-workflow[avro]'
17+
18+
If the extra is not installed, calling :func:`encode` or :func:`decode`
19+
raises :class:`AvroNotInstalledError` with the install hint.
20+
"""
21+
from __future__ import annotations
22+
23+
import base64
24+
import io
25+
import json
26+
from typing import Any
27+
28+
from .errors import AvroNotInstalledError
29+
30+
WRAPPER_SCHEMA_JSON = (
31+
'{"type":"record","name":"Payload","namespace":"durable_workflow",'
32+
'"fields":[{"name":"json","type":"string"},'
33+
'{"name":"version","type":"int","default":1}]}'
34+
)
35+
WRAPPER_VERSION = 1
36+
_PREFIX_GENERIC_WRAPPER = b"\x00"
37+
_PREFIX_TYPED_SCHEMA = b"\x01"
38+
39+
40+
def _load_avro_schema() -> Any:
41+
try:
42+
import avro.schema
43+
except ImportError as exc:
44+
raise AvroNotInstalledError(
45+
"The 'avro' package is required to encode/decode payloads with the 'avro' "
46+
"codec. Install with: pip install 'durable-workflow[avro]'"
47+
) from exc
48+
49+
return avro.schema.parse(WRAPPER_SCHEMA_JSON)
50+
51+
52+
def encode(value: Any) -> str:
53+
"""Encode a Python value as an Avro generic-wrapper payload blob.
54+
55+
Returns a base64 string the server accepts under ``payload_codec="avro"``.
56+
"""
57+
try:
58+
import avro.io
59+
except ImportError as exc:
60+
raise AvroNotInstalledError(
61+
"The 'avro' package is required to encode payloads with the 'avro' "
62+
"codec. Install with: pip install 'durable-workflow[avro]'"
63+
) from exc
64+
65+
schema = _load_avro_schema()
66+
buf = io.BytesIO()
67+
encoder = avro.io.BinaryEncoder(buf)
68+
writer = avro.io.DatumWriter(schema)
69+
writer.write(
70+
{
71+
"json": json.dumps(value, separators=(",", ":"), ensure_ascii=False),
72+
"version": WRAPPER_VERSION,
73+
},
74+
encoder,
75+
)
76+
return base64.b64encode(_PREFIX_GENERIC_WRAPPER + buf.getvalue()).decode("ascii")
77+
78+
79+
def decode(blob: str) -> Any:
80+
"""Decode an Avro ``payload_codec="avro"`` blob into a Python value.
81+
82+
Accepts the server's generic-wrapper format (prefix ``0x00``). Typed
83+
schemas (prefix ``0x01``) raise :class:`ValueError` because the SDK
84+
has no schema registry.
85+
"""
86+
try:
87+
import avro.io
88+
except ImportError as exc:
89+
raise AvroNotInstalledError(
90+
"The 'avro' package is required to decode payloads with the 'avro' "
91+
"codec. Install with: pip install 'durable-workflow[avro]'"
92+
) from exc
93+
94+
try:
95+
raw = base64.b64decode(blob, validate=True)
96+
except (ValueError, TypeError) as exc:
97+
_diagnose_ingress(blob, exc)
98+
99+
if not raw:
100+
raise ValueError("Avro payload is empty after base64 decode.")
101+
102+
prefix = raw[:1]
103+
if prefix == _PREFIX_TYPED_SCHEMA:
104+
raise ValueError(
105+
"Typed Avro payload (prefix 0x01) received without a schema context. "
106+
"This SDK currently supports only the generic wrapper (prefix 0x00); "
107+
"typed schemas are not yet implemented."
108+
)
109+
if prefix != _PREFIX_GENERIC_WRAPPER:
110+
raise ValueError(
111+
f"Unknown Avro payload prefix: 0x{prefix.hex()} "
112+
f"(expected 0x00 generic wrapper or 0x01 typed schema). "
113+
f"These bytes were not produced by a Durable Workflow Avro serializer."
114+
)
115+
116+
schema = _load_avro_schema()
117+
reader = avro.io.DatumReader(schema)
118+
decoder = avro.io.BinaryDecoder(io.BytesIO(raw[1:]))
119+
try:
120+
record = reader.read(decoder)
121+
except Exception as exc:
122+
raise ValueError(f"Avro generic-wrapper decode failed: {exc}") from exc
123+
124+
if not isinstance(record, dict) or "json" not in record:
125+
raise ValueError(
126+
"Avro generic-wrapper payload did not decode to a {json, version} record."
127+
)
128+
129+
try:
130+
return json.loads(record["json"])
131+
except (TypeError, json.JSONDecodeError) as exc:
132+
raise ValueError(f"Avro generic-wrapper 'json' field is not valid JSON: {exc}") from exc
133+
134+
135+
def _diagnose_ingress(blob: str, cause: Exception) -> None:
136+
"""Re-raise an ingress base64 failure with a typed remediation hint."""
137+
stripped = blob.lstrip() if isinstance(blob, str) else ""
138+
looks_like_json = stripped[:1] in {"{", "[", '"', "-", "t", "f", "n"} or (
139+
stripped[:1].isdigit() if stripped else False
140+
)
141+
if looks_like_json:
142+
raise ValueError(
143+
"Payload bytes look like JSON, not base64-encoded Avro. The producer "
144+
"appears to have JSON-encoded the payload but tagged it with codec "
145+
'"avro". Either change the codec tag to "json", or re-encode the '
146+
'payload with the Avro serializer before tagging it "avro".'
147+
) from cause
148+
149+
raise ValueError(
150+
"Failed to base64-decode Avro payload bytes. Avro payloads on the wire "
151+
"must be base64-encoded bytes whose first byte is 0x00 (generic wrapper) "
152+
"or 0x01 (typed schema). Re-encode the payload, or change the codec tag "
153+
"if the producer used a different codec."
154+
) from cause

src/durable_workflow/client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -827,11 +827,12 @@ async def complete_activity_task(
827827
activity_attempt_id: str,
828828
lease_owner: str,
829829
result: Any,
830+
codec: str = serializer.JSON_CODEC,
830831
) -> Any:
831832
body: dict[str, Any] = {
832833
"activity_attempt_id": activity_attempt_id,
833834
"lease_owner": lease_owner,
834-
"result": serializer.envelope(result),
835+
"result": serializer.envelope(result, codec=codec),
835836
}
836837
return await self._request(
837838
"POST", f"/worker/activity-tasks/{task_id}/complete", worker=True, json=body

src/durable_workflow/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ def __init__(self, message: str, *, cause: Exception | None = None) -> None:
101101
self.__cause__ = cause
102102

103103

104+
class AvroNotInstalledError(DurableWorkflowError, ImportError):
105+
"""Raised when Avro codec is requested but the ``avro`` extra is not installed."""
106+
107+
104108
def _raise_for_status(status: int, body: object, *, context: str = "") -> None:
105109
if status < 400:
106110
return

src/durable_workflow/serializer.py

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,56 @@
22
33
The server exposes a language-neutral payload envelope (see issue #164 and
44
``docs/configuration/worker-protocol.md`` in the docs repo). Every payload on
5-
the wire carries a ``payload_codec`` tag alongside its opaque blob. Python
6-
workers use the ``json`` codec exclusively: the blob is a raw UTF-8 JSON
7-
document.
5+
the wire carries a ``payload_codec`` tag alongside its opaque blob.
6+
7+
Supported codecs:
8+
9+
- ``"json"`` — the blob is a UTF-8 JSON document. Default and always
10+
available.
11+
- ``"avro"`` — the blob is a base64-encoded Avro generic-wrapper payload
12+
(see :mod:`durable_workflow._avro`). Requires the optional ``avro``
13+
extra: ``pip install 'durable-workflow[avro]'``.
814
"""
915
from __future__ import annotations
1016

1117
import json
1218
from typing import Any
1319

20+
from . import _avro
21+
1422
JSON_CODEC = "json"
23+
AVRO_CODEC = "avro"
24+
SUPPORTED_CODECS = (JSON_CODEC, AVRO_CODEC)
1525

1626

17-
def encode(value: Any) -> str:
18-
"""Encode a Python value as a JSON-codec payload blob."""
19-
return json.dumps(value, separators=(",", ":"), ensure_ascii=False)
27+
def encode(value: Any, codec: str = JSON_CODEC) -> str:
28+
"""Encode a Python value as a payload blob for *codec*.
29+
30+
Raises ``ValueError`` for unknown codecs and
31+
:class:`~durable_workflow.errors.AvroNotInstalledError` when the Avro
32+
extra is requested but not installed.
33+
"""
34+
if codec == JSON_CODEC:
35+
return json.dumps(value, separators=(",", ":"), ensure_ascii=False)
36+
if codec == AVRO_CODEC:
37+
return _avro.encode(value)
38+
raise ValueError(
39+
f"Unsupported payload codec {codec!r}; this SDK supports {SUPPORTED_CODECS!r}."
40+
)
2041

2142

22-
def envelope(value: Any) -> dict[str, str]:
43+
def envelope(value: Any, codec: str = JSON_CODEC) -> dict[str, str]:
2344
"""Wrap a value in a ``{codec, blob}`` payload envelope."""
24-
return {"codec": JSON_CODEC, "blob": encode(value)}
45+
return {"codec": codec, "blob": encode(value, codec=codec)}
2546

2647

2748
def decode_envelope(value: Any, codec: str | None = None) -> Any:
28-
"""Decode a value that may be a ``{codec, blob}`` envelope or a raw blob."""
49+
"""Decode a value that may be a ``{codec, blob}`` envelope or a raw blob.
50+
51+
When *value* is an envelope, its inner ``codec`` takes precedence over
52+
the *codec* argument. When *value* is a raw blob, *codec* selects the
53+
decoder (defaulting to JSON).
54+
"""
2955
if isinstance(value, dict) and "codec" in value and "blob" in value:
3056
return decode(value["blob"], codec=value["codec"])
3157
return decode(value, codec=codec)
@@ -34,21 +60,24 @@ def decode_envelope(value: Any, codec: str | None = None) -> Any:
3460
def decode(blob: str | None, codec: str | None = None) -> Any:
3561
"""Decode a payload blob into a Python value.
3662
37-
Raises ``ValueError`` when *codec* names a non-JSON codec or when *blob*
38-
is not valid JSON.
63+
Raises ``ValueError`` for unknown codecs or malformed blobs, and
64+
:class:`~durable_workflow.errors.AvroNotInstalledError` when the Avro
65+
extra is requested but not installed.
3966
"""
4067
if blob is None or blob == "":
4168
return None
4269

43-
if codec is not None and codec != JSON_CODEC:
44-
raise ValueError(
45-
f"Cannot decode payload with codec {codec!r}: this SDK only "
46-
f"supports the {JSON_CODEC!r} codec. Ensure the workflow was "
47-
f"started with a JSON input or an explicit "
48-
f'{{"codec": "json", "blob": "..."}} envelope.'
49-
)
50-
51-
try:
52-
return json.loads(blob)
53-
except (json.JSONDecodeError, TypeError) as exc:
54-
raise ValueError(f"Payload is not valid JSON: {exc}") from exc
70+
if codec is None or codec == JSON_CODEC:
71+
try:
72+
return json.loads(blob)
73+
except (json.JSONDecodeError, TypeError) as exc:
74+
raise ValueError(f"Payload is not valid JSON: {exc}") from exc
75+
76+
if codec == AVRO_CODEC:
77+
return _avro.decode(blob)
78+
79+
raise ValueError(
80+
f"Cannot decode payload with codec {codec!r}: this SDK supports "
81+
f"{SUPPORTED_CODECS!r}. Ensure the workflow was started with a "
82+
f"compatible codec or an explicit {{'codec': '<codec>', 'blob': '...'}} envelope."
83+
)

src/durable_workflow/worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ async def _run_activity_task(self, task: dict[str, Any]) -> None:
180180
activity_type: str = task.get("activity_type", "")
181181
attempt_number: int = task.get("attempt_number", 1)
182182
raw_args = task.get("arguments")
183-
args = serializer.decode_envelope(raw_args, codec=task.get("payload_codec")) or []
183+
inbound_codec = task.get("payload_codec") or serializer.JSON_CODEC
184+
result_codec = inbound_codec if inbound_codec in serializer.SUPPORTED_CODECS else serializer.JSON_CODEC
185+
args = serializer.decode_envelope(raw_args, codec=inbound_codec) or []
184186
if not isinstance(args, list):
185187
args = [args]
186188

@@ -264,6 +266,7 @@ async def _run_activity_task(self, task: dict[str, Any]) -> None:
264266
activity_attempt_id=attempt_id,
265267
lease_owner=self.worker_id,
266268
result=result,
269+
codec=result_codec,
267270
)
268271
except Exception as e:
269272
log.warning("failed to complete activity task %s: %s", task_id, e)

0 commit comments

Comments
 (0)