Skip to content

Commit 820476e

Browse files
#362: stamp row-local payload_codec on signal/update history exports
Mixed-codec runs (JSON run payload with an Avro signal or update) could previously produce history-export bundles where the update and signal rows lacked their own payload_codec field. collectCodecSchemas() already scanned those rows for Avro, so an Avro signal/update went undetected and the offline consumer never got the avro wrapper schema. - HistoryExport::update() and ::signal() now emit payload_codec alongside the encoded arguments/result, so codec_schemas detection and offline decode both see row-local codec metadata. - Fix latent typo where update export read \$update->name instead of the real \$update->update_name column; the exported 'name' field was always null, including in redaction-context callbacks. - Regression test asserts mixed-codec bundle carries row-local payload_codec and triggers codec_schemas.avro even when the run itself is JSON-coded. Existing full-snapshot test now asserts signal payload_codec round-trips.
1 parent 9cbb14e commit 820476e

2 files changed

Lines changed: 104 additions & 1 deletion

File tree

src/V2/Support/HistoryExport.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,7 @@ private static function update(WorkflowUpdate $update): array
841841
'command_id' => $update->workflow_command_id,
842842
'command_sequence' => $update->command_sequence,
843843
'workflow_sequence' => $update->workflow_sequence,
844-
'name' => $update->name,
844+
'name' => $update->update_name,
845845
'status' => $update->status->value,
846846
'outcome' => $update->outcome?->value,
847847
'rejection_reason' => $update->rejection_reason,
@@ -851,6 +851,7 @@ private static function update(WorkflowUpdate $update): array
851851
'applied_at' => self::timestamp($update->applied_at),
852852
'rejected_at' => self::timestamp($update->rejected_at),
853853
'closed_at' => self::timestamp($update->closed_at),
854+
'payload_codec' => $update->payload_codec,
854855
'arguments' => $update->arguments,
855856
'result' => $update->result,
856857
];
@@ -879,6 +880,7 @@ private static function signal(WorkflowSignal $signal): array
879880
'applied_at' => self::timestamp($signal->applied_at),
880881
'rejected_at' => self::timestamp($signal->rejected_at),
881882
'closed_at' => self::timestamp($signal->closed_at),
883+
'payload_codec' => $signal->payload_codec,
882884
'arguments' => $signal->arguments,
883885
];
884886
}

tests/Unit/V2/HistoryExportTest.php

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
use Workflow\V2\Models\WorkflowSignal;
3434
use Workflow\V2\Models\WorkflowTask;
3535
use Workflow\V2\Models\WorkflowTimer;
36+
use Workflow\V2\Models\WorkflowUpdate;
3637
use Workflow\V2\Support\ActivitySnapshot;
3738
use Workflow\V2\Support\HistoryExport;
3839
use Workflow\V2\Support\RunLineageProjector;
@@ -329,6 +330,7 @@ public function testItBuildsVersionedReplayBundleFromTypedHistoryAndProjections(
329330
$this->assertSame($signal->id, $bundle['signals'][0]['id']);
330331
$this->assertSame('approved-by', $bundle['signals'][0]['name']);
331332
$this->assertSame('applied', $bundle['signals'][0]['status']);
333+
$this->assertSame(config('workflows.serializer'), $bundle['signals'][0]['payload_codec']);
332334
$this->assertSame($task->id, $bundle['tasks'][0]['id']);
333335
$this->assertSame($activity->id, $bundle['activities'][0]['id']);
334336
$this->assertSame($activity->id, $bundle['activities'][0]['idempotency_key']);
@@ -1486,6 +1488,105 @@ public function testItEmbedsAvroWrapperSchemaWhenBundleContainsAvroPayloads(): v
14861488
$this->assertSame('01', $bundle['codec_schemas']['avro']['typed_prefix_hex']);
14871489
}
14881490

1491+
public function testItStampsRowLocalPayloadCodecOnSignalAndUpdateExportRows(): void
1492+
{
1493+
if (! class_exists(\Apache\Avro\Schema\AvroSchema::class)) {
1494+
$this->markTestSkipped('apache/avro package is not installed in this environment.');
1495+
}
1496+
1497+
config()->set('workflows.serializer', 'json');
1498+
$run = $this->createMinimalCompletedRun('history-export-mixed-codec');
1499+
$run->forceFill(['payload_codec' => 'json'])->save();
1500+
1501+
$instance = $run->instance;
1502+
1503+
$signalCommand = WorkflowCommand::record($instance, $run, [
1504+
'command_type' => CommandType::Signal->value,
1505+
'target_scope' => 'instance',
1506+
'payload_codec' => config('workflows.serializer'),
1507+
'payload' => Serializer::serialize([
1508+
'name' => 'approved-by',
1509+
'arguments' => ['Taylor'],
1510+
]),
1511+
'source' => 'webhook',
1512+
'status' => CommandStatus::Accepted->value,
1513+
'outcome' => CommandOutcome::SignalReceived->value,
1514+
'accepted_at' => now()->subMinute(),
1515+
'applied_at' => now()->subMinute(),
1516+
]);
1517+
1518+
$signal = WorkflowSignal::query()->create([
1519+
'workflow_command_id' => $signalCommand->id,
1520+
'workflow_instance_id' => $instance->id,
1521+
'workflow_run_id' => $run->id,
1522+
'target_scope' => 'instance',
1523+
'resolved_workflow_run_id' => $run->id,
1524+
'signal_name' => 'approved-by',
1525+
'signal_wait_id' => 'signal-wait-avro',
1526+
'status' => 'applied',
1527+
'outcome' => 'signal_received',
1528+
'command_sequence' => $signalCommand->command_sequence,
1529+
'workflow_sequence' => 1,
1530+
'payload_codec' => 'avro',
1531+
'arguments' => 'avro-encoded-signal-blob',
1532+
'received_at' => now()->subMinute(),
1533+
'applied_at' => now()->subMinute(),
1534+
'closed_at' => now()->subMinute(),
1535+
]);
1536+
1537+
$updateCommand = WorkflowCommand::record($instance, $run, [
1538+
'command_type' => CommandType::Update->value,
1539+
'target_scope' => 'instance',
1540+
'payload_codec' => config('workflows.serializer'),
1541+
'payload' => Serializer::serialize([
1542+
'name' => 'mark-approved',
1543+
'arguments' => [true, 'api'],
1544+
]),
1545+
'source' => 'api',
1546+
'status' => CommandStatus::Accepted->value,
1547+
'outcome' => CommandOutcome::UpdateCompleted->value,
1548+
'accepted_at' => now()->subMinute(),
1549+
'applied_at' => now()->subMinute(),
1550+
]);
1551+
1552+
$update = WorkflowUpdate::query()->create([
1553+
'workflow_command_id' => $updateCommand->id,
1554+
'workflow_instance_id' => $instance->id,
1555+
'workflow_run_id' => $run->id,
1556+
'command_sequence' => $updateCommand->command_sequence,
1557+
'workflow_sequence' => 1,
1558+
'update_name' => 'mark-approved',
1559+
'status' => 'completed',
1560+
'outcome' => 'update_completed',
1561+
'payload_codec' => 'avro',
1562+
'arguments' => 'avro-encoded-update-args',
1563+
'result' => 'avro-encoded-update-result',
1564+
'accepted_at' => now()->subMinute(),
1565+
'applied_at' => now()->subMinute(),
1566+
'closed_at' => now()->subMinute(),
1567+
]);
1568+
1569+
$bundle = HistoryExport::forRun($run->refresh(), Carbon::parse('2026-04-17 14:00:00'));
1570+
1571+
$signalRow = collect($bundle['signals'])->firstWhere('id', $signal->id);
1572+
$this->assertIsArray($signalRow);
1573+
$this->assertSame('avro', $signalRow['payload_codec']);
1574+
$this->assertSame('avro-encoded-signal-blob', $signalRow['arguments']);
1575+
1576+
$updateRow = collect($bundle['updates'])->firstWhere('id', $update->id);
1577+
$this->assertIsArray($updateRow);
1578+
$this->assertSame('avro', $updateRow['payload_codec']);
1579+
$this->assertSame('mark-approved', $updateRow['name']);
1580+
$this->assertSame('avro-encoded-update-args', $updateRow['arguments']);
1581+
$this->assertSame('avro-encoded-update-result', $updateRow['result']);
1582+
1583+
// The run itself is JSON-coded, but the Avro signal/update rows must
1584+
// trigger codec_schemas.avro so an offline consumer has the wrapper
1585+
// schema needed to decode those blobs.
1586+
$this->assertArrayHasKey('avro', $bundle['codec_schemas']);
1587+
$this->assertSame('00', $bundle['codec_schemas']['avro']['wrapper_prefix_hex']);
1588+
}
1589+
14891590
public function testItOmitsAvroSchemasWhenBundleHasNoAvroPayloads(): void
14901591
{
14911592
config()->set('workflows.serializer', 'json');

0 commit comments

Comments
 (0)