Skip to content

Commit 5b048fe

Browse files
authored
Add durabletask.extensions.history_export (#147)
* History export extension - V1 * Strict typing, type correctness * PR feedback * Use relative import for _test_helpers (CI fix) The absolute 'from tests.durabletask...' import worked locally because pip install -e .[dev] adds the repo root to sys.path, but CI runs pytest without that and pytest's rootdir doesn't include a 'tests' top-level package. Switch to a relative import inside the history_export test package (which already has __init__.py). * PR Feedback 2 * Schema back to 1.0, drop PENDING state * PR Feedback 3 * Hashed blob names * Address latest PR review comments from berndverst
1 parent 5189878 commit 5b048fe

30 files changed

Lines changed: 5563 additions & 0 deletions

CHANGELOG.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,54 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project
66
adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## Unreleased
9+
10+
ADDED
11+
12+
- Added `durabletask.extensions.history_export` for exporting the event history of
13+
terminal orchestrations to an external destination. Includes
14+
`ExportHistoryClient`, a per-job `ExportHistoryJobClient` returned by
15+
`get_job_client(...)`, and `list_jobs(...)` for enumerating jobs by status
16+
or last-modified window. Ships with a bundled `AzureBlobHistoryExportWriter`
17+
(installed with `pip install durabletask[history-export-azure]`) and a
18+
`HistoryWriter` protocol for plugging in custom destinations. Supports both
19+
`ExportMode.BATCH` (export a window and complete) and `ExportMode.CONTINUOUS`
20+
(tail terminal instances indefinitely until stopped via `delete_job`).
21+
Exported blobs are self-describing: each blob carries an explicit
22+
`schema_version`, the orchestration's `OrchestrationState` metadata, and
23+
the full ordered event list. Blob names are a lowercase-hex SHA-256 of
24+
``{last_updated_at}|{instance_id}`` with the format extension appended
25+
(matches the .NET `ExportInstanceHistoryActivity` naming scheme), so
26+
re-exporting an instance after a later terminal update lands at a new
27+
blob path rather than overwriting the previous one, and instance IDs
28+
that differ only by `/` no longer collide. Each exported blob also
29+
carries `{"instance_id": <id>}` as destination-side metadata (the Azure
30+
writer persists this as Azure Blob metadata) so consumers can scan a
31+
container without parsing each blob body. The export workflow retries each instance up
32+
to 3 times with exponential backoff (15s/30s/60s), retries failed batches
33+
up to 3 times, caps in-flight exports via `max_parallel_exports`
34+
(default 32), continues-as-new every 5 page cycles to bound orchestrator
35+
history while preserving cumulative totals across continue-as-new segments,
36+
and re-fetches entity state at the top of every page loop so
37+
external delete or mark-failed signals stop the orchestrator cleanly.
38+
Empty-page BATCH checkpoints no longer reset the persisted resume cursor,
39+
and duplicate `mark_failed` signals are now idempotent no-ops when a job
40+
is already failed to reduce transition-noise logs.
41+
`delete_job` actively tears the job down: it clears the entity state,
42+
terminates the driving orchestrator, waits briefly for it to settle, and
43+
purges its orchestration history so a re-created job with the same ID
44+
starts from a clean slate. Per-instance exports refuse to write a blob
45+
when the target instance has been purged or has re-entered a non-terminal
46+
state, surfacing the skipped instance as a per-batch failure.
47+
Job state lives in a durable entity with an explicit state-transition
48+
matrix (ACTIVE / COMPLETED / FAILED); invalid transitions raise
49+
`ExportJobInvalidTransitionError`. Persisted entity state uses a
50+
versioned, schema-stable JSON shape (`STATE_SCHEMA_VERSION`) with no
51+
embedded Python type metadata. Each export job's driving orchestrator
52+
uses a deterministic instance ID (`export-job-{job_id}`, exposed via
53+
`orchestrator_instance_id_for(...)`) so callers can correlate a job ID
54+
with its orchestrator for logging, monitoring, and restart.
55+
856
## v1.5.0
957

1058
BREAKING CHANGES (type-level only — no runtime impact for typical users)

docs/features.md

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,165 @@ class MyPayloadStore(PayloadStore):
412412

413413
See the [large payload example](../examples/large_payload/) for a complete working sample.
414414

415+
### Orchestration history export
416+
417+
The optional `durabletask.extensions.history_export` package provides a workflow for exporting the
418+
full event history of terminal orchestrations to an external destination (for example Azure Blob
419+
Storage). It is modeled after the .NET SDK's `ExportHistory` package.
420+
421+
An export job scans a time window of terminal instances, fetches each instance's history through
422+
the standard client API, serializes it, and writes it through a pluggable `HistoryWriter`. Job
423+
state is owned by a durable entity so progress survives worker restarts.
424+
425+
#### Installation
426+
427+
The core extension has no extra dependencies beyond the SDK. The bundled Azure Blob writer requires
428+
an optional dependency:
429+
430+
```bash
431+
pip install durabletask[history-export-azure]
432+
```
433+
434+
#### Configuring an export job
435+
436+
```python
437+
from datetime import datetime, timedelta, timezone
438+
439+
from durabletask import client, worker
440+
from durabletask.extensions.history_export import (
441+
ExportDestination,
442+
ExportFormat,
443+
ExportFormatKind,
444+
ExportHistoryClient,
445+
ExportJobCreationOptions,
446+
ExportMode,
447+
)
448+
from durabletask.extensions.history_export.azure_blob import (
449+
AzureBlobHistoryExportWriter,
450+
AzureBlobHistoryExportWriterOptions,
451+
)
452+
453+
writer = AzureBlobHistoryExportWriter(
454+
AzureBlobHistoryExportWriterOptions(
455+
container_name="orchestration-history",
456+
connection_string="DefaultEndpointsProtocol=https;...",
457+
)
458+
)
459+
dt_client = client.TaskHubGrpcClient(host_address="localhost:4001")
460+
export_client = ExportHistoryClient(dt_client, writer)
461+
462+
with worker.TaskHubGrpcWorker(host_address="localhost:4001") as w:
463+
export_client.register_worker(w)
464+
w.start()
465+
466+
now = datetime.now(timezone.utc)
467+
desc = export_client.create_job(ExportJobCreationOptions(
468+
mode=ExportMode.BATCH,
469+
completed_time_from=now - timedelta(days=1),
470+
completed_time_to=now,
471+
destination=ExportDestination(container="orchestration-history", prefix="2026-05"),
472+
format=ExportFormat(kind=ExportFormatKind.JSONL_GZIP),
473+
max_instances_per_batch=100,
474+
))
475+
final = export_client.wait_for_job(desc.job_id, timeout=600)
476+
print(final.status, final.exported_instances, final.failed_instances)
477+
```
478+
479+
#### Output formats
480+
481+
| `ExportFormatKind` | Per-instance blob extension | Content-Type | Content-Encoding |
482+
|---|---|---|---|
483+
| `JSON` | `.json` | `application/json` | (none) |
484+
| `JSONL_GZIP` | `.jsonl.gz` | `application/x-ndjson` | `gzip` |
485+
486+
The JSONL format prepends a metadata line and writes one event per line, which streams well for
487+
large histories.
488+
489+
#### Modes
490+
491+
Two `ExportMode` values are supported:
492+
493+
- `BATCH` exports a fixed time window (`completed_time_from` .. `completed_time_to`) and then
494+
marks the job `Completed`. This is the default and is appropriate for one-off backfills.
495+
- `CONTINUOUS` tails terminal instances indefinitely, sleeping between empty pages. The job
496+
has no natural completion; stop it by calling `export_client.delete_job(job_id)` (or signalling
497+
`mark_failed`). The orchestrator re-reads entity state at the top of each page loop, so the
498+
next iteration after the delete observes the missing entity and exits cleanly.
499+
500+
#### Listing and managing jobs
501+
502+
Use `list_jobs(ExportJobQuery(...))` to enumerate existing jobs, optionally filtered by status
503+
or last-modified window:
504+
505+
```python
506+
from durabletask.extensions.history_export import ExportJobQuery, ExportJobStatus
507+
508+
for desc in export_client.list_jobs(
509+
ExportJobQuery(status=[ExportJobStatus.FAILED])
510+
):
511+
print(desc.job_id, desc.last_error)
512+
```
513+
514+
Use `get_job_client(job_id)` for a per-job convenience wrapper that exposes `describe()`,
515+
`wait(timeout=...)`, and `delete()` directly:
516+
517+
```python
518+
job_client = export_client.get_job_client(desc.job_id)
519+
final = job_client.wait(timeout=600)
520+
print(final.status.value, final.exported_instances)
521+
job_client.delete()
522+
```
523+
524+
#### Custom destinations
525+
526+
The Azure Blob writer is one implementation of the
527+
`HistoryWriter` extension point. Implement the protocol (no
528+
inheritance required — it's a `typing.Protocol`) to send exports to
529+
any destination (S3, GCS, SFTP, local filesystem, a database, etc.):
530+
531+
```python
532+
from durabletask.extensions.history_export import HistoryWriter
533+
534+
535+
class LocalFileSystemHistoryWriter:
536+
def __init__(self, root_dir: str) -> None:
537+
self._root = root_dir
538+
539+
def write(
540+
self,
541+
*,
542+
instance_id: str,
543+
container: str,
544+
blob_name: str,
545+
payload: bytes,
546+
content_type: str,
547+
content_encoding: str | None,
548+
) -> None:
549+
import os
550+
# ``container`` is the destination's logical container name
551+
# (an ExportDestination.container). Per-job routing writers
552+
# combine it with ``blob_name``; writers that pin to a fixed
553+
# location at construction time may ignore it.
554+
path = os.path.join(self._root, container, blob_name)
555+
os.makedirs(os.path.dirname(path), exist_ok=True)
556+
with open(path, "wb") as fp:
557+
fp.write(payload)
558+
559+
560+
export_client = ExportHistoryClient(
561+
dt_client, LocalFileSystemHistoryWriter("/var/exports")
562+
)
563+
```
564+
565+
> [!TIP]
566+
> The bundled `AzureBlobHistoryExportWriter` lives in the optional
567+
> `durabletask.extensions.history_export.azure_blob` submodule and
568+
> requires `pip install durabletask[history-export-azure]`. The
569+
> core history-export package has no third-party runtime
570+
> dependencies — only the bundled destination does. Future
571+
> first-party destinations (S3, GCS, etc.) will be packaged as
572+
> additional optional extras using the same pattern.
573+
415574
### Logging configuration
416575

417576
Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""Orchestration history export for Durable Task.
5+
6+
This optional extension package provides a workflow for exporting
7+
orchestration history from terminal instances to a configured
8+
destination, modeled after the durabletask-dotnet ``ExportHistory``
9+
package.
10+
11+
The core building blocks (models, durable entity, activities,
12+
orchestrator, and the public client) live in this package and have
13+
no required runtime dependencies beyond the core SDK. Specific
14+
destinations (for example Azure Blob Storage) may require optional
15+
dependencies; see the destination module documentation for details.
16+
"""
17+
18+
from durabletask.extensions.history_export._constants import (
19+
ENTITY_NAME,
20+
ORCHESTRATOR_NAME,
21+
orchestrator_instance_id_for,
22+
)
23+
from durabletask.extensions.history_export.activities import (
24+
HistoryExportContext,
25+
)
26+
from durabletask.extensions.history_export.client import (
27+
ExportHistoryClient,
28+
ExportHistoryJobClient,
29+
)
30+
from durabletask.extensions.history_export.entity import ExportJobEntity
31+
from durabletask.extensions.history_export.exceptions import (
32+
ExportJobError,
33+
ExportJobInvalidTransitionError,
34+
ExportJobNotFoundError,
35+
)
36+
from durabletask.extensions.history_export.models import (
37+
STATE_SCHEMA_VERSION,
38+
ExportCheckpoint,
39+
ExportDestination,
40+
ExportFailure,
41+
ExportFilter,
42+
ExportFormat,
43+
ExportFormatKind,
44+
ExportJobConfiguration,
45+
ExportJobCreationOptions,
46+
ExportJobDescription,
47+
ExportJobQuery,
48+
ExportJobState,
49+
ExportJobStatus,
50+
ExportMode,
51+
)
52+
from durabletask.extensions.history_export.writer import HistoryWriter
53+
54+
__all__ = [
55+
"ENTITY_NAME",
56+
"ORCHESTRATOR_NAME",
57+
"STATE_SCHEMA_VERSION",
58+
"ExportCheckpoint",
59+
"ExportDestination",
60+
"ExportFailure",
61+
"ExportFilter",
62+
"ExportFormat",
63+
"ExportFormatKind",
64+
"ExportHistoryClient",
65+
"ExportHistoryJobClient",
66+
"ExportJobConfiguration",
67+
"ExportJobCreationOptions",
68+
"ExportJobDescription",
69+
"ExportJobEntity",
70+
"ExportJobError",
71+
"ExportJobInvalidTransitionError",
72+
"ExportJobNotFoundError",
73+
"ExportJobQuery",
74+
"ExportJobState",
75+
"ExportJobStatus",
76+
"ExportMode",
77+
"HistoryExportContext",
78+
"HistoryWriter",
79+
"orchestrator_instance_id_for",
80+
]
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""Stable, cross-cutting constants for the history-export extension.
5+
6+
These constants are imported by both the entity and the orchestrator
7+
modules. Keeping them in their own module avoids the circular import
8+
that would arise if either of those modules imported the other.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
ENTITY_NAME = "ExportJobEntity"
14+
"""Logical name of the export-job durable entity."""
15+
16+
ORCHESTRATOR_NAME = "export_job_orchestrator"
17+
"""Function-derived name of the export-job orchestrator."""
18+
19+
ORCHESTRATOR_INSTANCE_ID_PREFIX = "export-job-"
20+
"""Prefix applied to deterministic orchestrator instance IDs."""
21+
22+
23+
def orchestrator_instance_id_for(job_id: str) -> str:
24+
"""Return the deterministic orchestrator instance ID for *job_id*.
25+
26+
All export-job orchestrators share a stable instance-ID pattern so
27+
that public clients can reliably correlate a job ID with the
28+
orchestrator driving it (for logs, monitoring, restart, etc.).
29+
Matches the .NET ``ExportHistoryConstants.GetOrchestratorInstanceId``
30+
pattern.
31+
"""
32+
if not job_id:
33+
raise ValueError("job_id must be a non-empty string")
34+
return f"{ORCHESTRATOR_INSTANCE_ID_PREFIX}{job_id}"
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""Package-internal helpers shared across the history-export modules.
5+
6+
Names here have no leading underscore so they can be imported by sibling
7+
modules without tripping pyright's ``reportPrivateUsage`` check. They
8+
remain package-private by convention: nothing in this module is exported
9+
from :mod:`durabletask.extensions.history_export.__init__`.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
from datetime import datetime, timezone
15+
16+
17+
def dt_to_iso(value: datetime | None) -> str | None:
18+
"""Normalize *value* to a UTC ISO-8601 string (or ``None``)."""
19+
if value is None:
20+
return None
21+
if value.tzinfo is None:
22+
value = value.replace(tzinfo=timezone.utc)
23+
else:
24+
value = value.astimezone(timezone.utc)
25+
return value.isoformat()
26+
27+
28+
def dt_from_iso(value: str | None) -> datetime | None:
29+
"""Parse *value* as an ISO-8601 timestamp, defaulting naive values to UTC."""
30+
if value is None:
31+
return None
32+
parsed = datetime.fromisoformat(value)
33+
if parsed.tzinfo is None:
34+
parsed = parsed.replace(tzinfo=timezone.utc)
35+
return parsed
36+
37+
38+
__all__ = ["dt_from_iso", "dt_to_iso"]
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""Shared logger for the history-export extension.
5+
6+
Submodules that emit log records should import :data:`logger` from
7+
this module rather than calling :func:`logging.getLogger` themselves.
8+
This keeps every emit attributed to the same logger name so that
9+
callers can configure / filter the extension's output in one place.
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import logging
15+
16+
logger = logging.getLogger("durabletask.extensions.history_export")
17+
"""Module-wide logger for the history-export extension."""
18+
19+
__all__ = ["logger"]

0 commit comments

Comments
 (0)