Skip to content

Commit 8b20fc6

Browse files
Bind workflow claim paths through a replaceable history/projection role
Bind the Phase 4 history/projection claim seam
1 parent 04ee3ed commit 8b20fc6

10 files changed

Lines changed: 251 additions & 38 deletions

docs/architecture/control-plane-split.md

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,11 @@ It does not cover:
7676
first-class role so topologies can host it explicitly.
7777
- **History/projection role** — the role that owns `HistoryEvent`
7878
persistence, visibility projection via
79-
`Workflow\V2\Support\RunSummaryProjector`, and the observability
80-
surface via `Workflow\V2\Support\DefaultOperatorObservabilityRepository`.
79+
`Workflow\V2\Support\RunSummaryProjector`, the
80+
`Workflow\V2\Contracts\HistoryProjectionRole` /
81+
`Workflow\V2\Support\DefaultHistoryProjectionRole` binding seam,
82+
and the observability surface via
83+
`Workflow\V2\Support\DefaultOperatorObservabilityRepository`.
8184
- **Scheduler role** — the role that evaluates active
8285
`WorkflowSchedule` rows, resolves cron/interval triggers to
8386
workflow starts, and hands the start to the control plane through
@@ -203,7 +206,8 @@ Authority:
203206
recording path inside the transaction that produced the event.
204207
- projecting run, activity, timer, update, and signal state into
205208
`WorkflowRunSummary` and the operator observability surfaces via
206-
`Workflow\V2\Support\RunSummaryProjector` and
209+
`Workflow\V2\Support\RunSummaryProjector`,
210+
`Workflow\V2\Contracts\HistoryProjectionRole`, and
207211
`Workflow\V2\Support\DefaultOperatorObservabilityRepository`.
208212
- exporting redacted history through
209213
`Workflow\V2\Support\HistoryExport` and the
@@ -463,10 +467,13 @@ each step independently.
463467
an out-of-process adapter can replace the binding without
464468
patching the package. Today's bindings are
465469
`WorkflowControlPlane`, `OperatorObservabilityRepository`,
466-
`WorkflowTaskBridge`, `ActivityTaskBridge`, `LongPollWakeStore`,
467-
and the scheduler's `ScheduleWorkflowStarter`; Phase 4 adds a
468-
binding for the history/projection role when it moves out of
469-
process.
470+
`HistoryProjectionRole`, `WorkflowTaskBridge`,
471+
`ActivityTaskBridge`, `LongPollWakeStore`, and the scheduler's
472+
`ScheduleWorkflowStarter`. The history/projection role now
473+
crosses the matching seam through
474+
`DefaultHistoryProjectionRole`, so a future out-of-process
475+
adapter can replace that binding without patching the claim
476+
paths.
470477
3. **Introduce the dedicated matching shape.** The Phase 3
471478
contract already allows a dedicated matching role; Phase 4
472479
provides the deployment guidance for running it as a separate

src/Providers/WorkflowServiceProvider.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Workflow\Commands\V2ScheduleTickCommand;
2121
use Workflow\Commands\WorkflowMakeCommand;
2222
use Workflow\V2\Contracts\ActivityTaskBridge;
23+
use Workflow\V2\Contracts\HistoryProjectionRole;
2324
use Workflow\V2\Contracts\LongPollWakeStore;
2425
use Workflow\V2\Contracts\OperatorObservabilityRepository;
2526
use Workflow\V2\Contracts\ScheduleWorkflowStarter;
@@ -36,6 +37,7 @@
3637
use Workflow\V2\Support\CacheLongPollWakeStore;
3738
use Workflow\V2\Support\ConfiguredV2Models;
3839
use Workflow\V2\Support\DefaultActivityTaskBridge;
40+
use Workflow\V2\Support\DefaultHistoryProjectionRole;
3941
use Workflow\V2\Support\DefaultOperatorObservabilityRepository;
4042
use Workflow\V2\Support\DefaultWorkflowControlPlane;
4143
use Workflow\V2\Support\DefaultWorkflowTaskBridge;
@@ -57,6 +59,8 @@ public function register(): void
5759
DefaultOperatorObservabilityRepository::class,
5860
);
5961

62+
$this->app->singletonIf(HistoryProjectionRole::class, DefaultHistoryProjectionRole::class);
63+
6064
$this->app->singleton(WorkflowTaskBridge::class, DefaultWorkflowTaskBridge::class);
6165

6266
$this->app->singleton(ActivityTaskBridge::class, DefaultActivityTaskBridge::class);
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Contracts;
6+
7+
use Workflow\V2\Models\ActivityAttempt;
8+
use Workflow\V2\Models\ActivityExecution;
9+
use Workflow\V2\Models\WorkflowRun;
10+
use Workflow\V2\Models\WorkflowRunSummary;
11+
use Workflow\V2\Models\WorkflowTask;
12+
13+
/**
14+
* Binding seam for the history/projection role.
15+
*
16+
* Claim and command paths use this contract when they must synchronously
17+
* record durable history side effects and refresh operator-visible
18+
* projections without hard-coding the in-process implementation.
19+
*/
20+
interface HistoryProjectionRole
21+
{
22+
public function projectRun(WorkflowRun $run): WorkflowRunSummary;
23+
24+
public function recordActivityStarted(
25+
WorkflowRun $run,
26+
ActivityExecution $execution,
27+
ActivityAttempt $attempt,
28+
WorkflowTask $task,
29+
): WorkflowRunSummary;
30+
}

src/V2/Support/ActivityTaskClaimer.php

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@
66

77
use Illuminate\Support\Facades\DB;
88
use Illuminate\Support\Str;
9+
use Workflow\V2\Contracts\HistoryProjectionRole;
910
use Workflow\V2\Enums\ActivityAttemptStatus;
1011
use Workflow\V2\Enums\ActivityStatus;
11-
use Workflow\V2\Enums\HistoryEventType;
1212
use Workflow\V2\Enums\TaskStatus;
1313
use Workflow\V2\Enums\TaskType;
1414
use Workflow\V2\Models\ActivityAttempt;
1515
use Workflow\V2\Models\ActivityExecution;
16-
use Workflow\V2\Models\WorkflowHistoryEvent;
1716
use Workflow\V2\Models\WorkflowRun;
1817
use Workflow\V2\Models\WorkflowTask;
1918

@@ -100,13 +99,13 @@ public static function claimDetailed(
10099
$backendError = TaskBackendCapabilities::recordClaimFailureIfUnsupported($task);
101100

102101
if ($backendError !== null) {
103-
RunSummaryProjector::project($run->fresh(['instance', 'tasks', 'activityExecutions', 'failures']));
102+
self::historyProjectionRole()->projectRun(self::projectionRun($run));
104103

105104
return self::claimFailure('backend_unsupported', null, $backendError);
106105
}
107106

108107
if (! TaskCompatibility::supported($task, $run)) {
109-
RunSummaryProjector::project($run->fresh(['instance', 'tasks', 'activityExecutions', 'failures']));
108+
self::historyProjectionRole()->projectRun(self::projectionRun($run));
110109

111110
return self::claimFailure(
112111
'compatibility_unsupported',
@@ -170,29 +169,7 @@ public static function claimDetailed(
170169
'lease_expires_at' => $task->lease_expires_at,
171170
]);
172171

173-
$parallelMetadataPath = ParallelChildGroup::metadataPathForSequence($run, (int) $execution->sequence);
174-
$parallelMetadata = ParallelChildGroup::payloadForPath($parallelMetadataPath);
175-
176-
WorkflowHistoryEvent::record($run, HistoryEventType::ActivityStarted, array_merge([
177-
'activity_execution_id' => $execution->id,
178-
'activity_attempt_id' => $attemptId,
179-
'activity_class' => $execution->activity_class,
180-
'activity_type' => $execution->activity_type,
181-
'sequence' => $execution->sequence,
182-
'attempt_number' => $attemptCount,
183-
'activity' => ActivitySnapshot::fromExecution($execution),
184-
], $parallelMetadata ?? []), $task);
185-
186-
LifecycleEventDispatcher::activityStarted(
187-
$run,
188-
(string) $execution->id,
189-
(string) ($execution->activity_type ?? $execution->activity_class),
190-
(string) $execution->activity_class,
191-
(int) $execution->sequence,
192-
$attemptCount,
193-
);
194-
195-
RunSummaryProjector::project($run->fresh(['instance', 'tasks', 'activityExecutions', 'failures']));
172+
self::historyProjectionRole()->recordActivityStarted($run, $execution, $attempt, $task);
196173

197174
return self::claimSuccess(new ActivityTaskClaim($task, $run, $execution, $attempt));
198175
});
@@ -249,6 +226,19 @@ private static function releaseDelaySeconds(WorkflowTask $task): int
249226
return (int) ceil($remainingMilliseconds / 1000);
250227
}
251228

229+
private static function historyProjectionRole(): HistoryProjectionRole
230+
{
231+
/** @var HistoryProjectionRole $role */
232+
$role = app(HistoryProjectionRole::class);
233+
234+
return $role;
235+
}
236+
237+
private static function projectionRun(WorkflowRun $run): WorkflowRun
238+
{
239+
return $run->fresh(['instance', 'tasks', 'activityExecutions', 'failures']) ?? $run;
240+
}
241+
252242
private static function nonEmptyString(mixed $value): ?string
253243
{
254244
return is_string($value) && $value !== ''
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Support;
6+
7+
use Workflow\V2\Contracts\HistoryProjectionRole;
8+
use Workflow\V2\Enums\HistoryEventType;
9+
use Workflow\V2\Models\ActivityAttempt;
10+
use Workflow\V2\Models\ActivityExecution;
11+
use Workflow\V2\Models\WorkflowHistoryEvent;
12+
use Workflow\V2\Models\WorkflowRun;
13+
use Workflow\V2\Models\WorkflowRunSummary;
14+
use Workflow\V2\Models\WorkflowTask;
15+
16+
final class DefaultHistoryProjectionRole implements HistoryProjectionRole
17+
{
18+
public function projectRun(WorkflowRun $run): WorkflowRunSummary
19+
{
20+
return RunSummaryProjector::project($run);
21+
}
22+
23+
public function recordActivityStarted(
24+
WorkflowRun $run,
25+
ActivityExecution $execution,
26+
ActivityAttempt $attempt,
27+
WorkflowTask $task,
28+
): WorkflowRunSummary {
29+
$parallelMetadataPath = ParallelChildGroup::metadataPathForSequence($run, (int) $execution->sequence);
30+
$parallelMetadata = ParallelChildGroup::payloadForPath($parallelMetadataPath);
31+
32+
WorkflowHistoryEvent::record($run, HistoryEventType::ActivityStarted, array_merge([
33+
'activity_execution_id' => $execution->id,
34+
'activity_attempt_id' => $attempt->id,
35+
'activity_class' => $execution->activity_class,
36+
'activity_type' => $execution->activity_type,
37+
'sequence' => $execution->sequence,
38+
'attempt_number' => $attempt->attempt_number,
39+
'activity' => ActivitySnapshot::fromExecution($execution),
40+
], $parallelMetadata ?? []), $task);
41+
42+
LifecycleEventDispatcher::activityStarted(
43+
$run,
44+
(string) $execution->id,
45+
(string) ($execution->activity_type ?? $execution->activity_class),
46+
(string) $execution->activity_class,
47+
(int) $execution->sequence,
48+
(int) $attempt->attempt_number,
49+
);
50+
51+
return $this->projectRun($run->fresh(['instance', 'tasks', 'activityExecutions', 'failures']) ?? $run);
52+
}
53+
}

src/V2/Support/DefaultWorkflowTaskBridge.php

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use RuntimeException;
1111
use Throwable;
1212
use Workflow\Serializers\CodecRegistry;
13+
use Workflow\V2\Contracts\HistoryProjectionRole;
1314
use Workflow\V2\Contracts\WorkflowTaskBridge;
1415
use Workflow\V2\Enums\ActivityStatus;
1516
use Workflow\V2\Enums\ChildCallStatus;
@@ -120,7 +121,9 @@ public function poll(
120121

121122
public function claimStatus(string $taskId, ?string $leaseOwner = null): array
122123
{
123-
return DB::transaction(static function () use ($taskId, $leaseOwner): array {
124+
$historyProjectionRole = $this->historyProjectionRole();
125+
126+
return DB::transaction(function () use ($taskId, $leaseOwner, $historyProjectionRole): array {
124127
/** @var WorkflowTask|null $task */
125128
$task = ConfiguredV2Models::query('task_model', WorkflowTask::class)
126129
->lockForUpdate()
@@ -158,7 +161,7 @@ public function claimStatus(string $taskId, ?string $leaseOwner = null): array
158161
TaskCompatibility::sync($task, $run);
159162

160163
if (TaskBackendCapabilities::recordClaimFailureIfUnsupported($task) !== null) {
161-
RunSummaryProjector::project($run->fresh(['instance', 'tasks', 'activityExecutions', 'failures']));
164+
$historyProjectionRole->projectRun($this->projectionRun($run));
162165

163166
return self::claimRejected(
164167
$taskId,
@@ -168,7 +171,7 @@ public function claimStatus(string $taskId, ?string $leaseOwner = null): array
168171
}
169172

170173
if (! TaskCompatibility::supported($task, $run)) {
171-
RunSummaryProjector::project($run->fresh(['instance', 'tasks', 'activityExecutions', 'failures']));
174+
$historyProjectionRole->projectRun($this->projectionRun($run));
172175

173176
$mismatch = TaskCompatibility::mismatchReason($task, $run);
174177

@@ -193,7 +196,7 @@ public function claimStatus(string $taskId, ?string $leaseOwner = null): array
193196
'last_claim_error' => null,
194197
])->save();
195198

196-
RunSummaryProjector::project($run->fresh(['instance', 'tasks', 'activityExecutions', 'failures']));
199+
$historyProjectionRole->projectRun($this->projectionRun($run));
197200

198201
return [
199202
'claimed' => true,
@@ -2977,6 +2980,19 @@ private static function nonEmptyString(mixed $value): ?string
29772980
: null;
29782981
}
29792982

2983+
private function historyProjectionRole(): HistoryProjectionRole
2984+
{
2985+
/** @var HistoryProjectionRole $role */
2986+
$role = app(HistoryProjectionRole::class);
2987+
2988+
return $role;
2989+
}
2990+
2991+
private function projectionRun(WorkflowRun $run): WorkflowRun
2992+
{
2993+
return $run->fresh(['instance', 'tasks', 'activityExecutions', 'failures']) ?? $run;
2994+
}
2995+
29802996
private function inheritTypedVisibilityMetadata(WorkflowRun $sourceRun, WorkflowRun $targetRun): void
29812997
{
29822998
app(MemoUpsertService::class)->inheritFromParent($sourceRun, $targetRun, 1);

tests/Feature/V2/V2ActivityTaskBridgeTest.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Tests\TestCase;
1111
use Workflow\Serializers\Serializer;
1212
use Workflow\V2\Contracts\ActivityTaskBridge;
13+
use Workflow\V2\Contracts\HistoryProjectionRole;
1314
use Workflow\V2\Enums\ActivityAttemptStatus;
1415
use Workflow\V2\Enums\ActivityStatus;
1516
use Workflow\V2\Enums\HistoryEventType;
@@ -23,6 +24,7 @@
2324
use Workflow\V2\Models\WorkflowRun;
2425
use Workflow\V2\Models\WorkflowTask;
2526
use Workflow\V2\Support\DefaultActivityTaskBridge;
27+
use Workflow\V2\Support\DefaultHistoryProjectionRole;
2628

2729
final class V2ActivityTaskBridgeTest extends TestCase
2830
{
@@ -160,6 +162,48 @@ public function testClaimStatusClaimsReadyTask(): void
160162
$this->assertNull($result['reason']);
161163
}
162164

165+
public function testClaimStatusUsesHistoryProjectionRoleBinding(): void
166+
{
167+
[$run, $execution, $task] = $this->createActivityTask();
168+
169+
$customRole = new class(new DefaultHistoryProjectionRole()) implements HistoryProjectionRole {
170+
public array $calls = [];
171+
172+
public function __construct(
173+
private readonly DefaultHistoryProjectionRole $delegate,
174+
) {
175+
}
176+
177+
public function projectRun(WorkflowRun $run): \Workflow\V2\Models\WorkflowRunSummary
178+
{
179+
$this->calls[] = ['projectRun', $run->id];
180+
181+
return $this->delegate->projectRun($run);
182+
}
183+
184+
public function recordActivityStarted(
185+
WorkflowRun $run,
186+
ActivityExecution $execution,
187+
ActivityAttempt $attempt,
188+
WorkflowTask $task,
189+
): \Workflow\V2\Models\WorkflowRunSummary {
190+
$this->calls[] = ['recordActivityStarted', $run->id, $execution->id, $attempt->id, $task->id];
191+
192+
return $this->delegate->recordActivityStarted($run, $execution, $attempt, $task);
193+
}
194+
};
195+
196+
$this->app->instance(HistoryProjectionRole::class, $customRole);
197+
198+
$result = $this->bridge->claimStatus($task->id, 'server-worker-1');
199+
200+
$this->assertTrue($result['claimed']);
201+
$this->assertSame(
202+
[['recordActivityStarted', $run->id, $execution->id, $result['activity_attempt_id'], $task->id]],
203+
$customRole->calls,
204+
);
205+
}
206+
163207
public function testClaimStatusRejectsNonExistentTask(): void
164208
{
165209
$result = $this->bridge->claimStatus('nonexistent-task-id');

0 commit comments

Comments
 (0)