Skip to content

Commit 4d119cf

Browse files
TD-S097: worker-session activities are unsafe during mixed server rollouts (#117)
1 parent 4ddf411 commit 4d119cf

18 files changed

Lines changed: 345 additions & 56 deletions

README.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ curl -X POST http://localhost:8080/api/worker/register \
4444
-H "Authorization: Bearer $DW_AUTH_TOKEN" \
4545
-H "Content-Type: application/json" \
4646
-H "X-Namespace: default" \
47-
-H "X-Durable-Workflow-Protocol-Version: 1.0" \
47+
-H "X-Durable-Workflow-Protocol-Version: 1.2" \
4848
-d '{"worker_id":"quickstart-worker","task_queue":"quickstart","runtime":"python"}'
4949
```
5050

@@ -93,7 +93,7 @@ curl -X POST http://localhost:8080/api/worker/register \
9393
-H "Authorization: Bearer $DW_AUTH_TOKEN" \
9494
-H "Content-Type: application/json" \
9595
-H "X-Namespace: default" \
96-
-H "X-Durable-Workflow-Protocol-Version: 1.0" \
96+
-H "X-Durable-Workflow-Protocol-Version: 1.2" \
9797
-d '{"worker_id":"compose-worker","task_queue":"compose","runtime":"python"}'
9898
```
9999

@@ -333,7 +333,7 @@ curl -X POST $SERVER/api/worker/register \
333333
-H "Authorization: Bearer $WORKER_TOKEN" \
334334
-H "Content-Type: application/json" \
335335
-H "X-Namespace: default" \
336-
-H "X-Durable-Workflow-Protocol-Version: 1.0" \
336+
-H "X-Durable-Workflow-Protocol-Version: 1.2" \
337337
-d '{
338338
"worker_id": "worker-1",
339339
"task_queue": "order-workers",
@@ -363,7 +363,7 @@ curl -X POST $SERVER/api/worker/workflow-tasks/poll \
363363
-H "Authorization: Bearer $WORKER_TOKEN" \
364364
-H "Content-Type: application/json" \
365365
-H "X-Namespace: default" \
366-
-H "X-Durable-Workflow-Protocol-Version: 1.0" \
366+
-H "X-Durable-Workflow-Protocol-Version: 1.2" \
367367
-d '{
368368
"worker_id": "worker-1",
369369
"task_queue": "order-workers"
@@ -374,7 +374,7 @@ The response includes the task, its history events, and lease metadata:
374374

375375
```json
376376
{
377-
"protocol_version": "1.0",
377+
"protocol_version": "1.2",
378378
"task": {
379379
"task_id": "task-xyz",
380380
"workflow_id": "order-42",
@@ -400,7 +400,7 @@ curl -X POST $SERVER/api/worker/workflow-tasks/task-xyz/complete \
400400
-H "Authorization: Bearer $WORKER_TOKEN" \
401401
-H "Content-Type: application/json" \
402402
-H "X-Namespace: default" \
403-
-H "X-Durable-Workflow-Protocol-Version: 1.0" \
403+
-H "X-Durable-Workflow-Protocol-Version: 1.2" \
404404
-d '{
405405
"lease_owner": "worker-1",
406406
"workflow_task_attempt": 1,
@@ -422,7 +422,7 @@ curl -X POST $SERVER/api/worker/workflow-tasks/task-xyz/complete \
422422
-H "Authorization: Bearer $WORKER_TOKEN" \
423423
-H "Content-Type: application/json" \
424424
-H "X-Namespace: default" \
425-
-H "X-Durable-Workflow-Protocol-Version: 1.0" \
425+
-H "X-Durable-Workflow-Protocol-Version: 1.2" \
426426
-d '{
427427
"lease_owner": "worker-1",
428428
"workflow_task_attempt": 1,
@@ -445,15 +445,15 @@ curl -X POST $SERVER/api/worker/activity-tasks/poll \
445445
-H "Authorization: Bearer $WORKER_TOKEN" \
446446
-H "Content-Type: application/json" \
447447
-H "X-Namespace: default" \
448-
-H "X-Durable-Workflow-Protocol-Version: 1.0" \
448+
-H "X-Durable-Workflow-Protocol-Version: 1.2" \
449449
-d '{"worker_id": "worker-1", "task_queue": "order-workers"}'
450450

451451
# Complete (use task_id and activity_attempt_id from the poll response)
452452
curl -X POST $SERVER/api/worker/activity-tasks/TASK_ID/complete \
453453
-H "Authorization: Bearer $WORKER_TOKEN" \
454454
-H "Content-Type: application/json" \
455455
-H "X-Namespace: default" \
456-
-H "X-Durable-Workflow-Protocol-Version: 1.0" \
456+
-H "X-Durable-Workflow-Protocol-Version: 1.2" \
457457
-d '{
458458
"activity_attempt_id": "ATTEMPT_ID",
459459
"lease_owner": "worker-1",
@@ -667,8 +667,8 @@ manifests should fail closed.
667667
- `POST /api/worker/activity-tasks/{id}/fail` — Fail activity task
668668
- `POST /api/worker/activity-tasks/{id}/heartbeat` — Activity heartbeat
669669

670-
Worker-plane requests must send `X-Durable-Workflow-Protocol-Version: 1.0`, and
671-
worker-plane responses always echo the same header plus `protocol_version: "1.0"`.
670+
Worker-plane requests must send `X-Durable-Workflow-Protocol-Version: 1.2`, and
671+
worker-plane responses always echo the same header plus `protocol_version: "1.2"`.
672672
Worker requests with bodies follow the same JSON media-type requirement as the
673673
control plane and return a worker-protocol 415 response for XML, form, or other
674674
non-JSON body formats.
@@ -811,7 +811,7 @@ future carriers can validate parser behavior without repository-local fixture
811811
paths. A human-readable summary lives in
812812
`docs/contracts/external-task-result.md`.
813813

814-
Within worker protocol version `1.0`, `worker_protocol.version`,
814+
Within worker protocol version `1.2`, `worker_protocol.version`,
815815
`server_capabilities.long_poll_timeout`, and
816816
`server_capabilities.supported_workflow_task_commands` are stable contract
817817
fields. The command-option booleans under `server_capabilities` are additive

app/Http/Controllers/Api/WorkerController.php

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,15 @@ public function completeWorkflowTask(Request $request, string $taskId): JsonResp
524524

525525
$this->validateWorkflowTaskCommandScopes($commands);
526526

527+
if ($response = $this->guardWorkerSessionCommandsAvailable(
528+
$request,
529+
$taskId,
530+
(int) $validated['workflow_task_attempt'],
531+
$commands,
532+
)) {
533+
return $response;
534+
}
535+
527536
if ($response = $this->guardWorkflowTaskOwnership(
528537
$request,
529538
$namespace,
@@ -566,6 +575,54 @@ public function completeWorkflowTask(Request $request, string $taskId): JsonResp
566575
], $this->workflowOutcomeStatus($outcome['reason']));
567576
}
568577

578+
/**
579+
* @param list<array<string, mixed>> $commands
580+
*/
581+
private function guardWorkerSessionCommandsAvailable(
582+
Request $request,
583+
string $taskId,
584+
int $workflowTaskAttempt,
585+
array $commands,
586+
): ?JsonResponse {
587+
if (! $this->commandsUseWorkerSessions($commands) || WorkerProtocol::workerSessionsSupported()) {
588+
return null;
589+
}
590+
591+
$minimum = WorkerProtocol::workerSessionMinimumProtocolVersion();
592+
593+
return WorkerProtocol::json([
594+
'task_id' => $taskId,
595+
'workflow_task_attempt' => $workflowTaskAttempt,
596+
'outcome' => 'rejected',
597+
'recorded' => false,
598+
'reason' => 'worker_sessions_unavailable',
599+
'error' => sprintf(
600+
'Worker-session activity commands require worker protocol %s or newer.',
601+
$minimum,
602+
),
603+
'requested_version' => WorkerProtocol::requestVersion($request),
604+
'minimum_protocol_version' => $minimum,
605+
'remediation' => sprintf(
606+
'Complete worker-session workflow tasks through a server node advertising worker protocol %s or newer.',
607+
$minimum,
608+
),
609+
], 409);
610+
}
611+
612+
/**
613+
* @param list<array<string, mixed>> $commands
614+
*/
615+
private function commandsUseWorkerSessions(array $commands): bool
616+
{
617+
foreach ($commands as $command) {
618+
if ($this->hasCommandValue($command, 'worker_session')) {
619+
return true;
620+
}
621+
}
622+
623+
return false;
624+
}
625+
569626
/**
570627
* @param list<array<string, mixed>> $commands
571628
*

app/Http/Controllers/Api/WorkerSessionController.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ public function create(Request $request): JsonResponse
2121
return $response;
2222
}
2323

24+
if ($response = WorkerProtocol::rejectWorkerSessionsUnavailable($request)) {
25+
return $response;
26+
}
27+
2428
$namespace = (string) $request->attributes->get('namespace');
2529

2630
$validated = $this->validateSessionRequest($request, workerIdRequired: true);
@@ -59,6 +63,10 @@ public function heartbeat(Request $request, string $sessionId): JsonResponse
5963
return $response;
6064
}
6165

66+
if ($response = WorkerProtocol::rejectWorkerSessionsUnavailable($request)) {
67+
return $response;
68+
}
69+
6270
$namespace = (string) $request->attributes->get('namespace');
6371

6472
$validated = $request->validate([
@@ -82,6 +90,10 @@ public function close(Request $request, string $sessionId): JsonResponse
8290
return $response;
8391
}
8492

93+
if ($response = WorkerProtocol::rejectWorkerSessionsUnavailable($request)) {
94+
return $response;
95+
}
96+
8597
$namespace = (string) $request->attributes->get('namespace');
8698

8799
$validated = $request->validate([

app/Support/ActivityTaskPoller.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ private function claimReadyTask(
161161
);
162162

163163
if ($workerSession !== null) {
164+
if (! WorkerProtocol::workerSessionsSupported()) {
165+
return null;
166+
}
167+
164168
$admission = $this->workerSessions->admitActivity(
165169
$namespace,
166170
$worker,

app/Support/WorkerProtocol.php

Lines changed: 92 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,51 @@ public static function rejectUnsupported(Request $request): ?JsonResponse
6868
], 400);
6969
}
7070

71+
public static function workerSessionMinimumProtocolVersion(): string
72+
{
73+
$semantics = self::baseWorkerSessionSemantics();
74+
$minimum = $semantics['minimum_protocol_version']
75+
?? $semantics['min_worker_protocol_version']
76+
?? ($semantics['rollout_safety']['minimum_protocol_version'] ?? null)
77+
?? self::VERSION;
78+
79+
return is_string($minimum) && trim($minimum) !== ''
80+
? trim($minimum)
81+
: self::VERSION;
82+
}
83+
84+
public static function workerSessionsSupported(): bool
85+
{
86+
$configured = (string) config('server.worker_protocol.version', self::VERSION);
87+
88+
return version_compare($configured, self::workerSessionMinimumProtocolVersion(), '>=');
89+
}
90+
91+
public static function rejectWorkerSessionsUnavailable(Request $request): ?JsonResponse
92+
{
93+
if (self::workerSessionsSupported()) {
94+
return null;
95+
}
96+
97+
$configured = (string) config('server.worker_protocol.version', self::VERSION);
98+
$minimum = self::workerSessionMinimumProtocolVersion();
99+
100+
return self::json([
101+
'error' => sprintf(
102+
'Worker sessions require worker protocol %s or newer.',
103+
$minimum,
104+
),
105+
'reason' => 'worker_sessions_unavailable',
106+
'supported_version' => $configured,
107+
'requested_version' => self::requestVersion($request),
108+
'minimum_protocol_version' => $minimum,
109+
'remediation' => sprintf(
110+
'Route worker-session clients only to server nodes advertising worker protocol %s or newer.',
111+
$minimum,
112+
),
113+
], 409);
114+
}
115+
71116
/**
72117
* @return list<string>
73118
*/
@@ -108,6 +153,8 @@ public static function supportedWorkflowTaskCommands(): array
108153
*/
109154
public static function serverCapabilities(): array
110155
{
156+
$workerSessionSupported = self::workerSessionsSupported();
157+
111158
return [
112159
'long_poll_timeout' => (int) config(
113160
'server.polling.timeout',
@@ -128,18 +175,8 @@ public static function serverCapabilities(): array
128175
'activity_retry_policy' => true,
129176
'activity_timeouts' => true,
130177
'local_activities' => self::localActivitySemantics(),
131-
'worker_session_verbs' => method_exists(WorkerProtocolVersion::class, 'workerSessionVerbs')
132-
? WorkerProtocolVersion::workerSessionVerbs()
133-
: ['create', 'heartbeat', 'close'],
134-
'worker_sessions' => method_exists(WorkerProtocolVersion::class, 'workerSessionSemantics')
135-
? WorkerProtocolVersion::workerSessionSemantics()
136-
: [
137-
'command_field' => 'worker_session',
138-
'activity_options_field' => 'worker_session',
139-
'lifecycle' => 'lazy_create_on_first_admitted_activity',
140-
'ownership' => 'single_worker_lease_owner',
141-
'verbs' => ['create', 'heartbeat', 'close'],
142-
],
178+
'worker_session_verbs' => $workerSessionSupported ? self::workerSessionVerbs() : [],
179+
'worker_sessions' => self::workerSessionSemantics($workerSessionSupported),
143180
'child_workflow_retry_policy' => true,
144181
'child_workflow_timeouts' => true,
145182
'parent_close_policy' => true,
@@ -178,6 +215,49 @@ public static function serverCapabilities(): array
178215
];
179216
}
180217

218+
/**
219+
* @return list<string>
220+
*/
221+
private static function workerSessionVerbs(): array
222+
{
223+
return method_exists(WorkerProtocolVersion::class, 'workerSessionVerbs')
224+
? WorkerProtocolVersion::workerSessionVerbs()
225+
: ['create', 'heartbeat', 'close'];
226+
}
227+
228+
/**
229+
* @return array<string, mixed>
230+
*/
231+
private static function workerSessionSemantics(bool $supported): array
232+
{
233+
$minimum = self::workerSessionMinimumProtocolVersion();
234+
235+
return [
236+
...self::baseWorkerSessionSemantics(),
237+
'supported' => $supported,
238+
'minimum_protocol_version' => $minimum,
239+
...($supported ? [] : [
240+
'unavailable_reason' => 'worker_protocol_version_below_worker_session_minimum',
241+
]),
242+
];
243+
}
244+
245+
/**
246+
* @return array<string, mixed>
247+
*/
248+
private static function baseWorkerSessionSemantics(): array
249+
{
250+
return method_exists(WorkerProtocolVersion::class, 'workerSessionSemantics')
251+
? WorkerProtocolVersion::workerSessionSemantics()
252+
: [
253+
'command_field' => 'worker_session',
254+
'activity_options_field' => 'worker_session',
255+
'lifecycle' => 'lazy_create_on_first_admitted_activity',
256+
'ownership' => 'single_worker_lease_owner',
257+
'verbs' => ['create', 'heartbeat', 'close'],
258+
];
259+
}
260+
181261
/**
182262
* @return array<string, mixed>
183263
*/

composer.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/Feature/ActivityTimeoutTest.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
use App\Models\WorkerRegistration;
88
use App\Support\NamespaceWorkflowScope;
9+
use App\Support\WorkerProtocol;
910
use Illuminate\Foundation\Testing\RefreshDatabase;
1011
use Illuminate\Support\Facades\Queue;
1112
use Tests\Feature\Concerns\ServerTestHelpers;
@@ -284,7 +285,7 @@ private function workerHeaders(string $namespace = 'default'): array
284285
return [
285286
'X-Namespace' => $namespace,
286287
'X-Durable-Workflow-Control-Plane-Version' => '2',
287-
'X-Durable-Workflow-Protocol-Version' => '1.0',
288+
WorkerProtocol::HEADER => WorkerProtocol::VERSION,
288289
];
289290
}
290291

0 commit comments

Comments
 (0)