Skip to content

Commit 93aa8ee

Browse files
Carry accepted instance updates across continue-as-new
Carry accepted updates across continue-as-new
1 parent f87e9ac commit 93aa8ee

5 files changed

Lines changed: 187 additions & 3 deletions

File tree

docs/api-stability.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,31 @@ preference; both produce identical `Support\*` Call value objects.
101101
Adding new static methods to the facade is an additive (non-breaking)
102102
change. Removing or renaming a documented method is a major change.
103103

104+
## Continue-As-New Interleaving Contract
105+
106+
Continue-as-new keeps one logical workflow instance while closing one run
107+
and creating the next run. Commands that target the logical instance keep
108+
their ordering and lifecycle across that boundary:
109+
110+
- Signals are ordered by the instance message stream. A signal accepted
111+
before the continue-as-new transition commits remains pending until the
112+
continued run consumes it. Cursor transfer is durable and monotonic.
113+
- Instance-scoped updates accepted before the transition but not yet
114+
applied are carried to the continued run. The update id remains stable,
115+
`inspectUpdate()` follows the same lifecycle row, and the continued run
116+
records the `UpdateApplied` / `UpdateCompleted` history for the update.
117+
- Run-targeted commands are bound to their selected run. They are not
118+
retargeted to a continued run; callers that need logical-workflow
119+
behavior should use the instance-scoped command surface.
120+
- Queries are non-durable reads. A query resolves the current run at the
121+
time the query executes; if the continue-as-new transaction has already
122+
committed, the query reads the continued run, otherwise it reads the
123+
still-current closing run. Queries are not buffered or replayed.
124+
125+
This contract is intentionally instance-first so external server, CLI,
126+
and SDK callers can reason about a stable logical workflow id without
127+
having to retry around the brief run handoff window.
128+
104129
## Pre-existing `Contracts\*` interfaces
105130

106131
Interfaces under `Workflow\V2\Contracts\*` are the preferred extension

src/V2/Support/WorkflowExecutor.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2329,6 +2329,7 @@ private function continueAsNew(
23292329
$now,
23302330
$childCallId,
23312331
);
2332+
$this->transferAcceptedUpdatesToContinuedRun($run, $continuedRun);
23322333

23332334
/** @var WorkflowLink $link */
23342335
$link = WorkflowLink::query()->create([
@@ -2527,6 +2528,54 @@ private function continueAsNew(
25272528
return $continuedTask;
25282529
}
25292530

2531+
private function transferAcceptedUpdatesToContinuedRun(WorkflowRun $closingRun, WorkflowRun $continuedRun): void
2532+
{
2533+
$updates = WorkflowUpdate::query()
2534+
->where('workflow_run_id', $closingRun->id)
2535+
->where('target_scope', 'instance')
2536+
->where('status', UpdateStatus::Accepted->value)
2537+
->whereNull('workflow_sequence')
2538+
->lockForUpdate()
2539+
->get();
2540+
2541+
foreach ($updates as $update) {
2542+
if (! $update instanceof WorkflowUpdate) {
2543+
continue;
2544+
}
2545+
2546+
/** @var WorkflowCommand|null $command */
2547+
$command = $update->workflow_command_id === null
2548+
? null
2549+
: WorkflowCommand::query()
2550+
->lockForUpdate()
2551+
->find($update->workflow_command_id);
2552+
2553+
$update->forceFill([
2554+
'workflow_run_id' => $continuedRun->id,
2555+
'resolved_workflow_run_id' => $continuedRun->id,
2556+
])->save();
2557+
2558+
if ($command instanceof WorkflowCommand
2559+
&& $command->command_type === CommandType::Update
2560+
&& $command->status === CommandStatus::Accepted
2561+
) {
2562+
$command->forceFill([
2563+
'workflow_run_id' => $continuedRun->id,
2564+
'resolved_workflow_run_id' => $continuedRun->id,
2565+
])->save();
2566+
}
2567+
2568+
WorkflowHistoryEvent::record($continuedRun, HistoryEventType::UpdateAccepted, [
2569+
'workflow_command_id' => $command?->id,
2570+
'update_id' => $update->id,
2571+
'workflow_instance_id' => $continuedRun->workflow_instance_id,
2572+
'workflow_run_id' => $continuedRun->id,
2573+
'update_name' => $update->update_name,
2574+
'arguments' => $update->arguments,
2575+
], null, $command);
2576+
}
2577+
}
2578+
25302579
private function completeRun(WorkflowRun $run, WorkflowTask $task, mixed $result): void
25312580
{
25322581
$serializedOutput = Serializer::serializeWithCodec($run->payload_codec, $result);

tests/Feature/V2/V2MessageCursorContinueAsNewTest.php

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Workflow\V2\Models\WorkflowInstance;
1818
use Workflow\V2\Models\WorkflowRun;
1919
use Workflow\V2\Models\WorkflowTask;
20+
use Workflow\V2\Models\WorkflowUpdate;
2021
use Workflow\V2\Support\MessageStreamCursor;
2122
use Workflow\V2\WorkflowStub;
2223

@@ -150,6 +151,99 @@ public function testMultipleSignalsBeforeContinueAsNewTransferCursorCorrectly():
150151
$this->assertSame('Alice', $workflow->output()['name']);
151152
}
152153

154+
public function testAcceptedInstanceUpdateTransfersThroughContinueAsNew(): void
155+
{
156+
Queue::fake();
157+
158+
$workflow = WorkflowStub::make(TestConfiguredContinueSignalWorkflow::class, 'cursor-continue-update-1');
159+
$workflow->start(0);
160+
161+
$update = $workflow->submitUpdate('mark-approved', true);
162+
163+
$this->assertTrue($update->accepted());
164+
$this->assertSame('accepted', $update->status());
165+
166+
/** @var WorkflowUpdate $acceptedUpdate */
167+
$acceptedUpdate = WorkflowUpdate::query()->findOrFail($update->updateId());
168+
$firstRunId = $workflow->runId();
169+
170+
$this->assertSame($firstRunId, $acceptedUpdate->workflow_run_id);
171+
172+
$this->drainReadyTasks();
173+
174+
$this->waitFor(static fn (): bool => $workflow->refresh()->status() === 'waiting'
175+
&& $workflow->summary()?->wait_kind === 'signal');
176+
177+
$runs = WorkflowRun::query()
178+
->where('workflow_instance_id', 'cursor-continue-update-1')
179+
->orderBy('run_number')
180+
->get();
181+
182+
$this->assertCount(2, $runs);
183+
184+
$firstRun = $runs[0];
185+
$secondRun = $runs[1];
186+
187+
$acceptedUpdate->refresh();
188+
189+
$this->assertSame($firstRunId, $firstRun->id);
190+
$this->assertSame($secondRun->id, $acceptedUpdate->workflow_run_id);
191+
$this->assertSame($secondRun->id, $acceptedUpdate->resolved_workflow_run_id);
192+
$this->assertSame('completed', $acceptedUpdate->status->value);
193+
$this->assertSame(1, (int) $acceptedUpdate->workflow_sequence);
194+
$this->assertSame([
195+
'approved' => true,
196+
'count' => 1,
197+
], $acceptedUpdate->updateResult());
198+
199+
$command = WorkflowCommand::query()->findOrFail($update->commandId());
200+
201+
$this->assertSame($secondRun->id, $command->workflow_run_id);
202+
$this->assertSame($secondRun->id, $command->resolved_workflow_run_id);
203+
204+
$this->assertSame(1, WorkflowHistoryEvent::query()
205+
->where('workflow_run_id', $firstRun->id)
206+
->where('event_type', HistoryEventType::UpdateAccepted->value)
207+
->count());
208+
$this->assertSame(1, WorkflowHistoryEvent::query()
209+
->where('workflow_run_id', $secondRun->id)
210+
->where('event_type', HistoryEventType::UpdateAccepted->value)
211+
->where('workflow_command_id', $command->id)
212+
->count());
213+
$this->assertSame(1, WorkflowHistoryEvent::query()
214+
->where('workflow_run_id', $secondRun->id)
215+
->where('event_type', HistoryEventType::UpdateApplied->value)
216+
->where('workflow_command_id', $command->id)
217+
->count());
218+
$this->assertSame(1, WorkflowHistoryEvent::query()
219+
->where('workflow_run_id', $secondRun->id)
220+
->where('event_type', HistoryEventType::UpdateCompleted->value)
221+
->where('workflow_command_id', $command->id)
222+
->count());
223+
224+
$query = $workflow->query('current-approval');
225+
226+
$this->assertSame([
227+
'approved' => true,
228+
'count' => 1,
229+
], $query);
230+
231+
$signal = $workflow->signal('name-provided', 'Taylor');
232+
233+
$this->assertTrue($signal->accepted());
234+
235+
$this->drainReadyTasks();
236+
237+
$this->waitFor(static fn (): bool => $workflow->refresh()->completed());
238+
239+
$this->assertSame([
240+
'name' => 'Taylor',
241+
'approved' => true,
242+
'workflow_id' => 'cursor-continue-update-1',
243+
'run_id' => $secondRun->id,
244+
], $workflow->output());
245+
}
246+
153247
private function waitFor(callable $condition): void
154248
{
155249
$deadline = microtime(true) + 30;

tests/Feature/V2/V2WorkflowTest.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4153,8 +4153,9 @@ public function testContinueAsNewPersistsResolvedWorkflowClassAndContractsAfterC
41534153
\Workflow\V2\Support\WorkflowDefinition::fingerprint(TestConfiguredContinueSignalWorkflow::class),
41544154
$started->payload['workflow_definition_fingerprint'] ?? null,
41554155
);
4156-
$this->assertSame(['current-count'], $started->payload['declared_queries'] ?? null);
4157-
$this->assertSame('current-count', $started->payload['declared_query_contracts'][0]['name'] ?? null);
4156+
$this->assertSame(['current-approval', 'current-count'], $started->payload['declared_queries'] ?? null);
4157+
$this->assertSame('current-approval', $started->payload['declared_query_contracts'][0]['name'] ?? null);
4158+
$this->assertSame('current-count', $started->payload['declared_query_contracts'][1]['name'] ?? null);
41584159
$this->assertSame(['name-provided'], $started->payload['declared_signals'] ?? null);
41594160
$this->assertSame('name-provided', $started->payload['declared_signal_contracts'][0]['name'] ?? null);
41604161
$this->assertSame(['mark-approved'], $started->payload['declared_updates'] ?? null);
@@ -4176,7 +4177,7 @@ public function testContinueAsNewPersistsResolvedWorkflowClassAndContractsAfterC
41764177
$this->assertTrue($signal->accepted());
41774178
$this->assertSame('signal_received', $signal->outcome());
41784179
$this->assertSame('durable_history', $detail['declared_contract_source']);
4179-
$this->assertSame(['current-count'], $detail['declared_queries']);
4180+
$this->assertSame(['current-approval', 'current-count'], $detail['declared_queries']);
41804181
$this->assertSame(['name-provided'], $detail['declared_signals']);
41814182
$this->assertSame(['mark-approved'], $detail['declared_updates']);
41824183
}

tests/Fixtures/V2/TestConfiguredContinueSignalWorkflow.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ final class TestConfiguredContinueSignalWorkflow extends Workflow
2121
{
2222
private int $count = 0;
2323

24+
private bool $approved = false;
25+
2426
public function handle(int $count = 0): mixed
2527
{
2628
$this->count = $count;
@@ -33,6 +35,7 @@ public function handle(int $count = 0): mixed
3335

3436
return [
3537
'name' => $name,
38+
'approved' => $this->approved,
3639
'workflow_id' => $this->workflowId(),
3740
'run_id' => $this->runId(),
3841
];
@@ -47,8 +50,20 @@ public function currentCount(): int
4750
#[UpdateMethod('mark-approved')]
4851
public function approve(bool $approved): array
4952
{
53+
$this->approved = $approved;
54+
5055
return [
5156
'approved' => $approved,
57+
'count' => $this->count,
58+
];
59+
}
60+
61+
#[QueryMethod('current-approval')]
62+
public function currentApproval(): array
63+
{
64+
return [
65+
'approved' => $this->approved,
66+
'count' => $this->count,
5267
];
5368
}
5469
}

0 commit comments

Comments
 (0)