Skip to content

Commit 537ef6f

Browse files
TD-089: pin activity exception encode/decode to the run codec
Activity retry and final-failure paths in ActivityOutcomeRecorder were writing ActivityExecution.exception via codec-blind Serializer::serialize, which silently uses the package default codec. With Avro as the package default, that would store Avro-wrapped bytes on JSON-pinned runs (and the reverse on Avro-pinned runs under a JSON default). Encode the exception blob with serializeWithCodec(..., $runCodec) so the bytes match the run's persisted payload_codec, matching the terminal Completed/Failed branch that already did this. Thread the same codec into the fallback decode path: WorkflowExecutor and QueryStateReplayer now decode ActivityExecution.exception through the run codec, falling back to the legacy sniffer only when no run codec is available (pre-pinning rows). Adds regression coverage that drives the retry and final-failure paths through ActivityOutcomeRecorder with a codec mismatch between the run and the package default, for both JSON-pinned and Avro-pinned runs. Closes #363 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 799f5d8 commit 537ef6f

4 files changed

Lines changed: 297 additions & 4 deletions

File tree

src/V2/Support/ActivityOutcomeRecorder.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public static function record(
183183

184184
$lockedExecution->forceFill([
185185
'status' => ActivityStatus::Pending,
186-
'exception' => Serializer::serialize($exceptionPayload),
186+
'exception' => self::serializeWithCodec($exceptionPayload, null, $runCodec),
187187
'last_heartbeat_at' => null,
188188
])->save();
189189

@@ -260,7 +260,7 @@ public static function record(
260260

261261
$lockedExecution->forceFill([
262262
'status' => ActivityStatus::Failed,
263-
'exception' => Serializer::serialize($exceptionPayload),
263+
'exception' => self::serializeWithCodec($exceptionPayload, null, $runCodec),
264264
'closed_at' => now(),
265265
])->save();
266266

src/V2/Support/QueryStateReplayer.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,7 @@ private function activityException(
752752
$payload = is_array($event?->payload['exception'] ?? null)
753753
? $event->payload['exception']
754754
: (is_string($execution?->exception)
755-
? Serializer::unserialize($execution->exception)
755+
? $this->unserializeExceptionWithRun($execution->exception, $run)
756756
: []);
757757

758758
if (! is_array($payload) && $event !== null && $run !== null) {
@@ -791,6 +791,15 @@ private function activityException(
791791
return FailureFactory::restoreForReplay($payload, $fallbackClass, $fallbackMessage, $fallbackCode);
792792
}
793793

794+
private function unserializeExceptionWithRun(string $serialized, ?WorkflowRun $run): mixed
795+
{
796+
if ($run !== null && is_string($run->payload_codec) && $run->payload_codec !== '') {
797+
return Serializer::unserializeWithCodec($run->payload_codec, $serialized);
798+
}
799+
800+
return Serializer::unserialize($serialized);
801+
}
802+
794803
private function applyRecordedUpdates(
795804
WorkflowRun $run,
796805
\Workflow\V2\Workflow $workflow,

src/V2/Support/WorkflowExecutor.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3500,7 +3500,7 @@ private function activityException(
35003500
$payload = is_array($event?->payload['exception'] ?? null)
35013501
? $event->payload['exception']
35023502
: (is_string($execution?->exception)
3503-
? Serializer::unserialize($execution->exception)
3503+
? $this->unserializePayloadWithRun($execution->exception, $run)
35043504
: []);
35053505

35063506
if (! is_array($payload) && $event !== null && $run !== null) {
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Feature\V2;
6+
7+
use Illuminate\Support\Str;
8+
use RuntimeException;
9+
use Tests\Fixtures\V2\TestGreetingActivity;
10+
use Tests\Fixtures\V2\TestGreetingWorkflow;
11+
use Tests\TestCase;
12+
use Workflow\Serializers\Serializer;
13+
use Workflow\V2\Enums\ActivityAttemptStatus;
14+
use Workflow\V2\Enums\ActivityStatus;
15+
use Workflow\V2\Enums\HistoryEventType;
16+
use Workflow\V2\Enums\RunStatus;
17+
use Workflow\V2\Enums\TaskStatus;
18+
use Workflow\V2\Enums\TaskType;
19+
use Workflow\V2\Models\ActivityAttempt;
20+
use Workflow\V2\Models\ActivityExecution;
21+
use Workflow\V2\Models\WorkflowHistoryEvent;
22+
use Workflow\V2\Models\WorkflowInstance;
23+
use Workflow\V2\Models\WorkflowRun;
24+
use Workflow\V2\Models\WorkflowTask;
25+
use Workflow\V2\Support\ActivityOutcomeRecorder;
26+
27+
/**
28+
* TD-089 regression: activity exception rows must be encoded with the
29+
* run's pinned payload codec, not the package default. Any fallback
30+
* reader that decodes the blob must use the run codec too, so that
31+
* Avro-default deployments do not corrupt JSON-pinned runs (and vice
32+
* versa).
33+
*/
34+
final class V2ActivityExceptionCodecTest extends TestCase
35+
{
36+
public function testRetryPathStoresExceptionUnderRunCodecWhenDefaultDiffers(): void
37+
{
38+
// Package default differs from the run's pinned codec: this is the
39+
// exact mismatch that the old codec-blind Serializer::serialize()
40+
// would silently mis-encode.
41+
config()->set('workflows.serializer', 'avro');
42+
43+
[$run, $execution, $task, $attempt] = $this->scaffoldLeasedAttempt(
44+
pinnedCodec: 'json',
45+
maxAttempts: 2,
46+
instanceId: 'td089-retry-json',
47+
);
48+
49+
$outcome = ActivityOutcomeRecorder::record(
50+
taskId: $task->id,
51+
attemptId: $attempt->id,
52+
attemptCount: 1,
53+
result: null,
54+
throwable: new RuntimeException('retry boom', 7),
55+
maxAttempts: 2,
56+
backoffSeconds: 1,
57+
);
58+
59+
$this->assertTrue($outcome['recorded']);
60+
61+
$execution->refresh();
62+
63+
// Retry path must have flipped status back to Pending and stashed
64+
// the exception bytes for the reattempt.
65+
$this->assertSame(ActivityStatus::Pending, $execution->status);
66+
$this->assertIsString($execution->exception);
67+
68+
$this->assertExceptionBytesDecodeAs(
69+
bytes: $execution->exception,
70+
runCodec: 'json',
71+
otherCodec: 'avro',
72+
expectedMessage: 'retry boom',
73+
);
74+
}
75+
76+
public function testFinalFailurePathStoresExceptionUnderRunCodecWhenDefaultDiffers(): void
77+
{
78+
config()->set('workflows.serializer', 'avro');
79+
80+
[$run, $execution, $task, $attempt] = $this->scaffoldLeasedAttempt(
81+
pinnedCodec: 'json',
82+
maxAttempts: 1,
83+
instanceId: 'td089-final-json',
84+
);
85+
86+
$outcome = ActivityOutcomeRecorder::record(
87+
taskId: $task->id,
88+
attemptId: $attempt->id,
89+
attemptCount: 1,
90+
result: null,
91+
throwable: new RuntimeException('final boom', 9),
92+
maxAttempts: 1,
93+
backoffSeconds: 0,
94+
);
95+
96+
$this->assertTrue($outcome['recorded']);
97+
98+
$execution->refresh();
99+
100+
$this->assertSame(ActivityStatus::Failed, $execution->status);
101+
$this->assertIsString($execution->exception);
102+
103+
$this->assertExceptionBytesDecodeAs(
104+
bytes: $execution->exception,
105+
runCodec: 'json',
106+
otherCodec: 'avro',
107+
expectedMessage: 'final boom',
108+
);
109+
}
110+
111+
public function testFinalFailurePathStoresExceptionUnderAvroRunCodecWhenDefaultIsJson(): void
112+
{
113+
// Mirror case: Avro-pinned run under a JSON package default still
114+
// has to write Avro bytes, not JSON.
115+
config()->set('workflows.serializer', 'json');
116+
117+
[$run, $execution, $task, $attempt] = $this->scaffoldLeasedAttempt(
118+
pinnedCodec: 'avro',
119+
maxAttempts: 1,
120+
instanceId: 'td089-final-avro',
121+
);
122+
123+
$outcome = ActivityOutcomeRecorder::record(
124+
taskId: $task->id,
125+
attemptId: $attempt->id,
126+
attemptCount: 1,
127+
result: null,
128+
throwable: new RuntimeException('avro boom', 5),
129+
maxAttempts: 1,
130+
backoffSeconds: 0,
131+
);
132+
133+
$this->assertTrue($outcome['recorded']);
134+
135+
$execution->refresh();
136+
137+
$this->assertSame(ActivityStatus::Failed, $execution->status);
138+
$this->assertIsString($execution->exception);
139+
140+
$decoded = Serializer::unserializeWithCodec('avro', $execution->exception);
141+
142+
$this->assertIsArray($decoded);
143+
$this->assertSame(RuntimeException::class, $decoded['class']);
144+
$this->assertSame('avro boom', $decoded['message']);
145+
}
146+
147+
/**
148+
* @return array{0: WorkflowRun, 1: ActivityExecution, 2: WorkflowTask, 3: ActivityAttempt}
149+
*/
150+
private function scaffoldLeasedAttempt(string $pinnedCodec, int $maxAttempts, string $instanceId): array
151+
{
152+
$now = now();
153+
154+
$instance = WorkflowInstance::query()->create([
155+
'id' => $instanceId,
156+
'workflow_class' => TestGreetingWorkflow::class,
157+
'workflow_type' => 'test-greeting-workflow',
158+
'run_count' => 1,
159+
'reserved_at' => $now,
160+
'started_at' => $now,
161+
]);
162+
163+
$run = WorkflowRun::query()->create([
164+
'workflow_instance_id' => $instance->id,
165+
'run_number' => 1,
166+
'workflow_class' => TestGreetingWorkflow::class,
167+
'workflow_type' => 'test-greeting-workflow',
168+
'status' => RunStatus::Waiting->value,
169+
'arguments' => Serializer::serializeWithCodec($pinnedCodec, ['Taylor']),
170+
'payload_codec' => $pinnedCodec,
171+
'connection' => null,
172+
'queue' => null,
173+
'started_at' => $now,
174+
'last_progress_at' => $now,
175+
]);
176+
177+
$instance->forceFill(['current_run_id' => $run->id])->save();
178+
179+
$attemptId = (string) Str::ulid();
180+
181+
$execution = ActivityExecution::query()->create([
182+
'workflow_run_id' => $run->id,
183+
'sequence' => 1,
184+
'activity_class' => TestGreetingActivity::class,
185+
'activity_type' => 'test-greeting-activity',
186+
'status' => ActivityStatus::Running->value,
187+
'attempt_count' => 1,
188+
'current_attempt_id' => $attemptId,
189+
'arguments' => Serializer::serializeWithCodec($pinnedCodec, ['Taylor']),
190+
'connection' => null,
191+
'queue' => null,
192+
'started_at' => $now,
193+
'retry_policy' => [
194+
'snapshot_version' => 1,
195+
'max_attempts' => $maxAttempts,
196+
'backoff_seconds' => [1],
197+
'start_to_close_timeout' => 60,
198+
'schedule_to_start_timeout' => null,
199+
],
200+
]);
201+
202+
$task = WorkflowTask::query()->create([
203+
'workflow_run_id' => $run->id,
204+
'task_type' => TaskType::Activity->value,
205+
'status' => TaskStatus::Leased->value,
206+
'available_at' => $now,
207+
'payload' => [
208+
'activity_execution_id' => $execution->id,
209+
],
210+
'connection' => null,
211+
'queue' => null,
212+
'leased_at' => $now,
213+
'lease_expires_at' => $now->copy()->addMinutes(5),
214+
'attempt_count' => 1,
215+
]);
216+
217+
$attempt = ActivityAttempt::query()->create([
218+
'id' => $attemptId,
219+
'workflow_run_id' => $run->id,
220+
'activity_execution_id' => $execution->id,
221+
'workflow_task_id' => $task->id,
222+
'attempt_number' => 1,
223+
'status' => ActivityAttemptStatus::Running->value,
224+
'lease_owner' => $task->id,
225+
'started_at' => $now,
226+
'lease_expires_at' => $now->copy()->addMinutes(5),
227+
]);
228+
229+
WorkflowHistoryEvent::record($run, HistoryEventType::ActivityScheduled, [
230+
'activity_execution_id' => $execution->id,
231+
'activity_class' => $execution->activity_class,
232+
'activity_type' => $execution->activity_type,
233+
'sequence' => 1,
234+
]);
235+
236+
WorkflowHistoryEvent::record($run, HistoryEventType::ActivityStarted, [
237+
'activity_execution_id' => $execution->id,
238+
'activity_attempt_id' => $attemptId,
239+
'activity_class' => $execution->activity_class,
240+
'activity_type' => $execution->activity_type,
241+
'sequence' => 1,
242+
'attempt_number' => 1,
243+
], $task);
244+
245+
return [$run, $execution, $task, $attempt];
246+
}
247+
248+
private function assertExceptionBytesDecodeAs(
249+
string $bytes,
250+
string $runCodec,
251+
string $otherCodec,
252+
string $expectedMessage,
253+
): void {
254+
$decoded = Serializer::unserializeWithCodec($runCodec, $bytes);
255+
256+
$this->assertIsArray($decoded);
257+
$this->assertSame(RuntimeException::class, $decoded['class']);
258+
$this->assertSame($expectedMessage, $decoded['message']);
259+
260+
// Under the bug, the bytes would have been encoded with the package
261+
// default ($otherCodec), so decoding as $runCodec would have thrown
262+
// or produced garbage. Sanity-check that the bytes are NOT in the
263+
// other codec's shape to catch silent regressions if someone ever
264+
// re-wires serializeWithCodec to fall back to the package default.
265+
$otherDecoded = null;
266+
try {
267+
$otherDecoded = Serializer::unserializeWithCodec($otherCodec, $bytes);
268+
} catch (\Throwable $ignored) {
269+
// Expected: bytes in one codec should not decode as the other.
270+
return;
271+
}
272+
273+
// If the other codec *did* decode, the payload must look different
274+
// from a faithful Throwable array (proving the bytes are still
275+
// canonically from the run codec, not the package default).
276+
$this->assertTrue(
277+
! is_array($otherDecoded)
278+
|| ($otherDecoded['class'] ?? null) !== RuntimeException::class
279+
|| ($otherDecoded['message'] ?? null) !== $expectedMessage,
280+
'Exception bytes were ambiguously decodable as both codecs; '
281+
. 'encoder is likely ignoring the pinned run codec.',
282+
);
283+
}
284+
}

0 commit comments

Comments
 (0)