Skip to content

Commit 75a1b18

Browse files
Route workflow loop wakes and repair passes through a replaceable matching role binding
Expose a replaceable matching role seam
1 parent 4efdc19 commit 75a1b18

8 files changed

Lines changed: 223 additions & 10 deletions

File tree

docs/architecture/control-plane-split.md

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -467,13 +467,16 @@ each step independently.
467467
an out-of-process adapter can replace the binding without
468468
patching the package. Today's bindings are
469469
`WorkflowControlPlane`, `OperatorObservabilityRepository`,
470-
`HistoryProjectionRole`, `WorkflowTaskBridge`,
470+
`MatchingRole`, `HistoryProjectionRole`, `WorkflowTaskBridge`,
471471
`ActivityTaskBridge`, `LongPollWakeStore`, and the scheduler's
472-
`ScheduleWorkflowStarter`. The history/projection role now
473-
crosses the matching seam through
474-
`DefaultHistoryProjectionRole`, so a future out-of-process
475-
adapter can replace that binding without patching the claim
476-
paths.
472+
`ScheduleWorkflowStarter`. The matching role now crosses the
473+
queue-loop wake and dedicated daemon entrypoints through
474+
`DefaultMatchingRole`, so a future out-of-process adapter can
475+
replace that binding without patching `Looping` listeners or
476+
`workflow:v2:repair-pass`. The history/projection role now
477+
crosses the matching seam through `DefaultHistoryProjectionRole`,
478+
so a future out-of-process adapter can replace that binding
479+
without patching the claim paths.
477480
3. **Introduce the dedicated matching shape.** The Phase 3
478481
contract already allows a dedicated matching role; Phase 4
479482
provides the deployment guidance for running it as a separate

src/Commands/V2RepairPassCommand.php

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
use Illuminate\Console\Command;
88
use JsonException;
99
use Symfony\Component\Console\Attribute\AsCommand;
10+
use Workflow\V2\Contracts\MatchingRole;
1011
use Workflow\V2\Support\TaskRepairPolicy;
11-
use Workflow\V2\TaskWatchdog;
1212

1313
#[AsCommand(name: 'workflow:v2:repair-pass')]
1414
class V2RepairPassCommand extends Command
@@ -32,6 +32,12 @@ class V2RepairPassCommand extends Command
3232
*/
3333
private bool $shouldStop = false;
3434

35+
public function __construct(
36+
private readonly MatchingRole $matchingRole,
37+
) {
38+
parent::__construct();
39+
}
40+
3541
public function handle(): int
3642
{
3743
if ((bool) $this->option('loop')) {
@@ -59,7 +65,7 @@ protected function sleepBetweenIterations(int $sleepSeconds): void
5965
*/
6066
private function runOnce(?bool $respectThrottleOverride): int
6167
{
62-
$report = TaskWatchdog::runPass(
68+
$report = $this->matchingRole->runPass(
6369
$this->stringOption('connection'),
6470
$this->stringOption('queue'),
6571
respectThrottle: $respectThrottleOverride ?? (bool) $this->option('respect-throttle'),

src/Providers/WorkflowServiceProvider.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use Workflow\V2\Contracts\ActivityTaskBridge;
2323
use Workflow\V2\Contracts\HistoryProjectionRole;
2424
use Workflow\V2\Contracts\LongPollWakeStore;
25+
use Workflow\V2\Contracts\MatchingRole;
2526
use Workflow\V2\Contracts\OperatorObservabilityRepository;
2627
use Workflow\V2\Contracts\ScheduleWorkflowStarter;
2728
use Workflow\V2\Contracts\WorkflowControlPlane;
@@ -38,14 +39,14 @@
3839
use Workflow\V2\Support\ConfiguredV2Models;
3940
use Workflow\V2\Support\DefaultActivityTaskBridge;
4041
use Workflow\V2\Support\DefaultHistoryProjectionRole;
42+
use Workflow\V2\Support\DefaultMatchingRole;
4143
use Workflow\V2\Support\DefaultOperatorObservabilityRepository;
4244
use Workflow\V2\Support\DefaultWorkflowControlPlane;
4345
use Workflow\V2\Support\DefaultWorkflowTaskBridge;
4446
use Workflow\V2\Support\LongPollCacheValidator;
4547
use Workflow\V2\Support\PhpClassScheduleStarter;
4648
use Workflow\V2\Support\TypeRegistry;
4749
use Workflow\V2\Support\WorkflowModeGuard;
48-
use Workflow\V2\TaskWatchdog;
4950
use Workflow\Watchdog;
5051

5152
final class WorkflowServiceProvider extends ServiceProvider
@@ -59,6 +60,8 @@ public function register(): void
5960
DefaultOperatorObservabilityRepository::class,
6061
);
6162

63+
$this->app->singletonIf(MatchingRole::class, DefaultMatchingRole::class);
64+
6265
$this->app->singletonIf(HistoryProjectionRole::class, DefaultHistoryProjectionRole::class);
6366

6467
$this->app->singleton(WorkflowTaskBridge::class, DefaultWorkflowTaskBridge::class);
@@ -115,7 +118,7 @@ public function boot(): void
115118
Watchdog::wake($event->connectionName, $event->queue);
116119

117120
if (config('workflows.v2.matching_role.queue_wake_enabled', true)) {
118-
TaskWatchdog::wake($event->connectionName, $event->queue);
121+
app(MatchingRole::class)->wake($event->connectionName, $event->queue);
119122
}
120123
});
121124
}

src/V2/Contracts/MatchingRole.php

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Contracts;
6+
7+
/**
8+
* Binding seam for the matching role.
9+
*
10+
* Queue-loop wake hooks and the dedicated repair-pass daemon use this
11+
* contract when they need the canonical matching-role implementation
12+
* without hard-coding the in-process TaskWatchdog.
13+
*/
14+
interface MatchingRole
15+
{
16+
public function wake(?string $connection = null, ?string $queue = null): void;
17+
18+
/**
19+
* @param list<string> $runIds
20+
* @return array{
21+
* connection: string|null,
22+
* queue: string|null,
23+
* run_ids: list<string>,
24+
* instance_id: string|null,
25+
* respect_throttle: bool,
26+
* throttled: bool,
27+
* selected_existing_task_candidates: int,
28+
* selected_missing_task_candidates: int,
29+
* selected_total_candidates: int,
30+
* repaired_existing_tasks: int,
31+
* repaired_missing_tasks: int,
32+
* dispatched_tasks: int,
33+
* existing_task_failures: list<array{candidate_id: string, message: string}>,
34+
* missing_run_failures: list<array{run_id: string, message: string}>,
35+
* deadline_expired_candidates: int,
36+
* deadline_expired_tasks_created: int,
37+
* deadline_expired_failures: list<array{run_id: string, message: string}>,
38+
* activity_timeout_candidates: int,
39+
* activity_timeouts_enforced: int,
40+
* activity_timeout_failures: list<array{execution_id: string, message: string}>
41+
* }
42+
*/
43+
public function runPass(
44+
?string $connection = null,
45+
?string $queue = null,
46+
bool $respectThrottle = false,
47+
array $runIds = [],
48+
?string $instanceId = null,
49+
): array;
50+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Support;
6+
7+
use Workflow\V2\Contracts\MatchingRole;
8+
use Workflow\V2\TaskWatchdog;
9+
10+
final class DefaultMatchingRole implements MatchingRole
11+
{
12+
public function wake(?string $connection = null, ?string $queue = null): void
13+
{
14+
TaskWatchdog::wake($connection, $queue);
15+
}
16+
17+
public function runPass(
18+
?string $connection = null,
19+
?string $queue = null,
20+
bool $respectThrottle = false,
21+
array $runIds = [],
22+
?string $instanceId = null,
23+
): array {
24+
return TaskWatchdog::runPass($connection, $queue, $respectThrottle, $runIds, $instanceId);
25+
}
26+
}

tests/Unit/Commands/V2RepairPassCommandTest.php

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Tests\TestCase;
1212
use Workflow\Serializers\CodecRegistry;
1313
use Workflow\Serializers\Serializer;
14+
use Workflow\V2\Contracts\MatchingRole;
1415
use Workflow\V2\Enums\RunStatus;
1516
use Workflow\V2\Enums\SignalStatus;
1617
use Workflow\V2\Enums\TaskStatus;
@@ -172,6 +173,103 @@ public function testItReportsExistingTaskRecoveryCountsInHumanOutput(): void
172173
);
173174
}
174175

176+
public function testItUsesTheMatchingRoleBindingForRepairPasses(): void
177+
{
178+
$fake = new class() implements MatchingRole {
179+
/**
180+
* @var array{connection: string|null, queue: string|null, respectThrottle: bool, runIds: list<string>, instanceId: string|null}|null
181+
*/
182+
public ?array $lastRunPassArguments = null;
183+
184+
public function wake(?string $connection = null, ?string $queue = null): void
185+
{
186+
}
187+
188+
public function runPass(
189+
?string $connection = null,
190+
?string $queue = null,
191+
bool $respectThrottle = false,
192+
array $runIds = [],
193+
?string $instanceId = null,
194+
): array {
195+
$this->lastRunPassArguments = [
196+
'connection' => $connection,
197+
'queue' => $queue,
198+
'respectThrottle' => $respectThrottle,
199+
'runIds' => $runIds,
200+
'instanceId' => $instanceId,
201+
];
202+
203+
return [
204+
'connection' => $connection,
205+
'queue' => $queue,
206+
'run_ids' => $runIds,
207+
'instance_id' => $instanceId,
208+
'respect_throttle' => $respectThrottle,
209+
'throttled' => false,
210+
'selected_existing_task_candidates' => 0,
211+
'selected_missing_task_candidates' => 0,
212+
'selected_total_candidates' => 0,
213+
'repaired_existing_tasks' => 0,
214+
'repaired_missing_tasks' => 0,
215+
'dispatched_tasks' => 0,
216+
'existing_task_failures' => [],
217+
'missing_run_failures' => [],
218+
'deadline_expired_candidates' => 0,
219+
'deadline_expired_tasks_created' => 0,
220+
'deadline_expired_failures' => [],
221+
'activity_timeout_candidates' => 0,
222+
'activity_timeouts_enforced' => 0,
223+
'activity_timeout_failures' => [],
224+
];
225+
}
226+
};
227+
228+
$this->app->instance(MatchingRole::class, $fake);
229+
230+
$expected = [
231+
'connection' => 'redis',
232+
'queue' => 'critical',
233+
'run_ids' => ['run-a', 'run-b'],
234+
'instance_id' => 'instance-42',
235+
'respect_throttle' => true,
236+
'throttled' => false,
237+
'selected_existing_task_candidates' => 0,
238+
'selected_missing_task_candidates' => 0,
239+
'selected_total_candidates' => 0,
240+
'repaired_existing_tasks' => 0,
241+
'repaired_missing_tasks' => 0,
242+
'dispatched_tasks' => 0,
243+
'existing_task_failures' => [],
244+
'missing_run_failures' => [],
245+
'deadline_expired_candidates' => 0,
246+
'deadline_expired_tasks_created' => 0,
247+
'deadline_expired_failures' => [],
248+
'activity_timeout_candidates' => 0,
249+
'activity_timeouts_enforced' => 0,
250+
'activity_timeout_failures' => [],
251+
];
252+
253+
$this->artisan('workflow:v2:repair-pass', [
254+
'--connection' => 'redis',
255+
'--queue' => 'critical',
256+
'--run-id' => ['run-a', 'run-b'],
257+
'--instance-id' => 'instance-42',
258+
'--respect-throttle' => true,
259+
'--json' => true,
260+
])
261+
->expectsOutput(json_encode($expected, JSON_UNESCAPED_SLASHES))
262+
->assertSuccessful();
263+
264+
$this->assertSame([
265+
'connection' => 'redis',
266+
'queue' => 'critical',
267+
'respectThrottle' => true,
268+
'runIds' => ['run-a', 'run-b'],
269+
'instanceId' => 'instance-42',
270+
], $fake->lastRunPassArguments);
271+
}
272+
175273
public function testRunIdScopeRepairsOnlyTheSelectedMissingTaskRun(): void
176274
{
177275
Queue::fake();

tests/Unit/Providers/WorkflowServiceProviderTest.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Workflow\Serializers\Serializer;
1919
use Workflow\States\WorkflowPendingStatus;
2020
use Workflow\V2\Contracts\HistoryProjectionRole;
21+
use Workflow\V2\Contracts\MatchingRole;
2122
use Workflow\V2\Contracts\OperatorObservabilityRepository;
2223
use Workflow\V2\Enums\RunStatus;
2324
use Workflow\V2\Enums\TaskStatus;
@@ -110,6 +111,18 @@ public function testHistoryProjectionRoleBindingDefersToAppBinding(): void
110111
$this->assertSame($custom, $this->app->make(HistoryProjectionRole::class));
111112
}
112113

114+
public function testMatchingRoleBindingDefersToAppBinding(): void
115+
{
116+
$custom = $this->createMock(MatchingRole::class);
117+
118+
$this->app->offsetUnset(MatchingRole::class);
119+
$this->app->singleton(MatchingRole::class, static fn () => $custom);
120+
121+
(new WorkflowServiceProvider($this->app))->register();
122+
123+
$this->assertSame($custom, $this->app->make(MatchingRole::class));
124+
}
125+
113126
public function testProviderMergesV2DefaultsIntoLegacyPublishedConfig(): void
114127
{
115128
config()->set('workflows', [
@@ -391,6 +404,18 @@ public function testLoopingEventRepairsOverdueV2Task(): void
391404
$this->assertNotNull($task->last_dispatched_at);
392405
}
393406

407+
public function testLoopingEventRoutesV2WakeThroughMatchingRoleBinding(): void
408+
{
409+
$matchingRole = $this->createMock(MatchingRole::class);
410+
$matchingRole->expects($this->once())
411+
->method('wake')
412+
->with('redis', 'high,default');
413+
414+
$this->app->instance(MatchingRole::class, $matchingRole);
415+
416+
Event::dispatch(new Looping('redis', 'high,default'));
417+
}
418+
394419
public function testLoopingEventSkipsTaskWatchdogWakeWhenMatchingRoleDisabled(): void
395420
{
396421
Queue::fake();

tests/Unit/V2/ControlPlaneSplitDocumentationTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ final class ControlPlaneSplitDocumentationTest extends TestCase
6363
private const REQUIRED_REFERENCED_CLASSES = [
6464
'WorkflowControlPlane',
6565
'DefaultWorkflowControlPlane',
66+
'MatchingRole',
67+
'DefaultMatchingRole',
6668
'HistoryProjectionRole',
6769
'DefaultHistoryProjectionRole',
6870
'RunSummaryProjector',

0 commit comments

Comments
 (0)