Skip to content

Commit de5c291

Browse files
v2 executor: pin activity result encode and child run codec to parent run
Two paired drifts surfaced once #330 made avro the default codec: - ActivityOutcomeRecorder serialized activity results with the package default (Serializer::serialize) and stamped no codec on the resulting activity row, so when the workflow side decoded the result via the run's pinned codec, the bytes did not match the codec — for plain-array starts the run is json but the result was avro bytes labeled json. The recorder now uses the parent run's payload_codec to encode results (and exception payloads) when the worker has not specified one. A worker-supplied codec still wins (the worker is sending opaque bytes it serialized itself). - WorkflowExecutor::startChildWorkflow encoded child arguments with the parent run codec but stamped the child run with CodecRegistry::defaultCodec(). When parent and default differ (e.g. parent is json from a plain-array start and default is avro), the child run row labels json bytes as avro and the next read fails with "Failed to base64-decode Avro payload." The child run now inherits the parent's codec so the bytes and the label agree. - WorkflowExecutor::activityResult and ::sideEffectResult took an event with no run context, so they unpacked the result blob with the codec-blind Serializer::unserialize. Both now accept the active run and dispatch through the run's pinned codec when available. Each path falls back to the legacy codec-blind sniffer when the row has no codec stamped, so pre-pinned history continues to replay. Part of #331 (thread Avro through every payload surface). Retires the two server activity- and child-workflow tests that started failing when avro became the default. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c6f1513 commit de5c291

2 files changed

Lines changed: 61 additions & 13 deletions

File tree

src/V2/Support/ActivityOutcomeRecorder.php

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,19 @@ public static function record(
106106
return self::recorded(null);
107107
}
108108

109+
$runCodec = is_string($run->payload_codec) && $run->payload_codec !== ''
110+
? $run->payload_codec
111+
: null;
112+
109113
if (in_array($run->status, [RunStatus::Completed, RunStatus::Failed], true)) {
110114
$lockedExecution->forceFill([
111115
'status' => $throwable === null ? ActivityStatus::Completed : ActivityStatus::Failed,
112-
'result' => $throwable === null ? Serializer::serialize($result) : $lockedExecution->result,
116+
'result' => $throwable === null
117+
? self::serializeWithCodec($result, $codec, $runCodec)
118+
: $lockedExecution->result,
113119
'exception' => $throwable === null
114120
? $lockedExecution->exception
115-
: Serializer::serialize(FailureFactory::payload($throwable)),
121+
: self::serializeWithCodec(FailureFactory::payload($throwable), null, $runCodec),
116122
'closed_at' => $lockedExecution->closed_at ?? now(),
117123
])->save();
118124

@@ -138,9 +144,7 @@ public static function record(
138144
$parallelMetadata = ParallelChildGroup::payloadForPath($parallelMetadataPath);
139145

140146
if ($throwable === null) {
141-
$serializedResult = $codec !== null
142-
? $result
143-
: Serializer::serialize($result);
147+
$serializedResult = self::serializeWithCodec($result, $codec, $runCodec);
144148

145149
$lockedExecution->forceFill([
146150
'status' => ActivityStatus::Completed,
@@ -435,4 +439,27 @@ private static function shouldRetry(Throwable $throwable, int $attemptCount, int
435439
return ! $throwable instanceof NonRetryableExceptionContract
436440
&& $attemptCount < $maxAttempts;
437441
}
442+
443+
/**
444+
* Serialize an activity payload, preferring the worker-supplied codec
445+
* (treats $value as already-serialized bytes), then the parent run's
446+
* codec, then falling back to the package default.
447+
*
448+
* The encoded blob is stamped on the activity row; the workflow side
449+
* later decodes it with the run codec (see ActivityExecution::activityResult).
450+
* Pinning to the run codec keeps the bytes and the decode codec aligned
451+
* even when the package default differs from the run codec.
452+
*/
453+
private static function serializeWithCodec(mixed $value, ?string $workerCodec, ?string $runCodec): string
454+
{
455+
if (is_string($workerCodec) && $workerCodec !== '' && is_string($value)) {
456+
return $value;
457+
}
458+
459+
if (is_string($runCodec) && $runCodec !== '') {
460+
return Serializer::serializeWithCodec($runCodec, $value);
461+
}
462+
463+
return Serializer::serialize($value);
464+
}
438465
}

src/V2/Support/WorkflowExecutor.php

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public function run(WorkflowRun $run, WorkflowTask $task): ?WorkflowTask
142142
try {
143143
$this->syncWorkflowCursor($workflow, $sequence + 1);
144144
if ($activityCompletion->event_type === HistoryEventType::ActivityCompleted) {
145-
$current = $workflowExecution->send($this->activityResult($activityCompletion));
145+
$current = $workflowExecution->send($this->activityResult($activityCompletion, $run));
146146
} else {
147147
$failureId = $activityCompletion->payload['failure_id'] ?? null;
148148

@@ -411,7 +411,7 @@ public function run(WorkflowRun $run, WorkflowTask $task): ?WorkflowTask
411411
}
412412

413413
$this->syncWorkflowCursor($workflow, $sequence + 1);
414-
$current = $workflowExecution->send($this->sideEffectResult($sideEffectEvent));
414+
$current = $workflowExecution->send($this->sideEffectResult($sideEffectEvent, $run));
415415
} catch (Throwable $throwable) {
416416
$this->failRun($run, $task, $throwable, 'workflow_run', $run->id);
417417

@@ -909,7 +909,7 @@ public function run(WorkflowRun $run, WorkflowTask $task): ?WorkflowTask
909909

910910
if ($activityCompletion !== null) {
911911
if ($activityCompletion->event_type === HistoryEventType::ActivityCompleted) {
912-
$results[$offset] = $this->activityResult($activityCompletion);
912+
$results[$offset] = $this->activityResult($activityCompletion, $run);
913913

914914
continue;
915915
}
@@ -1426,7 +1426,14 @@ private function scheduleChildWorkflow(
14261426
$commandContract = RunCommandContract::snapshot($childWorkflowCall->workflow);
14271427
$now = now();
14281428

1429-
$serializedChildArguments = Serializer::serializeWithCodec($run->payload_codec, $metadata->arguments);
1429+
// Inherit the parent's run codec for the child so the encoded
1430+
// arguments (next line) match the codec stamped on the child run.
1431+
// Falling back to the package default when the parent has none
1432+
// keeps pre-pinned parents working.
1433+
$childCodec = is_string($run->payload_codec) && $run->payload_codec !== ''
1434+
? $run->payload_codec
1435+
: CodecRegistry::defaultCodec();
1436+
$serializedChildArguments = Serializer::serializeWithCodec($childCodec, $metadata->arguments);
14301437
StructuralLimits::guardPayloadSize($serializedChildArguments);
14311438

14321439
/** @var WorkflowInstance $childInstance */
@@ -1448,7 +1455,7 @@ private function scheduleChildWorkflow(
14481455
'visibility_labels' => null,
14491456
'status' => RunStatus::Pending->value,
14501457
'compatibility' => $run->compatibility ?? WorkerCompatibility::current(),
1451-
'payload_codec' => CodecRegistry::defaultCodec(),
1458+
'payload_codec' => $childCodec,
14521459
'arguments' => $serializedChildArguments,
14531460
'connection' => RoutingResolver::workflowConnection($childWorkflowCall->workflow, $metadata),
14541461
'queue' => RoutingResolver::workflowQueue($childWorkflowCall->workflow, $metadata),
@@ -3120,25 +3127,39 @@ private function recordClosedParallelChildResolutions(
31203127
return true;
31213128
}
31223129

3123-
private function activityResult(WorkflowHistoryEvent $event): mixed
3130+
private function activityResult(WorkflowHistoryEvent $event, ?WorkflowRun $run = null): mixed
31243131
{
31253132
$serialized = $event->payload['result'] ?? null;
31263133

31273134
if (! is_string($serialized)) {
31283135
return null;
31293136
}
31303137

3131-
return Serializer::unserialize($serialized);
3138+
return $this->unserializePayloadWithRun($serialized, $run);
31323139
}
31333140

3134-
private function sideEffectResult(WorkflowHistoryEvent $event): mixed
3141+
private function sideEffectResult(WorkflowHistoryEvent $event, ?WorkflowRun $run = null): mixed
31353142
{
31363143
$serialized = $event->payload['result'] ?? null;
31373144

31383145
if (! is_string($serialized)) {
31393146
return null;
31403147
}
31413148

3149+
return $this->unserializePayloadWithRun($serialized, $run);
3150+
}
3151+
3152+
/**
3153+
* Decode a payload bytes string using the run's pinned codec, falling
3154+
* back to the legacy codec-blind sniffer when no run codec is available
3155+
* (history events written before payload_codec was populated).
3156+
*/
3157+
private function unserializePayloadWithRun(string $serialized, ?WorkflowRun $run): mixed
3158+
{
3159+
if ($run !== null && is_string($run->payload_codec) && $run->payload_codec !== '') {
3160+
return Serializer::unserializeWithCodec($run->payload_codec, $serialized);
3161+
}
3162+
31423163
return Serializer::unserialize($serialized);
31433164
}
31443165

0 commit comments

Comments
 (0)