Skip to content

Commit c38a31d

Browse files
Repair signal timeout task drift
1 parent 21253ba commit c38a31d

5 files changed

Lines changed: 494 additions & 12 deletions

File tree

src/V2/Support/RunSummaryProjector.php

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,17 @@ public static function project(WorkflowRun $run): WorkflowRunSummary
195195
$resumeSourceId = $openConditionWait['resume_source_id'];
196196
} elseif ($openSignalWait !== null) {
197197
$waitKind = 'signal';
198-
$waitReason = ($openSignalWait['timeout_seconds'] ?? null) === null
199-
? sprintf('Waiting for signal %s', $openSignalWait['name'])
200-
: sprintf('Waiting for signal %s or timeout', $openSignalWait['name']);
198+
$waitReason = match (true) {
199+
self::timestamp($openSignalWait['timeout_fired_at'] ?? null) !== null => sprintf(
200+
'Waiting to apply signal %s timeout',
201+
$openSignalWait['name'],
202+
),
203+
($openSignalWait['timeout_seconds'] ?? null) === null => sprintf(
204+
'Waiting for signal %s',
205+
$openSignalWait['name'],
206+
),
207+
default => sprintf('Waiting for signal %s or timeout', $openSignalWait['name']),
208+
};
201209
$waitStartedAt = $openSignalWait['opened_at'];
202210
$waitDeadlineAt = $openSignalWait['deadline_at'];
203211
$openWaitId = $openSignalWait['id'];
@@ -632,6 +640,43 @@ private static function liveness(
632640
return ['waiting_for_condition', 'Waiting for a condition-changing durable input.'];
633641
}
634642

643+
if ($openSignalWait !== null && $openSignalWait['timer_id'] !== null) {
644+
if (self::timestamp($openSignalWait['timeout_fired_at'] ?? null) !== null) {
645+
if ($nextTask !== null) {
646+
return self::taskLiveness($nextTask, $run, 'Signal timeout');
647+
}
648+
649+
return [
650+
'repair_needed',
651+
sprintf('Signal wait %s has a fired timeout without an open workflow task.', $openSignalWait['id']),
652+
];
653+
}
654+
655+
if ($nextTask !== null) {
656+
if (
657+
TaskRepairPolicy::leaseExpired($nextTask)
658+
|| TaskRepairPolicy::readyTaskNeedsRedispatch($nextTask)
659+
|| TaskRepairPolicy::claimFailed($nextTask)
660+
) {
661+
return self::taskLiveness($nextTask, $run, 'Signal timeout');
662+
}
663+
664+
return [
665+
'waiting_for_signal',
666+
sprintf(
667+
'Waiting for signal %s or timeout at %s.',
668+
$openSignalWait['name'],
669+
$openSignalWait['deadline_at']?->toJSON() ?? 'an unknown time',
670+
),
671+
];
672+
}
673+
674+
return [
675+
'repair_needed',
676+
sprintf('Signal wait %s is open without an open timeout task.', $openSignalWait['id']),
677+
];
678+
}
679+
635680
if ($openTimer !== null) {
636681
if ($nextTask !== null) {
637682
if (
@@ -933,7 +978,9 @@ private static function conditionLabel(array $conditionWait): string
933978
* name: string,
934979
* opened_at: \Carbon\CarbonInterface,
935980
* deadline_at: \Carbon\CarbonInterface|null,
981+
* timeout_fired_at: \Carbon\CarbonInterface|null,
936982
* timeout_seconds: int|null,
983+
* timer_id: string|null,
937984
* resume_source_kind: string,
938985
* resume_source_id: string|null
939986
* }|null
@@ -942,7 +989,8 @@ private static function openSignalWait(WorkflowRun $run): ?array
942989
{
943990
$openSignals = array_values(array_filter(
944991
SignalWaits::forRun($run),
945-
static fn (array $wait): bool => $wait['status'] === 'open',
992+
static fn (array $wait): bool => $wait['status'] === 'open'
993+
|| $wait['source_status'] === 'timed_out',
946994
));
947995

948996
if ($openSignals === []) {
@@ -969,15 +1017,19 @@ private static function openSignalWait(WorkflowRun $run): ?array
9691017

9701018
/** @var array{id: string, name: string, opened_at: \Carbon\CarbonInterface} $signal */
9711019
$signal = end($openSignals);
1020+
$timerId = self::nonEmptyString($signal['timer_id'] ?? null);
1021+
$timeoutFiredAt = self::timestamp($signal['timeout_fired_at'] ?? null);
9721022

9731023
return [
9741024
'id' => $signal['signal_wait_id'],
9751025
'name' => $signal['signal_name'],
9761026
'opened_at' => $signal['opened_at'],
9771027
'deadline_at' => self::timestamp($signal['deadline_at'] ?? null),
1028+
'timeout_fired_at' => $timeoutFiredAt,
9781029
'timeout_seconds' => self::intValue($signal['timeout_seconds'] ?? null),
979-
'resume_source_kind' => 'signal',
980-
'resume_source_id' => null,
1030+
'timer_id' => $timerId,
1031+
'resume_source_kind' => $timerId === null ? 'signal' : 'timer',
1032+
'resume_source_id' => $timerId,
9811033
];
9821034
}
9831035

src/V2/Support/RunTaskView.php

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,19 @@ private static function missingTransportRows(WorkflowRun $run, Collection $activ
206206
$hasUnsupportedWait = self::hasUnsupportedWait($waits);
207207

208208
foreach ($waits as $wait) {
209-
if (($wait['status'] ?? null) !== 'open' || ($wait['task_backed'] ?? false) === true) {
209+
$kind = self::stringValue($wait['kind'] ?? null);
210+
$isSignalTimeoutApplication = $kind === 'signal'
211+
&& self::stringValue($wait['source_status'] ?? null) === 'timed_out'
212+
&& self::stringValue($wait['resume_source_kind'] ?? null) === 'timer'
213+
&& self::stringValue($wait['resume_source_id'] ?? null) !== null;
214+
215+
if (
216+
(($wait['status'] ?? null) !== 'open' && ! $isSignalTimeoutApplication)
217+
|| ($wait['task_backed'] ?? false) === true
218+
) {
210219
continue;
211220
}
212221

213-
$kind = self::stringValue($wait['kind'] ?? null);
214-
215222
if (
216223
$kind === 'activity'
217224
&& self::stringValue($wait['source_status'] ?? null) === ActivityStatus::Pending->value
@@ -269,6 +276,28 @@ private static function missingTransportRows(WorkflowRun $run, Collection $activ
269276
continue;
270277
}
271278

279+
if (
280+
$kind === 'signal'
281+
&& self::stringValue($wait['resume_source_kind'] ?? null) === 'timer'
282+
&& self::stringValue($wait['resume_source_id'] ?? null) !== null
283+
) {
284+
if (self::timestamp($wait['timeout_fired_at'] ?? null) !== null) {
285+
$rows[] = self::missingSignalTimeoutWorkflowTaskRow($run, $wait);
286+
287+
continue;
288+
}
289+
290+
$timerId = self::stringValue($wait['resume_source_id'] ?? null);
291+
292+
/** @var array<string, mixed>|null $timer */
293+
$timer = $timerId === null ? null : $timers->get($timerId);
294+
$rows[] = self::missingSignalTimeoutTimerTaskRow($run, $timer ?? [
295+
'id' => $timerId,
296+
], $wait);
297+
298+
continue;
299+
}
300+
272301
if ($kind === 'update') {
273302
$rows[] = self::missingWorkflowTaskRow(
274303
$run,
@@ -457,6 +486,43 @@ private static function missingConditionTimeoutWorkflowTaskRow(WorkflowRun $run,
457486
return $row;
458487
}
459488

489+
private static function missingSignalTimeoutWorkflowTaskRow(WorkflowRun $run, array $wait): array
490+
{
491+
$signalWaitId = self::stringValue($wait['signal_wait_id'] ?? null)
492+
?? self::stringValue($wait['id'] ?? null)
493+
?? 'signal';
494+
$timerId = self::stringValue($wait['resume_source_id'] ?? null);
495+
$signalName = self::stringValue($wait['target_name'] ?? null);
496+
497+
$row = self::missingTaskBase(
498+
$run,
499+
sprintf('missing:workflow:signal-timeout:%s', $signalWaitId),
500+
TaskType::Workflow->value,
501+
$run->connection,
502+
$run->queue,
503+
self::timestamp($wait['timeout_fired_at'] ?? null)
504+
?? self::timestamp($wait['deadline_at'] ?? null)
505+
?? self::timestamp($wait['opened_at'] ?? null),
506+
);
507+
508+
$row['summary'] = sprintf(
509+
'Workflow task missing to apply signal%s timeout.',
510+
$signalName === null ? '' : sprintf(' %s', $signalName),
511+
);
512+
$row['timer_id'] = $timerId;
513+
$row['timer_sequence'] = self::intValue($wait['sequence'] ?? null);
514+
$row['timer_fire_at'] = self::timestamp($wait['deadline_at'] ?? null);
515+
$row['signal_wait_id'] = $signalWaitId;
516+
$row['signal_name'] = $signalName;
517+
$row['workflow_wait_kind'] = 'signal';
518+
$row['workflow_open_wait_id'] = $signalWaitId;
519+
$row['workflow_resume_source_kind'] = 'timer';
520+
$row['workflow_resume_source_id'] = $timerId;
521+
$row['workflow_sequence'] = self::intValue($wait['sequence'] ?? null);
522+
523+
return $row;
524+
}
525+
460526
/**
461527
* @param array<string, mixed> $timer
462528
* @param array<string, mixed> $wait
@@ -506,6 +572,47 @@ private static function missingTimerTaskRow(
506572
return $row;
507573
}
508574

575+
/**
576+
* @param array<string, mixed> $timer
577+
* @param array<string, mixed> $wait
578+
* @return array<string, mixed>
579+
*/
580+
private static function missingSignalTimeoutTimerTaskRow(WorkflowRun $run, array $timer, array $wait): array
581+
{
582+
$timerId = self::stringValue($timer['id'] ?? null)
583+
?? self::stringValue($wait['resume_source_id'] ?? null)
584+
?? 'timer';
585+
$availableAt = self::timestamp($wait['deadline_at'] ?? null)
586+
?? self::timestamp($timer['fire_at'] ?? null);
587+
$signalWaitId = self::stringValue($wait['signal_wait_id'] ?? null)
588+
?? self::stringValue($timer['signal_wait_id'] ?? null)
589+
?? self::stringValue($wait['id'] ?? null);
590+
$signalName = self::stringValue($wait['target_name'] ?? null)
591+
?? self::stringValue($timer['signal_name'] ?? null);
592+
593+
$row = self::missingTaskBase(
594+
$run,
595+
sprintf('missing:timer:%s', $timerId),
596+
TaskType::Timer->value,
597+
$run->connection,
598+
$run->queue,
599+
$availableAt,
600+
);
601+
602+
$row['summary'] = sprintf(
603+
'Signal timeout task missing%s.',
604+
$signalName === null ? '' : sprintf(' for %s', $signalName),
605+
);
606+
$row['timer_id'] = $timerId;
607+
$row['timer_sequence'] = self::intValue($timer['sequence'] ?? null)
608+
?? self::intValue($wait['sequence'] ?? null);
609+
$row['timer_fire_at'] = $availableAt;
610+
$row['signal_wait_id'] = $signalWaitId;
611+
$row['signal_name'] = $signalName;
612+
613+
return $row;
614+
}
615+
509616
/**
510617
* @return array<string, mixed>
511618
*/

src/V2/Support/RunWaitView.php

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ private static function signalWaits(WorkflowRun $run, array $taskByTimerId): arr
252252
$timeoutSeconds = self::intValue($wait['timeout_seconds'] ?? null);
253253
$timerId = self::stringValue($wait['timer_id'] ?? null);
254254
$task = $timerId === null ? null : ($taskByTimerId[$timerId] ?? null);
255+
$resumeSourceKind = $timerId !== null ? 'timer' : 'signal';
256+
$resumeSourceId = $timerId !== null ? $timerId : $wait['command_id'];
255257

256258
$summary = match ($wait['status']) {
257259
'open' => $timeoutSeconds === null
@@ -270,7 +272,11 @@ private static function signalWaits(WorkflowRun $run, array $taskByTimerId): arr
270272
default => 'Signal wait ended when the run failed.',
271273
},
272274
default => $wait['source_status'] === 'timed_out'
273-
? sprintf('Signal %s timed out after %s.', $wait['signal_name'], self::durationLabel($timeoutSeconds ?? 0))
275+
? sprintf(
276+
'Signal %s timed out after %s.',
277+
$wait['signal_name'],
278+
self::durationLabel($timeoutSeconds ?? 0)
279+
)
274280
: sprintf('Signal %s received.', $wait['signal_name']),
275281
};
276282

@@ -292,8 +298,8 @@ private static function signalWaits(WorkflowRun $run, array $taskByTimerId): arr
292298
'target_type' => null,
293299
'task_backed' => self::isOpenTask($task),
294300
'external_only' => $timerId === null,
295-
'resume_source_kind' => $wait['source_status'] === 'timed_out' ? 'timer' : 'signal',
296-
'resume_source_id' => $wait['source_status'] === 'timed_out' ? $timerId : $wait['command_id'],
301+
'resume_source_kind' => $resumeSourceKind,
302+
'resume_source_id' => $resumeSourceId,
297303
'task_id' => $task?->id,
298304
'task_type' => $task?->task_type?->value,
299305
'task_status' => $task?->status?->value,

src/V2/Support/TaskRepair.php

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,66 @@ private static function createMissingTask(WorkflowRun $run, WorkflowRunSummary $
376376
}
377377
}
378378

379+
if ($summary->wait_kind === 'signal' && $summary->resume_source_kind === 'timer') {
380+
$signalWait = self::openSignalWait($run, $summary);
381+
$timerId = self::nonEmptyString($signalWait['timer_id'] ?? null)
382+
?? self::nonEmptyString($summary->resume_source_id);
383+
$timer = $timerId === null ? null : TimerRecovery::restore($run, $timerId);
384+
$availableAt = self::timestamp($signalWait['deadline_at'] ?? null)
385+
?? $timer?->fire_at
386+
?? now();
387+
$timeoutFiredAt = self::timestamp($signalWait['timeout_fired_at'] ?? null)
388+
?? $timer?->fired_at;
389+
390+
if ($timerId !== null && ($timeoutFiredAt !== null || $timer?->status === TimerStatus::Fired)) {
391+
/** @var WorkflowTask $task */
392+
$task = WorkflowTask::query()->create([
393+
'workflow_run_id' => $run->id,
394+
'task_type' => TaskType::Workflow->value,
395+
'status' => TaskStatus::Ready->value,
396+
'available_at' => $timeoutFiredAt ?? now(),
397+
'payload' => array_filter([
398+
'workflow_wait_kind' => 'signal',
399+
'open_wait_id' => self::nonEmptyString($signalWait['signal_wait_id'] ?? null)
400+
?? self::nonEmptyString($summary->open_wait_id),
401+
'resume_source_kind' => 'timer',
402+
'resume_source_id' => $timerId,
403+
'timer_id' => $timerId,
404+
'signal_wait_id' => self::nonEmptyString($signalWait['signal_wait_id'] ?? null),
405+
'signal_name' => self::nonEmptyString($signalWait['signal_name'] ?? null),
406+
'workflow_sequence' => self::intValue($signalWait['sequence'] ?? null),
407+
], static fn (mixed $value): bool => $value !== null),
408+
'connection' => $run->connection,
409+
'queue' => $run->queue,
410+
'compatibility' => $run->compatibility,
411+
'repair_count' => 1,
412+
]);
413+
414+
return $task;
415+
}
416+
417+
if ($timer instanceof WorkflowTimer && $timerId !== null) {
418+
/** @var WorkflowTask $task */
419+
$task = WorkflowTask::query()->create([
420+
'workflow_run_id' => $run->id,
421+
'task_type' => TaskType::Timer->value,
422+
'status' => TaskStatus::Ready->value,
423+
'available_at' => $availableAt->isFuture() ? $availableAt : now(),
424+
'payload' => array_filter([
425+
'timer_id' => $timerId,
426+
'signal_wait_id' => self::nonEmptyString($signalWait['signal_wait_id'] ?? null),
427+
'signal_name' => self::nonEmptyString($signalWait['signal_name'] ?? null),
428+
], static fn (mixed $value): bool => $value !== null),
429+
'connection' => $run->connection,
430+
'queue' => $run->queue,
431+
'compatibility' => $run->compatibility,
432+
'repair_count' => 1,
433+
]);
434+
435+
return $task;
436+
}
437+
}
438+
379439
/** @var WorkflowTask $task */
380440
$task = WorkflowTask::query()->create([
381441
'workflow_run_id' => $run->id,
@@ -508,6 +568,40 @@ private static function openConditionWait(WorkflowRun $run, WorkflowRunSummary $
508568
return null;
509569
}
510570

571+
/**
572+
* @return array<string, mixed>|null
573+
*/
574+
private static function openSignalWait(WorkflowRun $run, WorkflowRunSummary $summary): ?array
575+
{
576+
foreach (SignalWaits::forRun($run) as $wait) {
577+
if (($wait['status'] ?? null) !== 'open' && ($wait['source_status'] ?? null) !== 'timed_out') {
578+
continue;
579+
}
580+
581+
if (($wait['signal_wait_id'] ?? null) === $summary->open_wait_id) {
582+
return $wait;
583+
}
584+
}
585+
586+
$timerId = self::nonEmptyString($summary->resume_source_id);
587+
588+
if ($timerId === null) {
589+
return null;
590+
}
591+
592+
foreach (SignalWaits::forRun($run) as $wait) {
593+
if (($wait['status'] ?? null) !== 'open' && ($wait['source_status'] ?? null) !== 'timed_out') {
594+
continue;
595+
}
596+
597+
if (($wait['timer_id'] ?? null) === $timerId) {
598+
return $wait;
599+
}
600+
}
601+
602+
return null;
603+
}
604+
511605
private static function latestRetryScheduledEvent(
512606
WorkflowRun $run,
513607
ActivityExecution $execution,

0 commit comments

Comments
 (0)