Skip to content

Commit 168a616

Browse files
[cross-repo from server#136] Worker Versioning: pin in-flight workflows to the worker version that started them (Temporal-parity) (#537)
1 parent ee2cd45 commit 168a616

4 files changed

Lines changed: 193 additions & 5 deletions

File tree

docs/architecture/worker-compatibility.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,16 @@ Guarantees:
156156

157157
Compatibility flows through the run lifecycle as follows:
158158

159-
- **Start** — a new run is stamped with
160-
`WorkerCompatibility::current()` on the starter process and the
161-
value is written to `workflow_runs.compatibility` in the same
159+
- **Start** — a new run is stamped with the start-time pin in this
160+
resolution order: the explicit `build_id` (or its `compatibility`
161+
alias) supplied in the start options wins; otherwise the value of
162+
`WorkerCompatibility::current()` on the starter process is used.
163+
The marker is written to `workflow_runs.compatibility` in the same
162164
transaction as `WorkflowStarted`. See `DefaultWorkflowControlPlane`
163-
for the dispatch site.
165+
for the dispatch site. The explicit option is the surface a server
166+
or external orchestrator uses to bind a new run to whichever build
167+
the operator has routed new starts to, without requiring the
168+
starter process to also advertise that build.
164169
- **Workflow tasks** — each `workflow_tasks` row carries a
165170
`compatibility` column. Existing tasks are synced to the owning run's
166171
compatibility on claim via `TaskCompatibility::sync()` so repair and

src/V2/Contracts/WorkflowControlPlane.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ interface WorkflowControlPlane
3838
* - memo: array<string, mixed>|null — non-indexed metadata
3939
* - command_context: \Workflow\V2\CommandContext|null — recorded command attribution/context
4040
* - duplicate_start_policy: 'reject_duplicate'|'return_existing_active'
41+
* - build_id: string|null — pin the new run to a specific worker build
42+
* so replay only dispatches to workers advertising the same build.
43+
* Accepted as `compatibility` too. When omitted, the run inherits
44+
* the {@see WorkerCompatibility::current()} marker from the calling
45+
* worker context (legacy behavior); a server-driven start should
46+
* resolve the active build from its rollout state and pass it here
47+
* so subsequent versioned worker pools cannot break replay.
4148
*
4249
* @return array{
4350
* started: bool,

src/V2/Support/DefaultWorkflowControlPlane.php

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
5555
$duplicatePolicy = ($options['duplicate_start_policy'] ?? null) === 'return_existing_active'
5656
? DuplicateStartPolicy::ReturnExistingActive
5757
: DuplicateStartPolicy::RejectDuplicate;
58+
$pinnedCompatibility = self::normalizeCompatibilityOption($options);
5859

5960
$workflowClass = $resolvedClass ?? $workflowType;
6061

@@ -81,6 +82,7 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
8182
$runTimeoutSeconds,
8283
$duplicatePolicy,
8384
$payloadCodec,
85+
$pinnedCompatibility,
8486
&$command,
8587
&$task,
8688
&$instance,
@@ -220,7 +222,7 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
220222
'execution_deadline_at' => $executionDeadlineAt,
221223
'run_deadline_at' => $runDeadlineAt,
222224
'status' => RunStatus::Pending->value,
223-
'compatibility' => WorkerCompatibility::current(),
225+
'compatibility' => $pinnedCompatibility ?? WorkerCompatibility::current(),
224226
'payload_codec' => $payloadCodec,
225227
'arguments' => is_string($arguments) ? $arguments : null,
226228
'connection' => $connection,
@@ -288,6 +290,10 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
288290
'execution_deadline_at' => $executionDeadlineAt?->toIso8601String(),
289291
'run_deadline_at' => $runDeadlineAt?->toIso8601String(),
290292
'workflow_definition_fingerprint' => $fingerprint,
293+
// Pinned compatibility marker — the run is bound to this
294+
// worker build at start, so replay only dispatches to
295+
// workers that advertise the same build.
296+
'compatibility' => $run->compatibility,
291297
], static fn (mixed $v): bool => $v !== null);
292298

293299
if ($commandContract !== null) {
@@ -1060,4 +1066,31 @@ private static function visibilityMetadataPayload(?array $values): ?array
10601066
{
10611067
return is_array($values) && $values !== [] ? $values : null;
10621068
}
1069+
1070+
/**
1071+
* Resolve the start-time pinning marker the caller wants stamped on
1072+
* the run. Operators (or the server's start-time router) pass the
1073+
* marker as `build_id` or `compatibility`; either name is honored
1074+
* so SDKs can pick whichever feels natural without bespoke logic.
1075+
*
1076+
* @param array<string, mixed> $options
1077+
*/
1078+
private static function normalizeCompatibilityOption(array $options): ?string
1079+
{
1080+
foreach (['build_id', 'compatibility'] as $key) {
1081+
$value = $options[$key] ?? null;
1082+
1083+
if (! is_string($value)) {
1084+
continue;
1085+
}
1086+
1087+
$value = trim($value);
1088+
1089+
if ($value !== '') {
1090+
return $value;
1091+
}
1092+
}
1093+
1094+
return null;
1095+
}
10631096
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Feature\V2;
6+
7+
use Illuminate\Support\Facades\Queue;
8+
use Tests\Fixtures\V2\TestGreetingWorkflow;
9+
use Tests\TestCase;
10+
use Workflow\V2\Contracts\WorkflowControlPlane;
11+
use Workflow\V2\Models\WorkflowRun;
12+
use Workflow\V2\Models\WorkflowTask;
13+
use Workflow\V2\Support\WorkerCompatibility;
14+
use Workflow\V2\Support\WorkerCompatibilityFleet;
15+
16+
/**
17+
* Replay 2026 worker-versioning parity: a server (or other caller)
18+
* driving a start through the control plane must be able to pin the
19+
* new run to a specific worker build id, so subsequent worker pools
20+
* running a different build cannot break replay.
21+
*
22+
* The pin is expressed as the `build_id` option on the start
23+
* contract; the legacy `WorkerCompatibility::current()` config
24+
* fallback is preserved when no pin is supplied.
25+
*/
26+
final class V2WorkflowVersionPinningTest extends TestCase
27+
{
28+
protected function setUp(): void
29+
{
30+
parent::setUp();
31+
32+
config()
33+
->set('workflows.v2.compatibility.namespace', null);
34+
config()
35+
->set('workflows.v2.types.workflows', [
36+
'test-greeting-workflow' => TestGreetingWorkflow::class,
37+
]);
38+
WorkerCompatibilityFleet::clear();
39+
Queue::fake();
40+
}
41+
42+
public function test_start_option_pins_run_and_first_task_to_supplied_build_id(): void
43+
{
44+
// No worker context — WorkerCompatibility::current() returns null,
45+
// so without an explicit pin the run would stay unversioned.
46+
$this->assertNull(WorkerCompatibility::current());
47+
48+
$controlPlane = $this->app->make(WorkflowControlPlane::class);
49+
50+
$result = $controlPlane->start('test-greeting-workflow', 'pin-by-build-id', [
51+
'build_id' => 'v2026.05.01-rc1',
52+
'arguments' => null,
53+
]);
54+
55+
$this->assertTrue($result['started']);
56+
57+
$run = WorkflowRun::query()
58+
->where('workflow_instance_id', 'pin-by-build-id')
59+
->firstOrFail();
60+
61+
$this->assertSame('v2026.05.01-rc1', $run->compatibility);
62+
63+
$task = WorkflowTask::query()
64+
->where('workflow_run_id', $run->id)
65+
->firstOrFail();
66+
67+
$this->assertSame('v2026.05.01-rc1', $task->compatibility);
68+
}
69+
70+
public function test_start_option_accepts_compatibility_alias(): void
71+
{
72+
$controlPlane = $this->app->make(WorkflowControlPlane::class);
73+
74+
$result = $controlPlane->start('test-greeting-workflow', 'pin-by-compat', [
75+
'compatibility' => 'v2026.05.01-rc1',
76+
'arguments' => null,
77+
]);
78+
79+
$this->assertTrue($result['started']);
80+
81+
$run = WorkflowRun::query()
82+
->where('workflow_instance_id', 'pin-by-compat')
83+
->firstOrFail();
84+
85+
$this->assertSame('v2026.05.01-rc1', $run->compatibility);
86+
}
87+
88+
public function test_start_falls_back_to_worker_compatibility_current_when_no_pin_supplied(): void
89+
{
90+
config()
91+
->set('workflows.v2.compatibility.current', 'build-from-worker-context');
92+
93+
$controlPlane = $this->app->make(WorkflowControlPlane::class);
94+
95+
$controlPlane->start('test-greeting-workflow', 'pin-fallback', [
96+
'arguments' => null,
97+
]);
98+
99+
$run = WorkflowRun::query()
100+
->where('workflow_instance_id', 'pin-fallback')
101+
->firstOrFail();
102+
103+
$this->assertSame('build-from-worker-context', $run->compatibility);
104+
}
105+
106+
public function test_explicit_pin_overrides_worker_compatibility_current(): void
107+
{
108+
config()
109+
->set('workflows.v2.compatibility.current', 'build-from-worker-context');
110+
111+
$controlPlane = $this->app->make(WorkflowControlPlane::class);
112+
113+
$controlPlane->start('test-greeting-workflow', 'pin-overrides', [
114+
'build_id' => 'operator-routed-build',
115+
'arguments' => null,
116+
]);
117+
118+
$run = WorkflowRun::query()
119+
->where('workflow_instance_id', 'pin-overrides')
120+
->firstOrFail();
121+
122+
$this->assertSame('operator-routed-build', $run->compatibility);
123+
}
124+
125+
public function test_blank_pin_value_falls_through_to_legacy_resolution(): void
126+
{
127+
config()
128+
->set('workflows.v2.compatibility.current', 'build-from-worker-context');
129+
130+
$controlPlane = $this->app->make(WorkflowControlPlane::class);
131+
132+
$controlPlane->start('test-greeting-workflow', 'pin-blank', [
133+
'build_id' => ' ',
134+
'arguments' => null,
135+
]);
136+
137+
$run = WorkflowRun::query()
138+
->where('workflow_instance_id', 'pin-blank')
139+
->firstOrFail();
140+
141+
$this->assertSame('build-from-worker-context', $run->compatibility);
142+
}
143+
}

0 commit comments

Comments
 (0)