Skip to content

Commit ec20a32

Browse files
Add v2 patch marker workflow helpers
Issue: zorporation/durable-workflow#446 Loop-ID: build-02
1 parent 653b0ba commit ec20a32

10 files changed

Lines changed: 255 additions & 7 deletions

docs/api-stability.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ guarantee:
8787
`Workflow\V2\functions.php`: `activity`, `executeActivity`, `child`,
8888
`executeChildWorkflow`, `async`, `all`, `parallel`, `await`,
8989
`awaitWithTimeout`, `awaitSignal`, `timer`, `sideEffect`,
90-
`continueAsNew`, `getVersion`, `upsertMemo`, `upsertSearchAttributes`,
90+
`continueAsNew`, `getVersion`, `patched`, `deprecatePatch`,
91+
`upsertMemo`, `upsertSearchAttributes`,
9192
and the timer sugar `seconds`/`minutes`/`hours`/`days`/`weeks`/
9293
`months`/`years`.
9394

@@ -117,14 +118,21 @@ the PHP class that produced the event is `@internal`.
117118

118119
### `VersionMarkerRecorded`
119120

120-
This marker records the result of `Workflow::getVersion()` (PHP) and
121-
`workflow.get_version()` (Python SDK). The moment an operational
121+
This marker records the result of `Workflow::getVersion()`, `Workflow::patched()`,
122+
or `Workflow::deprecatePatch()` (PHP) and `workflow.get_version()`,
123+
`workflow.patched()`, or `workflow.deprecate_patch()` (Python SDK). The moment an operational
122124
workflow writes a `VersionMarkerRecorded` event, every replayer for
123125
the rest of that workflow's lifetime must continue to decode the same
124126
payload. See PHP `Workflow\V2\Support\DefaultWorkflowTaskBridge::applyRecordVersionMarker()`
125127
and Python `durable_workflow.workflow._workflow_state` for the
126128
authoritative emission and replay sites.
127129

130+
`patched(change_id)` and `deprecatePatch(change_id)` / `deprecate_patch(change_id)`
131+
are additive sugar over this same frozen shape. They do not introduce a new
132+
event type: patched markers use `min_supported = -1`, `max_supported = 1`,
133+
and `version = 1`; replaying version `-1` means the workflow reached the patch
134+
site before the patch marker existed.
135+
128136
**Payload shape — frozen:**
129137

130138
| key | type | meaning |

src/V2/Support/QueryStateReplayer.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,10 @@ public function replayState(WorkflowRun $run): ReplayState
235235
$resolution = VersionResolver::resolve($run, $versionEvent, $current, $sequence);
236236

237237
$this->syncWorkflowCursor($workflow, $sequence + ($resolution->advancesSequence ? 1 : 0));
238-
$current = $workflowExecution->send($resolution->version, $versionEvent?->recorded_at);
238+
$current = $workflowExecution->send(
239+
$current->resolveValue($resolution->version),
240+
$versionEvent?->recorded_at
241+
);
239242

240243
if ($resolution->advancesSequence) {
241244
++$sequence;
@@ -592,7 +595,7 @@ public function replayState(WorkflowRun $run): ReplayState
592595

593596
$this->syncWorkflowCursor($workflow, $sequence);
594597
throw new UnsupportedWorkflowYieldException(sprintf(
595-
'Workflow %s yielded %s. v2 currently supports activity(), child(), async(), all(), await(), signal(), timer(), sideEffect(), continueAsNew(), getVersion(), upsertMemo(), and upsertSearchAttributes() only.',
598+
'Workflow %s yielded %s. v2 currently supports activity(), child(), async(), all(), await(), signal(), timer(), sideEffect(), continueAsNew(), getVersion(), patched(), deprecatePatch(), upsertMemo(), and upsertSearchAttributes() only.',
596599
$run->workflow_class,
597600
get_debug_type($current),
598601
));

src/V2/Support/VersionCall.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,21 @@
66

77
use LogicException;
88
use Workflow\V2\Contracts\YieldedCommand;
9+
use Workflow\V2\WorkflowStub;
910

1011
final class VersionCall implements YieldedCommand
1112
{
13+
public const RESULT_VERSION = 'version';
14+
15+
public const RESULT_PATCHED = 'patched';
16+
17+
public const RESULT_DEPRECATE_PATCH = 'deprecate_patch';
18+
1219
public function __construct(
1320
public readonly string $changeId,
1421
public readonly int $minSupported,
1522
public readonly int $maxSupported,
23+
public readonly string $resultKind = self::RESULT_VERSION,
1624
) {
1725
if ($this->changeId === '') {
1826
throw new LogicException('V2 version change ids must be non-empty strings.');
@@ -26,5 +34,36 @@ public function __construct(
2634
$this->maxSupported,
2735
));
2836
}
37+
38+
if (! in_array($this->resultKind, [
39+
self::RESULT_VERSION,
40+
self::RESULT_PATCHED,
41+
self::RESULT_DEPRECATE_PATCH,
42+
], true)) {
43+
throw new LogicException(sprintf(
44+
'V2 version change [%s] has unsupported result kind [%s].',
45+
$this->changeId,
46+
$this->resultKind,
47+
));
48+
}
49+
}
50+
51+
public static function patched(string $changeId): self
52+
{
53+
return new self($changeId, WorkflowStub::DEFAULT_VERSION, 1, self::RESULT_PATCHED);
54+
}
55+
56+
public static function deprecatePatch(string $changeId): self
57+
{
58+
return new self($changeId, WorkflowStub::DEFAULT_VERSION, 1, self::RESULT_DEPRECATE_PATCH);
59+
}
60+
61+
public function resolveValue(int $version): mixed
62+
{
63+
return match ($this->resultKind) {
64+
self::RESULT_PATCHED => $version === 1,
65+
self::RESULT_DEPRECATE_PATCH => null,
66+
default => $version,
67+
};
2968
}
3069
}

src/V2/Support/WorkflowExecutor.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,10 @@ public function run(WorkflowRun $run, WorkflowTask $task): ?WorkflowTask
487487
}
488488

489489
$this->syncWorkflowCursor($workflow, $sequence + ($resolution->advancesSequence ? 1 : 0));
490-
$current = $workflowExecution->send($version, $versionMarkerEvent?->recorded_at);
490+
$current = $workflowExecution->send(
491+
$current->resolveValue($version),
492+
$versionMarkerEvent?->recorded_at
493+
);
491494
} catch (Throwable $throwable) {
492495
$this->failRun($run, $task, $throwable, 'workflow_run', $run->id);
493496

@@ -1307,7 +1310,7 @@ public function run(WorkflowRun $run, WorkflowTask $task): ?WorkflowTask
13071310
$run,
13081311
$task,
13091312
new UnsupportedWorkflowYieldException(sprintf(
1310-
'Workflow %s yielded %s. v2 currently supports activity(), child(), async(), all(), await(), signal(), timer(), sideEffect(), continueAsNew(), getVersion(), upsertMemo(), and upsertSearchAttributes() only.',
1313+
'Workflow %s yielded %s. v2 currently supports activity(), child(), async(), all(), await(), signal(), timer(), sideEffect(), continueAsNew(), getVersion(), patched(), deprecatePatch(), upsertMemo(), and upsertSearchAttributes() only.',
13111314
$run->workflow_class,
13121315
get_debug_type($current),
13131316
)),

src/V2/Workflow.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,26 @@ public static function getVersion(
350350
return getVersion($changeId, $minSupported, $maxSupported);
351351
}
352352

353+
/**
354+
* Return whether this workflow run has crossed a replay-safe code patch.
355+
*
356+
* @see patched()
357+
*/
358+
public static function patched(string $changeId): mixed
359+
{
360+
return patched($changeId);
361+
}
362+
363+
/**
364+
* Keep a patch marker alive after the old workflow branch is removed.
365+
*
366+
* @see deprecatePatch()
367+
*/
368+
public static function deprecatePatch(string $changeId): mixed
369+
{
370+
return deprecatePatch($changeId);
371+
}
372+
353373
/**
354374
* Upsert non-indexed memo metadata on the current workflow run.
355375
*

src/V2/functions.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,20 @@ function getVersion(
224224
}
225225
}
226226

227+
if (! function_exists(__NAMESPACE__ . '\\patched')) {
228+
function patched(string $changeId): mixed
229+
{
230+
return WorkflowFiberContext::suspend(VersionCall::patched($changeId));
231+
}
232+
}
233+
234+
if (! function_exists(__NAMESPACE__ . '\\deprecatePatch')) {
235+
function deprecatePatch(string $changeId): mixed
236+
{
237+
return WorkflowFiberContext::suspend(VersionCall::deprecatePatch($changeId));
238+
}
239+
}
240+
227241
// Timer sugar
228242

229243
if (! function_exists(__NAMESPACE__ . '\\seconds')) {

tests/Feature/V2/V2VersionWorkflowTest.php

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
namespace Tests\Feature\V2;
66

77
use Illuminate\Support\Facades\Queue;
8+
use Tests\Fixtures\V2\TestDeprecatedPatchWorkflow;
9+
use Tests\Fixtures\V2\TestPatchedWorkflow;
810
use Tests\Fixtures\V2\TestVersionAfterSignalWorkflow;
911
use Tests\Fixtures\V2\TestVersionBeforeSignalWorkflow;
1012
use Tests\Fixtures\V2\TestVersionMinSupportedWorkflow;
@@ -136,6 +138,95 @@ public function testVersionMarkersReuseRecordedHistoryValue(): void
136138
$this->assertSame('v2_result', $workflow->output()['result']);
137139
}
138140

141+
public function testPatchedRecordsMarkerAndResolvesTrueForCurrentRuns(): void
142+
{
143+
Queue::fake();
144+
145+
$workflow = WorkflowStub::make(TestPatchedWorkflow::class, 'patched-current');
146+
$workflow->start();
147+
148+
$this->drainReadyTasks();
149+
150+
$this->assertTrue($workflow->refresh()->completed());
151+
$this->assertSame([
152+
'patched' => true,
153+
'branch' => 'patched',
154+
], $workflow->output());
155+
156+
/** @var WorkflowHistoryEvent $marker */
157+
$marker = WorkflowHistoryEvent::query()
158+
->where('workflow_run_id', $workflow->runId())
159+
->where('event_type', HistoryEventType::VersionMarkerRecorded->value)
160+
->firstOrFail();
161+
162+
$this->assertSame('patch-1', $marker->payload['change_id'] ?? null);
163+
$this->assertSame(1, $marker->payload['version'] ?? null);
164+
$this->assertSame(WorkflowStub::DEFAULT_VERSION, $marker->payload['min_supported'] ?? null);
165+
$this->assertSame(1, $marker->payload['max_supported'] ?? null);
166+
}
167+
168+
public function testPatchedResolvesFalseForRecordedLegacyMarker(): void
169+
{
170+
Queue::fake();
171+
172+
$workflow = WorkflowStub::make(TestPatchedWorkflow::class, 'patched-legacy');
173+
$workflow->start();
174+
175+
/** @var WorkflowRun $run */
176+
$run = WorkflowRun::query()->findOrFail($workflow->runId());
177+
178+
WorkflowHistoryEvent::query()->create([
179+
'workflow_run_id' => $run->id,
180+
'sequence' => 3,
181+
'event_type' => HistoryEventType::VersionMarkerRecorded->value,
182+
'payload' => [
183+
'sequence' => 1,
184+
'change_id' => 'patch-1',
185+
'version' => WorkflowStub::DEFAULT_VERSION,
186+
'min_supported' => WorkflowStub::DEFAULT_VERSION,
187+
'max_supported' => 1,
188+
],
189+
'recorded_at' => now(),
190+
]);
191+
192+
$run->forceFill([
193+
'last_history_sequence' => 3,
194+
])->save();
195+
196+
$this->drainReadyTasks();
197+
198+
$this->assertTrue($workflow->refresh()->completed());
199+
$this->assertSame([
200+
'patched' => false,
201+
'branch' => 'legacy',
202+
], $workflow->output());
203+
}
204+
205+
public function testDeprecatePatchRecordsMarkerAndReturnsNull(): void
206+
{
207+
Queue::fake();
208+
209+
$workflow = WorkflowStub::make(TestDeprecatedPatchWorkflow::class, 'patch-deprecated');
210+
$workflow->start();
211+
212+
$this->drainReadyTasks();
213+
214+
$this->assertTrue($workflow->refresh()->completed());
215+
$this->assertSame([
216+
'marker_result' => null,
217+
'branch' => 'patched',
218+
], $workflow->output());
219+
220+
/** @var WorkflowHistoryEvent $marker */
221+
$marker = WorkflowHistoryEvent::query()
222+
->where('workflow_run_id', $workflow->runId())
223+
->where('event_type', HistoryEventType::VersionMarkerRecorded->value)
224+
->firstOrFail();
225+
226+
$this->assertSame('patch-1', $marker->payload['change_id'] ?? null);
227+
$this->assertSame(1, $marker->payload['version'] ?? null);
228+
}
229+
139230
public function testVersionMarkersFailRunWhenRecordedVersionIsNotSupported(): void
140231
{
141232
Queue::fake();
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Fixtures\V2;
6+
7+
use Workflow\V2\Attributes\Type;
8+
use function Workflow\V2\deprecatePatch;
9+
use Workflow\V2\Workflow;
10+
11+
#[Type('test-deprecated-patch-workflow')]
12+
final class TestDeprecatedPatchWorkflow extends Workflow
13+
{
14+
public function handle(): array
15+
{
16+
$result = deprecatePatch('patch-1');
17+
18+
return [
19+
'marker_result' => $result,
20+
'branch' => 'patched',
21+
];
22+
}
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Fixtures\V2;
6+
7+
use Workflow\V2\Attributes\Type;
8+
use function Workflow\V2\patched;
9+
use Workflow\V2\Workflow;
10+
11+
#[Type('test-patched-workflow')]
12+
final class TestPatchedWorkflow extends Workflow
13+
{
14+
public function handle(): array
15+
{
16+
$patched = patched('patch-1');
17+
18+
return [
19+
'patched' => $patched,
20+
'branch' => $patched ? 'patched' : 'legacy',
21+
];
22+
}
23+
}

tests/Unit/V2/WorkflowFacadeTest.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Workflow\V2\Support\UpsertSearchAttributesCall;
2020
use Workflow\V2\Support\VersionCall;
2121
use Workflow\V2\Workflow;
22+
use Workflow\V2\WorkflowStub;
2223

2324
/**
2425
* The static facade on Workflow\V2\Workflow is a thin delegate to the
@@ -122,6 +123,27 @@ public function testGetVersionReturnsAVersionCall(): void
122123
$this->assertInstanceOf(VersionCall::class, $call);
123124
}
124125

126+
public function testPatchedReturnsAVersionCallWithBooleanResultKind(): void
127+
{
128+
$call = Workflow::patched('change-one');
129+
130+
$this->assertInstanceOf(VersionCall::class, $call);
131+
$this->assertSame('change-one', $call->changeId);
132+
$this->assertSame(WorkflowStub::DEFAULT_VERSION, $call->minSupported);
133+
$this->assertSame(1, $call->maxSupported);
134+
$this->assertTrue($call->resolveValue(1));
135+
$this->assertFalse($call->resolveValue(WorkflowStub::DEFAULT_VERSION));
136+
}
137+
138+
public function testDeprecatePatchReturnsAVersionCallWithNullResultKind(): void
139+
{
140+
$call = Workflow::deprecatePatch('change-one');
141+
142+
$this->assertInstanceOf(VersionCall::class, $call);
143+
$this->assertSame('change-one', $call->changeId);
144+
$this->assertNull($call->resolveValue(1));
145+
}
146+
125147
public function testAllReturnsAnAllCall(): void
126148
{
127149
$call = Workflow::all([
@@ -183,6 +205,8 @@ public function testEveryFacadeMethodIsStatic(): void
183205
'sideEffect',
184206
'continueAsNew',
185207
'getVersion',
208+
'patched',
209+
'deprecatePatch',
186210
'upsertMemo',
187211
'upsertSearchAttributes',
188212
'seconds',

0 commit comments

Comments
 (0)