-
Notifications
You must be signed in to change notification settings - Fork 57
Expand file tree
/
Copy pathScheduleHandle.php
More file actions
342 lines (306 loc) · 13.7 KB
/
ScheduleHandle.php
File metadata and controls
342 lines (306 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
<?php
declare(strict_types=1);
namespace Temporal\Client\Schedule;
use Google\Protobuf\Timestamp;
use Temporal\Api\Common\V1\SearchAttributes;
use Temporal\Api\Schedule\V1\BackfillRequest;
use Temporal\Api\Schedule\V1\SchedulePatch;
use Temporal\Api\Schedule\V1\TriggerImmediatelyRequest;
use Temporal\Api\Workflowservice\V1\DeleteScheduleRequest;
use Temporal\Api\Workflowservice\V1\DescribeScheduleRequest;
use Temporal\Api\Workflowservice\V1\ListScheduleMatchingTimesRequest;
use Temporal\Api\Workflowservice\V1\PatchScheduleRequest;
use Temporal\Api\Workflowservice\V1\UpdateScheduleRequest;
use Temporal\Client\ClientOptions;
use Temporal\Client\Common\ClientContextTrait;
use Temporal\Client\GRPC\ServiceClientInterface;
use Temporal\Client\GRPC\StatusCode;
use Temporal\Client\Schedule\Info\ScheduleDescription;
use Temporal\Client\Schedule\Policy\ScheduleOverlapPolicy;
use Temporal\Client\Schedule\Update\ScheduleUpdate;
use Temporal\Client\Schedule\Update\ScheduleUpdateInput;
use Temporal\Common\Uuid;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Exception\Client\ServiceClientException;
use Temporal\Exception\InvalidArgumentException;
use Temporal\Internal\Mapper\ScheduleMapper;
use Temporal\Internal\Marshaller\MarshallerInterface;
use Temporal\Internal\Marshaller\ProtoToArrayConverter;
final class ScheduleHandle
{
use ClientContextTrait;
/**
* Maximum number of retry attempts for conflict token mismatches when using a closure updater.
*/
private const CONFLICT_TOKEN_MAX_RETRIES = 10;
/**
* Substring matched against a {@see ServiceClientException} message to detect a conflict
* token mismatch returned by the CHASM scheduler. The full server error is
* `serviceerror.NewFailedPrecondition("mismatched conflict token")` — see
* `chasm/lib/scheduler/scheduler.go` (`ErrConflictTokenMismatch`) in temporalio/temporal.
* The legacy V1 signal-based scheduler silently drops the update instead and does not
* produce this error at all.
*/
private const CONFLICT_TOKEN_ERROR_MARKER = 'conflict token';
public function __construct(
ServiceClientInterface $client,
private readonly ClientOptions $clientOptions,
private readonly DataConverterInterface $converter,
private readonly MarshallerInterface $marshaller,
private readonly ProtoToArrayConverter $protoConverter,
private readonly string $namespace,
private readonly string $id,
) {
$this->client = $client;
}
/**
* GetID returns the schedule ID associated with this handle.
*/
public function getID(): string
{
return $this->id;
}
/**
* Update the Schedule.
*
* There are two forms:
*
* - **Closure form** — the closure receives a {@see ScheduleUpdateInput} carrying the current
* {@see ScheduleDescription} and must return a {@see ScheduleUpdate}. The SDK automatically
* fetches a fresh description on every attempt and uses its conflict token, so concurrent
* updates from other clients are retried transparently up to {@see self::CONFLICT_TOKEN_MAX_RETRIES}
* times. If all retries are exhausted, a {@see ServiceClientException} with
* {@see StatusCode::FAILED_PRECONDITION} is raised.
*
* - **Direct form** — a pre-built {@see Schedule} is sent as-is. The optional `$conflictToken`
* argument is the opaque value from {@see ScheduleDescription::$conflictToken}; if supplied,
* the server rejects the update when the schedule has been modified since the describe that
* produced the token. No retry is performed; the caller handles conflicts.
*
* **IMPORTANT:** The closure may be invoked multiple times (once per retry), so it MUST be
* idempotent and free of side effects outside of returning the {@see ScheduleUpdate}.
* Do not increment counters, log business events, or mutate external state from inside it.
*
* Examples:
*
* Add a search attribute using the closure form (auto-retries on conflict):
* ```
* $handle->update(function (ScheduleUpdateInput $input): ScheduleUpdate {
* return ScheduleUpdate::new($input->description->schedule)
* ->withSearchAttributes(
* $input->description->searchAttributes
* ->withValue('foo', 'bar')
* ->withValue('bar', 42),
* );
* });
* ```
*
* Pause a described schedule with an explicit conflict token (no retry):
* ```
* $description = $handle->describe();
* $schedule = $description->schedule;
* $handle->update(
* $schedule->withState($schedule->state->withPaused(true)),
* $description->conflictToken,
* );
* ```
*
* @param Schedule|\Closure(ScheduleUpdateInput): ScheduleUpdate $schedule The new Schedule to
* update to, or an idempotent closure that will be called with the current
* {@see ScheduleUpdateInput} and must return a {@see ScheduleUpdate}.
* @param string|null $conflictToken Only valid with the direct form. Can be the value of
* {@see ScheduleDescription::$conflictToken}, causing the request to fail if the
* schedule has been modified between the {@see self::describe()} and this update.
* If missing, the schedule will be updated unconditionally. MUST be `null` when
* `$schedule` is a closure — in the closure form the token is managed internally by
* the retry loop; passing a non-null value throws {@see InvalidArgumentException}.
*
* @throws InvalidArgumentException When a non-null `$conflictToken` is passed together with a
* closure `$schedule`.
* @throws ServiceClientException On a non-retryable server error, or after retries are
* exhausted in the closure form.
*/
public function update(
Schedule|\Closure $schedule,
?string $conflictToken = null,
): void {
if ($schedule instanceof \Closure) {
if ($conflictToken !== null) {
throw new InvalidArgumentException(
'Passing a conflict token together with a closure updater is not supported: '
. 'in closure form the token is fetched from describe() on every retry. '
. 'Use the direct form `update(Schedule, ?string $conflictToken)` if you need '
. 'to pin the update to a specific token.',
);
}
$this->updateWithClosure($schedule);
return;
}
$this->doUpdate($schedule, $conflictToken);
}
/**
* Describe fetches the Schedule's description from the Server
*/
public function describe(): ScheduleDescription
{
$request = (new DescribeScheduleRequest())
->setScheduleId($this->id)
->setNamespace($this->namespace);
$response = $this->client->DescribeSchedule($request);
$values = $this->protoConverter->convert($response);
$dto = new ScheduleDescription();
return $this->marshaller->unmarshal($values, $dto);
}
/**
* Lists matching times within a range.
*
* @return \Countable&\Traversable<int, \DateTimeImmutable>
*/
public function listScheduleMatchingTimes(
\DateTimeInterface $startTime,
\DateTimeInterface $endTime,
): \Countable&\Traversable {
$request = (new ListScheduleMatchingTimesRequest())
->setScheduleId($this->id)
->setNamespace($this->namespace)
->setStartTime((new Timestamp())->setSeconds($startTime->getTimestamp()))
->setEndTime((new Timestamp())->setSeconds($endTime->getTimestamp()));
$response = $this->client->ListScheduleMatchingTimes($request);
/** @var list<\DateTimeInterface> $list */
$list = [];
foreach ($response->getStartTime() as $timestamp) {
$list[] = new \DateTimeImmutable("@{$timestamp->getSeconds()}");
}
return new \ArrayIterator($list);
}
/**
* Backfill the schedule by going though the specified time periods and taking Actions as if that
* time passed by right now, all at once.
*
* @param iterable<BackfillPeriod> $periods Time periods to backfill the schedule.
*/
public function backfill(iterable $periods): void
{
$backfill = [];
foreach ($periods as $period) {
$period instanceof BackfillPeriod or throw new InvalidArgumentException(
'Backfill periods must be of type BackfillPeriod.',
);
$backfill[] = (new BackfillRequest())
->setOverlapPolicy($period->overlapPolicy->value)
->setStartTime((new Timestamp())->setSeconds($period->startTime->getTimestamp()))
->setEndTime((new Timestamp())->setSeconds($period->endTime->getTimestamp()));
}
$request = $this->patch((new SchedulePatch())->setBackfillRequest($backfill));
$this->client->PatchSchedule($request);
}
/**
* Trigger an Action to be taken immediately. Will override the schedules default policy
* with the one specified here. If overlap is {@see ScheduleOverlapPolicy::Unspecified} the Schedule
* policy will be used.
*
* @param ScheduleOverlapPolicy $overlapPolicy If specified, policy to override the Schedules
* default overlap policy.
*/
public function trigger(ScheduleOverlapPolicy $overlapPolicy = ScheduleOverlapPolicy::Unspecified): void
{
$request = $this->patch(
(new SchedulePatch())->setTriggerImmediately(
(new TriggerImmediatelyRequest())->setOverlapPolicy($overlapPolicy->value),
),
);
$this->client->PatchSchedule($request);
}
/**
* Pause the Schedule will also overwrite the Schedules current note with the new note.
*
* @param string $note Informative human-readable message with contextual notes.
* @psalm-assert non-empty-string $note
*/
public function pause(string $note = 'Paused via PHP SDK'): void
{
$note === '' and throw new InvalidArgumentException('Pause note cannot be empty.');
$request = $this->patch((new SchedulePatch())->setPause($note));
$this->client->PatchSchedule($request);
}
/**
* Unpause the Schedule will also overwrite the Schedules current note with the new note.
*
* @param string $note Informative human-readable message with contextual notes.
* @psalm-assert non-empty-string $note
*/
public function unpause(string $note = 'Unpaused via PHP SDK'): void
{
$note === '' and throw new InvalidArgumentException('Unpause note cannot be empty.');
$request = $this->patch((new SchedulePatch())->setUnpause($note));
$this->client->PatchSchedule($request);
}
/**
* Delete the Schedule.
*/
public function delete(): void
{
$request = (new DeleteScheduleRequest())
->setNamespace($this->namespace)
->setScheduleId($this->id)
->setIdentity($this->clientOptions->identity);
$this->client->DeleteSchedule($request);
}
private function updateWithClosure(\Closure $updater): void
{
for ($attempt = 0; $attempt < self::CONFLICT_TOKEN_MAX_RETRIES; $attempt++) {
$description = $this->describe();
$update = $updater(new ScheduleUpdateInput($description));
$update instanceof ScheduleUpdate or throw new InvalidArgumentException(
'Closure for the schedule update method must return a ScheduleUpdate.',
);
try {
$this->doUpdate($update->schedule, $description->conflictToken, $update);
return;
} catch (ServiceClientException $e) {
if ($e->getCode() !== StatusCode::FAILED_PRECONDITION
|| !\str_contains($e->getMessage(), self::CONFLICT_TOKEN_ERROR_MARKER)
) {
throw $e;
}
// Conflict token mismatch — retry with a fresh describe
}
}
throw new ServiceClientException((object) [
'code' => StatusCode::FAILED_PRECONDITION,
'details' => \sprintf(
'Schedule update conflict token mismatch after %d retries',
self::CONFLICT_TOKEN_MAX_RETRIES,
),
'metadata' => [],
]);
}
private function doUpdate(Schedule $schedule, ?string $conflictToken, ?ScheduleUpdate $update = null): void
{
$request = (new UpdateScheduleRequest())
->setScheduleId($this->id)
->setNamespace($this->namespace)
->setConflictToken((string) $conflictToken)
->setIdentity($this->clientOptions->identity)
->setRequestId(Uuid::v4());
// Search attributes from closure-based update
if ($update?->searchAttributes !== null) {
$update->searchAttributes->setDataConverter($this->converter);
$payloads = $update->searchAttributes->toPayloadArray();
$encodedSa = (new SearchAttributes())->setIndexedFields($payloads);
$request->setSearchAttributes($encodedSa);
}
$mapper = new ScheduleMapper($this->converter, $this->marshaller);
$scheduleMessage = $mapper->toMessage($schedule);
$request->setSchedule($scheduleMessage);
$this->client->UpdateSchedule($request);
}
private function patch(SchedulePatch $patch): PatchScheduleRequest
{
return (new PatchScheduleRequest())
->setScheduleId($this->id)
->setNamespace($this->namespace)
->setRequestId(Uuid::v4())
->setPatch($patch);
}
}