Skip to content

Commit 3f9b447

Browse files
committed
History export extension - V1
1 parent ccbff41 commit 3f9b447

30 files changed

Lines changed: 4959 additions & 0 deletions

CHANGELOG.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,31 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
99

1010
ADDED
1111

12+
- Added `durabletask.extensions.history_export` for exporting the event history of
13+
terminal orchestrations to an external destination. Includes
14+
`ExportHistoryClient`, a per-job `ExportHistoryJobClient` returned by
15+
`get_job_client(...)`, and `list_jobs(...)` for enumerating jobs by status
16+
or last-modified window. Ships with a bundled `AzureBlobHistoryExportWriter`
17+
(installed with `pip install durabletask[history-export-azure]`) and a
18+
`HistoryWriter` protocol for plugging in custom destinations. Supports both
19+
`ExportMode.BATCH` (export a window and complete) and `ExportMode.CONTINUOUS`
20+
(tail terminal instances indefinitely until stopped via `delete_job`).
21+
Exported blobs are self-describing: each blob carries an explicit
22+
`schema_version`, the orchestration's `OrchestrationState` metadata, and
23+
the full ordered event list. The export workflow retries each instance up
24+
to 3 times with exponential backoff (15s/30s/60s), retries failed batches
25+
up to 3 times, caps in-flight exports via `max_parallel_exports`
26+
(default 32), continues-as-new every 5 page cycles to bound orchestrator
27+
history, and re-fetches entity state at the top of every page loop so
28+
external delete or mark-failed signals stop the orchestrator cleanly.
29+
Job state lives in a durable entity with an explicit state-transition
30+
matrix (PENDING / ACTIVE / COMPLETED / FAILED); invalid transitions raise
31+
`ExportJobInvalidTransitionError`. Persisted entity state uses a
32+
versioned, schema-stable JSON shape (`STATE_SCHEMA_VERSION`) with no
33+
embedded Python type metadata. Each export job's driving orchestrator
34+
uses a deterministic instance ID (`export-job-{job_id}`, exposed via
35+
`orchestrator_instance_id_for(...)`) so callers can correlate a job ID
36+
with its orchestrator for logging, monitoring, and restart.
1237
- Added `ReplaySafeLogger` and `OrchestrationContext.create_replay_safe_logger()`
1338
for suppressing duplicate log messages during orchestrator replay
1439
- Added `GrpcChannelOptions` and `GrpcRetryPolicyOptions` for configuring

docs/features.md

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,162 @@ class MyPayloadStore(PayloadStore):
412412

413413
See the [large payload example](../examples/large_payload/) for a complete working sample.
414414

415+
### Orchestration history export
416+
417+
The optional `durabletask.extensions.history_export` package provides a workflow for exporting the
418+
full event history of terminal orchestrations to an external destination (for example Azure Blob
419+
Storage). It is modeled after the .NET SDK's `ExportHistory` package.
420+
421+
An export job scans a time window of terminal instances, fetches each instance's history through
422+
the standard client API, serializes it, and writes it through a pluggable `HistoryWriter`. Job
423+
state is owned by a durable entity so progress survives worker restarts.
424+
425+
#### Installation
426+
427+
The core extension has no extra dependencies beyond the SDK. The bundled Azure Blob writer requires
428+
an optional dependency:
429+
430+
```bash
431+
pip install durabletask[history-export-azure]
432+
```
433+
434+
#### Configuring an export job
435+
436+
```python
437+
from datetime import datetime, timedelta, timezone
438+
439+
from durabletask import client, worker
440+
from durabletask.extensions.history_export import (
441+
ExportDestination,
442+
ExportFormat,
443+
ExportFormatKind,
444+
ExportHistoryClient,
445+
ExportJobCreationOptions,
446+
ExportMode,
447+
)
448+
from durabletask.extensions.history_export.azure_blob import (
449+
AzureBlobHistoryExportWriter,
450+
AzureBlobHistoryExportWriterOptions,
451+
)
452+
453+
writer = AzureBlobHistoryExportWriter(
454+
AzureBlobHistoryExportWriterOptions(
455+
container_name="orchestration-history",
456+
connection_string="DefaultEndpointsProtocol=https;...",
457+
)
458+
)
459+
dt_client = client.TaskHubGrpcClient(host_address="localhost:4001")
460+
export_client = ExportHistoryClient(dt_client, writer)
461+
462+
with worker.TaskHubGrpcWorker(host_address="localhost:4001") as w:
463+
export_client.register_worker(w)
464+
w.start()
465+
466+
now = datetime.now(timezone.utc)
467+
desc = export_client.create_job(ExportJobCreationOptions(
468+
mode=ExportMode.BATCH,
469+
completed_time_from=now - timedelta(days=1),
470+
completed_time_to=now,
471+
destination=ExportDestination(container="orchestration-history", prefix="2026-05"),
472+
format=ExportFormat(kind=ExportFormatKind.JSONL_GZIP),
473+
max_instances_per_batch=100,
474+
))
475+
final = export_client.wait_for_job(desc.job_id, timeout=600)
476+
print(final.status, final.exported_instances, final.failed_instances)
477+
```
478+
479+
#### Output formats
480+
481+
| `ExportFormatKind` | Per-instance blob extension | Content-Type | Content-Encoding |
482+
|---|---|---|---|
483+
| `JSON` | `.json` | `application/json` | (none) |
484+
| `JSONL_GZIP` | `.jsonl.gz` | `application/x-ndjson` | `gzip` |
485+
486+
The JSONL format prepends a metadata line and writes one event per line, which streams well for
487+
large histories.
488+
489+
#### Modes
490+
491+
Two `ExportMode` values are supported:
492+
493+
- `BATCH` exports a fixed time window (`completed_time_from` .. `completed_time_to`) and then
494+
marks the job `Completed`. This is the default and is appropriate for one-off backfills.
495+
- `CONTINUOUS` tails terminal instances indefinitely, sleeping between empty pages. The job
496+
has no natural completion; stop it by calling `export_client.delete_job(job_id)` (or signalling
497+
`mark_failed`). The orchestrator re-reads entity state at the top of each page loop, so the
498+
next iteration after the delete observes the missing entity and exits cleanly.
499+
500+
#### Listing and managing jobs
501+
502+
Use `list_jobs(ExportJobQuery(...))` to enumerate existing jobs, optionally filtered by status
503+
or last-modified window:
504+
505+
```python
506+
from durabletask.extensions.history_export import ExportJobQuery, ExportJobStatus
507+
508+
for desc in export_client.list_jobs(
509+
ExportJobQuery(status=[ExportJobStatus.FAILED])
510+
):
511+
print(desc.job_id, desc.last_error)
512+
```
513+
514+
Use `get_job_client(job_id)` for a per-job convenience wrapper that exposes `describe()`,
515+
`wait(timeout=...)`, and `delete()` directly:
516+
517+
```python
518+
job_client = export_client.get_job_client(desc.job_id)
519+
final = job_client.wait(timeout=600)
520+
print(final.status.value, final.exported_instances)
521+
job_client.delete()
522+
```
523+
524+
#### Custom destinations
525+
526+
The Azure Blob writer is one implementation of the
527+
`HistoryWriter` extension point. Implement the protocol (no
528+
inheritance required — it's a `typing.Protocol`) to send exports to
529+
any destination (S3, GCS, SFTP, local filesystem, a database, etc.):
530+
531+
```python
532+
from typing import Optional
533+
534+
from durabletask.extensions.history_export import HistoryWriter
535+
536+
537+
class LocalFileSystemHistoryWriter:
538+
def __init__(self, root_dir: str) -> None:
539+
self._root = root_dir
540+
541+
def write(
542+
self,
543+
*,
544+
instance_id: str,
545+
blob_name: str,
546+
payload: bytes,
547+
content_type: str,
548+
content_encoding: Optional[str],
549+
) -> None:
550+
import os
551+
path = os.path.join(self._root, blob_name)
552+
os.makedirs(os.path.dirname(path), exist_ok=True)
553+
with open(path, "wb") as fp:
554+
fp.write(payload)
555+
556+
557+
export_client = ExportHistoryClient(
558+
dt_client, LocalFileSystemHistoryWriter("/var/exports")
559+
)
560+
```
561+
562+
> [!TIP]
563+
> The bundled `AzureBlobHistoryExportWriter` lives in the optional
564+
> `durabletask.extensions.history_export.azure_blob` submodule and
565+
> requires `pip install durabletask[history-export-azure]`. The
566+
> core history-export package has no third-party runtime
567+
> dependencies — only the bundled destination does. Future
568+
> first-party destinations (S3, GCS, etc.) will be packaged as
569+
> additional optional extras using the same pattern.
570+
415571
### Logging configuration
416572

417573
Both the TaskHubGrpcWorker and TaskHubGrpcClient (as well as DurableTaskSchedulerWorker and

0 commit comments

Comments
 (0)