Skip to content

Commit de7d3ba

Browse files
Add PHP invocable activity handler support
1 parent 88122e4 commit de7d3ba

4 files changed

Lines changed: 1050 additions & 0 deletions

File tree

Lines changed: 391 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,391 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Support;
6+
7+
final class ExternalTaskInput
8+
{
9+
public const SCHEMA = 'durable-workflow.v2.external-task-input';
10+
11+
public const VERSION = 1;
12+
13+
public const KIND_ACTIVITY_TASK = 'activity_task';
14+
15+
public const KIND_WORKFLOW_TASK = 'workflow_task';
16+
17+
/**
18+
* @param array<string, mixed> $task
19+
* @param array<string, mixed> $workflow
20+
* @param array<string, mixed> $lease
21+
* @param array<string, mixed> $payloads
22+
* @param array<string, mixed> $headers
23+
* @param array<string, mixed>|null $deadlines
24+
* @param array<string, mixed>|null $history
25+
*/
26+
private function __construct(
27+
public readonly string $kind,
28+
public readonly array $task,
29+
public readonly array $workflow,
30+
public readonly array $lease,
31+
public readonly array $payloads,
32+
public readonly array $headers,
33+
public readonly ?array $deadlines = null,
34+
public readonly ?array $history = null,
35+
) {
36+
}
37+
38+
/**
39+
* @param array<string, mixed> $envelope
40+
*/
41+
public static function parse(array $envelope): self
42+
{
43+
self::requireValue($envelope, 'schema', self::SCHEMA);
44+
self::requireValue($envelope, 'version', self::VERSION);
45+
46+
$taskFields = self::requireMap($envelope, 'task');
47+
$kind = self::requireTaskKind($taskFields, 'kind');
48+
$task = self::parseTask($taskFields, $kind);
49+
$workflow = self::parseWorkflow(self::requireMap($envelope, 'workflow'), $kind);
50+
$lease = self::parseLease(self::requireMap($envelope, 'lease'));
51+
$payloads = self::parsePayloads(self::requireMap($envelope, 'payloads'));
52+
$headers = self::requireMap($envelope, 'headers');
53+
54+
if ($kind === self::KIND_ACTIVITY_TASK) {
55+
return new self(
56+
kind: $kind,
57+
task: $task,
58+
workflow: $workflow,
59+
lease: $lease,
60+
payloads: $payloads,
61+
headers: $headers,
62+
deadlines: self::parseDeadlines(self::requireMap($envelope, 'deadlines')),
63+
);
64+
}
65+
66+
return new self(
67+
kind: $kind,
68+
task: $task,
69+
workflow: $workflow,
70+
lease: $lease,
71+
payloads: $payloads,
72+
headers: $headers,
73+
history: self::parseHistory(self::requireMap($envelope, 'history')),
74+
);
75+
}
76+
77+
public function isActivityTask(): bool
78+
{
79+
return $this->kind === self::KIND_ACTIVITY_TASK;
80+
}
81+
82+
public function isWorkflowTask(): bool
83+
{
84+
return $this->kind === self::KIND_WORKFLOW_TASK;
85+
}
86+
87+
public function taskId(): string
88+
{
89+
return (string) $this->task['id'];
90+
}
91+
92+
public function attempt(): int
93+
{
94+
return (int) $this->task['attempt'];
95+
}
96+
97+
public function handler(): ?string
98+
{
99+
$handler = $this->task['handler'] ?? null;
100+
101+
return is_string($handler) ? $handler : null;
102+
}
103+
104+
public function idempotencyKey(): string
105+
{
106+
return (string) $this->task['idempotency_key'];
107+
}
108+
109+
public function leaseExpiresAt(): string
110+
{
111+
return (string) $this->lease['expires_at'];
112+
}
113+
114+
public function argumentsPayload(): ?array
115+
{
116+
$arguments = $this->payloads['arguments'] ?? null;
117+
118+
return is_array($arguments) ? $arguments : null;
119+
}
120+
121+
/**
122+
* @return array<string, string>
123+
*/
124+
public function deadlineCandidates(): array
125+
{
126+
$candidates = [
127+
'lease.expires_at' => $this->leaseExpiresAt(),
128+
];
129+
130+
foreach ($this->deadlines ?? [] as $name => $expiresAt) {
131+
if (is_string($name) && is_string($expiresAt)) {
132+
$candidates['deadlines.' . $name] = $expiresAt;
133+
}
134+
}
135+
136+
return $candidates;
137+
}
138+
139+
/**
140+
* @param array<string, mixed> $task
141+
* @return array<string, mixed>
142+
*/
143+
private static function parseTask(array $task, string $kind): array
144+
{
145+
$attempt = self::requireInt($task, 'attempt');
146+
if ($attempt < 1) {
147+
throw new ExternalTaskInputException('External task input task.attempt must be >= 1.');
148+
}
149+
150+
$parsed = [
151+
'id' => self::requireString($task, 'id'),
152+
'kind' => $kind,
153+
'attempt' => $attempt,
154+
'task_queue' => self::requireString($task, 'task_queue'),
155+
'connection' => self::requireOptionalString($task, 'connection'),
156+
'idempotency_key' => self::requireString($task, 'idempotency_key'),
157+
];
158+
159+
if ($kind === self::KIND_ACTIVITY_TASK) {
160+
$parsed['activity_attempt_id'] = self::requireString($task, 'activity_attempt_id');
161+
$parsed['handler'] = self::requireString($task, 'handler');
162+
163+
return $parsed;
164+
}
165+
166+
$parsed['handler'] = self::requireOptionalString($task, 'handler');
167+
$parsed['compatibility'] = self::requireOptionalString($task, 'compatibility');
168+
169+
return $parsed;
170+
}
171+
172+
/**
173+
* @param array<string, mixed> $workflow
174+
* @return array<string, mixed>
175+
*/
176+
private static function parseWorkflow(array $workflow, string $kind): array
177+
{
178+
$parsed = [
179+
'id' => self::requireString($workflow, 'id'),
180+
'run_id' => self::requireString($workflow, 'run_id'),
181+
];
182+
183+
if ($kind === self::KIND_WORKFLOW_TASK) {
184+
$parsed['status'] = self::requireOptionalString($workflow, 'status');
185+
$parsed['resume'] = self::requireMap($workflow, 'resume');
186+
}
187+
188+
return $parsed;
189+
}
190+
191+
/**
192+
* @param array<string, mixed> $lease
193+
* @return array<string, string>
194+
*/
195+
private static function parseLease(array $lease): array
196+
{
197+
return [
198+
'owner' => self::requireString($lease, 'owner'),
199+
'expires_at' => self::requireString($lease, 'expires_at'),
200+
'heartbeat_endpoint' => self::requireString($lease, 'heartbeat_endpoint'),
201+
];
202+
}
203+
204+
/**
205+
* @param array<string, mixed> $payloads
206+
* @return array<string, mixed>
207+
*/
208+
private static function parsePayloads(array $payloads): array
209+
{
210+
self::requireNullableMap($payloads, 'arguments');
211+
212+
return $payloads;
213+
}
214+
215+
/**
216+
* @param array<string, mixed> $deadlines
217+
* @return array<string, mixed>
218+
*/
219+
private static function parseDeadlines(array $deadlines): array
220+
{
221+
foreach (['schedule_to_start', 'start_to_close', 'schedule_to_close', 'heartbeat'] as $key) {
222+
self::requireOptionalString($deadlines, $key);
223+
}
224+
225+
return $deadlines;
226+
}
227+
228+
/**
229+
* @param array<string, mixed> $history
230+
* @return array<string, mixed>
231+
*/
232+
private static function parseHistory(array $history): array
233+
{
234+
self::requireList($history, 'events');
235+
self::requireInt($history, 'last_sequence');
236+
self::requireOptionalString($history, 'next_page_token');
237+
self::requireOptionalString($history, 'encoding');
238+
239+
return $history;
240+
}
241+
242+
/**
243+
* @param array<string, mixed> $value
244+
* @return array<string, mixed>
245+
*/
246+
private static function requireMap(array $value, string $key): array
247+
{
248+
if (! array_key_exists($key, $value)) {
249+
throw new ExternalTaskInputException(sprintf('External task input is missing required field [%s].', $key));
250+
}
251+
252+
$item = $value[$key];
253+
if (! is_array($item) || array_is_list($item)) {
254+
throw new ExternalTaskInputException(sprintf('External task input field [%s] must be an object.', $key));
255+
}
256+
257+
return $item;
258+
}
259+
260+
/**
261+
* @param array<string, mixed> $value
262+
*/
263+
private static function requireNullableMap(array $value, string $key): ?array
264+
{
265+
if (! array_key_exists($key, $value)) {
266+
throw new ExternalTaskInputException(sprintf('External task input is missing required field [%s].', $key));
267+
}
268+
269+
if ($value[$key] === null) {
270+
return null;
271+
}
272+
273+
$item = $value[$key];
274+
if (! is_array($item) || array_is_list($item)) {
275+
throw new ExternalTaskInputException(sprintf(
276+
'External task input field [%s] must be an object or null.',
277+
$key
278+
));
279+
}
280+
281+
return $item;
282+
}
283+
284+
/**
285+
* @param array<string, mixed> $value
286+
* @return list<mixed>
287+
*/
288+
private static function requireList(array $value, string $key): array
289+
{
290+
if (! array_key_exists($key, $value)) {
291+
throw new ExternalTaskInputException(sprintf('External task input is missing required field [%s].', $key));
292+
}
293+
294+
$item = $value[$key];
295+
if (! is_array($item) || ! array_is_list($item)) {
296+
throw new ExternalTaskInputException(sprintf('External task input field [%s] must be an array.', $key));
297+
}
298+
299+
return $item;
300+
}
301+
302+
/**
303+
* @param array<string, mixed> $value
304+
*/
305+
private static function requireTaskKind(array $value, string $key): string
306+
{
307+
$kind = self::requireString($value, $key);
308+
if ($kind === self::KIND_ACTIVITY_TASK || $kind === self::KIND_WORKFLOW_TASK) {
309+
return $kind;
310+
}
311+
312+
throw new ExternalTaskInputException(sprintf('Unsupported external task input kind [%s].', $kind));
313+
}
314+
315+
/**
316+
* @param array<string, mixed> $value
317+
*/
318+
private static function requireString(array $value, string $key): string
319+
{
320+
if (! array_key_exists($key, $value)) {
321+
throw new ExternalTaskInputException(sprintf('External task input is missing required field [%s].', $key));
322+
}
323+
324+
$item = $value[$key];
325+
if (! is_string($item)) {
326+
throw new ExternalTaskInputException(sprintf('External task input field [%s] must be a string.', $key));
327+
}
328+
329+
return $item;
330+
}
331+
332+
/**
333+
* @param array<string, mixed> $value
334+
*/
335+
private static function requireOptionalString(array $value, string $key): ?string
336+
{
337+
if (! array_key_exists($key, $value)) {
338+
throw new ExternalTaskInputException(sprintf('External task input is missing required field [%s].', $key));
339+
}
340+
341+
$item = $value[$key];
342+
if ($item === null) {
343+
return null;
344+
}
345+
346+
if (! is_string($item)) {
347+
throw new ExternalTaskInputException(sprintf(
348+
'External task input field [%s] must be a string or null.',
349+
$key
350+
));
351+
}
352+
353+
return $item;
354+
}
355+
356+
/**
357+
* @param array<string, mixed> $value
358+
*/
359+
private static function requireInt(array $value, string $key): int
360+
{
361+
if (! array_key_exists($key, $value)) {
362+
throw new ExternalTaskInputException(sprintf('External task input is missing required field [%s].', $key));
363+
}
364+
365+
$item = $value[$key];
366+
if (! is_int($item)) {
367+
throw new ExternalTaskInputException(sprintf('External task input field [%s] must be an integer.', $key));
368+
}
369+
370+
return $item;
371+
}
372+
373+
/**
374+
* @param array<string, mixed> $value
375+
*/
376+
private static function requireValue(array $value, string $key, mixed $expected): void
377+
{
378+
if (! array_key_exists($key, $value)) {
379+
throw new ExternalTaskInputException(sprintf('External task input is missing required field [%s].', $key));
380+
}
381+
382+
if ($value[$key] !== $expected) {
383+
throw new ExternalTaskInputException(sprintf(
384+
'External task input field [%s] must be %s; got %s.',
385+
$key,
386+
var_export($expected, true),
387+
var_export($value[$key], true),
388+
));
389+
}
390+
}
391+
}

0 commit comments

Comments
 (0)