Skip to content

Commit 3866768

Browse files
#362: loud typed codec-mismatch errors at ingress and self-describing Avro export
Two pieces of the release-gating Avro parity surface: 1. Loud, typed codec-mismatch ingress errors Add Workflow\Serializers\CodecDecodeException — a typed exception that names the declared codec, what the decoder actually saw, and a remediation hint. The Avro and Json codecs now throw it instead of a generic RuntimeException when the bytes don't match the declared codec. The ingress diagnosers cover the two most common cross-codec mistakes: - JSON bytes labeled `avro` (producer JSON-encoded but tagged it Avro) - Avro bytes labeled `json` (producer Avro-encoded but tagged it JSON) In both cases the message names the codec, describes the symptom, and tells the operator how to fix it ("change the codec tag to X" or "re-encode the payload"). The wrapping ValidationException at the HTTP ingress (PayloadEnvelopeResolver) preserves the typed message verbatim so it shows up in API responses without losing context. Covers acceptance criteria from the issue: - "Sending JSON bytes under an `avro` codec tag produces a loud, typed error — not silent garbage" - "Sending Avro bytes under a `json` codec tag produces a loud, typed error" - "Avro decode failure names the codec, expected schema, and a remediation — not a generic RuntimeException" 2. Self-describing Avro history-export bundles HistoryExport bundles now include a top-level `codec_schemas` map. When the bundle contains any payload encoded with the Avro codec (whether on the run, commands, updates, signals, or tasks), the map embeds the generic-wrapper schema JSON plus the documented prefix bytes (0x00 generic wrapper, 0x01 typed schema). An offline consumer reading the export — without the workflow runtime in scope — can decode the 0x00-prefixed Avro payloads using only the bundle. Bundles with no Avro payloads keep `codec_schemas: {}` so the field is always present and shape-stable, simplifying consumer code. Tests: 7 new ingress-mismatch unit tests; 2 new HistoryExport tests covering the Avro-present and Avro-absent codec_schemas cases. All 130 tests across the changed surface pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 362cc63 commit 3866768

6 files changed

Lines changed: 440 additions & 17 deletions

File tree

src/Serializers/Avro.php

Lines changed: 125 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use Apache\Avro\Datum\AvroIODatumWriter;
1111
use Apache\Avro\IO\AvroStringIO;
1212
use Apache\Avro\Schema\AvroSchema;
13-
use RuntimeException;
13+
use Throwable;
1414

1515
/**
1616
* Avro binary codec with optional schema support.
@@ -37,6 +37,24 @@ final class Avro implements SerializerInterface
3737
*/
3838
private const WRAPPER_SCHEMA = '{"type":"record","name":"Payload","namespace":"durable_workflow","fields":[{"name":"json","type":"string"},{"name":"version","type":"int","default":1}]}';
3939

40+
/**
41+
* Stable wire-protocol prefix bytes documented for SDK / export consumers.
42+
*/
43+
public const PREFIX_GENERIC_WRAPPER = "\x00";
44+
public const PREFIX_TYPED_SCHEMA = "\x01";
45+
46+
/**
47+
* The generic-wrapper schema as canonical JSON.
48+
*
49+
* Exposed so that history-export bundles and similar self-describing
50+
* artifacts can embed the schema needed to decode `0x00`-prefixed
51+
* Avro payloads offline, without coupling consumers to this class.
52+
*/
53+
public static function wrapperSchemaJson(): string
54+
{
55+
return self::WRAPPER_SCHEMA;
56+
}
57+
4058
private static ?AvroSchema $wrapperSchema = null;
4159

4260
/** @var AvroSchema|null Typed schema set by the caller for the current encode/decode. */
@@ -127,21 +145,46 @@ private static function decodeWithSchema(string $data, AvroSchema $schema): mixe
127145
return self::suppressDeprecations(function () use ($data, $schema): mixed {
128146
$bytes = base64_decode($data, true);
129147
if ($bytes === false) {
130-
throw new RuntimeException('Failed to base64-decode Avro payload.');
148+
self::failWithIngressDiagnosis($data);
131149
}
132150

133151
$io = new AvroStringIO($bytes);
134152

135153
// Read and verify prefix
136154
$prefix = $io->read(1);
137155
if ($prefix !== "\x01") {
138-
throw new RuntimeException('Expected typed Avro payload (prefix 0x01), got: 0x' . bin2hex($prefix));
156+
$schemaName = method_exists($schema, 'fullname') ? $schema->fullname() : null;
157+
throw new CodecDecodeException(
158+
'avro',
159+
sprintf(
160+
'Expected typed Avro payload (prefix 0x01) for schema "%s", got prefix 0x%s.',
161+
$schemaName ?: 'inline',
162+
bin2hex($prefix),
163+
),
164+
'Re-encode the payload with the typed Avro path against the matching writer schema, or change the codec tag to match the bytes you are sending.',
165+
);
139166
}
140167

141-
$reader = new AvroIODatumReader($schema);
142-
$decoder = new AvroIOBinaryDecoder($io);
143-
144-
return $reader->read($decoder);
168+
try {
169+
$reader = new AvroIODatumReader($schema);
170+
$decoder = new AvroIOBinaryDecoder($io);
171+
172+
return $reader->read($decoder);
173+
} catch (CodecDecodeException $e) {
174+
throw $e;
175+
} catch (Throwable $e) {
176+
$schemaName = method_exists($schema, 'fullname') ? $schema->fullname() : null;
177+
throw new CodecDecodeException(
178+
'avro',
179+
sprintf(
180+
'Avro datum reader failed against schema "%s": %s',
181+
$schemaName ?: 'inline',
182+
$e->getMessage(),
183+
),
184+
'Verify the writer schema matches the bytes (resolution: writer→reader compatibility per Avro spec). If you intended a different schema, supply it via Avro::withSchema() before decoding.',
185+
$e,
186+
);
187+
}
145188
});
146189
}
147190

@@ -175,29 +218,95 @@ private static function decodeWrapped(string $data): mixed
175218
return self::suppressDeprecations(function () use ($data): mixed {
176219
$bytes = base64_decode($data, true);
177220
if ($bytes === false) {
178-
throw new RuntimeException('Failed to base64-decode Avro payload.');
221+
self::failWithIngressDiagnosis($data);
179222
}
180223

181224
$io = new AvroStringIO($bytes);
182225

183226
// Read prefix
184227
$prefix = $io->read(1);
185228
if ($prefix === "\x01") {
186-
throw new RuntimeException('Typed Avro payload requires a schema context. Call Avro::withSchema() before unserialize().');
229+
throw new CodecDecodeException(
230+
'avro',
231+
'Typed Avro payload (prefix 0x01) decoded without a schema context.',
232+
'Call Avro::withSchema($writerSchema) before unserialize() so the typed payload can be read with its writer schema.',
233+
);
187234
}
188235
if ($prefix !== "\x00") {
189-
throw new RuntimeException('Unknown Avro payload prefix: 0x' . bin2hex($prefix));
236+
throw new CodecDecodeException(
237+
'avro',
238+
sprintf(
239+
'Unknown Avro payload prefix: 0x%s (expected 0x00 generic wrapper or 0x01 typed schema).',
240+
bin2hex($prefix),
241+
),
242+
'These bytes were not produced by Workflow\\Serializers\\Avro::serialize(). Re-encode the payload with the Avro codec, or change the codec tag if the producer used a different codec.',
243+
);
190244
}
191245

192-
$schema = self::wrapperSchema();
193-
$reader = new AvroIODatumReader($schema);
194-
$decoder = new AvroIOBinaryDecoder($io);
195-
$record = $reader->read($decoder);
196-
197-
return json_decode($record['json'], true, 512, JSON_THROW_ON_ERROR);
246+
try {
247+
$schema = self::wrapperSchema();
248+
$reader = new AvroIODatumReader($schema);
249+
$decoder = new AvroIOBinaryDecoder($io);
250+
$record = $reader->read($decoder);
251+
252+
return json_decode($record['json'], true, 512, JSON_THROW_ON_ERROR);
253+
} catch (CodecDecodeException $e) {
254+
throw $e;
255+
} catch (Throwable $e) {
256+
throw new CodecDecodeException(
257+
'avro',
258+
'Generic Avro wrapper decode failed: ' . $e->getMessage(),
259+
'Re-encode the payload with the Avro codec (generic wrapper produces a JSON-string-inside-Avro envelope), or change the codec tag if the producer used a different codec.',
260+
$e,
261+
);
262+
}
198263
});
199264
}
200265

266+
/**
267+
* Diagnose why the bytes labeled as Avro could not be base64-decoded
268+
* and throw a {@see CodecDecodeException} with the most actionable hint.
269+
*
270+
* The most common ingress mistakes are:
271+
* - A producer JSON-encoded the payload but tagged it `avro`. The bytes
272+
* will start with a JSON character ({, [, ", -, digit, t, f, n) and
273+
* base64_decode() in strict mode rejects them outright.
274+
* - A producer sent raw binary Avro bytes without base64-encoding them.
275+
*/
276+
private static function failWithIngressDiagnosis(string $data): never
277+
{
278+
if (self::looksLikeJson($data)) {
279+
throw new CodecDecodeException(
280+
'avro',
281+
'Payload bytes look like JSON, not base64-encoded Avro.',
282+
'The producer appears to have JSON-encoded the payload but tagged it with codec "avro". Either change the codec tag to "json", or re-encode the payload with Workflow\\Serializers\\Avro::serialize() before tagging it "avro".',
283+
);
284+
}
285+
286+
throw new CodecDecodeException(
287+
'avro',
288+
'Failed to base64-decode Avro payload bytes.',
289+
'Avro payloads on the wire must be base64-encoded bytes whose first byte is 0x00 (generic wrapper) or 0x01 (typed schema). Re-encode the payload, or change the codec tag if the producer used a different codec.',
290+
);
291+
}
292+
293+
private static function looksLikeJson(string $data): bool
294+
{
295+
if ($data === '') {
296+
return false;
297+
}
298+
299+
$first = $data[0];
300+
if ($first === '{' || $first === '[' || $first === '"') {
301+
return true;
302+
}
303+
if ($first === '-' || ($first >= '0' && $first <= '9')) {
304+
return true;
305+
}
306+
307+
return in_array($data, ['true', 'false', 'null'], true);
308+
}
309+
201310
private static function wrapperSchema(): AvroSchema
202311
{
203312
if (self::$wrapperSchema === null) {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\Serializers;
6+
7+
use RuntimeException;
8+
use Throwable;
9+
10+
/**
11+
* Thrown when a payload labelled with a specific codec cannot be decoded.
12+
*
13+
* The exception names the declared codec, describes what the decoder
14+
* actually saw, and includes a remediation hint so that operators looking
15+
* at a wire-protocol or HTTP-API failure can reach the right answer
16+
* without spelunking through the codec internals.
17+
*
18+
* Loud, typed ingress failures are required by the Avro release-gating
19+
* acceptance criteria — a JSON blob arriving under an `avro` codec tag
20+
* (or vice versa) must surface as a clearly attributable error instead
21+
* of a generic RuntimeException with binary noise.
22+
*
23+
* @see https://github.com/zorporation/durable-workflow/issues/362
24+
*/
25+
final class CodecDecodeException extends RuntimeException
26+
{
27+
public function __construct(
28+
public readonly string $declaredCodec,
29+
public readonly string $detail,
30+
public readonly string $remediation,
31+
?Throwable $previous = null,
32+
) {
33+
parent::__construct(
34+
sprintf('%s Remediation: %s', $detail, $remediation),
35+
0,
36+
$previous,
37+
);
38+
}
39+
}

src/Serializers/Json.php

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,45 @@ public static function unserialize(string $data)
5959
try {
6060
return json_decode($data, true, 512, JSON_THROW_ON_ERROR);
6161
} catch (JsonException $e) {
62-
throw new RuntimeException('Failed to JSON-decode payload: ' . $e->getMessage(), 0, $e);
62+
if (self::looksLikeBase64Avro($data)) {
63+
throw new CodecDecodeException(
64+
'json',
65+
'Payload bytes look like base64-encoded Avro, not JSON: ' . $e->getMessage(),
66+
'The blob is valid base64 starting with an Avro framing prefix (0x00 generic wrapper or 0x01 typed schema). Either change the codec tag to "avro", or re-encode the payload as JSON.',
67+
$e,
68+
);
69+
}
70+
71+
throw new CodecDecodeException(
72+
'json',
73+
'Failed to JSON-decode payload: ' . $e->getMessage(),
74+
'Re-encode the payload as valid UTF-8 JSON (RFC 8259), or change the codec tag if a different codec produced these bytes.',
75+
$e,
76+
);
77+
}
78+
}
79+
80+
/**
81+
* Heuristic: do these bytes look like base64-encoded Avro?
82+
*
83+
* The cheapest reliable check: pure base64 alphabet, base64_decode in
84+
* strict mode succeeds, and the first decoded byte is 0x00 (generic
85+
* Avro wrapper) or 0x01 (typed Avro schema). JSON ASCII text never
86+
* decodes to bytes leading with 0x00/0x01 because base64 alphabet
87+
* cannot represent control characters in source form, so this check
88+
* has effectively no false positives on misformatted JSON.
89+
*/
90+
private static function looksLikeBase64Avro(string $data): bool
91+
{
92+
if ($data === '' || preg_match('/^[A-Za-z0-9+\/]+={0,2}$/', $data) !== 1) {
93+
return false;
94+
}
95+
96+
$decoded = base64_decode($data, true);
97+
if ($decoded === false || $decoded === '') {
98+
return false;
6399
}
100+
101+
return $decoded[0] === "\x00" || $decoded[0] === "\x01";
64102
}
65103
}

src/V2/Support/HistoryExport.php

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Workflow\V2\Support;
66

77
use Carbon\CarbonInterface;
8+
use Workflow\Serializers\Avro;
89
use Workflow\Serializers\CodecRegistry;
910
use Closure;
1011
use InvalidArgumentException;
@@ -163,9 +164,65 @@ public static function forRun(
163164
],
164165
];
165166

167+
$bundle['codec_schemas'] = self::collectCodecSchemas($bundle);
168+
166169
return self::withIntegrity(self::withRedaction($bundle, $run, $redactor));
167170
}
168171

172+
/**
173+
* Collect the well-known wire schemas needed to decode the payloads in
174+
* this bundle offline.
175+
*
176+
* For Avro: embeds the generic-wrapper schema (used by every codec=avro
177+
* payload that does not carry a typed schema). Consumers reading the
178+
* export without the workflow runtime can use this to decode the
179+
* `0x00`-prefixed Avro payloads.
180+
*
181+
* For JSON and other self-describing codecs: the map is empty.
182+
*
183+
* @param array<string, mixed> $bundle
184+
* @return array<string, array<string, string>>
185+
*/
186+
private static function collectCodecSchemas(array $bundle): array
187+
{
188+
$schemas = [];
189+
190+
if (self::bundleUsesCodec($bundle, 'avro')) {
191+
$schemas['avro'] = [
192+
'wrapper_schema' => Avro::wrapperSchemaJson(),
193+
'wrapper_prefix_hex' => '00',
194+
'typed_prefix_hex' => '01',
195+
];
196+
}
197+
198+
return $schemas;
199+
}
200+
201+
/**
202+
* @param array<string, mixed> $bundle
203+
*/
204+
private static function bundleUsesCodec(array $bundle, string $codec): bool
205+
{
206+
$payloadsCodec = $bundle['payloads']['codec'] ?? null;
207+
if ($payloadsCodec === $codec) {
208+
return true;
209+
}
210+
211+
foreach (['commands', 'updates', 'signals', 'tasks'] as $section) {
212+
if (! isset($bundle[$section]) || ! is_array($bundle[$section])) {
213+
continue;
214+
}
215+
216+
foreach ($bundle[$section] as $row) {
217+
if (is_array($row) && ($row['payload_codec'] ?? null) === $codec) {
218+
return true;
219+
}
220+
}
221+
}
222+
223+
return false;
224+
}
225+
169226
/**
170227
* @param array<string, mixed> $bundle
171228
*

0 commit comments

Comments
 (0)