Skip to content

Commit 5d9485c

Browse files
Cache verified external payload bytes
1 parent 2d4b8ca commit 5d9485c

5 files changed

Lines changed: 149 additions & 3 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ failures surfaced as workflow failure commands.
205205
- **Polyglot**: Works alongside PHP workers on the same task queue
206206
- **HTTP/JSON protocol**: No gRPC, no protobuf dependencies
207207
- **Codec envelopes**: Avro payloads by default, with JSON decode compatibility for existing history
208-
- **External payload references**: opt-in reference envelopes and a local filesystem driver for large-payload offload experiments
208+
- **External payload references**: opt-in reference envelopes, a local filesystem driver, and a bounded verified-byte cache for large-payload offload experiments
209209
- **Payload-size warnings**: Structured warnings before oversized workflow, activity, schedule, signal, update, query, or search-attribute payloads reach the server
210210
- **Workflow definition guard**: Worker registration refuses same-id hot reloads when a workflow class definition changed
211211
- **Deterministic workflow helpers**: `ctx.now()`, `ctx.random()`, `ctx.uuid4()`, and `ctx.uuid7()` replay from workflow state

src/durable_workflow/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
)
5757
from .external_storage import (
5858
EXTERNAL_PAYLOAD_REFERENCE_SCHEMA,
59+
ExternalPayloadCache,
5960
ExternalPayloadIntegrityError,
6061
ExternalPayloadReference,
6162
ExternalStorageDriver,
@@ -171,6 +172,7 @@
171172
"workflow",
172173
"DurableWorkflowError",
173174
"EXTERNAL_PAYLOAD_REFERENCE_SCHEMA",
175+
"ExternalPayloadCache",
174176
"ExternalPayloadIntegrityError",
175177
"ExternalPayloadReference",
176178
"ExternalStorageDriver",

src/durable_workflow/external_storage.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import hashlib
6+
from collections import OrderedDict
67
from dataclasses import dataclass
78
from pathlib import Path
89
from typing import Protocol
@@ -76,6 +77,63 @@ def from_dict(cls, data: object) -> ExternalPayloadReference:
7677
return cls(uri=uri, sha256=sha256, size_bytes=size_bytes, codec=codec, schema=schema)
7778

7879

80+
class ExternalPayloadCache:
81+
"""Bounded cache for verified external payload bytes during replay.
82+
83+
Cache entries are keyed by the complete reference identity. Bytes are
84+
inserted only by :func:`fetch_external_payload` after size and sha256
85+
verification has succeeded, so cache hits preserve the same integrity
86+
contract as a fresh driver fetch.
87+
"""
88+
89+
def __init__(self, *, max_entries: int = 128, max_bytes: int = 16 * 1024 * 1024) -> None:
90+
if max_entries < 1:
91+
raise ValueError("external payload cache max_entries must be at least 1")
92+
if max_bytes < 1:
93+
raise ValueError("external payload cache max_bytes must be at least 1")
94+
self.max_entries = max_entries
95+
self.max_bytes = max_bytes
96+
self.current_bytes = 0
97+
self._entries: OrderedDict[tuple[str, str, int, str], bytes] = OrderedDict()
98+
99+
def get(self, reference: ExternalPayloadReference) -> bytes | None:
100+
key = self._key(reference)
101+
data = self._entries.get(key)
102+
if data is None:
103+
return None
104+
self._entries.move_to_end(key)
105+
return data
106+
107+
def put(self, reference: ExternalPayloadReference, data: bytes) -> None:
108+
if len(data) > self.max_bytes:
109+
return
110+
111+
key = self._key(reference)
112+
existing = self._entries.pop(key, None)
113+
if existing is not None:
114+
self.current_bytes -= len(existing)
115+
116+
self._entries[key] = data
117+
self.current_bytes += len(data)
118+
self._evict()
119+
120+
def clear(self) -> None:
121+
self._entries.clear()
122+
self.current_bytes = 0
123+
124+
def __len__(self) -> int:
125+
return len(self._entries)
126+
127+
@staticmethod
128+
def _key(reference: ExternalPayloadReference) -> tuple[str, str, int, str]:
129+
return (reference.uri, reference.sha256, reference.size_bytes, reference.codec)
130+
131+
def _evict(self) -> None:
132+
while len(self._entries) > self.max_entries or self.current_bytes > self.max_bytes:
133+
_, data = self._entries.popitem(last=False)
134+
self.current_bytes -= len(data)
135+
136+
79137
class LocalFilesystemExternalStorage:
80138
"""Dependency-free external storage driver for development and tests."""
81139

@@ -136,15 +194,24 @@ def store_external_payload(
136194
def fetch_external_payload(
137195
driver: ExternalStorageDriver,
138196
reference: ExternalPayloadReference,
197+
*,
198+
cache: ExternalPayloadCache | None = None,
139199
) -> bytes:
140200
"""Fetch payload bytes and verify size/hash before replay or decode."""
201+
if cache is not None:
202+
cached = cache.get(reference)
203+
if cached is not None:
204+
return cached
205+
141206
data = driver.get(reference.uri)
142207
if len(data) != reference.size_bytes:
143208
raise ExternalPayloadIntegrityError("external payload size does not match its reference")
144209

145210
actual_sha256 = hashlib.sha256(data).hexdigest()
146211
if actual_sha256 != reference.sha256:
147212
raise ExternalPayloadIntegrityError("external payload hash does not match its reference")
213+
if cache is not None:
214+
cache.put(reference, data)
148215
return data
149216

150217

src/durable_workflow/serializer.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from . import _avro
3232
from .external_storage import (
33+
ExternalPayloadCache,
3334
ExternalPayloadReference,
3435
ExternalStorageDriver,
3536
fetch_external_payload,
@@ -423,6 +424,7 @@ def decode_envelope(
423424
codec: str | None = None,
424425
*,
425426
external_storage: ExternalStorageDriver | None = None,
427+
external_storage_cache: ExternalPayloadCache | None = None,
426428
) -> Any:
427429
"""Decode a value that may be a ``{codec, blob}`` envelope or a raw blob.
428430
@@ -440,7 +442,7 @@ def decode_envelope(
440442
envelope_codec = value.get("codec")
441443
if envelope_codec is not None and envelope_codec != reference.codec:
442444
raise ValueError("external payload reference codec does not match envelope codec")
443-
blob = fetch_external_payload(external_storage, reference).decode("utf-8")
445+
blob = fetch_external_payload(external_storage, reference, cache=external_storage_cache).decode("utf-8")
444446
return decode(blob, codec=reference.codec)
445447
return decode(value, codec=codec)
446448

@@ -450,13 +452,19 @@ def decode_envelopes(
450452
codec: str | None = None,
451453
*,
452454
external_storage: ExternalStorageDriver | None = None,
455+
external_storage_cache: ExternalPayloadCache | None = None,
453456
) -> list[Any]:
454457
"""Decode several raw blobs or ``{codec, blob}`` envelopes in order."""
455458
if external_storage is not None or any(
456459
isinstance(value, dict) and "external_storage" in value for value in values
457460
):
458461
return [
459-
decode_envelope(value, codec=codec, external_storage=external_storage)
462+
decode_envelope(
463+
value,
464+
codec=codec,
465+
external_storage=external_storage,
466+
external_storage_cache=external_storage_cache,
467+
)
460468
for value in values
461469
]
462470

tests/test_external_storage.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from durable_workflow import serializer
77
from durable_workflow.external_storage import (
88
EXTERNAL_PAYLOAD_REFERENCE_SCHEMA,
9+
ExternalPayloadCache,
910
ExternalPayloadIntegrityError,
1011
ExternalPayloadReference,
1112
LocalFilesystemExternalStorage,
@@ -103,6 +104,74 @@ def test_fetch_external_payload_rejects_mutated_bytes(tmp_path: Path) -> None:
103104
fetch_external_payload(storage, reference)
104105

105106

107+
def test_fetch_external_payload_cache_reuses_verified_bytes(tmp_path: Path) -> None:
108+
storage = LocalFilesystemExternalStorage(tmp_path)
109+
cache = ExternalPayloadCache(max_entries=2, max_bytes=1024)
110+
reference = store_external_payload(storage, b'{"stable":true}', codec="json")
111+
path = Path(reference.uri.removeprefix("file://"))
112+
113+
assert fetch_external_payload(storage, reference, cache=cache) == b'{"stable":true}'
114+
path.write_bytes(b'{"stable":false}')
115+
116+
assert fetch_external_payload(storage, reference, cache=cache) == b'{"stable":true}'
117+
assert len(cache) == 1
118+
119+
120+
def test_fetch_external_payload_does_not_cache_failed_integrity_check(tmp_path: Path) -> None:
121+
storage = LocalFilesystemExternalStorage(tmp_path)
122+
cache = ExternalPayloadCache(max_entries=2, max_bytes=1024)
123+
reference = store_external_payload(storage, b'{"safe":true}', codec="json")
124+
path = Path(reference.uri.removeprefix("file://"))
125+
path.write_bytes(b'{"safe":false}')
126+
127+
with pytest.raises(ExternalPayloadIntegrityError, match="size|hash"):
128+
fetch_external_payload(storage, reference, cache=cache)
129+
130+
assert len(cache) == 0
131+
132+
133+
def test_decode_envelopes_can_share_external_payload_cache(tmp_path: Path) -> None:
134+
storage = LocalFilesystemExternalStorage(tmp_path)
135+
cache = ExternalPayloadCache(max_entries=2, max_bytes=1024)
136+
env = serializer.external_storage_envelope(
137+
{"message": "x" * 64},
138+
external_storage=storage,
139+
threshold_bytes=10,
140+
codec="json",
141+
)
142+
path = Path(env["external_storage"]["uri"].removeprefix("file://"))
143+
144+
assert serializer.decode_envelope(env, external_storage=storage, external_storage_cache=cache) == {
145+
"message": "x" * 64
146+
}
147+
path.write_bytes(b'{"message":"mutated"}')
148+
149+
assert serializer.decode_envelopes(
150+
[env],
151+
external_storage=storage,
152+
external_storage_cache=cache,
153+
) == [{"message": "x" * 64}]
154+
155+
156+
def test_external_payload_cache_is_bounded_by_entries_and_bytes(tmp_path: Path) -> None:
157+
storage = LocalFilesystemExternalStorage(tmp_path)
158+
cache = ExternalPayloadCache(max_entries=1, max_bytes=20)
159+
first = store_external_payload(storage, b"first", codec="json")
160+
second = store_external_payload(storage, b"second", codec="json")
161+
162+
fetch_external_payload(storage, first, cache=cache)
163+
fetch_external_payload(storage, second, cache=cache)
164+
165+
assert cache.get(first) is None
166+
assert cache.get(second) == b"second"
167+
168+
too_large = store_external_payload(storage, b"x" * 21, codec="json")
169+
fetch_external_payload(storage, too_large, cache=cache)
170+
171+
assert cache.get(too_large) is None
172+
assert cache.current_bytes <= cache.max_bytes
173+
174+
106175
def test_local_storage_rejects_file_uri_outside_root(tmp_path: Path) -> None:
107176
storage = LocalFilesystemExternalStorage(tmp_path / "root")
108177
outside = tmp_path / "outside"

0 commit comments

Comments
 (0)