-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathclient.py
More file actions
158 lines (134 loc) · 6.59 KB
/
Copy pathclient.py
File metadata and controls
158 lines (134 loc) · 6.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import logging
from dataclasses import asdict
from typing import Any
from durabletask.client import (EntityQuery, OrchestrationStatus,
TaskHubGrpcClient)
from durabletask.entities import EntityInstanceId
from durabletask.internal import shared
from durabletask.scheduled import transitions
from durabletask.scheduled.exceptions import ScheduleNotFoundError
from durabletask.scheduled.models import (ScheduleCreationOptions,
ScheduleDescription, ScheduleQuery,
ScheduleState, ScheduleUpdateOptions)
from durabletask.scheduled.orchestrator import (
ScheduleOperationRequest, execute_schedule_operation_orchestrator)
from durabletask.scheduled.schedule_entity import (DELETE_OPERATION,
ENTITY_NAME)
logger = logging.getLogger("durabletask.scheduled")
def _parse_state(serialized_state: Any) -> ScheduleState | None:
if serialized_state is None:
return None
data = serialized_state
if isinstance(data, str):
if not data.strip():
# A deleted (or never-initialized) entity reports empty state.
return None
data = shared.from_json(data)
if isinstance(data, dict):
return ScheduleState.from_dict(data)
return None
class ScheduleClient:
"""Client for managing a single schedule instance."""
def __init__(self, client: TaskHubGrpcClient, schedule_id: str,
*, operation_timeout: float = 60):
if not schedule_id:
raise ValueError("schedule_id cannot be empty.")
self._client = client
self._schedule_id = schedule_id
self._entity_id = EntityInstanceId(ENTITY_NAME, schedule_id)
self._operation_timeout = operation_timeout
@property
def schedule_id(self) -> str:
"""Gets the ID of this schedule."""
return self._schedule_id
def _run_operation(self, operation_name: str, input: Any | None = None) -> None:
request = ScheduleOperationRequest(
entity_id=str(self._entity_id),
operation_name=operation_name,
input=input,
)
instance_id = self._client.schedule_new_orchestration(
execute_schedule_operation_orchestrator, input=asdict(request))
state = self._client.wait_for_orchestration_completion(
instance_id, timeout=self._operation_timeout)
if state is None or state.runtime_status != OrchestrationStatus.COMPLETED:
failure = state.failure_details if state else None
message = failure.message if failure else "unknown error"
raise RuntimeError(
f"Failed to '{operation_name}' schedule '{self._schedule_id}': {message}")
def create(self, options: ScheduleCreationOptions) -> None:
"""Create or update this schedule with the given configuration."""
self._run_operation(transitions.CREATE_SCHEDULE, options.to_dict())
def update(self, options: ScheduleUpdateOptions) -> None:
"""Update this schedule's configuration."""
self._run_operation(transitions.UPDATE_SCHEDULE, options.to_dict())
def pause(self) -> None:
"""Pause this schedule."""
self._run_operation(transitions.PAUSE_SCHEDULE)
def resume(self) -> None:
"""Resume this schedule."""
self._run_operation(transitions.RESUME_SCHEDULE)
def delete(self) -> None:
"""Delete this schedule."""
self._run_operation(DELETE_OPERATION)
def describe(self) -> ScheduleDescription:
"""Retrieve the current details of this schedule."""
metadata = self._client.get_entity(self._entity_id, include_state=True)
if metadata is None:
raise ScheduleNotFoundError(self._schedule_id)
state = _parse_state(metadata.get_state())
if state is None:
raise ScheduleNotFoundError(self._schedule_id)
return state.to_description()
class ScheduledTaskClient:
"""Client for managing scheduled tasks in a Durable Task application."""
def __init__(self, client: TaskHubGrpcClient, *, operation_timeout: float = 60):
self._client = client
self._operation_timeout = operation_timeout
def get_schedule_client(self, schedule_id: str) -> ScheduleClient:
"""Get a handle to manage a specific schedule."""
return ScheduleClient(self._client, schedule_id,
operation_timeout=self._operation_timeout)
def create_schedule(self, options: ScheduleCreationOptions) -> ScheduleClient:
"""Create a new schedule and return a client for managing it."""
schedule_client = self.get_schedule_client(options.schedule_id)
schedule_client.create(options)
return schedule_client
def get_schedule(self, schedule_id: str) -> ScheduleDescription | None:
"""Get a schedule description by ID, or None if it does not exist."""
try:
return self.get_schedule_client(schedule_id).describe()
except ScheduleNotFoundError:
return None
def list_schedules(self, filter: ScheduleQuery | None = None) -> list[ScheduleDescription]:
"""List schedules matching the given filter criteria."""
prefix = filter.schedule_id_prefix if filter and filter.schedule_id_prefix else ""
page_size = filter.page_size if filter and filter.page_size else ScheduleQuery.DEFAULT_PAGE_SIZE
query = EntityQuery(
instance_id_starts_with=f"@{ENTITY_NAME}@{prefix}",
include_state=True,
page_size=page_size,
)
results: list[ScheduleDescription] = []
for metadata in self._client.get_all_entities(query):
state = _parse_state(metadata.get_state())
if state is None or state.schedule_configuration is None:
continue
if not self._matches_filter(state, filter):
continue
results.append(state.to_description())
return results
@staticmethod
def _matches_filter(state: ScheduleState, filter: ScheduleQuery | None) -> bool:
if filter is None:
return True
if filter.status is not None and state.status != filter.status:
return False
created_at = state.schedule_created_at
if filter.created_from is not None and not (created_at and created_at > filter.created_from):
return False
if filter.created_to is not None and not (created_at and created_at < filter.created_to):
return False
return True