Skip to content

Add scheduled tasks feature and delayed entity signals#160

Open
andystaples wants to merge 1 commit into
microsoft:mainfrom
andystaples:andystaples/add-scheduled-tasks
Open

Add scheduled tasks feature and delayed entity signals#160
andystaples wants to merge 1 commit into
microsoft:mainfrom
andystaples:andystaples/add-scheduled-tasks

Conversation

@andystaples

Copy link
Copy Markdown
Contributor

Summary

Adds a scheduled tasks feature (durabletask.scheduled) to the Python SDK,
bringing it to parity with the Microsoft.DurableTask.ScheduledTasks package in
durabletask-dotnet. A schedule periodically starts a target orchestration based
on a configurable interval and time window. The feature is built entirely on top
of durable entities plus a small helper orchestrator — no new sidecar/protocol
requirements.

To make schedules work, this PR also adds delayed (scheduled) entity signals
across the entity, orchestration-context, and client signal_entity APIs, and
teaches the in-memory test backend to honor them.

What's new

durabletask.scheduled package

  • configure_scheduled_tasks(worker) — registers the schedule entity and
    operation orchestrator on a worker (parity with .NET's UseScheduledTasks).
  • ScheduledTaskClientcreate_schedule, get_schedule, get_schedule_client,
    list_schedules.
  • ScheduleClient — per-schedule create, describe, update, pause,
    resume, delete.
  • Public DTOs: ScheduleCreationOptions, ScheduleUpdateOptions,
    ScheduleDescription, ScheduleQuery, ScheduleStatus.
  • Errors: ScheduleNotFoundError, ScheduleInvalidTransitionError,
    ScheduleClientValidationError.

Schedules support interval (≥ 1s), start_at, end_at, and
start_immediately_if_late, with a state machine (UninitializedActive
Paused) and an execution-token mechanism that cancels stale timer signals,
mirroring the .NET design.

Delayed entity signals (prerequisite)

  • Added an optional signal_time parameter to:
    • EntityContext.signal_entity / DurableEntity.signal_entity
    • OrchestrationContext.signal_entity
    • client signal_entity (sync and async)
  • These map to the existing protobuf scheduledTime field. The public parameter
    name signal_time matches the .NET public API (SignalEntityOptions.SignalTime).
  • The in-memory backend now defers entity operations whose scheduledTime is in
    the future (via a background timer thread), on both the client-signal and
    orchestration-emitted-signal paths.

Tests

  • Unit: schedule state machine, transition rules, config validation/diff,
    and state/options serialization round-trips.
  • In-memory E2E: create/describe/run, pause/resume, update, list with
    prefix/status filters, delete; plus delayed-signal deferral via client and
    orchestrator.
  • Live DTS E2E (pytest.mark.dts): scheduled-task lifecycle and delayed
    signals, verified against the DTS emulator.

Docs & examples

  • New examples/scheduled_tasks.py.
  • CHANGELOG.md entries under Unreleased.

Notes

  • Placed in the core durabletask package (not a separate distribution as in
    .NET) for simpler installation and testing.
  • Field names are Pythonic snake_case; class names stay aligned with .NET.
  • The schedule entity name is lowercased to schedule, so list filtering uses
    the @schedule@<prefix> instance-ID prefix.

Add a recurring schedule feature (durabletask.scheduled) for parity with
durabletask-dotnet, built on durable entities and a helper orchestrator.
Enable it on a worker with configure_scheduled_tasks(worker) and manage
schedules from the client via ScheduledTaskClient / ScheduleClient
(create, describe, list, update, pause, resume, delete).

To support the schedule entity's self-rearming, add an optional signal_time
parameter to entity, orchestration-context, and client signal_entity methods,
and make the in-memory backend honor delayed (scheduled) entity signals.

Includes unit tests, in-memory E2E tests, and live DTS E2E tests for both the
schedule feature and delayed signals, plus an example and changelog entries.
Copilot AI review requested due to automatic review settings June 26, 2026 17:08

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new scheduled tasks capability (durabletask.scheduled) to the core Durable Task Python SDK, implemented on top of durable entities plus a helper orchestrator. As a prerequisite, it also adds delayed (scheduled) entity signals end-to-end (entity APIs, orchestration context, client APIs, and the in-memory test backend) to support time-based schedule re-arming.

Changes:

  • Added the durabletask.scheduled package (schedule entity, client surface, models/DTOs, transition rules, registration helper, and error types).
  • Added a signal_time parameter across entity/orchestration/client signal_entity APIs and mapped it to the protobuf scheduledTime field.
  • Updated the in-memory backend to defer entity operations scheduled in the future; added unit/E2E coverage plus a runnable example and changelog entry.

Reviewed changes

Copilot reviewed 28 out of 28 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
tests/durabletask/scheduled/test_transitions.py Unit tests for schedule status transition rules.
tests/durabletask/scheduled/test_scheduled_e2e.py In-memory E2E coverage for schedule lifecycle and runs.
tests/durabletask/scheduled/test_schedule_entity.py Unit tests for schedule entity behavior/actions via executor harness.
tests/durabletask/scheduled/test_models.py Unit tests for scheduled-task models, validation, and serialization.
tests/durabletask/scheduled/init.py Test package init.
tests/durabletask/entities/test_delayed_signals_e2e.py In-memory E2E coverage for delayed entity signals (client + orchestrator).
tests/durabletask-azuremanaged/scheduled/test_dts_scheduled_e2e.py Live DTS E2E coverage for scheduled tasks.
tests/durabletask-azuremanaged/scheduled/init.py Azuremanaged test package init.
tests/durabletask-azuremanaged/entities/test_dts_delayed_signals_e2e.py Live DTS E2E coverage for delayed entity signals.
examples/scheduled_tasks.py End-to-end sample showing schedule creation and management.
durabletask/worker.py Extends orchestration-context signal_entity plumbing to accept signal_time.
durabletask/testing/in_memory_backend.py Implements delayed entity operation delivery in the in-memory backend.
durabletask/task.py Updates orchestration-context API contract/docs to include signal_time.
durabletask/scheduled/transitions.py Defines valid schedule state transitions keyed by operation.
durabletask/scheduled/schedule_status.py Introduces ScheduleStatus enum.
durabletask/scheduled/schedule_entity.py Implements the schedule entity state machine and re-arming via delayed signals.
durabletask/scheduled/registration.py Adds configure_scheduled_tasks(worker) registration helper.
durabletask/scheduled/orchestrator.py Adds helper orchestrator for “awaitable” schedule entity operations.
durabletask/scheduled/models.py Adds public DTOs/options plus internal config/state serialization.
durabletask/scheduled/exceptions.py Adds scheduled-task exception types.
durabletask/scheduled/client.py Adds ScheduledTaskClient and per-schedule ScheduleClient.
durabletask/scheduled/init.py Exposes the scheduled tasks public surface.
durabletask/internal/helpers.py Adds scheduled time support to entity-signal orchestrator actions.
durabletask/internal/client_helpers.py Adds scheduled time support to client SignalEntityRequest.
durabletask/entities/entity_context.py Adds signal_time to entity-side signal_entity API and action emission.
durabletask/entities/durable_entity.py Adds signal_time to DurableEntity.signal_entity facade.
durabletask/client.py Adds signal_time to sync + async client signal_entity APIs.
CHANGELOG.md Documents the new scheduled tasks feature and delayed entity signals.

Comment on lines +1805 to +1808
with self._lock:
if self._shutdown_event.is_set():
return
self._queue_entity_operation(entity_id, event)
input: Any | None = None,
signal_time: datetime | None = None) -> pb.SignalEntityRequest:
"""Build a SignalEntityRequest for signaling an entity."""
scheduled_time = helpers.new_timestamp(signal_time) if signal_time is not None else None
Comment on lines +107 to +110
scheduled_time: timestamp_pb2.Timestamp | None = None
if signal_time is not None:
scheduled_time = timestamp_pb2.Timestamp()
scheduled_time.FromDatetime(signal_time)
Comment on lines +260 to 264
scheduled_timestamp: timestamp_pb2.Timestamp | None = None
if scheduled_time is not None:
scheduled_timestamp = timestamp_pb2.Timestamp()
scheduled_timestamp.FromDatetime(scheduled_time)
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent(
Comment on lines +128 to +145
def list_schedules(self, filter: ScheduleQuery | None = None) -> list[ScheduleDescription]:
"""List schedules matching the given filter criteria."""
prefix = filter.schedule_id_prefix if filter and filter.schedule_id_prefix else ""
page_size = filter.page_size if filter and filter.page_size else ScheduleQuery.DEFAULT_PAGE_SIZE
query = EntityQuery(
instance_id_starts_with=f"@{ENTITY_NAME}@{prefix}",
include_state=True,
page_size=page_size,
)
results: list[ScheduleDescription] = []
for metadata in self._client.get_all_entities(query):
state = _parse_state(metadata.get_state())
if state is None or state.schedule_configuration is None:
continue
if not self._matches_filter(state, filter):
continue
results.append(state.to_description())
return results
Comment on lines +148 to +158
def _matches_filter(state: ScheduleState, filter: ScheduleQuery | None) -> bool:
if filter is None:
return True
if filter.status is not None and state.status != filter.status:
return False
created_at = state.schedule_created_at
if filter.created_from is not None and not (created_at and created_at > filter.created_from):
return False
if filter.created_to is not None and not (created_at and created_at < filter.created_to):
return False
return True
Comment on lines +5 to +6
from durabletask.scheduled.orchestrator import \
execute_schedule_operation_orchestrator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants