Skip to content

Commit 41680d8

Browse files
Add object-store external payload drivers
Add object-store external payload drivers
1 parent 2071623 commit 41680d8

4 files changed

Lines changed: 356 additions & 3 deletions

File tree

README.md

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,14 +198,39 @@ infers the workflow type and input from that event; otherwise pass
198198
contains the commands the workflow would emit next, including determinism
199199
failures surfaced as workflow failure commands.
200200

201+
## External payload storage
202+
203+
Large payload offload is opt-in. `serializer.external_storage_envelope(...)`
204+
keeps small encoded payloads inline and stores larger bytes through an
205+
`ExternalStorageDriver`, returning a stable reference envelope with URI, codec,
206+
size, and SHA-256 integrity metadata. `serializer.decode_envelope(...)` fetches
207+
referenced bytes through the same driver and verifies size/hash before decode.
208+
209+
The SDK includes a local filesystem driver for development plus dependency-free
210+
S3, GCS, and Azure Blob adapters. Cloud SDKs stay application-owned: pass an
211+
already-configured boto3-compatible S3 client, google-cloud-storage client, or
212+
azure-storage-blob container client when your deployment enables external
213+
payload storage.
214+
215+
```python
216+
from durable_workflow import S3ExternalStorage, serializer
217+
218+
storage = S3ExternalStorage(s3_client, bucket="workflow-payloads", prefix="prod")
219+
payload = serializer.external_storage_envelope(
220+
{"large": "value"},
221+
external_storage=storage,
222+
threshold_bytes=2 * 1024 * 1024,
223+
)
224+
```
225+
201226
## Features
202227

203228
- **Async-first**: Built on `httpx` and `asyncio`
204229
- **Type-safe**: Full type hints, passes `mypy --strict`
205230
- **Polyglot**: Works alongside PHP workers on the same task queue
206231
- **HTTP/JSON protocol**: No gRPC, no protobuf dependencies
207232
- **Codec envelopes**: Avro payloads by default, with JSON decode compatibility for existing history
208-
- **External payload references**: opt-in reference envelopes, a local filesystem driver, and a bounded verified-byte cache for large-payload offload experiments
233+
- **External payload references**: opt-in reference envelopes, local filesystem/S3/GCS/Azure Blob drivers, and a bounded verified-byte cache for large-payload offload experiments
209234
- **Payload-size warnings**: Structured warnings before oversized workflow, activity, schedule, signal, update, query, or search-attribute payloads reach the server
210235
- **Workflow definition guard**: Worker registration refuses same-id hot reloads when a workflow class definition changed
211236
- **Deterministic workflow helpers**: `ctx.now()`, `ctx.random()`, `ctx.uuid4()`, and `ctx.uuid7()` replay from workflow state

src/durable_workflow/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,14 @@
5757
)
5858
from .external_storage import (
5959
EXTERNAL_PAYLOAD_REFERENCE_SCHEMA,
60+
AzureBlobExternalStorage,
6061
ExternalPayloadCache,
6162
ExternalPayloadIntegrityError,
6263
ExternalPayloadReference,
6364
ExternalStorageDriver,
65+
GCSExternalStorage,
6466
LocalFilesystemExternalStorage,
67+
S3ExternalStorage,
6568
)
6669
from .external_task_input import (
6770
EXTERNAL_TASK_INPUT_CONTRACT_SCHEMA,
@@ -175,6 +178,7 @@
175178
"workflow",
176179
"DurableWorkflowError",
177180
"EXTERNAL_PAYLOAD_REFERENCE_SCHEMA",
181+
"AzureBlobExternalStorage",
178182
"ExternalPayloadCache",
179183
"ExternalPayloadIntegrityError",
180184
"ExternalPayloadReference",
@@ -194,6 +198,7 @@
194198
"InvalidArgument",
195199
"InMemoryMetrics",
196200
"InvocableActivityHandler",
201+
"GCSExternalStorage",
197202
"LocalFilesystemExternalStorage",
198203
"MetricsRecorder",
199204
"NamespaceNotFound",
@@ -206,6 +211,7 @@
206211
"QueryTaskHandler",
207212
"QueryTaskInterceptorContext",
208213
"RetryPolicy",
214+
"S3ExternalStorage",
209215
"ServerError",
210216
"TransportRetryPolicy",
211217
"Unauthorized",

src/durable_workflow/external_storage.py

Lines changed: 169 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from collections import OrderedDict
77
from dataclasses import dataclass
88
from pathlib import Path
9-
from typing import Protocol
10-
from urllib.parse import unquote, urlparse
9+
from typing import Any, Protocol
10+
from urllib.parse import quote, unquote, urlparse
1111

1212
EXTERNAL_PAYLOAD_REFERENCE_SCHEMA = "durable-workflow.v2.external-payload-reference.v1"
1313

@@ -174,6 +174,130 @@ def _path_from_uri(self, uri: str) -> Path:
174174
return path
175175

176176

177+
class S3ExternalStorage:
178+
"""External storage driver backed by a boto3-compatible S3 client.
179+
180+
The SDK does not depend on boto3. Applications that need S3 pass an
181+
already-configured client exposing ``put_object``, ``get_object``, and
182+
``delete_object``.
183+
"""
184+
185+
def __init__(self, client: Any, *, bucket: str, prefix: str = "") -> None:
186+
if not bucket:
187+
raise ValueError("s3 external storage bucket must be non-empty")
188+
self.client = client
189+
self.bucket = bucket
190+
self.prefix = _normalize_object_prefix(prefix)
191+
192+
def put(self, data: bytes, *, sha256: str, codec: str) -> str:
193+
key = _object_key(self.prefix, sha256=sha256, codec=codec)
194+
self.client.put_object(
195+
Bucket=self.bucket,
196+
Key=key,
197+
Body=data,
198+
ContentType="application/octet-stream",
199+
Metadata={"sha256": sha256, "codec": codec},
200+
)
201+
return _object_uri("s3", self.bucket, key)
202+
203+
def get(self, uri: str) -> bytes:
204+
bucket, key = _parse_object_uri(uri, scheme="s3", expected_bucket=self.bucket, expected_prefix=self.prefix)
205+
response = self.client.get_object(Bucket=bucket, Key=key)
206+
body = response["Body"]
207+
data = body.read() if hasattr(body, "read") else body
208+
if not isinstance(data, bytes):
209+
raise ValueError("s3 external storage client returned non-bytes payload")
210+
return data
211+
212+
def delete(self, uri: str) -> None:
213+
bucket, key = _parse_object_uri(uri, scheme="s3", expected_bucket=self.bucket, expected_prefix=self.prefix)
214+
self.client.delete_object(Bucket=bucket, Key=key)
215+
216+
217+
class GCSExternalStorage:
218+
"""External storage driver backed by a google-cloud-storage client.
219+
220+
The SDK does not depend on google-cloud-storage. Applications pass a
221+
configured client exposing ``bucket(name).blob(key)``.
222+
"""
223+
224+
def __init__(self, client: Any, *, bucket: str, prefix: str = "") -> None:
225+
if not bucket:
226+
raise ValueError("gcs external storage bucket must be non-empty")
227+
self.client = client
228+
self.bucket = bucket
229+
self.prefix = _normalize_object_prefix(prefix)
230+
231+
def put(self, data: bytes, *, sha256: str, codec: str) -> str:
232+
key = _object_key(self.prefix, sha256=sha256, codec=codec)
233+
blob = self.client.bucket(self.bucket).blob(key)
234+
blob.metadata = {"sha256": sha256, "codec": codec}
235+
blob.upload_from_string(data, content_type="application/octet-stream")
236+
return _object_uri("gs", self.bucket, key)
237+
238+
def get(self, uri: str) -> bytes:
239+
bucket, key = _parse_object_uri(uri, scheme="gs", expected_bucket=self.bucket, expected_prefix=self.prefix)
240+
data = self.client.bucket(bucket).blob(key).download_as_bytes()
241+
if not isinstance(data, bytes):
242+
raise ValueError("gcs external storage client returned non-bytes payload")
243+
return data
244+
245+
def delete(self, uri: str) -> None:
246+
bucket, key = _parse_object_uri(uri, scheme="gs", expected_bucket=self.bucket, expected_prefix=self.prefix)
247+
self.client.bucket(bucket).blob(key).delete()
248+
249+
250+
class AzureBlobExternalStorage:
251+
"""External storage driver backed by an azure-storage-blob container client.
252+
253+
The SDK does not depend on azure-storage-blob. Applications pass a
254+
configured container client exposing ``upload_blob``, ``download_blob``,
255+
and ``delete_blob``.
256+
"""
257+
258+
def __init__(self, container_client: Any, *, container: str, prefix: str = "") -> None:
259+
if not container:
260+
raise ValueError("azure external storage container must be non-empty")
261+
self.container_client = container_client
262+
self.container = container
263+
self.prefix = _normalize_object_prefix(prefix)
264+
265+
def put(self, data: bytes, *, sha256: str, codec: str) -> str:
266+
key = _object_key(self.prefix, sha256=sha256, codec=codec)
267+
self.container_client.upload_blob(
268+
name=key,
269+
data=data,
270+
overwrite=True,
271+
metadata={"sha256": sha256, "codec": codec},
272+
)
273+
return _object_uri("azure-blob", self.container, key)
274+
275+
def get(self, uri: str) -> bytes:
276+
container, key = _parse_object_uri(
277+
uri,
278+
scheme="azure-blob",
279+
expected_bucket=self.container,
280+
expected_prefix=self.prefix,
281+
)
282+
data = self.container_client.download_blob(key).readall()
283+
if not isinstance(data, bytes):
284+
raise ValueError("azure external storage client returned non-bytes payload")
285+
if container != self.container:
286+
raise ValueError("azure external storage URI uses a different container")
287+
return data
288+
289+
def delete(self, uri: str) -> None:
290+
container, key = _parse_object_uri(
291+
uri,
292+
scheme="azure-blob",
293+
expected_bucket=self.container,
294+
expected_prefix=self.prefix,
295+
)
296+
if container != self.container:
297+
raise ValueError("azure external storage URI uses a different container")
298+
self.container_client.delete_blob(key)
299+
300+
177301
def store_external_payload(
178302
driver: ExternalStorageDriver,
179303
data: bytes,
@@ -230,3 +354,46 @@ def _safe_codec_segment(codec: str) -> str:
230354
if not all(char.isalnum() or char in {"-", "_", "."} for char in codec):
231355
raise ValueError("codec contains characters that are unsafe for local storage paths")
232356
return codec
357+
358+
359+
def _normalize_object_prefix(prefix: str) -> str:
360+
cleaned = prefix.strip("/")
361+
if not cleaned:
362+
return ""
363+
parts = cleaned.split("/")
364+
if any(part in {"", ".", ".."} for part in parts):
365+
raise ValueError("external storage prefix contains an unsafe path segment")
366+
return "/".join(quote(part, safe="-_.~") for part in parts)
367+
368+
369+
def _object_key(prefix: str, *, sha256: str, codec: str) -> str:
370+
_validate_sha256(sha256)
371+
codec_segment = quote(_safe_codec_segment(codec), safe="-_.~")
372+
key = f"{codec_segment}/{sha256[:2]}/{sha256}"
373+
return f"{prefix}/{key}" if prefix else key
374+
375+
376+
def _object_uri(scheme: str, bucket: str, key: str) -> str:
377+
return f"{scheme}://{bucket}/{key}"
378+
379+
380+
def _parse_object_uri(
381+
uri: str,
382+
*,
383+
scheme: str,
384+
expected_bucket: str,
385+
expected_prefix: str,
386+
) -> tuple[str, str]:
387+
parsed = urlparse(uri)
388+
if parsed.scheme != scheme or parsed.netloc != expected_bucket:
389+
raise ValueError(f"{scheme} external storage URI uses a different bucket or container")
390+
391+
key = parsed.path.lstrip("/")
392+
if not key:
393+
raise ValueError(f"{scheme} external storage URI must include an object key")
394+
parts = key.split("/")
395+
if any(part in {"", ".", ".."} for part in parts):
396+
raise ValueError(f"{scheme} external storage URI contains an unsafe object key")
397+
if expected_prefix and not (key == expected_prefix or key.startswith(f"{expected_prefix}/")):
398+
raise ValueError(f"{scheme} external storage URI is outside the configured prefix")
399+
return parsed.netloc, key

0 commit comments

Comments
 (0)