Skip to content

Commit 00f2701

Browse files
TD-S001: Worker-session admission mutates leases before task claims commit (#118)
1 parent 4671706 commit 00f2701

3 files changed

Lines changed: 191 additions & 20 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php
2+
3+
namespace App\Support;
4+
5+
use RuntimeException;
6+
7+
final class ActivityTaskClaimRolledBack extends RuntimeException
8+
{
9+
}

app/Support/ActivityTaskPoller.php

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -153,34 +153,42 @@ private function claimReadyTask(
153153
continue;
154154
}
155155

156-
$claim = DB::transaction(function () use ($namespace, $worker, $readyTask, $taskId, $leaseOwner): ?array {
157-
$workerSession = $this->workerSessions->optionsForExecution(
158-
is_string($readyTask['activity_execution_id'] ?? null)
159-
? $readyTask['activity_execution_id']
160-
: null,
161-
);
156+
try {
157+
$claim = DB::transaction(function () use ($namespace, $worker, $taskId, $leaseOwner): ?array {
158+
$claim = $this->bridge->claimStatus($taskId, $leaseOwner);
162159

163-
if ($workerSession !== null) {
164-
if (! WorkerProtocol::workerSessionsSupported()) {
160+
if (($claim['claimed'] ?? false) !== true) {
165161
return null;
166162
}
167163

168-
$admission = $this->workerSessions->admitActivity(
169-
$namespace,
170-
$worker,
171-
$workerSession,
172-
$taskId,
164+
$workerSession = $this->workerSessions->optionsForExecution(
165+
is_string($claim['activity_execution_id'] ?? null)
166+
? $claim['activity_execution_id']
167+
: null,
173168
);
174169

175-
if (($admission['admitted'] ?? false) !== true) {
176-
return null;
170+
if ($workerSession !== null) {
171+
if (! WorkerProtocol::workerSessionsSupported()) {
172+
throw new ActivityTaskClaimRolledBack;
173+
}
174+
175+
$admission = $this->workerSessions->admitActivity(
176+
$namespace,
177+
$worker,
178+
$workerSession,
179+
$taskId,
180+
);
181+
182+
if (($admission['admitted'] ?? false) !== true) {
183+
throw new ActivityTaskClaimRolledBack;
184+
}
177185
}
178-
}
179-
180-
$claim = $this->bridge->claimStatus($taskId, $leaseOwner);
181186

182-
return ($claim['claimed'] ?? false) === true ? $claim : null;
183-
});
187+
return $claim;
188+
});
189+
} catch (ActivityTaskClaimRolledBack) {
190+
$claim = null;
191+
}
184192

185193
if ($claim !== null) {
186194
return $claim;

tests/Feature/WorkerSessionProtocolTest.php

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@
55
namespace Tests\Feature;
66

77
use App\Models\WorkerRegistration;
8+
use App\Models\WorkerSessionLease;
89
use App\Support\WorkerProtocol;
910
use Illuminate\Foundation\Testing\RefreshDatabase;
1011
use Illuminate\Support\Facades\Queue;
12+
use Mockery\MockInterface;
1113
use Tests\Feature\Concerns\ServerTestHelpers;
1214
use Tests\Fixtures\ExternalGreetingWorkflow;
1315
use Tests\TestCase;
1416
use Workflow\Serializers\Serializer;
17+
use Workflow\V2\Contracts\ActivityTaskBridge as ActivityTaskBridgeContract;
18+
use Workflow\V2\Models\ActivityExecution;
19+
use Workflow\V2\Models\WorkflowTask;
1520

1621
class WorkerSessionProtocolTest extends TestCase
1722
{
@@ -202,6 +207,155 @@ public function test_activity_poll_admits_worker_session_only_to_capable_holder(
202207
->assertJsonPath('task', null);
203208
}
204209

210+
public function test_activity_poll_does_not_admit_worker_session_when_task_claim_fails(): void
211+
{
212+
Queue::fake();
213+
214+
$this->configureWorkflowTypes([
215+
'tests.external-greeting-workflow' => ExternalGreetingWorkflow::class,
216+
]);
217+
218+
$workflowId = 'wf-worker-session-lost-claim';
219+
220+
$start = $this->postJson('/api/workflows', [
221+
'workflow_id' => $workflowId,
222+
'workflow_type' => 'tests.external-greeting-workflow',
223+
'task_queue' => 'external-workflows',
224+
'input' => ['Ada'],
225+
], $this->apiHeaders());
226+
227+
$start->assertCreated();
228+
229+
$this->registerWorkerThroughProtocol(
230+
workerId: 'workflow-worker-lost-claim',
231+
taskQueue: 'external-workflows',
232+
supportedWorkflowTypes: ['tests.external-greeting-workflow'],
233+
);
234+
235+
$workflowPoll = $this->postJson('/api/worker/workflow-tasks/poll', [
236+
'worker_id' => 'workflow-worker-lost-claim',
237+
'task_queue' => 'external-workflows',
238+
], $this->workerHeaders());
239+
240+
$workflowPoll->assertOk();
241+
242+
$this->postJson(
243+
sprintf('/api/worker/workflow-tasks/%s/complete', $workflowPoll->json('task.task_id')),
244+
[
245+
'lease_owner' => $workflowPoll->json('task.lease_owner'),
246+
'workflow_task_attempt' => $workflowPoll->json('task.workflow_task_attempt'),
247+
'commands' => [
248+
[
249+
'type' => 'schedule_activity',
250+
'activity_type' => 'tests.external-greeting-activity',
251+
'arguments' => Serializer::serializeWithCodec(
252+
(string) config('workflows.serializer'),
253+
['Ada'],
254+
),
255+
'worker_session' => [
256+
'session_id' => 'gpu-lost-claim',
257+
'queue' => 'gpu-activities',
258+
'requirements' => ['gpu:nvidia-l4'],
259+
'lease_seconds' => 120,
260+
'ttl_seconds' => 600,
261+
'max_concurrent_activities' => 1,
262+
],
263+
],
264+
],
265+
],
266+
$this->workerHeaders(),
267+
)->assertOk();
268+
269+
$runId = (string) $start->json('run_id');
270+
271+
$task = WorkflowTask::query()
272+
->where('workflow_run_id', $runId)
273+
->where('task_type', 'activity')
274+
->firstOrFail();
275+
276+
$execution = ActivityExecution::query()
277+
->where('workflow_run_id', $runId)
278+
->firstOrFail();
279+
280+
$recordedAt = now()->toJSON();
281+
282+
$this->mock(ActivityTaskBridgeContract::class, function (MockInterface $mock) use (
283+
$execution,
284+
$recordedAt,
285+
$runId,
286+
$task,
287+
$workflowId,
288+
): void {
289+
$mock->shouldReceive('poll')
290+
->once()
291+
->with(null, 'gpu-activities', 10, null, 'default', ['tests.external-greeting-activity'])
292+
->andReturn([
293+
[
294+
'task_id' => $task->id,
295+
'workflow_run_id' => $runId,
296+
'workflow_instance_id' => $workflowId,
297+
'activity_execution_id' => $execution->id,
298+
'activity_type' => 'tests.external-greeting-activity',
299+
'activity_class' => null,
300+
'connection' => null,
301+
'queue' => 'gpu-activities',
302+
'compatibility' => null,
303+
'available_at' => $recordedAt,
304+
],
305+
]);
306+
307+
$mock->shouldReceive('claimStatus')
308+
->once()
309+
->with($task->id, 'gpu-worker-lost-claim')
310+
->andReturn([
311+
'claimed' => false,
312+
'task_id' => $task->id,
313+
'workflow_instance_id' => null,
314+
'workflow_run_id' => null,
315+
'activity_execution_id' => null,
316+
'activity_attempt_id' => null,
317+
'attempt_number' => null,
318+
'activity_type' => null,
319+
'activity_class' => null,
320+
'idempotency_key' => null,
321+
'payload_codec' => null,
322+
'arguments' => null,
323+
'retry_policy' => null,
324+
'connection' => null,
325+
'queue' => null,
326+
'lease_owner' => null,
327+
'lease_expires_at' => null,
328+
'reason' => 'task_not_ready',
329+
'reason_detail' => 'The task is no longer ready.',
330+
'retry_after_seconds' => null,
331+
'backend_error' => null,
332+
'compatibility_reason' => null,
333+
]);
334+
});
335+
336+
$this->registerWorkerThroughProtocol(
337+
workerId: 'gpu-worker-lost-claim',
338+
taskQueue: 'gpu-activities',
339+
supportedActivityTypes: ['tests.external-greeting-activity'],
340+
capabilities: ['gpu:nvidia-l4'],
341+
);
342+
343+
$this->postJson('/api/worker/activity-tasks/poll', [
344+
'worker_id' => 'gpu-worker-lost-claim',
345+
'task_queue' => 'gpu-activities',
346+
], $this->workerHeaders())
347+
->assertOk()
348+
->assertJsonPath('poll_status', 'empty')
349+
->assertJsonPath('task', null);
350+
351+
$this->assertFalse(
352+
WorkerSessionLease::query()
353+
->where('namespace', 'default')
354+
->where('session_id', 'gpu-lost-claim')
355+
->exists(),
356+
);
357+
}
358+
205359
public function test_worker_session_activity_commands_are_protocol_fenced_for_mixed_server_rollouts(): void
206360
{
207361
Queue::fake();

0 commit comments

Comments
 (0)