Add scheduled tasks feature and delayed entity signals#160
Open
andystaples wants to merge 1 commit into
Open
Conversation
Add a recurring schedule feature (durabletask.scheduled) for parity with durabletask-dotnet, built on durable entities and a helper orchestrator. Enable it on a worker with configure_scheduled_tasks(worker) and manage schedules from the client via ScheduledTaskClient / ScheduleClient (create, describe, list, update, pause, resume, delete). To support the schedule entity's self-rearming, add an optional signal_time parameter to entity, orchestration-context, and client signal_entity methods, and make the in-memory backend honor delayed (scheduled) entity signals. Includes unit tests, in-memory E2E tests, and live DTS E2E tests for both the schedule feature and delayed signals, plus an example and changelog entries.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces a new scheduled tasks capability (durabletask.scheduled) to the core Durable Task Python SDK, implemented on top of durable entities plus a helper orchestrator. As a prerequisite, it also adds delayed (scheduled) entity signals end-to-end (entity APIs, orchestration context, client APIs, and the in-memory test backend) to support time-based schedule re-arming.
Changes:
- Added the
durabletask.scheduledpackage (schedule entity, client surface, models/DTOs, transition rules, registration helper, and error types). - Added a
signal_timeparameter across entity/orchestration/clientsignal_entityAPIs and mapped it to the protobufscheduledTimefield. - Updated the in-memory backend to defer entity operations scheduled in the future; added unit/E2E coverage plus a runnable example and changelog entry.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/durabletask/scheduled/test_transitions.py | Unit tests for schedule status transition rules. |
| tests/durabletask/scheduled/test_scheduled_e2e.py | In-memory E2E coverage for schedule lifecycle and runs. |
| tests/durabletask/scheduled/test_schedule_entity.py | Unit tests for schedule entity behavior/actions via executor harness. |
| tests/durabletask/scheduled/test_models.py | Unit tests for scheduled-task models, validation, and serialization. |
| tests/durabletask/scheduled/init.py | Test package init. |
| tests/durabletask/entities/test_delayed_signals_e2e.py | In-memory E2E coverage for delayed entity signals (client + orchestrator). |
| tests/durabletask-azuremanaged/scheduled/test_dts_scheduled_e2e.py | Live DTS E2E coverage for scheduled tasks. |
| tests/durabletask-azuremanaged/scheduled/init.py | Azuremanaged test package init. |
| tests/durabletask-azuremanaged/entities/test_dts_delayed_signals_e2e.py | Live DTS E2E coverage for delayed entity signals. |
| examples/scheduled_tasks.py | End-to-end sample showing schedule creation and management. |
| durabletask/worker.py | Extends orchestration-context signal_entity plumbing to accept signal_time. |
| durabletask/testing/in_memory_backend.py | Implements delayed entity operation delivery in the in-memory backend. |
| durabletask/task.py | Updates orchestration-context API contract/docs to include signal_time. |
| durabletask/scheduled/transitions.py | Defines valid schedule state transitions keyed by operation. |
| durabletask/scheduled/schedule_status.py | Introduces ScheduleStatus enum. |
| durabletask/scheduled/schedule_entity.py | Implements the schedule entity state machine and re-arming via delayed signals. |
| durabletask/scheduled/registration.py | Adds configure_scheduled_tasks(worker) registration helper. |
| durabletask/scheduled/orchestrator.py | Adds helper orchestrator for “awaitable” schedule entity operations. |
| durabletask/scheduled/models.py | Adds public DTOs/options plus internal config/state serialization. |
| durabletask/scheduled/exceptions.py | Adds scheduled-task exception types. |
| durabletask/scheduled/client.py | Adds ScheduledTaskClient and per-schedule ScheduleClient. |
| durabletask/scheduled/init.py | Exposes the scheduled tasks public surface. |
| durabletask/internal/helpers.py | Adds scheduled time support to entity-signal orchestrator actions. |
| durabletask/internal/client_helpers.py | Adds scheduled time support to client SignalEntityRequest. |
| durabletask/entities/entity_context.py | Adds signal_time to entity-side signal_entity API and action emission. |
| durabletask/entities/durable_entity.py | Adds signal_time to DurableEntity.signal_entity facade. |
| durabletask/client.py | Adds signal_time to sync + async client signal_entity APIs. |
| CHANGELOG.md | Documents the new scheduled tasks feature and delayed entity signals. |
Comment on lines
+1805
to
+1808
| with self._lock: | ||
| if self._shutdown_event.is_set(): | ||
| return | ||
| self._queue_entity_operation(entity_id, event) |
| input: Any | None = None, | ||
| signal_time: datetime | None = None) -> pb.SignalEntityRequest: | ||
| """Build a SignalEntityRequest for signaling an entity.""" | ||
| scheduled_time = helpers.new_timestamp(signal_time) if signal_time is not None else None |
Comment on lines
+107
to
+110
| scheduled_time: timestamp_pb2.Timestamp | None = None | ||
| if signal_time is not None: | ||
| scheduled_time = timestamp_pb2.Timestamp() | ||
| scheduled_time.FromDatetime(signal_time) |
Comment on lines
+260
to
264
| scheduled_timestamp: timestamp_pb2.Timestamp | None = None | ||
| if scheduled_time is not None: | ||
| scheduled_timestamp = timestamp_pb2.Timestamp() | ||
| scheduled_timestamp.FromDatetime(scheduled_time) | ||
| return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent( |
Comment on lines
+128
to
+145
| def list_schedules(self, filter: ScheduleQuery | None = None) -> list[ScheduleDescription]: | ||
| """List schedules matching the given filter criteria.""" | ||
| prefix = filter.schedule_id_prefix if filter and filter.schedule_id_prefix else "" | ||
| page_size = filter.page_size if filter and filter.page_size else ScheduleQuery.DEFAULT_PAGE_SIZE | ||
| query = EntityQuery( | ||
| instance_id_starts_with=f"@{ENTITY_NAME}@{prefix}", | ||
| include_state=True, | ||
| page_size=page_size, | ||
| ) | ||
| results: list[ScheduleDescription] = [] | ||
| for metadata in self._client.get_all_entities(query): | ||
| state = _parse_state(metadata.get_state()) | ||
| if state is None or state.schedule_configuration is None: | ||
| continue | ||
| if not self._matches_filter(state, filter): | ||
| continue | ||
| results.append(state.to_description()) | ||
| return results |
Comment on lines
+148
to
+158
| def _matches_filter(state: ScheduleState, filter: ScheduleQuery | None) -> bool: | ||
| if filter is None: | ||
| return True | ||
| if filter.status is not None and state.status != filter.status: | ||
| return False | ||
| created_at = state.schedule_created_at | ||
| if filter.created_from is not None and not (created_at and created_at > filter.created_from): | ||
| return False | ||
| if filter.created_to is not None and not (created_at and created_at < filter.created_to): | ||
| return False | ||
| return True |
Comment on lines
+5
to
+6
| from durabletask.scheduled.orchestrator import \ | ||
| execute_schedule_operation_orchestrator |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a scheduled tasks feature (
durabletask.scheduled) to the Python SDK,bringing it to parity with the
Microsoft.DurableTask.ScheduledTaskspackage indurabletask-dotnet. A schedule periodically starts a target orchestration based
on a configurable interval and time window. The feature is built entirely on top
of durable entities plus a small helper orchestrator — no new sidecar/protocol
requirements.
To make schedules work, this PR also adds delayed (scheduled) entity signals
across the entity, orchestration-context, and client
signal_entityAPIs, andteaches the in-memory test backend to honor them.
What's new
durabletask.scheduledpackageconfigure_scheduled_tasks(worker)— registers the schedule entity andoperation orchestrator on a worker (parity with .NET's
UseScheduledTasks).ScheduledTaskClient—create_schedule,get_schedule,get_schedule_client,list_schedules.ScheduleClient— per-schedulecreate,describe,update,pause,resume,delete.ScheduleCreationOptions,ScheduleUpdateOptions,ScheduleDescription,ScheduleQuery,ScheduleStatus.ScheduleNotFoundError,ScheduleInvalidTransitionError,ScheduleClientValidationError.Schedules support
interval(≥ 1s),start_at,end_at, andstart_immediately_if_late, with a state machine (Uninitialized→Active⇄
Paused) and an execution-token mechanism that cancels stale timer signals,mirroring the .NET design.
Delayed entity signals (prerequisite)
signal_timeparameter to:EntityContext.signal_entity/DurableEntity.signal_entityOrchestrationContext.signal_entitysignal_entity(sync and async)scheduledTimefield. The public parametername
signal_timematches the .NET public API (SignalEntityOptions.SignalTime).scheduledTimeis inthe future (via a background timer thread), on both the client-signal and
orchestration-emitted-signal paths.
Tests
and state/options serialization round-trips.
prefix/status filters, delete; plus delayed-signal deferral via client and
orchestrator.
pytest.mark.dts): scheduled-task lifecycle and delayedsignals, verified against the DTS emulator.
Docs & examples
examples/scheduled_tasks.py.CHANGELOG.mdentries under Unreleased.Notes
durabletaskpackage (not a separate distribution as in.NET) for simpler installation and testing.
schedule, so list filtering usesthe
@schedule@<prefix>instance-ID prefix.