Skip to content

Commit f1efe0d

Browse files
Keep v2 scheduler correct when the wake layer fails
Pin the wake-acceleration degraded-mode invariants from the v2 scheduler-correctness contract against the engine: a wake layer that drops every signal, raises on every signal, or is permanently unavailable must not prevent durable tasks from being written, history events from being recorded, or workflows from making progress. Catch publisher exceptions in the workflow-task and history-event observers and report them instead of cascading through the durable write path, and add an integration test that covers lost-signal, unreachable, permanently-unavailable, and partitioned wake behaviour end-to-end.
1 parent 1ee23bc commit f1efe0d

5 files changed

Lines changed: 352 additions & 4 deletions

File tree

src/V2/Observers/WorkflowHistoryEventObserver.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
namespace Workflow\V2\Observers;
66

7+
use Throwable;
78
use Workflow\V2\Contracts\LongPollWakeStore;
89
use Workflow\V2\Models\WorkflowHistoryEvent;
10+
use Workflow\V2\Support\CacheLongPollWakeStore;
911

1012
/**
1113
* Triggers long-poll wake signals when history events are created.
@@ -23,6 +25,24 @@ public function __construct(
2325

2426
public function created(WorkflowHistoryEvent $event): void
2527
{
26-
$this->wakeStore->signalHistoryEvent($event);
28+
// History-event publishers are strictly an acceleration signal
29+
// for wait_new_event pollers. The durable history row is
30+
// already persisted; a failing or dropped publisher must not
31+
// break the write path.
32+
try {
33+
if ($this->wakeStore instanceof CacheLongPollWakeStore) {
34+
$this->wakeStore->signalHistoryEvent($event);
35+
36+
return;
37+
}
38+
39+
if (! is_string($event->workflow_run_id) || $event->workflow_run_id === '') {
40+
return;
41+
}
42+
43+
$this->wakeStore->signal($this->wakeStore->historyRunChannel($event->workflow_run_id));
44+
} catch (Throwable $throwable) {
45+
report($throwable);
46+
}
2747
}
2848
}

src/V2/Observers/WorkflowTaskObserver.php

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
namespace Workflow\V2\Observers;
66

7+
use Throwable;
78
use Workflow\V2\Contracts\LongPollWakeStore;
89
use Workflow\V2\Models\WorkflowTask;
10+
use Workflow\V2\Support\CacheLongPollWakeStore;
911

1012
/**
1113
* Triggers long-poll wake signals when workflow tasks change.
@@ -23,16 +25,40 @@ public function __construct(
2325

2426
public function created(WorkflowTask $task): void
2527
{
26-
$this->wakeStore->signalTask($task);
28+
$this->signalTask($task);
2729
}
2830

2931
public function updated(WorkflowTask $task): void
3032
{
31-
$this->wakeStore->signalTask($task);
33+
$this->signalTask($task);
3234
}
3335

3436
public function deleted(WorkflowTask $task): void
3537
{
36-
$this->wakeStore->signalTask($task);
38+
$this->signalTask($task);
39+
}
40+
41+
private function signalTask(WorkflowTask $task): void
42+
{
43+
// The acceleration layer is not the correctness boundary. Any
44+
// publisher failure — unreachable cache, partitioned backend,
45+
// dropped signal — must not prevent the underlying task write
46+
// from completing. The durable dispatch row is already
47+
// persisted; pollers will discover it on their next interval.
48+
try {
49+
if ($this->wakeStore instanceof CacheLongPollWakeStore) {
50+
$this->wakeStore->signalTask($task);
51+
52+
return;
53+
}
54+
55+
$this->wakeStore->signal(...$this->wakeStore->workflowTaskPollChannels(
56+
is_string($task->namespace) ? $task->namespace : '',
57+
is_string($task->connection) ? $task->connection : null,
58+
is_string($task->queue) ? $task->queue : null,
59+
));
60+
} catch (Throwable $throwable) {
61+
report($throwable);
62+
}
3763
}
3864
}
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Feature\V2;
6+
7+
use Illuminate\Support\Facades\Queue;
8+
use RuntimeException;
9+
use Tests\Fixtures\V2\TestGreetingWorkflow;
10+
use Tests\Support\V2\NullLongPollWakeStore;
11+
use Tests\Support\V2\ThrowingLongPollWakeStore;
12+
use Tests\TestCase;
13+
use Workflow\V2\Contracts\LongPollWakeStore;
14+
use Workflow\V2\Enums\RunStatus;
15+
use Workflow\V2\Enums\TaskStatus;
16+
use Workflow\V2\Enums\TaskType;
17+
use Workflow\V2\Jobs\RunActivityTask;
18+
use Workflow\V2\Jobs\RunTimerTask;
19+
use Workflow\V2\Jobs\RunWorkflowTask;
20+
use Workflow\V2\Models\WorkflowHistoryEvent;
21+
use Workflow\V2\Models\WorkflowRun;
22+
use Workflow\V2\Models\WorkflowTask;
23+
use Workflow\V2\Support\CacheLongPollWakeStore;
24+
use Workflow\V2\WorkflowStub;
25+
26+
/**
27+
* Covers the degraded-mode scheduler-correctness scenarios pinned in
28+
* docs/architecture/scheduler-correctness.md. Each scenario verifies
29+
* that the acceleration layer's failure does not become a correctness
30+
* failure: tasks still move through their durable lifecycle, history
31+
* events still persist, and workflows still reach a terminal state
32+
* using only durable dispatch rows.
33+
*/
34+
final class V2SchedulerDegradedModeTest extends TestCase
35+
{
36+
protected function setUp(): void
37+
{
38+
parent::setUp();
39+
40+
config()
41+
->set('workflows.v2.compatibility.current', 'build-degraded');
42+
config()
43+
->set('workflows.v2.compatibility.supported', ['build-degraded']);
44+
}
45+
46+
/**
47+
* Contract: "Wake backend lost some signals — a subset of
48+
* published signals never reaches subscribers. Pollers that
49+
* missed the signal re-poll on their configured interval. No
50+
* work is lost."
51+
*
52+
* A deployment whose wake layer silently drops every signal
53+
* remains correct; the workflow still runs to completion using
54+
* durable dispatch state alone.
55+
*/
56+
public function testScenarioWakeBackendLostSignals(): void
57+
{
58+
$this->swapWakeStore(NullLongPollWakeStore::class);
59+
60+
Queue::fake();
61+
62+
$workflow = WorkflowStub::make(TestGreetingWorkflow::class, 'degraded-dropped');
63+
$workflow->start('Taylor');
64+
65+
$this->drainReadyTasks();
66+
67+
$run = WorkflowRun::query()
68+
->where('workflow_instance_id', 'degraded-dropped')
69+
->orderBy('run_number')
70+
->firstOrFail();
71+
72+
$this->assertSame(RunStatus::Completed, $run->status);
73+
$this->assertSame([], WorkflowTask::query()
74+
->where('workflow_run_id', $run->id)
75+
->where('status', TaskStatus::Ready->value)
76+
->pluck('id')
77+
->all());
78+
$this->assertGreaterThan(
79+
0,
80+
WorkflowHistoryEvent::query()
81+
->where('workflow_run_id', $run->id)
82+
->count(),
83+
'Durable history must accrue even when no wake signal was delivered.',
84+
);
85+
}
86+
87+
/**
88+
* Contract: "Wake backend unreachable — signal() calls surface
89+
* as exceptions or log lines at the publisher; pollers continue
90+
* to discover work on the next configured poll."
91+
*
92+
* The publisher-side failure is logged via report() but MUST
93+
* NOT cause the durable task write, history write, or workflow
94+
* progression to fail.
95+
*/
96+
public function testScenarioWakeBackendUnreachable(): void
97+
{
98+
$store = $this->swapWakeStore(ThrowingLongPollWakeStore::class);
99+
100+
Queue::fake();
101+
102+
$workflow = WorkflowStub::make(TestGreetingWorkflow::class, 'degraded-unreachable');
103+
$workflow->start('Taylor');
104+
105+
$this->drainReadyTasks();
106+
107+
$run = WorkflowRun::query()
108+
->where('workflow_instance_id', 'degraded-unreachable')
109+
->orderBy('run_number')
110+
->firstOrFail();
111+
112+
$this->assertSame(RunStatus::Completed, $run->status);
113+
$this->assertGreaterThan(
114+
0,
115+
$store->signalAttempts,
116+
'Throwing wake store should have been asked to signal at least once.',
117+
);
118+
}
119+
120+
/**
121+
* Contract: "A node MUST NOT refuse to make progress because
122+
* the acceleration layer is unavailable."
123+
*
124+
* Pins the publisher-resilience invariant directly: a raising
125+
* wake backend does not break workflow start, and the durable
126+
* workflow task is committed in the normal path.
127+
*/
128+
public function testScenarioCacheBackendPermanentlyUnavailableDoesNotBlockTaskCreation(): void
129+
{
130+
$store = $this->swapWakeStore(ThrowingLongPollWakeStore::class);
131+
132+
Queue::fake();
133+
134+
$workflow = WorkflowStub::make(TestGreetingWorkflow::class, 'degraded-blocked');
135+
136+
try {
137+
$workflow->start('Taylor');
138+
} catch (RuntimeException $throwable) {
139+
$this->fail('Task creation must not fail when the wake backend raises: ' . $throwable->getMessage());
140+
}
141+
142+
$this->assertSame(
143+
1,
144+
WorkflowTask::query()
145+
->where('task_type', TaskType::Workflow->value)
146+
->count(),
147+
'Exactly one ready workflow task should have been durably created despite the wake backend raising.',
148+
);
149+
$this->assertGreaterThan(0, $store->signalAttempts);
150+
}
151+
152+
/**
153+
* Contract: "Wake backend partitioned — different nodes see
154+
* different version snapshots. A node that missed a signal
155+
* re-polls on its configured interval and still finds every
156+
* eligible task."
157+
*
158+
* A node that sees zero advancing signals (effectively
159+
* partitioned away from the publisher) still discovers ready
160+
* work through durable dispatch state — workflow_tasks rows.
161+
*/
162+
public function testScenarioWakeBackendPartitionedFallsBackToDurableDispatchPoll(): void
163+
{
164+
$this->swapWakeStore(NullLongPollWakeStore::class);
165+
166+
Queue::fake();
167+
168+
$workflow = WorkflowStub::make(TestGreetingWorkflow::class, 'degraded-partitioned');
169+
$workflow->start('Taylor');
170+
171+
$readyCountBeforeDrain = WorkflowTask::query()
172+
->where('status', TaskStatus::Ready->value)
173+
->count();
174+
175+
$this->assertGreaterThan(
176+
0,
177+
$readyCountBeforeDrain,
178+
'Durable dispatch state must expose a ready task to direct polling even when no wake signal was delivered.',
179+
);
180+
181+
$this->drainReadyTasks();
182+
183+
$this->assertSame(0, WorkflowTask::query() ->where('status', TaskStatus::Ready->value) ->count());
184+
}
185+
186+
private function drainReadyTasks(): void
187+
{
188+
$deadline = microtime(true) + 10;
189+
190+
while (microtime(true) < $deadline) {
191+
/** @var WorkflowTask|null $task */
192+
$task = WorkflowTask::query()
193+
->where('status', TaskStatus::Ready->value)
194+
->orderBy('created_at')
195+
->first();
196+
197+
if ($task === null) {
198+
return;
199+
}
200+
201+
$job = match ($task->task_type) {
202+
TaskType::Workflow => new RunWorkflowTask($task->id),
203+
TaskType::Activity => new RunActivityTask($task->id),
204+
TaskType::Timer => new RunTimerTask($task->id),
205+
};
206+
207+
$this->app->call([$job, 'handle']);
208+
}
209+
210+
$this->fail('Timed out draining ready workflow tasks.');
211+
}
212+
213+
/**
214+
* @template TStore of CacheLongPollWakeStore
215+
*
216+
* @param class-string<TStore> $storeClass
217+
* @return TStore
218+
*/
219+
private function swapWakeStore(string $storeClass): CacheLongPollWakeStore
220+
{
221+
$store = $this->app->make($storeClass);
222+
$this->app->instance(LongPollWakeStore::class, $store);
223+
$this->app->instance(CacheLongPollWakeStore::class, $store);
224+
225+
return $store;
226+
}
227+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Support\V2;
6+
7+
use Workflow\V2\Support\CacheLongPollWakeStore;
8+
9+
/**
10+
* Acceleration-layer wake store that silently drops every signal.
11+
*
12+
* Simulates the "wake backend silently drops signals" and "cache
13+
* backend permanently unavailable" degraded-mode scenarios pinned in
14+
* docs/architecture/scheduler-correctness.md. All signal traffic is
15+
* dropped, so channel version stamps never advance; pollers that rely
16+
* on `changed()` will always see `false`.
17+
*/
18+
final class NullLongPollWakeStore extends CacheLongPollWakeStore
19+
{
20+
public function signal(string ...$channels): void
21+
{
22+
// Intentionally drop every signal.
23+
}
24+
25+
public function snapshot(array $channels): array
26+
{
27+
$snapshot = [];
28+
29+
foreach ($channels as $channel) {
30+
if (! is_string($channel) || trim($channel) === '') {
31+
continue;
32+
}
33+
34+
$snapshot[trim($channel)] = null;
35+
}
36+
37+
return $snapshot;
38+
}
39+
40+
public function changed(array $snapshot): bool
41+
{
42+
return false;
43+
}
44+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Support\V2;
6+
7+
use RuntimeException;
8+
use Workflow\V2\Support\CacheLongPollWakeStore;
9+
10+
/**
11+
* Acceleration-layer wake store that fails every signal with an
12+
* exception.
13+
*
14+
* Simulates the "wake backend unreachable" degraded-mode scenario
15+
* pinned in docs/architecture/scheduler-correctness.md, where
16+
* `signal()` calls surface as exceptions at the publisher. The
17+
* scheduler-correctness contract requires durable dispatch to remain
18+
* correct under this failure mode; publishers MUST NOT cause task
19+
* creation, history write, or schedule fire to fail.
20+
*/
21+
final class ThrowingLongPollWakeStore extends CacheLongPollWakeStore
22+
{
23+
public int $signalAttempts = 0;
24+
25+
public function signal(string ...$channels): void
26+
{
27+
$this->signalAttempts++;
28+
29+
throw new RuntimeException('simulated wake backend unreachable');
30+
}
31+
}

0 commit comments

Comments
 (0)