Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,20 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

N/A
ADDED

- Added `durabletask.scheduled`, a recurring schedule feature built on durable
entities. Use `configure_scheduled_tasks(worker)` to enable it on a worker,
then manage schedules from the client via `ScheduledTaskClient` (and the
per-schedule `ScheduleClient`). Supports creating, describing, listing,
updating, pausing, resuming, and deleting schedules with configurable
`interval`, `start_at`, `end_at`, and `start_immediately_if_late` options.
- Added an optional `signal_time` parameter to `EntityContext.signal_entity`
and `DurableEntity.signal_entity`, allowing an entity signal to be scheduled
for future delivery.
- Added an optional `signal_time` parameter to `OrchestrationContext.signal_entity`
and to the client `signal_entity` methods (sync and async), allowing entity
signals to be scheduled for future delivery from orchestrations and clients.

## v1.6.0

Expand Down
10 changes: 6 additions & 4 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,8 +719,9 @@ def purge_orchestrations_by(self,
def signal_entity(self,
entity_instance_id: EntityInstanceId,
operation_name: str,
input: Any | None = None) -> None:
req = build_signal_entity_req(entity_instance_id, operation_name, input)
input: Any | None = None,
signal_time: datetime | None = None) -> None:
req = build_signal_entity_req(entity_instance_id, operation_name, input, signal_time)
self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.")
if self._payload_store is not None:
payload_helpers.externalize_payloads(
Expand Down Expand Up @@ -1199,8 +1200,9 @@ async def purge_orchestrations_by(self,
async def signal_entity(self,
entity_instance_id: EntityInstanceId,
operation_name: str,
input: Any | None = None) -> None:
req = build_signal_entity_req(entity_instance_id, operation_name, input)
input: Any | None = None,
signal_time: datetime | None = None) -> None:
req = build_signal_entity_req(entity_instance_id, operation_name, input, signal_time)
self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.")
if self._payload_store is not None:
await payload_helpers.externalize_payloads_async(
Expand Down
11 changes: 9 additions & 2 deletions durabletask/entities/durable_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the MIT License.

from typing import Any, TypeVar, overload
from datetime import datetime

from durabletask.entities.entity_context import EntityContext
from durabletask.entities.entity_instance_id import EntityInstanceId
Expand Down Expand Up @@ -52,7 +53,9 @@ def set_state(self, state: Any):
"""
self.entity_context.set_state(state)

def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, input: Any | None = None) -> None:
def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str,
input: Any | None = None,
signal_time: datetime | None = None) -> None:
"""Signal another entity to perform an operation.

Parameters
Expand All @@ -63,8 +66,12 @@ def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, in
The operation to perform on the entity.
input : Any, optional
The input to provide to the entity for the operation.
signal_time : datetime, optional
The time at which the signal should be delivered. If None, the signal is
delivered as soon as possible. Use this to schedule a future operation,
for example to have an entity wake itself up at a later time.
"""
self.entity_context.signal_entity(entity_instance_id, operation, input)
self.entity_context.signal_entity(entity_instance_id, operation, input, signal_time)

def schedule_new_orchestration(self, orchestration_name: str, input: Any | None = None, instance_id: str | None = None) -> str:
"""Schedule a new orchestration instance.
Expand Down
16 changes: 14 additions & 2 deletions durabletask/entities/entity_context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from datetime import datetime
from typing import Any, TypeVar, overload
import uuid
from google.protobuf import timestamp_pb2
from durabletask.entities.entity_instance_id import EntityInstanceId
from durabletask.internal import helpers, shared
from durabletask.internal.entity_state_shim import StateShim
Expand Down Expand Up @@ -83,7 +85,9 @@ def set_state(self, new_state: Any) -> None:
"""
self._state.set_state(new_state)

def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, input: Any | None = None) -> None:
def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str,
input: Any | None = None,
signal_time: datetime | None = None) -> None:
"""Signal another entity to perform an operation.

Parameters
Expand All @@ -94,15 +98,23 @@ def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, in
The operation to perform on the entity.
input : Any, optional
The input to provide to the entity for the operation.
signal_time : datetime, optional
The time at which the signal should be delivered. If None, the signal is
delivered as soon as possible. Use this to schedule a future operation,
for example to have an entity wake itself up at a later time.
"""
encoded_input: str | None = shared.to_json(input) if input is not None else None
scheduled_time: timestamp_pb2.Timestamp | None = None
if signal_time is not None:
scheduled_time = timestamp_pb2.Timestamp()
scheduled_time.FromDatetime(signal_time)
Comment on lines +107 to +110
self._state.add_operation_action(
pb.OperationAction(
sendSignal=pb.SendSignalAction(
instanceId=str(entity_instance_id),
name=operation,
input=helpers.get_string_value(encoded_input),
scheduledTime=None,
scheduledTime=scheduled_time,
requestTime=None,
parentTraceContext=None,
)
Expand Down
6 changes: 4 additions & 2 deletions durabletask/internal/client_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,16 @@ def build_terminate_req(
def build_signal_entity_req(
entity_instance_id: EntityInstanceId,
operation_name: str,
input: Any | None = None) -> pb.SignalEntityRequest:
input: Any | None = None,
signal_time: datetime | None = None) -> pb.SignalEntityRequest:
"""Build a SignalEntityRequest for signaling an entity."""
scheduled_time = helpers.new_timestamp(signal_time) if signal_time is not None else None
return pb.SignalEntityRequest(
instanceId=str(entity_instance_id),
name=operation_name,
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
requestId=str(uuid.uuid4()),
scheduledTime=None,
scheduledTime=scheduled_time,
parentTraceContext=None,
requestTime=helpers.new_timestamp(datetime.now(timezone.utc))
)
9 changes: 7 additions & 2 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,16 @@ def new_signal_entity_action(id: int,
entity_id: EntityInstanceId,
operation: str,
encoded_input: str | None,
request_id: str) -> pb.OrchestratorAction:
request_id: str,
scheduled_time: datetime | None = None) -> pb.OrchestratorAction:
scheduled_timestamp: timestamp_pb2.Timestamp | None = None
if scheduled_time is not None:
scheduled_timestamp = timestamp_pb2.Timestamp()
scheduled_timestamp.FromDatetime(scheduled_time)
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent(
Comment on lines +260 to 264
requestId=request_id,
operation=operation,
scheduledTime=None,
scheduledTime=scheduled_timestamp,
input=get_string_value(encoded_input),
targetInstanceId=get_string_value(str(entity_id)),
)))
Expand Down
38 changes: 38 additions & 0 deletions durabletask/scheduled/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Scheduled tasks support for the Durable Task SDK.

This package provides a recurring schedule feature built on top of durable
entities and a helper orchestrator. Register the entity and orchestrator with a
worker via :func:`configure_scheduled_tasks`, then manage schedules from the
client via :class:`ScheduledTaskClient`.
"""

from durabletask.scheduled.client import ScheduleClient, ScheduledTaskClient
from durabletask.scheduled.exceptions import (ScheduleClientValidationError,
ScheduleError,
ScheduleInvalidTransitionError,
ScheduleNotFoundError)
from durabletask.scheduled.models import (ScheduleCreationOptions,
ScheduleDescription, ScheduleQuery,
ScheduleUpdateOptions)
from durabletask.scheduled.registration import configure_scheduled_tasks
from durabletask.scheduled.schedule_status import ScheduleStatus

__all__ = [
"ScheduledTaskClient",
"ScheduleClient",
"ScheduleCreationOptions",
"ScheduleUpdateOptions",
"ScheduleDescription",
"ScheduleQuery",
"ScheduleStatus",
"ScheduleError",
"ScheduleNotFoundError",
"ScheduleClientValidationError",
"ScheduleInvalidTransitionError",
"configure_scheduled_tasks",
]

PACKAGE_NAME = "durabletask.scheduled"
158 changes: 158 additions & 0 deletions durabletask/scheduled/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import logging
from dataclasses import asdict
from typing import Any

from durabletask.client import (EntityQuery, OrchestrationStatus,
TaskHubGrpcClient)
from durabletask.entities import EntityInstanceId
from durabletask.internal import shared
from durabletask.scheduled import transitions
from durabletask.scheduled.exceptions import ScheduleNotFoundError
from durabletask.scheduled.models import (ScheduleCreationOptions,
ScheduleDescription, ScheduleQuery,
ScheduleState, ScheduleUpdateOptions)
from durabletask.scheduled.orchestrator import (
ScheduleOperationRequest, execute_schedule_operation_orchestrator)
from durabletask.scheduled.schedule_entity import (DELETE_OPERATION,
ENTITY_NAME)

logger = logging.getLogger("durabletask.scheduled")


def _parse_state(serialized_state: Any) -> ScheduleState | None:
if serialized_state is None:
return None
data = serialized_state
if isinstance(data, str):
if not data.strip():
# A deleted (or never-initialized) entity reports empty state.
return None
data = shared.from_json(data)
if isinstance(data, dict):
return ScheduleState.from_dict(data)
return None


class ScheduleClient:
"""Client for managing a single schedule instance."""

def __init__(self, client: TaskHubGrpcClient, schedule_id: str,
*, operation_timeout: float = 60):
if not schedule_id:
raise ValueError("schedule_id cannot be empty.")
self._client = client
self._schedule_id = schedule_id
self._entity_id = EntityInstanceId(ENTITY_NAME, schedule_id)
self._operation_timeout = operation_timeout

@property
def schedule_id(self) -> str:
"""Gets the ID of this schedule."""
return self._schedule_id

def _run_operation(self, operation_name: str, input: Any | None = None) -> None:
request = ScheduleOperationRequest(
entity_id=str(self._entity_id),
operation_name=operation_name,
input=input,
)
instance_id = self._client.schedule_new_orchestration(
execute_schedule_operation_orchestrator, input=asdict(request))
state = self._client.wait_for_orchestration_completion(
instance_id, timeout=self._operation_timeout)
if state is None or state.runtime_status != OrchestrationStatus.COMPLETED:
failure = state.failure_details if state else None
message = failure.message if failure else "unknown error"
raise RuntimeError(
f"Failed to '{operation_name}' schedule '{self._schedule_id}': {message}")

def create(self, options: ScheduleCreationOptions) -> None:
"""Create or update this schedule with the given configuration."""
self._run_operation(transitions.CREATE_SCHEDULE, options.to_dict())

def update(self, options: ScheduleUpdateOptions) -> None:
"""Update this schedule's configuration."""
self._run_operation(transitions.UPDATE_SCHEDULE, options.to_dict())

def pause(self) -> None:
"""Pause this schedule."""
self._run_operation(transitions.PAUSE_SCHEDULE)

def resume(self) -> None:
"""Resume this schedule."""
self._run_operation(transitions.RESUME_SCHEDULE)

def delete(self) -> None:
"""Delete this schedule."""
self._run_operation(DELETE_OPERATION)

def describe(self) -> ScheduleDescription:
"""Retrieve the current details of this schedule."""
metadata = self._client.get_entity(self._entity_id, include_state=True)
if metadata is None:
raise ScheduleNotFoundError(self._schedule_id)
state = _parse_state(metadata.get_state())
if state is None:
raise ScheduleNotFoundError(self._schedule_id)
return state.to_description()


class ScheduledTaskClient:
"""Client for managing scheduled tasks in a Durable Task application."""

def __init__(self, client: TaskHubGrpcClient, *, operation_timeout: float = 60):
self._client = client
self._operation_timeout = operation_timeout

def get_schedule_client(self, schedule_id: str) -> ScheduleClient:
"""Get a handle to manage a specific schedule."""
return ScheduleClient(self._client, schedule_id,
operation_timeout=self._operation_timeout)

def create_schedule(self, options: ScheduleCreationOptions) -> ScheduleClient:
"""Create a new schedule and return a client for managing it."""
schedule_client = self.get_schedule_client(options.schedule_id)
schedule_client.create(options)
return schedule_client

def get_schedule(self, schedule_id: str) -> ScheduleDescription | None:
"""Get a schedule description by ID, or None if it does not exist."""
try:
return self.get_schedule_client(schedule_id).describe()
except ScheduleNotFoundError:
return None

def list_schedules(self, filter: ScheduleQuery | None = None) -> list[ScheduleDescription]:
"""List schedules matching the given filter criteria."""
prefix = filter.schedule_id_prefix if filter and filter.schedule_id_prefix else ""
page_size = filter.page_size if filter and filter.page_size else ScheduleQuery.DEFAULT_PAGE_SIZE
query = EntityQuery(
instance_id_starts_with=f"@{ENTITY_NAME}@{prefix}",
include_state=True,
page_size=page_size,
)
results: list[ScheduleDescription] = []
for metadata in self._client.get_all_entities(query):
state = _parse_state(metadata.get_state())
if state is None or state.schedule_configuration is None:
continue
if not self._matches_filter(state, filter):
continue
results.append(state.to_description())
return results
Comment on lines +128 to +145

@staticmethod
def _matches_filter(state: ScheduleState, filter: ScheduleQuery | None) -> bool:
if filter is None:
return True
if filter.status is not None and state.status != filter.status:
return False
created_at = state.schedule_created_at
if filter.created_from is not None and not (created_at and created_at > filter.created_from):
return False
if filter.created_to is not None and not (created_at and created_at < filter.created_to):
return False
return True
Comment on lines +148 to +158
36 changes: 36 additions & 0 deletions durabletask/scheduled/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.


class ScheduleError(Exception):
"""Base class for schedule-related errors."""


class ScheduleNotFoundError(ScheduleError):
"""Raised when a requested schedule does not exist."""

def __init__(self, schedule_id: str):
self.schedule_id = schedule_id
super().__init__(f"Schedule with ID '{schedule_id}' was not found.")


class ScheduleClientValidationError(ScheduleError):
"""Raised when a schedule operation fails client-side validation."""

def __init__(self, schedule_id: str, message: str):
self.schedule_id = schedule_id
super().__init__(f"Validation failed for schedule '{schedule_id}': {message}")


class ScheduleInvalidTransitionError(ScheduleError):
"""Raised when an operation is not valid for the schedule's current status."""

def __init__(self, schedule_id: str, from_status: object, to_status: object, operation_name: str):
self.schedule_id = schedule_id
self.from_status = from_status
self.to_status = to_status
self.operation_name = operation_name
super().__init__(
f"Invalid state transition for schedule '{schedule_id}': operation "
f"'{operation_name}' cannot transition from '{from_status}' to '{to_status}'."
)
Loading
Loading