Skip to content

Commit 17fbc2d

Browse files
TD-S099: Server does not prove unsupported worker payload codecs fail closed (#120)
1 parent 4d119cf commit 17fbc2d

2 files changed

Lines changed: 316 additions & 1 deletion

File tree

app/Http/Controllers/Api/WorkerController.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Workflow\V2\Exceptions\StructuralLimitExceededException;
2020
use Workflow\V2\Models\WorkflowTask;
2121
use Workflow\V2\Support\HistoryPayloadCompression;
22+
use Workflow\V2\Support\PayloadEnvelopeResolver;
2223
use Workflow\V2\Support\StandaloneWorkerVisibility;
2324
use Workflow\V2\Support\WorkerProtocolVersion;
2425
use Workflow\V2\Support\WorkflowCommandNormalizer;
@@ -967,13 +968,27 @@ public function completeQueryTask(Request $request, string $queryTaskId): JsonRe
967968
'result_envelope.blob' => ['required_with:result_envelope', 'string'],
968969
]);
969970

971+
$resultEnvelope = null;
972+
973+
if (($validated['result_envelope'] ?? null) !== null) {
974+
$resolved = PayloadEnvelopeResolver::resolveCommandPayloadWithCodec([
975+
'codec' => $validated['result_envelope']['codec'] ?? null,
976+
'blob' => $validated['result_envelope']['blob'] ?? null,
977+
], 'result_envelope');
978+
979+
$resultEnvelope = [
980+
'codec' => $resolved['codec'],
981+
'blob' => $resolved['payload'],
982+
];
983+
}
984+
970985
$outcome = $this->queryTasks->complete(
971986
$namespace,
972987
$queryTaskId,
973988
$validated['lease_owner'],
974989
(int) $validated['query_task_attempt'],
975990
$validated['result'] ?? null,
976-
$validated['result_envelope'] ?? null,
991+
$resultEnvelope,
977992
);
978993

979994
return WorkerProtocol::json(
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
<?php
2+
3+
namespace Tests\Feature;
4+
5+
use App\Support\WorkflowQueryTaskBroker;
6+
use Illuminate\Foundation\Testing\RefreshDatabase;
7+
use Illuminate\Support\Facades\Queue;
8+
use Tests\Feature\Concerns\ServerTestHelpers;
9+
use Tests\TestCase;
10+
use Workflow\Serializers\Serializer;
11+
use Workflow\V2\Enums\ActivityStatus;
12+
use Workflow\V2\Enums\HistoryEventType;
13+
use Workflow\V2\Enums\TaskStatus;
14+
use Workflow\V2\Models\ActivityExecution;
15+
use Workflow\V2\Models\WorkflowHistoryEvent;
16+
use Workflow\V2\Models\WorkflowRun;
17+
use Workflow\V2\Models\WorkflowTask;
18+
19+
class UnsupportedWorkerPayloadCodecTest extends TestCase
20+
{
21+
use RefreshDatabase;
22+
use ServerTestHelpers;
23+
24+
protected function setUp(): void
25+
{
26+
parent::setUp();
27+
28+
config([
29+
'server.polling.timeout' => 0,
30+
'server.query_tasks.timeout' => 0,
31+
]);
32+
33+
$this->createNamespace('default');
34+
}
35+
36+
public function test_worker_workflow_task_payload_codec_failure_records_task_failure(): void
37+
{
38+
Queue::fake();
39+
40+
$run = $this->startRemoteWorkflow('wf-worker-unsupported-codec');
41+
$run->forceFill(['payload_codec' => 'zstd'])->save();
42+
43+
$this->registerWorker(
44+
'python-codec-workflow',
45+
'python-workflows',
46+
supportedWorkflowTypes: ['python.codec-workflow'],
47+
);
48+
49+
$poll = $this->withHeaders($this->workerHeaders())
50+
->postJson('/api/worker/workflow-tasks/poll', [
51+
'worker_id' => 'python-codec-workflow',
52+
'task_queue' => 'python-workflows',
53+
]);
54+
55+
$poll->assertOk()
56+
->assertJsonPath('task.payload_codec', 'zstd')
57+
->assertJsonPath('task.arguments.codec', 'zstd');
58+
59+
$taskId = (string) $poll->json('task.task_id');
60+
$attempt = (int) $poll->json('task.workflow_task_attempt');
61+
62+
$fail = $this->withHeaders($this->workerHeaders())
63+
->postJson("/api/worker/workflow-tasks/{$taskId}/fail", [
64+
'lease_owner' => 'python-codec-workflow',
65+
'workflow_task_attempt' => $attempt,
66+
'failure' => [
67+
'message' => "Unsupported payload codec 'zstd'.",
68+
'type' => 'ValueError',
69+
'stack_trace' => 'at worker.decode_payload',
70+
],
71+
]);
72+
73+
$fail->assertOk()
74+
->assertJsonPath('outcome', 'failed')
75+
->assertJsonPath('recorded', true);
76+
77+
$task = WorkflowTask::query()->findOrFail($taskId);
78+
79+
$this->assertSame(TaskStatus::Failed, $task->status);
80+
$this->assertSame("Unsupported payload codec 'zstd'.", $task->last_error);
81+
$this->assertFalse(
82+
WorkflowHistoryEvent::query()
83+
->where('workflow_run_id', $run->id)
84+
->where('event_type', HistoryEventType::WorkflowCompleted->value)
85+
->exists(),
86+
);
87+
}
88+
89+
public function test_worker_activity_payload_codec_failure_is_non_retryable_even_without_details(): void
90+
{
91+
Queue::fake();
92+
93+
$run = $this->startRemoteWorkflow('wf-activity-unsupported-codec');
94+
$this->registerWorker(
95+
'python-codec-scheduler',
96+
'python-workflows',
97+
supportedWorkflowTypes: ['python.codec-workflow'],
98+
);
99+
100+
$workflowPoll = $this->withHeaders($this->workerHeaders())
101+
->postJson('/api/worker/workflow-tasks/poll', [
102+
'worker_id' => 'python-codec-scheduler',
103+
'task_queue' => 'python-workflows',
104+
]);
105+
106+
$workflowPoll->assertOk();
107+
108+
$schedule = $this->withHeaders($this->workerHeaders())
109+
->postJson(sprintf('/api/worker/workflow-tasks/%s/complete', $workflowPoll->json('task.task_id')), [
110+
'lease_owner' => $workflowPoll->json('task.lease_owner'),
111+
'workflow_task_attempt' => $workflowPoll->json('task.workflow_task_attempt'),
112+
'commands' => [
113+
[
114+
'type' => 'schedule_activity',
115+
'activity_type' => 'python.codec-activity',
116+
'arguments' => Serializer::serializeWithCodec('avro', ['Ada']),
117+
'queue' => 'python-activities',
118+
'retry_policy' => [
119+
'max_attempts' => 3,
120+
'backoff_seconds' => [1],
121+
],
122+
],
123+
],
124+
]);
125+
126+
$schedule->assertOk()
127+
->assertJsonPath('outcome', 'completed');
128+
129+
$run->refresh()->forceFill(['payload_codec' => 'zstd'])->save();
130+
131+
$this->registerWorker(
132+
'python-codec-activity',
133+
'python-activities',
134+
supportedActivityTypes: ['python.codec-activity'],
135+
);
136+
137+
$activityPoll = $this->withHeaders($this->workerHeaders())
138+
->postJson('/api/worker/activity-tasks/poll', [
139+
'worker_id' => 'python-codec-activity',
140+
'task_queue' => 'python-activities',
141+
]);
142+
143+
$activityPoll->assertOk()
144+
->assertJsonPath('task.payload_codec', 'zstd')
145+
->assertJsonPath('task.arguments.codec', 'zstd');
146+
147+
$fail = $this->withHeaders($this->workerHeaders())
148+
->postJson(sprintf('/api/worker/activity-tasks/%s/fail', $activityPoll->json('task.task_id')), [
149+
'activity_attempt_id' => $activityPoll->json('task.activity_attempt_id'),
150+
'lease_owner' => $activityPoll->json('task.lease_owner'),
151+
'failure' => [
152+
'message' => "Unsupported payload codec 'zstd'.",
153+
'type' => 'ValueError',
154+
'stack_trace' => 'at worker.decode_payload',
155+
'non_retryable' => true,
156+
],
157+
]);
158+
159+
$fail->assertOk()
160+
->assertJsonPath('outcome', 'failed')
161+
->assertJsonPath('recorded', true)
162+
->assertJsonPath('next_task_id', null);
163+
164+
$execution = ActivityExecution::query()
165+
->findOrFail((string) $activityPoll->json('task.activity_execution_id'));
166+
167+
$this->assertSame(ActivityStatus::Failed, $execution->status);
168+
$this->assertTrue(
169+
WorkflowHistoryEvent::query()
170+
->where('workflow_run_id', $run->id)
171+
->where('event_type', HistoryEventType::ActivityFailed->value)
172+
->exists(),
173+
);
174+
$this->assertFalse(
175+
WorkflowHistoryEvent::query()
176+
->where('workflow_run_id', $run->id)
177+
->where('event_type', HistoryEventType::ActivityRetryScheduled->value)
178+
->exists(),
179+
);
180+
}
181+
182+
public function test_worker_query_payload_codec_failure_records_failed_query_task(): void
183+
{
184+
Queue::fake();
185+
186+
$run = $this->startRemoteWorkflow('wf-query-unsupported-codec');
187+
$run->forceFill(['payload_codec' => 'zstd'])->save();
188+
189+
$this->registerWorker(
190+
'python-codec-query',
191+
'python-workflows',
192+
supportedWorkflowTypes: ['python.codec-workflow'],
193+
);
194+
195+
/** @var WorkflowQueryTaskBroker $broker */
196+
$broker = app(WorkflowQueryTaskBroker::class);
197+
$task = $broker->enqueue('default', $run, 'status', [
198+
'codec' => 'zstd',
199+
'blob' => 'opaque-query-arguments',
200+
]);
201+
202+
$poll = $this->withHeaders($this->workerHeaders())
203+
->postJson('/api/worker/query-tasks/poll', [
204+
'worker_id' => 'python-codec-query',
205+
'task_queue' => 'python-workflows',
206+
]);
207+
208+
$poll->assertOk()
209+
->assertJsonPath('task.query_task_id', $task['query_task_id'])
210+
->assertJsonPath('task.payload_codec', 'zstd')
211+
->assertJsonPath('task.workflow_arguments.codec', 'zstd')
212+
->assertJsonPath('task.query_arguments.codec', 'zstd');
213+
214+
$fail = $this->withHeaders($this->workerHeaders())
215+
->postJson("/api/worker/query-tasks/{$task['query_task_id']}/fail", [
216+
'lease_owner' => 'python-codec-query',
217+
'query_task_attempt' => 1,
218+
'failure' => [
219+
'message' => "Unsupported payload codec 'zstd'.",
220+
'reason' => 'query_payload_decode_failed',
221+
'type' => 'ValueError',
222+
'stack_trace' => 'at worker.decode_payload',
223+
],
224+
]);
225+
226+
$fail->assertOk()
227+
->assertJsonPath('outcome', 'failed')
228+
->assertJsonPath('reason', 'query_payload_decode_failed');
229+
230+
$stored = $broker->task((string) $task['query_task_id']);
231+
232+
$this->assertIsArray($stored);
233+
$this->assertSame('failed', $stored['status'] ?? null);
234+
$this->assertSame('query_payload_decode_failed', $stored['reason'] ?? null);
235+
$this->assertSame("Unsupported payload codec 'zstd'.", $stored['message'] ?? null);
236+
}
237+
238+
public function test_worker_query_result_envelope_rejects_unsupported_codec_without_completing(): void
239+
{
240+
Queue::fake();
241+
242+
$run = $this->startRemoteWorkflow('wf-query-result-unsupported-codec');
243+
$this->registerWorker(
244+
'python-codec-query-result',
245+
'python-workflows',
246+
supportedWorkflowTypes: ['python.codec-workflow'],
247+
);
248+
249+
/** @var WorkflowQueryTaskBroker $broker */
250+
$broker = app(WorkflowQueryTaskBroker::class);
251+
$task = $broker->enqueue('default', $run, 'status', [
252+
'codec' => 'avro',
253+
'blob' => Serializer::serializeWithCodec('avro', ['summary']),
254+
]);
255+
256+
$poll = $this->withHeaders($this->workerHeaders())
257+
->postJson('/api/worker/query-tasks/poll', [
258+
'worker_id' => 'python-codec-query-result',
259+
'task_queue' => 'python-workflows',
260+
]);
261+
262+
$poll->assertOk()
263+
->assertJsonPath('task.query_task_id', $task['query_task_id']);
264+
265+
$complete = $this->withHeaders($this->workerHeaders())
266+
->postJson("/api/worker/query-tasks/{$task['query_task_id']}/complete", [
267+
'lease_owner' => 'python-codec-query-result',
268+
'query_task_attempt' => 1,
269+
'result' => ['status' => 'ready'],
270+
'result_envelope' => [
271+
'codec' => 'zstd',
272+
'blob' => 'opaque-result',
273+
],
274+
]);
275+
276+
$complete->assertStatus(422)
277+
->assertJsonValidationErrors(['result_envelope.codec']);
278+
279+
$stored = $broker->task((string) $task['query_task_id']);
280+
281+
$this->assertIsArray($stored);
282+
$this->assertSame('leased', $stored['status'] ?? null);
283+
$this->assertArrayNotHasKey('result_envelope', $stored);
284+
}
285+
286+
private function startRemoteWorkflow(string $workflowId): WorkflowRun
287+
{
288+
$start = $this->withHeaders($this->apiHeaders())
289+
->postJson('/api/workflows', [
290+
'workflow_id' => $workflowId,
291+
'workflow_type' => 'python.codec-workflow',
292+
'task_queue' => 'python-workflows',
293+
'input' => ['Ada'],
294+
]);
295+
296+
$start->assertCreated();
297+
298+
return WorkflowRun::query()->findOrFail((string) $start->json('run_id'));
299+
}
300+
}

0 commit comments

Comments
 (0)