Skip to content

Commit f744141

Browse files
authored
Add Glue interchange and wire format header codec (#37019)
Add Glue wire format decoding to `mz-interchange`. This only supports uncompressed payloads at the moment, just to keep the changes to a minimum. Compression support to be added at a later date.
1 parent 6e38b96 commit f744141

2 files changed

Lines changed: 161 additions & 0 deletions

File tree

src/interchange/src/glue.rs

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
//! Wire-format helpers for the AWS Glue Schema Registry framing.
11+
//!
12+
//! Glue prepends an 18-byte header to each Kafka record payload:
13+
//!
14+
//! | Offset | Bytes | Meaning |
15+
//! |--------|-------|------------------------------------------------------|
16+
//! | 0 | 1 | Header version. Glue currently emits `0x03`. |
17+
//! | 1 | 1 | Compression byte. `0x00` = none, `0x05` = zlib. |
18+
//! | 2..18 | 16 | Schema-version UUID, big-endian. |
19+
//! | 18.. | N | The serialized record payload. |
20+
//!
21+
//! Not documented in any spec (that I could find it), ref to the source
22+
//! [serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/SerializationDataEncoder.java](https://github.com/awslabs/aws-glue-schema-registry/blob/4b9cac477d6876a883e2a8893738a30c072694dc/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/SerializationDataEncoder.java#L54-L70)
23+
//! Materialize only supports the uncompressed framing (`compression =
24+
//! 0x00`). Compressed records are rejected — supporting zlib at the wire
25+
//! layer is straightforward but no consumer in Materialize asks for it yet,
26+
//! and silently decompressing would mask producer misconfiguration.
27+
//!
28+
//! The Confluent analogue lives in [`crate::confluent`].
29+
30+
use anyhow::{Result, bail};
31+
use uuid::Uuid;
32+
33+
/// Glue wire-format header version, written at byte 0.
34+
const HEADER_VERSION: u8 = 0x03;
35+
36+
/// Compression byte indicating an uncompressed payload.
37+
const COMPRESSION_NONE: u8 = 0x00;
38+
39+
/// Length of the Glue header in bytes (version + compression + UUID).
40+
pub const HEADER_LEN: usize = 1 + 1 + 16;
41+
42+
/// Parse the Glue Avro header from the front of `buf`, returning the
43+
/// schema-version UUID and a subslice covering the record payload.
44+
///
45+
/// Returns an error if the buffer is shorter than the fixed header, if the
46+
/// header-version byte is not `0x03`, or if the compression byte is
47+
/// anything other than `0x00`.
48+
pub fn extract_avro_header(buf: &[u8]) -> Result<(Uuid, &[u8])> {
49+
if buf.len() < HEADER_LEN {
50+
bail!(
51+
"Glue-style avro datum is too few bytes: expected at least {} bytes, got {}",
52+
HEADER_LEN,
53+
buf.len()
54+
);
55+
}
56+
let version = buf[0];
57+
if version != HEADER_VERSION {
58+
bail!(
59+
"wrong Glue-style avro serialization header version: expected {:#04x}, got {:#04x}",
60+
HEADER_VERSION,
61+
version
62+
);
63+
}
64+
let compression = buf[1];
65+
if compression != COMPRESSION_NONE {
66+
bail!(
67+
"unsupported Glue-style avro compression byte: \
68+
expected {:#04x} (uncompressed), got {:#04x}",
69+
COMPRESSION_NONE,
70+
compression
71+
);
72+
}
73+
// `Uuid::from_slice` only fails on length mismatch, which we've already
74+
// validated above; the unwrap is sound.
75+
let uuid = Uuid::from_slice(&buf[2..HEADER_LEN]).expect("18-byte header validated above");
76+
Ok((uuid, &buf[HEADER_LEN..]))
77+
}
78+
79+
/// Frame `payload` with the Glue Avro header, producing a buffer suitable
80+
/// to publish to Kafka. The header is laid down using the uncompressed
81+
/// framing (`compression = 0x00`).
82+
pub fn prepend_avro_header(schema_version_id: Uuid, payload: &[u8]) -> Vec<u8> {
83+
let mut out = Vec::with_capacity(HEADER_LEN + payload.len());
84+
out.push(HEADER_VERSION);
85+
out.push(COMPRESSION_NONE);
86+
out.extend_from_slice(schema_version_id.as_bytes());
87+
out.extend_from_slice(payload);
88+
out
89+
}
90+
91+
#[cfg(test)]
92+
mod tests {
93+
use super::*;
94+
95+
fn fixture_uuid() -> Uuid {
96+
// Fixed value so the encoded byte layout is exact in assertions.
97+
Uuid::parse_str("12345678-1234-5678-1234-567812345678").unwrap()
98+
}
99+
100+
#[mz_ore::test]
101+
fn roundtrip() {
102+
let uuid = fixture_uuid();
103+
let payload = b"avro-bytes-here";
104+
let framed = prepend_avro_header(uuid, payload);
105+
assert_eq!(framed.len(), HEADER_LEN + payload.len());
106+
let (parsed_uuid, rest) = extract_avro_header(&framed).unwrap();
107+
assert_eq!(parsed_uuid, uuid);
108+
assert_eq!(rest, payload);
109+
}
110+
111+
#[mz_ore::test]
112+
fn header_byte_layout() {
113+
let uuid = fixture_uuid();
114+
let framed = prepend_avro_header(uuid, &[]);
115+
assert_eq!(framed[0], HEADER_VERSION);
116+
assert_eq!(framed[1], COMPRESSION_NONE);
117+
assert_eq!(&framed[2..HEADER_LEN], uuid.as_bytes());
118+
}
119+
120+
#[mz_ore::test]
121+
fn rejects_buffer_too_short() {
122+
// 17 bytes — one short of the minimum header.
123+
let buf = [0u8; HEADER_LEN - 1];
124+
let err = extract_avro_header(&buf).unwrap_err();
125+
assert!(err.to_string().contains("too few bytes"), "{err}");
126+
}
127+
128+
#[mz_ore::test]
129+
fn rejects_wrong_header_version() {
130+
let mut buf = prepend_avro_header(fixture_uuid(), b"payload");
131+
buf[0] = 0x02;
132+
let err = extract_avro_header(&buf).unwrap_err();
133+
assert!(
134+
err.to_string()
135+
.contains("wrong Glue-style avro serialization header version"),
136+
"{err}"
137+
);
138+
}
139+
140+
#[mz_ore::test]
141+
fn rejects_compressed_payload() {
142+
let mut buf = prepend_avro_header(fixture_uuid(), b"payload");
143+
buf[1] = 0x05; // zlib
144+
let err = extract_avro_header(&buf).unwrap_err();
145+
assert!(
146+
err.to_string()
147+
.contains("unsupported Glue-style avro compression byte"),
148+
"{err}"
149+
);
150+
}
151+
152+
#[mz_ore::test]
153+
fn empty_payload_is_legal() {
154+
let uuid = fixture_uuid();
155+
let framed = prepend_avro_header(uuid, &[]);
156+
let (parsed_uuid, rest) = extract_avro_header(&framed).unwrap();
157+
assert_eq!(parsed_uuid, uuid);
158+
assert!(rest.is_empty());
159+
}
160+
}

src/interchange/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub mod avro;
1515
pub mod confluent;
1616
pub mod encode;
1717
pub mod envelopes;
18+
pub mod glue;
1819
pub mod json;
1920
pub mod protobuf;
2021
pub mod text_binary;

0 commit comments

Comments
 (0)