Skip to content

Commit ba7c667

Browse files
Fail closed incompatible workflow starts
Reject workflow start and signal-with-start attempts when only incompatible live workers exist. Surface compatibility-blocked outcomes through workflow commands, schedules, and control-plane callers instead of starting runs that cannot execute.
1 parent 59538db commit ba7c667

10 files changed

Lines changed: 467 additions & 19 deletions

src/V2/CommandResult.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,9 @@ public function reason(): ?string
130130
{
131131
return $this->command->commandReason();
132132
}
133+
134+
public function message(): ?string
135+
{
136+
return $this->command->commandMessage();
137+
}
133138
}

src/V2/Enums/CommandOutcome.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ enum CommandOutcome: string
2525
case RejectedUnknownSignal = 'rejected_unknown_signal';
2626
case RejectedUnknownUpdate = 'rejected_unknown_update';
2727
case RejectedInvalidArguments = 'rejected_invalid_arguments';
28+
case RejectedCompatibilityBlocked = 'rejected_compatibility_blocked';
2829
case RejectedPendingSignal = 'rejected_pending_signal';
2930
case RejectedWorkflowDefinitionUnavailable = 'rejected_workflow_definition_unavailable';
3031
}

src/V2/Models/WorkflowCommand.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,19 @@ public function commandReason(): ?string
430430
return is_string($reason) && $reason !== '' ? $reason : null;
431431
}
432432

433+
public function commandMessage(): ?string
434+
{
435+
$payload = $this->payloadData();
436+
437+
if (! is_array($payload)) {
438+
return null;
439+
}
440+
441+
$message = $payload['message'] ?? null;
442+
443+
return is_string($message) && $message !== '' ? $message : null;
444+
}
445+
433446
/**
434447
* @return array<string, mixed>|null
435448
*/

src/V2/Support/DefaultWorkflowControlPlane.php

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
use Workflow\V2\Models\WorkflowTask;
3131
use Workflow\V2\UpdateResult;
3232
use Workflow\V2\WorkflowStub;
33+
use Workflow\V2\Support\WorkflowStartGate;
3334

3435
final class DefaultWorkflowControlPlane implements WorkflowControlPlane
3536
{
@@ -156,6 +157,38 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
156157
$queue = $this->classPropertyDefault($resolvedClass, 'queue');
157158
}
158159

160+
$startBlockedReason = WorkflowStartGate::blockedReason(
161+
WorkerCompatibility::current(),
162+
$connection,
163+
$queue,
164+
);
165+
166+
if ($startBlockedReason !== null) {
167+
$blockedMessage = WorkflowStartGate::blockedMessage(
168+
sprintf('Workflow instance [%s] cannot start.', $instance->id),
169+
WorkerCompatibility::current(),
170+
$connection,
171+
$queue,
172+
) ?? sprintf('Workflow instance [%s] cannot start.', $instance->id);
173+
174+
$command = WorkflowCommand::record($instance, null, $this->commandAttributes($commandContext, [
175+
'command_type' => CommandType::Start->value,
176+
'target_scope' => 'instance',
177+
'status' => CommandStatus::Rejected->value,
178+
'outcome' => CommandOutcome::RejectedCompatibilityBlocked->value,
179+
'payload_codec' => CodecRegistry::defaultCodec(),
180+
'payload' => Serializer::serializeWithCodec(CodecRegistry::defaultCodec(), array_filter([
181+
'reason' => $startBlockedReason,
182+
'message' => $blockedMessage,
183+
'arguments_blob' => is_string($arguments) ? $arguments : null,
184+
], static fn (mixed $value): bool => $value !== null)),
185+
'rejection_reason' => $startBlockedReason,
186+
'rejected_at' => now(),
187+
]));
188+
189+
return;
190+
}
191+
159192
if ($instance->workflow_class !== $workflowClass) {
160193
$instance->forceFill([
161194
'workflow_class' => $workflowClass,
@@ -316,6 +349,7 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
316349
'outcome' => $command?->outcome?->value ?? 'unknown',
317350
'task_id' => $task instanceof WorkflowTask ? $task->id : null,
318351
'reason' => $accepted ? null : ($command?->rejection_reason ?? 'start_failed'),
352+
'message' => $accepted ? null : $command?->commandMessage(),
319353
];
320354
}
321355

src/V2/Support/ScheduleManager.php

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Workflow\V2\Enums\RunStatus;
1515
use Workflow\V2\Enums\ScheduleOverlapPolicy;
1616
use Workflow\V2\Enums\ScheduleStatus;
17+
use Workflow\V2\Exceptions\WorkflowExecutionUnavailableException;
1718
use Workflow\V2\Models\WorkflowHistoryEvent;
1819
use Workflow\V2\Models\WorkflowRun;
1920
use Workflow\V2\Models\WorkflowSchedule;
@@ -392,11 +393,20 @@ public static function triggerDetailed(
392393
self::closeExistingRun($schedule, $overlapPolicy);
393394
}
394395

395-
$startResult = self::startRun(
396-
$schedule,
397-
effectiveOverlapPolicy: $overlapPolicy->value,
398-
context: $context,
399-
);
396+
try {
397+
$startResult = self::startRun(
398+
$schedule,
399+
effectiveOverlapPolicy: $overlapPolicy->value,
400+
context: $context,
401+
);
402+
} catch (WorkflowExecutionUnavailableException $exception) {
403+
self::recordSkip($schedule, $exception->blockedReason(), $context);
404+
$schedule->forceFill([
405+
'next_fire_at' => $schedule->computeNextFireAtWithJitter(),
406+
])->save();
407+
408+
return new ScheduleTriggerResult('skipped', null, null, $exception->blockedReason());
409+
}
400410

401411
return new ScheduleTriggerResult('triggered', $startResult->instanceId, $startResult->runId, null);
402412
});
@@ -441,6 +451,15 @@ public static function tick(int $limit = 100): array
441451
'instance_id' => $instanceId,
442452
];
443453
}
454+
} catch (WorkflowExecutionUnavailableException $exception) {
455+
$schedule->refresh();
456+
self::recordSkip($schedule, $exception->blockedReason());
457+
$results[] = [
458+
'schedule_id' => $schedule->schedule_id,
459+
'instance_id' => null,
460+
'outcome' => 'skipped',
461+
'reason' => $exception->blockedReason(),
462+
];
444463
} catch (\Throwable $e) {
445464
$schedule->refresh();
446465
$schedule->recordFailure($e->getMessage());
@@ -617,13 +636,19 @@ private static function triggerForBackfill(
617636
self::closeExistingRun($schedule, $effectivePolicy);
618637
}
619638

620-
return self::startRun(
621-
$schedule,
622-
occurrenceTime: $occurrenceTime,
623-
outcome: 'backfilled',
624-
effectiveOverlapPolicy: $effectivePolicy->value,
625-
context: $context,
626-
)->instanceId;
639+
try {
640+
return self::startRun(
641+
$schedule,
642+
occurrenceTime: $occurrenceTime,
643+
outcome: 'backfilled',
644+
effectiveOverlapPolicy: $effectivePolicy->value,
645+
context: $context,
646+
)->instanceId;
647+
} catch (WorkflowExecutionUnavailableException $exception) {
648+
self::recordSkip($schedule, $exception->blockedReason(), $context);
649+
650+
return null;
651+
}
627652
});
628653
}
629654

@@ -642,6 +667,8 @@ private static function startRun(
642667

643668
try {
644669
$result = $starter->start($schedule, $occurrenceTime, $outcome, $effectiveOverlapPolicy);
670+
} catch (WorkflowExecutionUnavailableException $exception) {
671+
throw $exception;
645672
} catch (\Throwable $e) {
646673
$schedule->recordFailure($e->getMessage());
647674
$schedule->save();
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Support;
6+
7+
final class WorkflowStartGate
8+
{
9+
public const BLOCKED_COMPATIBILITY = 'compatibility_blocked';
10+
11+
public static function blockedReason(
12+
?string $required,
13+
?string $connection = null,
14+
?string $queue = null,
15+
): ?string {
16+
if (config('workflows.v2.fleet.validation_mode') !== 'fail') {
17+
return null;
18+
}
19+
20+
if ($required === null) {
21+
return null;
22+
}
23+
24+
if (WorkerCompatibilityFleet::activeWorkerCount($connection, $queue) === 0) {
25+
return null;
26+
}
27+
28+
if (WorkerCompatibilityFleet::supports($required, $connection, $queue)) {
29+
return null;
30+
}
31+
32+
return self::BLOCKED_COMPATIBILITY;
33+
}
34+
35+
public static function blockedMessage(
36+
string $prefix,
37+
?string $required,
38+
?string $connection = null,
39+
?string $queue = null,
40+
): ?string {
41+
if (self::blockedReason($required, $connection, $queue) === null) {
42+
return null;
43+
}
44+
45+
$reason = WorkerCompatibilityFleet::mismatchReason($required, $connection, $queue)
46+
?? 'No compatible worker is live for the requested queue.';
47+
48+
return sprintf('%s Start blocked under fail validation mode. %s', $prefix, $reason);
49+
}
50+
}

0 commit comments

Comments
 (0)