diff --git a/src/avro/src/decode.rs b/src/avro/src/decode.rs index d7db61163057c..74ddf824ad2b7 100644 --- a/src/avro/src/decode.rs +++ b/src/avro/src/decode.rs @@ -22,6 +22,7 @@ // of which can be found in the LICENSE file at the root of this repository. use std::cmp; +use std::collections::BTreeSet; use std::fmt::{self, Display}; use std::fs::File; use std::io::{self, Cursor, Read, Seek, SeekFrom}; @@ -31,8 +32,8 @@ use flate2::read::MultiGzDecoder; use crate::error::{DecodeError, Error as AvroError}; use crate::schema::{ - RecordField, ResolvedDefaultValueField, ResolvedRecordField, SchemaNode, SchemaPiece, - SchemaPieceOrNamed, + RecordField, ResolvedDefaultValueField, ResolvedRecordField, Schema, SchemaNode, SchemaPiece, + SchemaPieceOrNamed, SchemaPieceRefOrNamed, }; use crate::types::{Scalar, Value}; use crate::util::{TsUnit, safe_len, zag_i32, zag_i64}; @@ -156,6 +157,152 @@ mod tests { ) ); } + + #[mz_ore::test] + fn array_block_len_bounded_by_remaining_input() { + // A tiny body claiming a huge array block must error, not allocate. A + // small (or hostile) message used to drive an unbounded `Vec` by + // claiming a multi-million-element block whose items decode from ~no + // input (e.g. an empty record). Regression for an OOM found by fuzzing. + use std::str::FromStr; + + use super::{AvroDeserializer, GeneralDeserializer}; + use crate::util::zig_i64; + use crate::{Schema, ValueDecoder}; + + let schema = Schema::from_str(r#"{"type": "array", "items": "long"}"#).unwrap(); + let mut body = Vec::new(); + zig_i64(8_000_000, &mut body); // block count dwarfs the (here, empty) element data + let dsr = GeneralDeserializer { + schema: schema.top_node(), + }; + let mut reader: &[u8] = &body; + let res = dsr.deserialize(&mut reader, ValueDecoder); + assert!( + res.is_err(), + "an array block longer than the remaining input must be rejected, not allocated" + ); + } + + #[mz_ore::test] + fn zero_width_array_elements_decode() { + // The remaining-input bound must not reject valid arrays whose elements + // encode to zero bytes. `null` and empty records have no per-element byte + // floor, so a ten-element block legitimately follows its count with no + // element bytes at all (Materialize's own writer emits `array` of + // ten as `[20, 0]`: block count 10, then the terminating zero block). + use std::str::FromStr; + + use super::{AvroDeserializer, GeneralDeserializer}; + use crate::types::Value; + use crate::util::zig_i64; + use crate::{Schema, ValueDecoder}; + + for (items, want) in [ + (r#""null""#, Value::Null), + ( + r#"{"type": "record", "name": "Empty", "fields": []}"#, + Value::Record(vec![]), + ), + ] { + let schema = + Schema::from_str(&format!(r#"{{"type": "array", "items": {items}}}"#)).unwrap(); + let mut body = Vec::new(); + zig_i64(10, &mut body); // ten elements... + body.push(0); // ...then the terminating zero block. No element bytes. + let dsr = GeneralDeserializer { + schema: schema.top_node(), + }; + let mut reader: &[u8] = &body; + let decoded = dsr + .deserialize(&mut reader, ValueDecoder) + .expect("a zero-width array element type must decode, not be rejected"); + assert_eq!(decoded, Value::Array(vec![want; 10])); + } + } + + #[mz_ore::test] + fn valid_null_array_falsely_rejected() { + // Materialize's encoder emits no element bytes for `null` array + // elements, so decoding must not assume each element consumes at least + // one byte of remaining input. + use std::str::FromStr; + + use super::{AvroDeserializer, GeneralDeserializer}; + use crate::encode::encode_to_vec; + use crate::types::Value; + use crate::{Schema, ValueDecoder}; + + let schema = Schema::from_str(r#"{"type": "array", "items": "null"}"#).unwrap(); + let value = Value::Array(vec![Value::Null, Value::Null]); + let body = encode_to_vec(&value, &schema); + + let dsr = GeneralDeserializer { + schema: schema.top_node(), + }; + let mut reader: &[u8] = &body; + let res = dsr.deserialize(&mut reader, ValueDecoder); + assert!( + res.is_ok(), + "an encoder-produced array of nulls should round-trip, but got: {res:?}" + ); + } + + #[mz_ore::test] + fn zero_width_array_elements_decode_across_blocks() { + // Zero-width arrays can be encoded as multiple blocks. The cumulative + // cap must not reject ordinary valid data below the limit. + use std::str::FromStr; + + use super::{AvroDeserializer, GeneralDeserializer}; + use crate::types::Value; + use crate::util::zig_i64; + use crate::{Schema, ValueDecoder}; + + let schema = Schema::from_str(r#"{"type": "array", "items": "null"}"#).unwrap(); + let mut body = Vec::new(); + zig_i64(4, &mut body); + zig_i64(6, &mut body); + body.push(0); + + let dsr = GeneralDeserializer { + schema: schema.top_node(), + }; + let mut reader: &[u8] = &body; + let decoded = dsr + .deserialize(&mut reader, ValueDecoder) + .expect("zero-width arrays may span multiple blocks below the cap"); + assert_eq!(decoded, Value::Array(vec![Value::Null; 10])); + } + + #[mz_ore::test] + fn zero_width_array_total_len_bounded_across_blocks() { + // A zero-width element type has no input-proportional per-block bound. + // Keep a cumulative cap so repeated legal-size blocks cannot drive an + // unbounded decode. Seed the array access at the cap to test the edge + // without walking 16M null elements first. + use std::str::FromStr; + + use super::{AvroArrayAccess, MAX_BLOCK_ELEMENTS, SimpleArrayAccess}; + use crate::util::zig_i64; + use crate::{Schema, TrivialDecoder}; + + let schema = Schema::from_str(r#""null""#).unwrap(); + let mut body = Vec::new(); + zig_i64(1, &mut body); + + let mut reader: &[u8] = &body; + let mut access = SimpleArrayAccess::new(&mut reader, schema.top_node()); + access.total = MAX_BLOCK_ELEMENTS; + + let err = access + .decode_next(TrivialDecoder) + .expect_err("a new block past the cumulative array limit must be rejected"); + assert!( + err.to_string().contains("Avro array total length"), + "unexpected error: {err}" + ); + } } /// A convenience function to build timestamp values from underlying longs. @@ -211,6 +358,17 @@ pub trait Skip: Read { } Ok(()) } + + /// An upper bound, if cheaply known, on the number of bytes still readable + /// from this source. Used to reject an array/map block that claims more + /// elements than the input could possibly contain: each element consumes at + /// least zero bytes, so a block longer than the remaining input only happens + /// when a small (or hostile) message claims a huge count, which would + /// otherwise drive an unbounded `Vec` allocation (length amplification). + /// Streaming sources that can't answer cheaply return `None`. + fn remaining_input(&self) -> Option { + None + } } impl Skip for File { @@ -226,12 +384,20 @@ impl Skip for &[u8] { *self = &self[len..]; Ok(()) } + + fn remaining_input(&self) -> Option { + Some(self.len()) + } } impl Skip for Box { fn skip(&mut self, len: usize) -> Result<(), io::Error> { self.as_mut().skip(len) } + + fn remaining_input(&self) -> Option { + self.as_ref().remaining_input() + } } impl> Skip for Cursor { @@ -239,6 +405,11 @@ impl> Skip for Cursor { self.seek(SeekFrom::Current(len as i64))?; Ok(()) } + + fn remaining_input(&self) -> Option { + let total = self.get_ref().as_ref().len(); + Some(total.saturating_sub(usize::try_from(self.position()).unwrap_or(usize::MAX))) + } } impl Skip for MultiGzDecoder {} @@ -497,6 +668,27 @@ impl<'a, R: AvroRead> AvroMapAccess for SimpleMapAccess<'a, R> { } _ => unreachable!(), }; + // See `SimpleArrayAccess::decode_next` — same `MAX_BLOCK_ELEMENTS` + // bound applies; a wire-claimed block length above the cap + // would let the decode loop OOM / overflow allocation. + if len > MAX_BLOCK_ELEMENTS { + return Err(AvroError::Decode(DecodeError::Custom(format!( + "Avro map block length {len} exceeds limit {MAX_BLOCK_ELEMENTS}" + )))); + } + // A block can't hold more entries than there are bytes left to + // decode them from; reject a count that claims otherwise rather than + // letting it drive an unbounded allocation (see `Skip::remaining_input`). + // Unlike an array item, every map entry encodes at least a one-byte + // key-length varint, so each entry has a guaranteed one-byte floor and + // a count above the remaining input is always bogus. + if let Some(remaining) = self.r.remaining_input() { + if len > remaining { + return Err(AvroError::Decode(DecodeError::Custom(format!( + "Avro map block length {len} exceeds remaining input ({remaining} bytes)" + )))); + } + } self.remaining = len; } assert!(self.remaining > 0); @@ -522,6 +714,7 @@ struct SimpleArrayAccess<'a, R: AvroRead> { r: &'a mut R, schema: SchemaNode<'a>, remaining: usize, + total: usize, done: bool, } @@ -531,6 +724,7 @@ impl<'a, R: AvroRead> SimpleArrayAccess<'a, R> { r, schema, remaining: 0, + total: 0, done: false, } } @@ -560,6 +754,110 @@ impl<'a> AvroArrayAccess for ValueArrayAccess<'a> { } } +/// Sanity cap on the element count Avro arrays/maps can claim from the wire. +/// Arrays apply this both per block and cumulatively; maps apply it per block. +/// Without it, a malicious or corrupt file can claim up to `i64::MAX` items and +/// the generic array/map decode loop will run until it eventually OOMs or hits +/// `Vec` capacity-overflow. +const MAX_BLOCK_ELEMENTS: usize = 1 << 24; + +/// A *lower* bound on the number of bytes any value of `schema` encodes to on +/// the wire. +/// +/// Used to reject an array block whose claimed element count could not possibly +/// fit in the remaining input: a block of `len` elements occupies at least +/// `len * min_encoded_len` bytes. Only an under-estimate is ever safe here — an +/// over-estimate would reject valid data — so anything whose floor we can't +/// prove (schema-resolution pieces, named-type recursion cycles) contributes +/// `0`, which simply relaxes the bound. +/// +/// Crucially this returns `0` for zero-width types — `null`, an empty record, a +/// record of only such fields — because those genuinely encode to no bytes. +/// Materialize's own writer emits a ten-element `array` as `[20, 0]`, so a +/// blanket "count must not exceed remaining bytes" rule would reject valid +/// input. For zero-width element types the caller falls back to the cumulative +/// array `MAX_BLOCK_ELEMENTS` cap. +fn min_encoded_len(schema: SchemaNode) -> usize { + let mut visited = BTreeSet::new(); + min_encoded_len_piece(schema.root, schema.inner, &mut visited) +} + +/// Resolves a (possibly named) schema reference, guarding against named-type +/// cycles, then defers to [`min_encoded_len_piece`]. +fn min_encoded_len_or_named( + root: &Schema, + node: SchemaPieceRefOrNamed, + visited: &mut BTreeSet, +) -> usize { + match node { + SchemaPieceRefOrNamed::Piece(piece) => min_encoded_len_piece(root, piece, visited), + SchemaPieceRefOrNamed::Named(idx) => { + // A named-type cycle can only close through a record field; treat + // the back-edge as zero-width so we never over-estimate. + if !visited.insert(idx) { + return 0; + } + let len = min_encoded_len_piece(root, &root.lookup(idx).piece, visited); + visited.remove(&idx); + len + } + } +} + +fn min_encoded_len_piece( + root: &Schema, + piece: &SchemaPiece, + visited: &mut BTreeSet, +) -> usize { + match piece { + // Encodes to nothing at all. + SchemaPiece::Null => 0, + // A single byte (zig-zag varint of 0 is one byte; a bool is one byte). + SchemaPiece::Boolean + | SchemaPiece::Int + | SchemaPiece::Long + | SchemaPiece::Date + | SchemaPiece::TimestampMilli + | SchemaPiece::TimestampMicro => 1, + SchemaPiece::Float => 4, + SchemaPiece::Double => 8, + // `fixed`-backed decimals are exactly their size; `bytes`-backed ones, + // like `bytes`/`string`, carry at least a one-byte length varint. + SchemaPiece::Decimal { + fixed_size: Some(size), + .. + } => *size, + SchemaPiece::Decimal { + fixed_size: None, .. + } + | SchemaPiece::Bytes + | SchemaPiece::String + | SchemaPiece::Json + | SchemaPiece::Uuid => 1, + // An empty array/map encodes as a single zero-count byte regardless of + // the element type, so don't recurse into it. + SchemaPiece::Array(_) | SchemaPiece::Map(_) => 1, + // A union always writes at least its one-byte branch index. + SchemaPiece::Union(_) => 1, + // An enum writes a one-byte symbol index. + SchemaPiece::Enum { .. } => 1, + SchemaPiece::Fixed { size } => *size, + // A record's encoding is its fields' encodings concatenated, so its + // floor is the sum of the fields' floors — which can be `0` (the empty + // record, or a record of only `null`/empty-record fields). + SchemaPiece::Record { fields, .. } => fields.iter().fold(0, |acc, field| { + acc.saturating_add(min_encoded_len_or_named( + root, + field.schema.as_ref(), + visited, + )) + }), + // Schema-resolution pieces only arise on the reader/writer-mismatch + // path; we don't try to prove a floor for them. + _ => 0, + } +} + impl<'a, R: AvroRead> AvroArrayAccess for SimpleArrayAccess<'a, R> { fn decode_next(&mut self, d: D) -> Result, AvroError> { if self.done { @@ -576,6 +874,37 @@ impl<'a, R: AvroRead> AvroArrayAccess for SimpleArrayAccess<'a, R> { } _ => unreachable!(), }; + if len > MAX_BLOCK_ELEMENTS { + return Err(AvroError::Decode(DecodeError::Custom(format!( + "Avro array block length {len} exceeds limit {MAX_BLOCK_ELEMENTS}" + )))); + } + let total = self.total.saturating_add(len); + if total > MAX_BLOCK_ELEMENTS { + return Err(AvroError::Decode(DecodeError::Custom(format!( + "Avro array total length {total} exceeds limit {MAX_BLOCK_ELEMENTS}" + )))); + } + // A block of `len` items occupies at least `len * min_elem` bytes, + // so a count needing more than the remaining input can't be honest; + // reject it rather than let it drive an unbounded allocation (see + // `Skip::remaining_input`). Unlike a map entry — which always carries + // at least a one-byte key-length varint — an array item can encode to + // zero bytes (`null`, an empty record), so this bound only applies + // when the element type has a proven positive byte floor. For + // zero-width element types (`min_elem == 0`) we rely on the + // cumulative `MAX_BLOCK_ELEMENTS` cap; otherwise a valid datum such as a + // ten-element `array` (encoded as `[20, 0]`) would be wrongly + // rejected. + if let Some(remaining) = self.r.remaining_input() { + let min_elem = min_encoded_len(self.schema); + if min_elem > 0 && len.saturating_mul(min_elem) > remaining { + return Err(AvroError::Decode(DecodeError::Custom(format!( + "Avro array block length {len} exceeds remaining input ({remaining} bytes)" + )))); + } + } + self.total = total; self.remaining = len; } assert!(self.remaining > 0); @@ -1328,8 +1657,40 @@ pub struct GeneralDeserializer<'a> { pub schema: SchemaNode<'a>, } +/// Cap on recursive `GeneralDeserializer::deserialize` calls. Avro records +/// may reference themselves (`{"name":"X","type":"record","fields":[ +/// {"name":"x","type":"X"}]}`), so a malicious file plus matching wire +/// bytes can recurse forever and overflow the stack. +const MAX_DECODE_DEPTH: usize = 128; + +thread_local! { + static DECODE_DEPTH: std::cell::Cell = const { std::cell::Cell::new(0) }; +} + +struct DecodeDepthGuard; +impl DecodeDepthGuard { + fn enter() -> Result { + DECODE_DEPTH.with(|d| { + let new = d.get() + 1; + if new > MAX_DECODE_DEPTH { + return Err(AvroError::Decode(DecodeError::Custom(format!( + "Avro decode depth exceeds limit {MAX_DECODE_DEPTH}" + )))); + } + d.set(new); + Ok(DecodeDepthGuard) + }) + } +} +impl Drop for DecodeDepthGuard { + fn drop(&mut self) { + DECODE_DEPTH.with(|d| d.set(d.get().saturating_sub(1))); + } +} + impl<'a> AvroDeserializer for GeneralDeserializer<'a> { fn deserialize(self, r: &mut R, d: D) -> Result { + let _guard = DecodeDepthGuard::enter()?; use ValueOrReader::Reader; match self.schema.inner { SchemaPiece::Null => d.scalar(Scalar::Null), diff --git a/src/avro/src/reader.rs b/src/avro/src/reader.rs index 8d68b70afe50e..0775ab57b4f8e 100644 --- a/src/avro/src/reader.rs +++ b/src/avro/src/reader.rs @@ -267,9 +267,15 @@ impl Reader { assert!(self.is_empty(), "Expected self to be empty!"); match util::read_long(&mut self.inner) { Ok(block_len) => { - self.messages_remaining = block_len as usize; - let block_bytes = util::read_long(&mut self.inner)?; - self.fill_buf(block_bytes as usize)?; + // The object count is read straight from the wire; cap it like + // every other wire-read length (and reject negatives, which wrap + // to a huge `usize`). Otherwise a crafted block with a huge count + // and a zero-byte schema (e.g. `null`) makes the reader spin + // decoding billions of empty values. Found by the reader_decode + // cargo-fuzz target. + self.messages_remaining = util::safe_len(block_len as usize)?; + let block_bytes = util::safe_len(util::read_long(&mut self.inner)? as usize)?; + self.fill_buf(block_bytes)?; let mut marker = [0u8; 16]; self.inner.read_exact(&mut marker)?; @@ -686,9 +692,13 @@ impl<'a> SchemaResolver<'a> { let (index, w_inner) = w_inner .match_ref(other, &self.reader_to_writer_names) .ok_or_else(|| { + // `other` is the reader's concrete node (the second element + // of the match), so its name must be looked up in the + // reader's schema. Using `writer.root` here indexed the + // writer's (possibly empty) `named` table out of bounds. SchemaResolutionError::new( format!("No matching schema in writer union for reader type `{}` for field `{}`", - other.get_human_name(writer.root), + other.get_human_name(reader.root), self.get_current_human_readable_path())) })?; let inner = Box::new(self.resolve(writer.step(w_inner), reader.step_ref(other))?); @@ -873,9 +883,18 @@ impl<'a> SchemaResolver<'a> { match self.resolve_named(writer.root, reader.root, w_index, r_index) { Ok(piece) => piece, Err(e) => { - // clean up the placeholder values that were added above. - self.named.pop(); - self.reader_to_resolved_names.remove(&r_index); + // Roll back to the state before this node. We must remove not + // only this node's placeholder but also any nested named nodes + // resolved while resolving it: they live at indices + // `>= resolved_idx` and are unreachable now that this resolution + // failed. A plain `pop()` removed only the last one, orphaning a + // `None` placeholder (which a later `Option::unwrap` panics on) + // whenever a nested node had been pushed. Union resolution stores + // rather than propagates this error, so the orphan would survive. + self.named.truncate(resolved_idx); + self.reader_to_resolved_names + .retain(|_, v| *v < resolved_idx); + self.indices.retain(|_, v| *v < resolved_idx); return Err(e); } }; @@ -938,6 +957,29 @@ mod tests { use super::*; + #[mz_ore::test] + fn reader_rejects_huge_block_object_count() { + // A crafted object-container block with a huge object count and a + // zero-byte (`null`) schema must be rejected, not spin decoding billions + // of empty values. Regression for the reader_decode cargo-fuzz timeout. + let bytes: &[u8] = &[ + 0x4f, 0x62, 0x6a, 0x01, 0x04, 0x16, 0x61, 0x76, 0x72, 0x6f, 0x2e, 0x73, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x22, + 0x6e, 0x75, 0x6c, 0x6c, 0x22, 0x20, 0x00, 0x00, 0x00, 0x64, 0x64, 0x64, 0x64, 0x64, + 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0xbf, 0x00, 0x26, + 0x35, 0x33, 0x39, 0x33, 0x34, 0x38, 0x33, 0xcd, 0x45, 0x38, 0x56, 0xb1, 0x00, 0x00, + 0x64, 0x64, 0x7a, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, + 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, + ]; + let reader = Reader::new(bytes).expect("OCF header parses"); + // Iteration must terminate with an error (the oversized count is + // rejected), not hang. + assert!( + reader.into_iter().any(|item| item.is_err()), + "expected a decode error from the oversized block count" + ); + } + static SCHEMA: &str = r#" { "type": "record", diff --git a/src/avro/src/schema.rs b/src/avro/src/schema.rs index 1bb203f98819b..53b525e3f1712 100644 --- a/src/avro/src/schema.rs +++ b/src/avro/src/schema.rs @@ -864,12 +864,19 @@ impl PartialEq for UnionSchema { struct SchemaParser { named: Vec>, indices: BTreeMap, + depth: usize, } +/// Cap on nested-type depth while parsing an Avro schema JSON. The parser +/// recurses through `parse_inner` for every nested `record`/`array`/`map`/ +/// `union` so an untrusted schema like `{"type":{"type":{"type":...}}}` +/// can otherwise overflow the stack. +const MAX_SCHEMA_DEPTH: usize = 128; + impl SchemaParser { fn parse(mut self, value: &Value) -> Result { let top = self.parse_inner("", value)?; - let SchemaParser { named, indices } = self; + let SchemaParser { named, indices, .. } = self; Ok(Schema { named: named.into_iter().map(|o| o.unwrap()).collect(), indices, @@ -881,6 +888,24 @@ impl SchemaParser { &mut self, default_namespace: &str, value: &Value, + ) -> Result { + self.depth += 1; + if self.depth > MAX_SCHEMA_DEPTH { + self.depth -= 1; + return Err(ParseSchemaError::new(format!( + "schema nesting depth exceeds limit {MAX_SCHEMA_DEPTH}" + )) + .into()); + } + let result = self.parse_inner_impl(default_namespace, value); + self.depth -= 1; + result + } + + fn parse_inner_impl( + &mut self, + default_namespace: &str, + value: &Value, ) -> Result { match *value { Value::String(ref t) => { @@ -1399,6 +1424,7 @@ impl Schema { let p = SchemaParser { named: named.into_iter().map(Some).collect(), indices, + depth: 0, }; p.parse(primary) } diff --git a/src/avro/tests/io.rs b/src/avro/tests/io.rs index ff09d8da966a6..3ee2a1a7aa59e 100644 --- a/src/avro/tests/io.rs +++ b/src/avro/tests/io.rs @@ -312,6 +312,43 @@ fn test_no_default_value() -> Result<(), String> { } } +#[mz_ore::test] +fn test_resolve_named_against_union_does_not_panic() { + // Regression: resolving a named type against a union in which it doesn't + // appear used to look the name up in the *other* schema's `named` table and + // index it out of bounds. Both directions must resolve to an error, not + // panic. (Found by the schema_resolve fuzz target.) + let enum_schema = + Schema::from_str(r#"{"type":"enum","name":"N1","symbols":["A","B","C"]}"#).unwrap(); + let union_schema = Schema::from_str(r#"["null",{"type":"map","values":"null"}]"#).unwrap(); + assert!(resolve_schemas(&enum_schema, &union_schema).is_err()); + assert!(resolve_schemas(&union_schema, &enum_schema).is_err()); +} + +#[mz_ore::test] +fn test_resolve_failed_nested_named_rolls_back() { + // Regression: when a record resolution fails *after* a nested named type was + // already resolved, the cleanup popped the wrong slot and orphaned a `None` + // placeholder that a later `Option::unwrap` panicked on. The reader's extra + // no-default fields make the inner resolution fail, and the surrounding union + // stores (rather than propagates) that error, so the orphan would survive. + // Must not panic. (Found by the schema_resolve fuzz target.) + let writer = Schema::from_str( + r#"["null",{"type":"record","name":"N1","fields":[ + {"name":"f0","type":{"type":"record","name":"N2","fields":[{"name":"f0","type":"double"}]}}]}]"#, + ) + .unwrap(); + let reader = Schema::from_str( + r#"["null",{"type":"record","name":"N1","fields":[ + {"name":"f0","type":{"type":"record","name":"N2","fields":[]}}, + {"name":"f1","type":"null"}, + {"name":"f2","type":"null"}]}]"#, + ) + .unwrap(); + let _ = resolve_schemas(&writer, &reader); + let _ = resolve_schemas(&reader, &writer); +} + #[mz_ore::test] fn test_union_default() { let reader_schema = Schema::from_str(