Skip to content

Commit 50388a9

Browse files
Store activity execution payload codecs
Store activity execution payload codecs
1 parent 5e2698b commit 50388a9

5 files changed

Lines changed: 88 additions & 53 deletions

File tree

src/V2/Models/ActivityExecution.php

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public function activityArguments(): array
6666
}
6767

6868
/** @var array<int, mixed> $arguments */
69-
$arguments = $this->unserializeWithRunCodec($this->arguments);
69+
$arguments = $this->unserializeWithRowCodec($this->arguments);
7070

7171
return $arguments;
7272
}
@@ -77,25 +77,20 @@ public function activityResult(): mixed
7777
return null;
7878
}
7979

80-
return $this->unserializeWithRunCodec($this->result);
80+
return $this->unserializeWithRowCodec($this->result);
8181
}
8282

8383
/**
84-
* Decode an activity payload. Activity executions do not carry their own
85-
* payload_codec column because the scheduling path may fall back from
86-
* Avro to the legacy Y codec when arguments carry PHP-only values (see
87-
* WorkflowExecutor::scheduleActivity and #429). The blob is
88-
* self-describing — Avro's base64-plus-prefix envelope and PHP
89-
* serialize's `O:`/`a:`/… header byte are disjoint, so the legacy
90-
* sniff-based unserialize path picks the right codec regardless of
91-
* which one was chosen at write time.
92-
*
93-
* The run's `payload_codec` remains the authority for the rest of the
94-
* run state (command payloads, history, etc.) — only the activity
95-
* arguments/result blob gets the sniff treatment.
84+
* Decode an activity payload with the codec persisted beside the row.
85+
* Null is kept as a defensive fallback for rows written before this
86+
* column existed in unreleased v2 builds.
9687
*/
97-
private function unserializeWithRunCodec(string $blob): mixed
88+
private function unserializeWithRowCodec(string $blob): mixed
9889
{
90+
if (is_string($this->payload_codec) && $this->payload_codec !== '') {
91+
return Serializer::unserializeWithCodec($this->payload_codec, $blob);
92+
}
93+
9994
return Serializer::unserialize($blob);
10095
}
10196
}

src/V2/Support/ActivityOutcomeRecorder.php

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,17 @@ public static function record(
113113
$runCodec = is_string($run->payload_codec) && $run->payload_codec !== ''
114114
? $run->payload_codec
115115
: null;
116-
$serializedSuccessfulResult = null;
117-
$resultCodec = null;
116+
$encodedSuccessfulResult = null;
118117

119118
if ($throwable === null) {
120-
$serializedSuccessfulResult = self::serializeWithCodec($result, $codec, $runCodec);
121-
$resultCodec = self::payloadCodec($codec, $runCodec);
119+
$encodedSuccessfulResult = self::serializeWithCodec(
120+
$result,
121+
$codec,
122+
self::preferredPayloadCodec($lockedExecution, $runCodec),
123+
);
122124

123125
StructuralLimits::logWarning(
124-
StructuralLimits::warnApproachingPayloadSize($serializedSuccessfulResult),
126+
StructuralLimits::warnApproachingPayloadSize($encodedSuccessfulResult['blob']),
125127
[
126128
'workflow_run_id' => $run->id,
127129
'workflow_type' => $run->workflow_type,
@@ -133,23 +135,25 @@ public static function record(
133135
);
134136

135137
try {
136-
StructuralLimits::guardPayloadSize($serializedSuccessfulResult);
138+
StructuralLimits::guardPayloadSize($encodedSuccessfulResult['blob']);
137139
} catch (StructuralLimitExceededException $limitExceeded) {
138140
$throwable = $limitExceeded;
139-
$serializedSuccessfulResult = null;
140-
$resultCodec = null;
141+
$encodedSuccessfulResult = null;
141142
}
142143
}
143144

144145
if (in_array($run->status, [RunStatus::Completed, RunStatus::Failed], true)) {
145146
$lockedExecution->forceFill([
146147
'status' => $throwable === null ? ActivityStatus::Completed : ActivityStatus::Failed,
147148
'result' => $throwable === null
148-
? $serializedSuccessfulResult
149+
? $encodedSuccessfulResult['blob']
149150
: $lockedExecution->result,
151+
'payload_codec' => $throwable === null
152+
? $encodedSuccessfulResult['codec']
153+
: $lockedExecution->payload_codec,
150154
'exception' => $throwable === null
151155
? $lockedExecution->exception
152-
: self::serializeWithCodec(self::failurePayload($throwable, $codec), null, $runCodec),
156+
: self::serializeWithCodec(self::failurePayload($throwable, $codec), null, $runCodec)['blob'],
153157
'closed_at' => $lockedExecution->closed_at ?? now(),
154158
])->save();
155159

@@ -178,7 +182,8 @@ public static function record(
178182
if ($throwable === null) {
179183
$lockedExecution->forceFill([
180184
'status' => ActivityStatus::Completed,
181-
'result' => $serializedSuccessfulResult,
185+
'result' => $encodedSuccessfulResult['blob'],
186+
'payload_codec' => $encodedSuccessfulResult['codec'],
182187
'exception' => null,
183188
'closed_at' => now(),
184189
])->save();
@@ -191,7 +196,7 @@ public static function record(
191196
'sequence' => $lockedExecution->sequence,
192197
'attempt_number' => $attemptCount,
193198
'result' => $lockedExecution->result,
194-
'payload_codec' => $resultCodec,
199+
'payload_codec' => $encodedSuccessfulResult['codec'],
195200
'activity' => ActivitySnapshot::fromExecution($lockedExecution),
196201
], $parallelMetadata ?? []), $task);
197202

@@ -214,7 +219,7 @@ public static function record(
214219

215220
$lockedExecution->forceFill([
216221
'status' => ActivityStatus::Pending,
217-
'exception' => self::serializeWithCodec($exceptionPayload, null, $runCodec),
222+
'exception' => self::serializeWithCodec($exceptionPayload, null, $runCodec)['blob'],
218223
'last_heartbeat_at' => null,
219224
])->save();
220225

@@ -293,7 +298,7 @@ public static function record(
293298

294299
$lockedExecution->forceFill([
295300
'status' => ActivityStatus::Failed,
296-
'exception' => self::serializeWithCodec($exceptionPayload, null, $runCodec),
301+
'exception' => self::serializeWithCodec($exceptionPayload, null, $runCodec)['blob'],
297302
'closed_at' => now(),
298303
])->save();
299304

@@ -509,25 +514,32 @@ private static function shouldRetry(
509514
* codec (with a chooseCodecForData PHP-only fallback), then the package
510515
* default.
511516
*
512-
* The encoded blob is stamped on the activity row; the workflow side
513-
* later decodes it with a codec sniff so a Y-encoded fallback round-
514-
* trips even when the run's codec tag says Avro. Keeps parity with
515-
* activity-argument scheduling (see WorkflowExecutor::scheduleActivity
516-
* and #429).
517+
* @return array{blob: string, codec: string}
517518
*/
518-
private static function serializeWithCodec(mixed $value, ?string $workerCodec, ?string $runCodec): string
519+
private static function serializeWithCodec(mixed $value, ?string $workerCodec, ?string $preferredCodec): array
519520
{
520521
if (is_string($workerCodec) && $workerCodec !== '' && is_string($value)) {
521-
return $value;
522+
return [
523+
'blob' => $value,
524+
'codec' => CodecRegistry::canonicalize($workerCodec),
525+
];
522526
}
523527

524-
if (is_string($runCodec) && $runCodec !== '') {
525-
$chosenCodec = Serializer::chooseCodecForData($runCodec, $value);
528+
if (is_string($preferredCodec) && $preferredCodec !== '') {
529+
$chosenCodec = Serializer::chooseCodecForData($preferredCodec, $value);
526530

527-
return Serializer::serializeWithCodec($chosenCodec, $value);
531+
return [
532+
'blob' => Serializer::serializeWithCodec($chosenCodec, $value),
533+
'codec' => $chosenCodec,
534+
];
528535
}
529536

530-
return Serializer::serialize($value);
537+
$chosenCodec = Serializer::chooseCodecForData(CodecRegistry::defaultCodec(), $value);
538+
539+
return [
540+
'blob' => Serializer::serializeWithCodec($chosenCodec, $value),
541+
'codec' => $chosenCodec,
542+
];
531543
}
532544

533545
/**
@@ -549,10 +561,10 @@ private static function failurePayload(Throwable $throwable, ?string $workerCode
549561
return $payload;
550562
}
551563

552-
private static function payloadCodec(?string $workerCodec, ?string $runCodec): string
564+
private static function preferredPayloadCodec(ActivityExecution $execution, ?string $runCodec): ?string
553565
{
554-
if (is_string($workerCodec) && $workerCodec !== '') {
555-
return $workerCodec;
566+
if (is_string($execution->payload_codec) && $execution->payload_codec !== '') {
567+
return $execution->payload_codec;
556568
}
557569

558570
if (is_string($runCodec) && $runCodec !== '') {

src/V2/Support/WorkflowExecutor.php

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,14 +1349,8 @@ private function scheduleActivity(
13491349

13501350
// Activity arguments often contain consumer-side PHP objects (messages,
13511351
// DTOs, value objects) that the v2 default Avro codec can only encode
1352-
// by round-tripping through JSON — which drops class identity and
1353-
// hands the activity a plain associative array. Apply the same
1354-
// chooseCodecForData fallback child workflow scheduling uses so PHP-
1355-
// only arguments serialize with the legacy Y codec and arrive at the
1356-
// activity as the typed objects the producer passed in. The blob is
1357-
// self-describing (Avro's wrapper prefix / PHP-serialize's "O:"/"a:"
1358-
// shapes are disjoint), so the decode path can sniff without needing
1359-
// a stored per-execution codec column. See #429.
1352+
// by round-tripping through JSON. Persist the chosen codec beside the
1353+
// activity row so later reads never depend on payload sniffing.
13601354
$argumentsCodec = Serializer::chooseCodecForData($run->payload_codec, $activityCall->arguments);
13611355
$serializedArguments = Serializer::serializeWithCodec($argumentsCodec, $activityCall->arguments);
13621356
$this->logApproachingLimit(
@@ -1377,6 +1371,7 @@ private function scheduleActivity(
13771371
'activity_type' => TypeRegistry::for($activityCall->activity),
13781372
'status' => ActivityStatus::Pending->value,
13791373
'attempt_count' => 0,
1374+
'payload_codec' => $argumentsCodec,
13801375
'arguments' => $serializedArguments,
13811376
'connection' => RoutingResolver::activityConnection($activityCall->activity, $run, $options),
13821377
'queue' => RoutingResolver::activityQueue($activityCall->activity, $run, $options),

src/migrations/2026_04_05_000104_create_activity_executions_table.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ public function up(): void
1818
$table->string('activity_class');
1919
$table->string('activity_type');
2020
$table->string('status');
21+
$table->string('payload_codec')
22+
->nullable();
2123
$table->longText('arguments')
2224
->nullable();
2325
$table->longText('result')

tests/Feature/V2/V2ActivityArgumentCodecTest.php

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66

77
use Illuminate\Support\Facades\Queue;
88
use Tests\Fixtures\V2\TestActivityArgumentObject;
9+
use Tests\Fixtures\V2\TestActivityArgumentObjectActivity;
910
use Tests\Fixtures\V2\TestActivityArgumentObjectWorkflow;
1011
use Tests\TestCase;
12+
use Workflow\Serializers\CodecDecodeException;
13+
use Workflow\V2\Enums\ActivityStatus;
1114
use Workflow\V2\Models\ActivityExecution;
1215
use Workflow\V2\Models\WorkflowRun;
1316
use Workflow\V2\WorkflowStub;
@@ -19,8 +22,8 @@
1922
* json_encode / json_decode and hands decoders a plain associative array.
2023
* scheduleActivity now applies the same chooseCodecForData fallback child
2124
* workflow scheduling uses so PHP-only arguments round-trip through the
22-
* legacy Y codec. The decode path sniffs the blob so the run codec tag can
23-
* stay Avro without breaking round-trip.
25+
* legacy Y codec. The selected codec is stored on the activity row so the
26+
* decode path does not depend on sniffing disjoint blob shapes.
2427
*/
2528
final class V2ActivityArgumentCodecTest extends TestCase
2629
{
@@ -55,13 +58,41 @@ public function testActivityReceivesTypedObjectWhenRunCodecIsAvro(): void
5558
->where('workflow_run_id', $run->id)
5659
->firstOrFail();
5760

61+
$this->assertSame('workflow-serializer-y', $execution->payload_codec);
62+
$this->assertSame(sprintf('hello:3:%s', TestActivityArgumentObject::class), $execution->activityResult());
63+
5864
[$argument] = $execution->activityArguments();
5965

6066
$this->assertInstanceOf(TestActivityArgumentObject::class, $argument);
6167
$this->assertSame('hello', $argument->tag);
6268
$this->assertSame(3, $argument->count);
6369
}
6470

71+
public function testActivityArgumentsUseStoredCodecInsteadOfSniffFallback(): void
72+
{
73+
$workflow = WorkflowStub::make(TestActivityArgumentObjectWorkflow::class, 'activity-arg-stored-codec');
74+
$workflow->start();
75+
76+
/** @var WorkflowRun $run */
77+
$run = WorkflowRun::query()->findOrFail($workflow->runId());
78+
79+
/** @var ActivityExecution $execution */
80+
$execution = ActivityExecution::query()->create([
81+
'workflow_run_id' => $run->id,
82+
'sequence' => 99,
83+
'activity_class' => TestActivityArgumentObjectActivity::class,
84+
'activity_type' => TestActivityArgumentObjectActivity::class,
85+
'status' => ActivityStatus::Pending->value,
86+
'attempt_count' => 0,
87+
'payload_codec' => 'avro',
88+
'arguments' => json_encode(['sniffable-json'], JSON_THROW_ON_ERROR),
89+
]);
90+
91+
$this->expectException(CodecDecodeException::class);
92+
93+
$execution->activityArguments();
94+
}
95+
6596
private function drainReadyTasks(): void
6697
{
6798
$deadline = microtime(true) + 10;

0 commit comments

Comments
 (0)