Skip to content

Commit 50bb4a5

Browse files
committed
PR feedback
1 parent a2dc68c commit 50bb4a5

11 files changed

Lines changed: 75 additions & 25 deletions

File tree

docs/features.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,6 @@ inheritance required — it's a `typing.Protocol`) to send exports to
529529
any destination (S3, GCS, SFTP, local filesystem, a database, etc.):
530530

531531
```python
532-
from typing import Optional
533-
534532
from durabletask.extensions.history_export import HistoryWriter
535533

536534

@@ -542,13 +540,18 @@ class LocalFileSystemHistoryWriter:
542540
self,
543541
*,
544542
instance_id: str,
543+
container: str,
545544
blob_name: str,
546545
payload: bytes,
547546
content_type: str,
548-
content_encoding: Optional[str],
547+
content_encoding: str | None,
549548
) -> None:
550549
import os
551-
path = os.path.join(self._root, blob_name)
550+
# ``container`` is the destination's logical container name
551+
# (an ExportDestination.container). Per-job routing writers
552+
# combine it with ``blob_name``; writers that pin to a fixed
553+
# location at construction time may ignore it.
554+
path = os.path.join(self._root, container, blob_name)
552555
os.makedirs(os.path.dirname(path), exist_ok=True)
553556
with open(path, "wb") as fp:
554557
fp.write(payload)

durabletask/extensions/history_export/activities.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ def export_instance_history(
149149
raise TypeError("format must be a mapping")
150150
fmt = ExportFormat.from_dict(cast("Mapping[str, Any]", fmt_input))
151151
destination_raw: Mapping[str, Any] = input.get("destination") or {}
152+
container_raw: Any = destination_raw.get("container")
153+
if not container_raw:
154+
raise ValueError("destination.container is required")
155+
container: str = str(container_raw)
152156
prefix_raw: Any = destination_raw.get("prefix")
153157
prefix: str | None = str(prefix_raw) if prefix_raw is not None else None
154158

@@ -169,6 +173,7 @@ def export_instance_history(
169173
blob_name = _blob_name_for(instance_id=instance_id, prefix=prefix, fmt=fmt)
170174
ctx.writer.write(
171175
instance_id=instance_id,
176+
container=container,
172177
blob_name=blob_name,
173178
payload=payload,
174179
content_type=content_type_for(fmt),

durabletask/extensions/history_export/azure_blob.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,19 @@ class AzureBlobHistoryExportWriterOptions:
6565
def __post_init__(self) -> None:
6666
if not self.container_name:
6767
raise ValueError("container_name is required")
68+
if self.connection_string and self.account_url:
69+
raise ValueError(
70+
"'connection_string' and 'account_url' are mutually exclusive"
71+
)
6872
if not self.connection_string and not self.account_url:
6973
raise ValueError(
7074
"Either 'connection_string' or 'account_url' (with 'credential') "
7175
"must be provided"
7276
)
77+
if self.account_url and self.credential is None:
78+
raise ValueError(
79+
"'credential' is required when 'account_url' is provided"
80+
)
7381

7482

7583
class AzureBlobHistoryExportWriter:
@@ -116,19 +124,33 @@ def write(
116124
self,
117125
*,
118126
instance_id: str,
127+
container: str,
119128
blob_name: str,
120129
payload: bytes,
121130
content_type: str,
122131
content_encoding: str | None,
123132
) -> None:
124133
del instance_id # included by the protocol but not needed here
134+
# This writer pins to the container configured at construction
135+
# time and ignores the per-call ``container`` argument; the
136+
# configured value is authoritative for any given writer
137+
# instance. Run a separate writer per destination container
138+
# if you need per-job routing.
139+
del container
125140
self._ensure_container()
126141
container_client = self._service.get_container_client(
127142
self._options.container_name
128143
)
129-
content_settings = ContentSettings(
130-
content_type=content_type,
131-
content_encoding=content_encoding or "",
144+
# Only set Content-Encoding if the format actually compresses
145+
# the payload; an empty header value would be persisted on
146+
# the blob and confuse downstream clients.
147+
content_settings = (
148+
ContentSettings(
149+
content_type=content_type,
150+
content_encoding=content_encoding,
151+
)
152+
if content_encoding
153+
else ContentSettings(content_type=content_type)
132154
)
133155
container_client.upload_blob(
134156
name=blob_name,

durabletask/extensions/history_export/client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,10 @@ def list_jobs(
219219
instance_id_starts_with=_ENTITY_ID_PREFIX,
220220
last_modified_from=query.last_modified_from,
221221
last_modified_to=query.last_modified_to,
222-
include_state=query.include_state,
222+
# list_jobs always needs the persisted state to populate
223+
# ExportJobDescription; an entity-only view doesn't carry
224+
# status or progress and would always be filtered out.
225+
include_state=True,
223226
page_size=query.page_size,
224227
)
225228
status_filter = set(query.status) if query.status else None
@@ -230,7 +233,7 @@ def list_jobs(
230233
# explicit entity-name check.
231234
if meta.id.entity != ENTITY_NAME.lower():
232235
continue
233-
raw = meta.get_state(str) if meta.includes_state else None
236+
raw = meta.get_state(str)
234237
if not raw:
235238
continue
236239
try:

durabletask/extensions/history_export/models.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,12 @@ class ExportJobStatus(Enum):
6464
Status meanings
6565
---------------
6666
``PENDING``
67-
The job has been created but the entity has not yet processed
68-
the ``create`` signal, *or* the entity has accepted the
69-
configuration but has not yet kicked off its driving
70-
orchestrator. The value is reserved for the forthcoming
71-
``run`` operation (see the .NET ``ExportJob.Run`` pattern);
72-
the current implementation transitions directly from creation
73-
to :attr:`ACTIVE`, so jobs are not persisted in ``Pending``
74-
today.
67+
The job has been created and persisted but the entity has not
68+
yet kicked off its driving orchestrator. Jobs sit in this
69+
state briefly between the ``create`` and ``run`` signals
70+
(the public client sends both in immediate succession), or
71+
for longer if ``run`` is never invoked or if a caller revives
72+
a previously terminal job via ``create``.
7573
``ACTIVE``
7674
The job is running and the driving orchestrator is making
7775
progress through pages of terminal instances.
@@ -305,15 +303,12 @@ class ExportJobQuery:
305303
this timestamp are returned.
306304
page_size: Backend page size used to enumerate the underlying
307305
entities.
308-
include_state: Whether to fetch full job state (set ``False``
309-
to retrieve job IDs and metadata only).
310306
"""
311307

312308
status: list[ExportJobStatus] | None = None
313309
last_modified_from: datetime | None = None
314310
last_modified_to: datetime | None = None
315311
page_size: int | None = None
316-
include_state: bool = True
317312

318313

319314
@dataclass

durabletask/extensions/history_export/writer.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,18 @@ def write(
2626
self,
2727
*,
2828
instance_id: str,
29+
container: str,
2930
blob_name: str,
3031
payload: bytes,
3132
content_type: str,
3233
content_encoding: str | None,
3334
) -> None:
3435
import os
35-
path = os.path.join(self._root, blob_name)
36+
# The ``container`` value comes from the export job's
37+
# ExportDestination.container and is the logical
38+
# bucket / subdirectory the caller asked the job to
39+
# write into.
40+
path = os.path.join(self._root, container, blob_name)
3641
os.makedirs(os.path.dirname(path), exist_ok=True)
3742
with open(path, "wb") as fp:
3843
fp.write(payload)
@@ -63,6 +68,7 @@ def write(
6368
self,
6469
*,
6570
instance_id: str,
71+
container: str,
6672
blob_name: str,
6773
payload: bytes,
6874
content_type: str,
@@ -74,8 +80,17 @@ def write(
7480
instance_id: The orchestration instance whose history this
7581
payload represents. Provided so destinations may use
7682
it as a key, metadata, or sharding hint.
83+
container: The destination container / bucket name the
84+
job's :class:`ExportDestination` declared. Writers
85+
that want to honour per-job container routing should
86+
use this value; writers that pin to a fixed container
87+
at construction time (such as the bundled Azure Blob
88+
writer) may ignore it.
7789
blob_name: Destination-relative path / key, including any
7890
configured destination prefix and file extension.
91+
Does NOT include the ``container`` component — a
92+
writer that routes per-container is expected to
93+
combine the two.
7994
payload: The serialized history bytes. Already compressed
8095
if the configured format calls for it.
8196
content_type: The HTTP-style content type appropriate for

examples/history_export/app.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ def sample_orchestrator(ctx: task.OrchestrationContext, n: int):
5959

6060
def main() -> None:
6161
print(f"Using container: {CONTAINER_NAME}")
62-
print(f"Using storage connection: {AZURITE_CONN_STR}")
62+
# Avoid printing the raw connection string — a real Azure Storage
63+
# connection string contains the account key.
64+
if AZURITE_CONN_STR == "UseDevelopmentStorage=true":
65+
print("Using storage connection: Azurite (UseDevelopmentStorage=true)")
66+
else:
67+
print("Using storage connection: (redacted)")
6368

6469
backend = create_test_backend(port=50300)
6570
try:

tests/durabletask/extensions/history_export/test_activities.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def __init__(self) -> None:
4646
self._lock = threading.Lock()
4747
self.blobs: dict[str, dict] = {}
4848

49-
def write(self, *, instance_id, blob_name, payload, content_type, content_encoding):
49+
def write(self, *, instance_id, container, blob_name, payload, content_type, content_encoding):
5050
with self._lock:
5151
self.blobs[blob_name] = {
5252
"instance_id": instance_id,

tests/durabletask/extensions/history_export/test_azure_blob_writer_e2e.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def test_write_json_blob(writer):
9393
blob_name = f"json/{uuid.uuid4().hex}.json"
9494
writer.write(
9595
instance_id="inst-json",
96+
container=TEST_CONTAINER,
9697
blob_name=blob_name,
9798
payload=payload,
9899
content_type=content_type_for(fmt),
@@ -118,6 +119,7 @@ def test_write_jsonl_gzip_blob(writer):
118119
blob_name = f"gz/{uuid.uuid4().hex}.jsonl.gz"
119120
writer.write(
120121
instance_id="inst-gz",
122+
container=TEST_CONTAINER,
121123
blob_name=blob_name,
122124
payload=payload,
123125
content_type=content_type_for(fmt),

tests/durabletask/extensions/history_export/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def __init__(self) -> None:
4646
self._lock = threading.Lock()
4747
self.blobs: dict[str, dict] = {}
4848

49-
def write(self, *, instance_id, blob_name, payload, content_type, content_encoding):
49+
def write(self, *, instance_id, container, blob_name, payload, content_type, content_encoding):
5050
with self._lock:
5151
self.blobs[blob_name] = {
5252
"instance_id": instance_id,

0 commit comments

Comments
 (0)