Skip to content

Commit 09f6761

Browse files
Surface the per-schedule audit history stream in the Python SDK
Adds Client.get_schedule_history(schedule_id, *, limit, after_sequence) for one-shot page fetches and Client.iter_schedule_history(...) for paged iteration that hides the cursor under an AsyncIterator. Both sit on top of the standalone server's GET /api/schedules/{schedule_id}/history endpoint and return dataclasses (ScheduleHistoryPage, ScheduleHistoryEvent) that mirror the HTTP contract: sequence, event_type, recorded_at, payload, workflow_instance_id, workflow_run_id, plus the has_more / next_cursor pagination envelope. ScheduleHandle gets .history(...) and .iter_history(...) convenience methods so callers that already hold a handle don't need to reach back into the Client. History stays available for deleted schedules, matching the other surfaces. Seven pytest cases cover: page parsing with namespace/has_more/cursor plumbing, forwarding of limit and after_sequence as query parameters, local ValueError rejection of invalid limit / negative cursor, 404 mapping to ScheduleNotFound, iter_schedule_history paging across two server responses with the cursor advanced between calls, and the ScheduleHandle .history / .iter_history delegates. Full test suite remains 644/644 green; mypy and ruff are clean.
1 parent 50e2443 commit 09f6761

4 files changed

Lines changed: 359 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
### Added
10+
- `Client.get_schedule_history(schedule_id, *, limit=None, after_sequence=None)`
11+
returns one `ScheduleHistoryPage` of the schedule's audit stream, and
12+
`Client.iter_schedule_history(...)` is an async iterator that walks every
13+
remaining `ScheduleHistoryEvent` with paging hidden. `ScheduleHandle`
14+
exposes the same surface as `handle.history(...)` and
15+
`handle.iter_history(...)`. History remains available for deleted
16+
schedules so post-mortem review still works after a schedule is
17+
removed.
18+
919
## [0.4.2] — 2026-04-24
1020

1121
### Added

src/durable_workflow/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
ScheduleBackfillResult,
2626
ScheduleDescription,
2727
ScheduleHandle,
28+
ScheduleHistoryEvent,
29+
ScheduleHistoryPage,
2830
ScheduleList,
2931
ScheduleSpec,
3032
ScheduleTriggerResult,
@@ -171,6 +173,8 @@
171173
"ScheduleBackfillResult",
172174
"ScheduleDescription",
173175
"ScheduleHandle",
176+
"ScheduleHistoryEvent",
177+
"ScheduleHistoryPage",
174178
"ScheduleList",
175179
"ScheduleNotFound",
176180
"ScheduleSpec",

src/durable_workflow/client.py

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import asyncio
2222
import time
2323
import warnings
24-
from collections.abc import Callable
24+
from collections.abc import AsyncIterator, Callable
2525
from dataclasses import dataclass
2626
from importlib.metadata import PackageNotFoundError
2727
from importlib.metadata import version as _pkg_version
@@ -745,6 +745,42 @@ class ScheduleBackfillResult:
745745
results: list[dict[str, Any]] | None = None
746746

747747

748+
@dataclass
749+
class ScheduleHistoryEvent:
750+
"""One entry in a schedule's audit history stream.
751+
752+
Each event corresponds to a lifecycle transition recorded by the
753+
server (ScheduleCreated, SchedulePaused, ScheduleResumed,
754+
ScheduleUpdated, ScheduleTriggered, ScheduleTriggerSkipped, or
755+
ScheduleDeleted). The ``payload`` mirrors what the workflow engine
756+
recorded, including command-context attribution when the transition
757+
came from a mutating API call.
758+
"""
759+
760+
sequence: int
761+
event_type: str | None = None
762+
recorded_at: str | None = None
763+
workflow_instance_id: str | None = None
764+
workflow_run_id: str | None = None
765+
payload: dict[str, Any] | None = None
766+
id: str | None = None
767+
768+
769+
@dataclass
770+
class ScheduleHistoryPage:
771+
"""One page of a schedule's audit history stream.
772+
773+
``next_cursor`` is the ``after_sequence`` value to request the next
774+
page when ``has_more`` is ``True``; it is ``None`` on the final page.
775+
"""
776+
777+
schedule_id: str
778+
events: list[ScheduleHistoryEvent]
779+
has_more: bool = False
780+
next_cursor: int | None = None
781+
namespace: str | None = None
782+
783+
748784
@dataclass
749785
class BridgeAdapterOutcome:
750786
"""Machine-readable result returned by a bridge adapter event."""
@@ -934,6 +970,32 @@ async def backfill(
934970
self.schedule_id, start_time=start_time, end_time=end_time, overlap_policy=overlap_policy,
935971
)
936972

973+
async def history(
974+
self,
975+
*,
976+
limit: int | None = None,
977+
after_sequence: int | None = None,
978+
) -> ScheduleHistoryPage:
979+
"""Return one page of this schedule's audit history. See :meth:`Client.get_schedule_history`."""
980+
return await self._client.get_schedule_history(
981+
self.schedule_id,
982+
limit=limit,
983+
after_sequence=after_sequence,
984+
)
985+
986+
def iter_history(
987+
self,
988+
*,
989+
limit: int | None = None,
990+
after_sequence: int | None = None,
991+
) -> AsyncIterator[ScheduleHistoryEvent]:
992+
"""Iterate every audit event for this schedule. See :meth:`Client.iter_schedule_history`."""
993+
return self._client.iter_schedule_history(
994+
self.schedule_id,
995+
limit=limit,
996+
after_sequence=after_sequence,
997+
)
998+
937999

9381000
class Client:
9391001
"""Async HTTP client for Durable Workflow control-plane and worker APIs.
@@ -2409,6 +2471,102 @@ async def backfill_schedule(
24092471
results=data.get("results"),
24102472
)
24112473

2474+
async def get_schedule_history(
2475+
self,
2476+
schedule_id: str,
2477+
*,
2478+
limit: int | None = None,
2479+
after_sequence: int | None = None,
2480+
) -> ScheduleHistoryPage:
2481+
"""Return one page of the audit history stream for a schedule.
2482+
2483+
The page is ordered by ``sequence`` ascending. Use
2484+
``after_sequence=page.next_cursor`` to request the next page while
2485+
``page.has_more`` is ``True``, or call :meth:`iter_schedule_history`
2486+
to walk every remaining event with paging hidden.
2487+
2488+
History is available for deleted schedules: the audit stream
2489+
records ``ScheduleDeleted`` and survives the schedule's removal
2490+
exactly so operators can review what happened.
2491+
2492+
``limit`` is clamped by the server between 1 and 500 (default
2493+
100). ``after_sequence`` must be a non-negative integer; invalid
2494+
values raise :class:`~durable_workflow.errors.InvalidArgument`
2495+
through the shared 4xx mapping.
2496+
"""
2497+
if limit is not None and limit < 1:
2498+
raise ValueError("limit must be >= 1")
2499+
if after_sequence is not None and after_sequence < 0:
2500+
raise ValueError("after_sequence must be >= 0")
2501+
2502+
params: dict[str, str] = {}
2503+
if limit is not None:
2504+
params["limit"] = str(limit)
2505+
if after_sequence is not None:
2506+
params["after_sequence"] = str(after_sequence)
2507+
2508+
path = f"/schedules/{schedule_id}/history"
2509+
if params:
2510+
path = f"{path}?{urlencode(params)}"
2511+
2512+
data = await self._request("GET", path, context=schedule_id)
2513+
raw_events = data.get("events") or []
2514+
events = [
2515+
ScheduleHistoryEvent(
2516+
sequence=int(item.get("sequence", 0)),
2517+
event_type=item.get("event_type"),
2518+
recorded_at=item.get("recorded_at"),
2519+
workflow_instance_id=item.get("workflow_instance_id"),
2520+
workflow_run_id=item.get("workflow_run_id"),
2521+
payload=item.get("payload") if isinstance(item.get("payload"), dict) else None,
2522+
id=item.get("id"),
2523+
)
2524+
for item in raw_events
2525+
]
2526+
2527+
raw_cursor = data.get("next_cursor")
2528+
next_cursor: int | None
2529+
if raw_cursor is None:
2530+
next_cursor = None
2531+
else:
2532+
try:
2533+
next_cursor = int(raw_cursor)
2534+
except (TypeError, ValueError):
2535+
next_cursor = None
2536+
2537+
return ScheduleHistoryPage(
2538+
schedule_id=data.get("schedule_id", schedule_id),
2539+
events=events,
2540+
has_more=bool(data.get("has_more", False)),
2541+
next_cursor=next_cursor,
2542+
namespace=data.get("namespace"),
2543+
)
2544+
2545+
async def iter_schedule_history(
2546+
self,
2547+
schedule_id: str,
2548+
*,
2549+
limit: int | None = None,
2550+
after_sequence: int | None = None,
2551+
) -> AsyncIterator[ScheduleHistoryEvent]:
2552+
"""Yield every audit event for a schedule, paging under the hood.
2553+
2554+
Each element is a :class:`ScheduleHistoryEvent`. Paging stops once
2555+
the server reports ``has_more=False``.
2556+
"""
2557+
cursor = after_sequence
2558+
while True:
2559+
page = await self.get_schedule_history(
2560+
schedule_id,
2561+
limit=limit,
2562+
after_sequence=cursor,
2563+
)
2564+
for event in page.events:
2565+
yield event
2566+
if not page.has_more or page.next_cursor is None:
2567+
return
2568+
cursor = page.next_cursor
2569+
24122570
# ── Worker protocol ────────────────────────────────────────────────
24132571
async def register_worker(
24142572
self,

0 commit comments

Comments
 (0)