Skip to content

Commit 7ba54ec

Browse files
Reject misplaced retry/timeout fields on workflow commands
1 parent 15c4003 commit 7ba54ec

2 files changed

Lines changed: 504 additions & 31 deletions

File tree

src/V2/Support/WorkflowCommandNormalizer.php

Lines changed: 172 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
* authoritative mapping from raw JSON into canonical command arrays the
1616
* workflow task bridge consumes.
1717
*
18+
* Retry and timeout fields are scope-checked: durable activity retry policy
19+
* and per-attempt/total/heartbeat timeouts only belong on `schedule_activity`,
20+
* durable child workflow retry policy and execution/run timeouts only belong
21+
* on `start_child_workflow`, and the worker-side `non_retryable` failure
22+
* marker only belongs on `fail_workflow` / `fail_update`. Workflow failure
23+
* itself is non-retryable, and the SDK HTTP transport retry policy is a
24+
* client concern that does not appear in the workflow task command stream.
25+
*
1826
* Extracted from App\Http\Controllers\Api\WorkerController so the server
1927
* is no longer the source of truth for the command grammar.
2028
*
@@ -24,6 +32,60 @@
2432
*/
2533
final class WorkflowCommandNormalizer
2634
{
35+
/**
36+
* Scope contract for retry/timeout/failure fields. Each entry lists the
37+
* command types that legitimately accept the field and a short guidance
38+
* sentence that names the correct layer.
39+
*
40+
* @var array<string, array{allowed: list<string>, guidance: string}>
41+
*/
42+
private const FIELD_SCOPES = [
43+
'retry_policy' => [
44+
'allowed' => ['schedule_activity', 'start_child_workflow'],
45+
'guidance' => 'Configure activity retries on a schedule_activity command, or child workflow retries on a start_child_workflow command. Workflow failure itself is non-retryable, and SDK HTTP transport retry is a client concern that does not appear in the workflow task command stream.',
46+
],
47+
'start_to_close_timeout' => [
48+
'allowed' => ['schedule_activity'],
49+
'guidance' => 'start_to_close_timeout limits one activity attempt and only applies to a schedule_activity command.',
50+
],
51+
'schedule_to_start_timeout' => [
52+
'allowed' => ['schedule_activity'],
53+
'guidance' => 'schedule_to_start_timeout limits queue wait before an activity attempt starts and only applies to a schedule_activity command.',
54+
],
55+
'schedule_to_close_timeout' => [
56+
'allowed' => ['schedule_activity'],
57+
'guidance' => 'schedule_to_close_timeout limits the entire activity execution including retries and only applies to a schedule_activity command.',
58+
],
59+
'heartbeat_timeout' => [
60+
'allowed' => ['schedule_activity'],
61+
'guidance' => 'heartbeat_timeout limits the gap between activity heartbeats and only applies to a schedule_activity command.',
62+
],
63+
'execution_timeout_seconds' => [
64+
'allowed' => ['start_child_workflow'],
65+
'guidance' => 'execution_timeout_seconds limits the entire child workflow execution and only applies to a start_child_workflow command.',
66+
],
67+
'run_timeout_seconds' => [
68+
'allowed' => ['start_child_workflow'],
69+
'guidance' => 'run_timeout_seconds limits one child workflow run and only applies to a start_child_workflow command.',
70+
],
71+
'non_retryable' => [
72+
'allowed' => ['fail_workflow', 'fail_update'],
73+
'guidance' => 'non_retryable marks a workflow or update failure as non-retryable and only applies to a fail_workflow or fail_update command. Activity non-retryable error types belong inside the schedule_activity retry_policy.non_retryable_error_types list.',
74+
],
75+
'parent_close_policy' => [
76+
'allowed' => ['start_child_workflow'],
77+
'guidance' => 'parent_close_policy declares how a child workflow reacts when its parent closes and only applies to a start_child_workflow command.',
78+
],
79+
'delay_seconds' => [
80+
'allowed' => ['start_timer'],
81+
'guidance' => 'delay_seconds is the timer delay and only applies to a start_timer command.',
82+
],
83+
'timeout_seconds' => [
84+
'allowed' => ['open_condition_wait'],
85+
'guidance' => 'timeout_seconds only applies to open_condition_wait. For activities use start_to_close_timeout / schedule_to_start_timeout / schedule_to_close_timeout / heartbeat_timeout; for child workflows use execution_timeout_seconds / run_timeout_seconds.',
86+
],
87+
];
88+
2789
/**
2890
* @param list<array<string, mixed>> $commands
2991
* @return list<array<string, mixed>>
@@ -42,6 +104,8 @@ public static function normalize(array $commands): array
42104
continue;
43105
}
44106

107+
self::assertCommandFieldScope($type, is_array($command) ? $command : [], $index, $errors);
108+
45109
if ($type === 'complete_workflow') {
46110
$normalized[] = [
47111
'type' => $type,
@@ -88,6 +152,12 @@ public static function normalize(array $commands): array
88152
}
89153

90154
$retryPolicy = self::optionalRetryPolicy($command, $index, $errors, 'Activity');
155+
$startToClose = self::optionalPositiveInt($command, 'start_to_close_timeout', $index, $errors);
156+
$scheduleToStart = self::optionalPositiveInt($command, 'schedule_to_start_timeout', $index, $errors);
157+
$scheduleToClose = self::optionalPositiveInt($command, 'schedule_to_close_timeout', $index, $errors);
158+
$heartbeat = self::optionalPositiveInt($command, 'heartbeat_timeout', $index, $errors);
159+
160+
self::assertActivityTimeoutOrdering($startToClose, $scheduleToClose, $heartbeat, $index, $errors);
91161

92162
$normalized[] = array_filter([
93163
'type' => $type,
@@ -96,25 +166,10 @@ public static function normalize(array $commands): array
96166
'connection' => self::optionalCommandString($command, 'connection', $index, $errors),
97167
'queue' => self::optionalCommandString($command, 'queue', $index, $errors),
98168
'retry_policy' => $retryPolicy,
99-
'start_to_close_timeout' => self::optionalPositiveInt(
100-
$command,
101-
'start_to_close_timeout',
102-
$index,
103-
$errors,
104-
),
105-
'schedule_to_start_timeout' => self::optionalPositiveInt(
106-
$command,
107-
'schedule_to_start_timeout',
108-
$index,
109-
$errors,
110-
),
111-
'schedule_to_close_timeout' => self::optionalPositiveInt(
112-
$command,
113-
'schedule_to_close_timeout',
114-
$index,
115-
$errors,
116-
),
117-
'heartbeat_timeout' => self::optionalPositiveInt($command, 'heartbeat_timeout', $index, $errors),
169+
'start_to_close_timeout' => $startToClose,
170+
'schedule_to_start_timeout' => $scheduleToStart,
171+
'schedule_to_close_timeout' => $scheduleToClose,
172+
'heartbeat_timeout' => $heartbeat,
118173
], static fn (mixed $value): bool => $value !== null);
119174

120175
continue;
@@ -161,6 +216,11 @@ public static function normalize(array $commands): array
161216
continue;
162217
}
163218

219+
$executionTimeout = self::optionalPositiveInt($command, 'execution_timeout_seconds', $index, $errors);
220+
$runTimeout = self::optionalPositiveInt($command, 'run_timeout_seconds', $index, $errors);
221+
222+
self::assertChildWorkflowTimeoutOrdering($executionTimeout, $runTimeout, $index, $errors);
223+
164224
$normalized[] = array_filter([
165225
'type' => $type,
166226
'workflow_type' => trim($command['workflow_type']),
@@ -169,18 +229,8 @@ public static function normalize(array $commands): array
169229
'queue' => self::optionalCommandString($command, 'queue', $index, $errors),
170230
'parent_close_policy' => $parentClosePolicy,
171231
'retry_policy' => $retryPolicy,
172-
'execution_timeout_seconds' => self::optionalPositiveInt(
173-
$command,
174-
'execution_timeout_seconds',
175-
$index,
176-
$errors,
177-
),
178-
'run_timeout_seconds' => self::optionalPositiveInt(
179-
$command,
180-
'run_timeout_seconds',
181-
$index,
182-
$errors,
183-
),
232+
'execution_timeout_seconds' => $executionTimeout,
233+
'run_timeout_seconds' => $runTimeout,
184234
], static fn (mixed $value): bool => $value !== null);
185235

186236
continue;
@@ -532,4 +582,95 @@ private static function optionalRetryPolicy(array $command, int $index, array &$
532582

533583
return $policy === [] ? null : $policy;
534584
}
585+
586+
/**
587+
* Reject retry/timeout/failure fields that have been placed on a command
588+
* type that does not accept them. The check fires only when the field is
589+
* populated (non-null and present), so omitting fields stays valid.
590+
*
591+
* @param array<string, mixed> $command
592+
* @param array<string, list<string>> $errors
593+
*/
594+
private static function assertCommandFieldScope(string $type, array $command, int $index, array &$errors): void
595+
{
596+
foreach (self::FIELD_SCOPES as $field => $scope) {
597+
if (! array_key_exists($field, $command) || $command[$field] === null) {
598+
continue;
599+
}
600+
601+
if (in_array($type, $scope['allowed'], true)) {
602+
continue;
603+
}
604+
605+
$errors["commands.{$index}.{$field}"] = [
606+
sprintf(
607+
'Workflow task command field [%s] is not valid on a %s command. %s',
608+
$field,
609+
$type,
610+
$scope['guidance'],
611+
),
612+
];
613+
}
614+
}
615+
616+
/**
617+
* Activity timeout fields must form a coherent envelope:
618+
* schedule_to_close covers the whole execution including retries, so it
619+
* cannot be smaller than start_to_close, which limits one attempt; and
620+
* heartbeat_timeout polices liveness within an attempt, so it cannot
621+
* exceed start_to_close.
622+
*
623+
* @param array<string, list<string>> $errors
624+
*/
625+
private static function assertActivityTimeoutOrdering(
626+
?int $startToClose,
627+
?int $scheduleToClose,
628+
?int $heartbeat,
629+
int $index,
630+
array &$errors,
631+
): void {
632+
if ($startToClose !== null && $scheduleToClose !== null && $scheduleToClose < $startToClose) {
633+
$errors["commands.{$index}.schedule_to_close_timeout"] = [
634+
sprintf(
635+
'schedule_to_close_timeout (%d) must be greater than or equal to start_to_close_timeout (%d). schedule_to_close covers the whole activity execution including retries; one attempt cannot exceed the total budget.',
636+
$scheduleToClose,
637+
$startToClose,
638+
),
639+
];
640+
}
641+
642+
if ($startToClose !== null && $heartbeat !== null && $heartbeat > $startToClose) {
643+
$errors["commands.{$index}.heartbeat_timeout"] = [
644+
sprintf(
645+
'heartbeat_timeout (%d) must be less than or equal to start_to_close_timeout (%d). heartbeat_timeout polices liveness within one attempt and cannot exceed the per-attempt budget.',
646+
$heartbeat,
647+
$startToClose,
648+
),
649+
];
650+
}
651+
}
652+
653+
/**
654+
* Child workflow timeout fields must form a coherent envelope:
655+
* execution_timeout_seconds covers the whole child execution across runs,
656+
* so it cannot be smaller than run_timeout_seconds, which limits one run.
657+
*
658+
* @param array<string, list<string>> $errors
659+
*/
660+
private static function assertChildWorkflowTimeoutOrdering(
661+
?int $executionTimeout,
662+
?int $runTimeout,
663+
int $index,
664+
array &$errors,
665+
): void {
666+
if ($executionTimeout !== null && $runTimeout !== null && $executionTimeout < $runTimeout) {
667+
$errors["commands.{$index}.execution_timeout_seconds"] = [
668+
sprintf(
669+
'execution_timeout_seconds (%d) must be greater than or equal to run_timeout_seconds (%d). execution_timeout_seconds covers the whole child workflow execution; one run cannot exceed the total budget.',
670+
$executionTimeout,
671+
$runTimeout,
672+
),
673+
];
674+
}
675+
}
535676
}

0 commit comments

Comments
 (0)