Skip to content

Commit f73922b

Browse files
authored
[Json] Support FixedSizeList in json decoder (#9715)
# Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. --> - Closes #9714. # Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> # What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Implement decoder for FixedSizeList # Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes # Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. If there are any breaking changes to public APIs, please call them out. --> New decoding type supported
1 parent 52ff63c commit f73922b

3 files changed

Lines changed: 284 additions & 2 deletions

File tree

arrow-json/src/lib.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,4 +390,42 @@ mod tests {
390390
assert_list_view_roundtrip::<i32>();
391391
assert_list_view_roundtrip::<i64>();
392392
}
393+
394+
#[test]
395+
fn test_json_roundtrip_fixed_size_list() {
396+
let inner = Arc::new(Field::new("item", DataType::Int32, true));
397+
let schema = Arc::new(Schema::new(vec![
398+
Field::new("flat", DataType::FixedSizeList(inner.clone(), 3), true),
399+
Field::new(
400+
"nested",
401+
DataType::FixedSizeList(
402+
Arc::new(Field::new("item", DataType::FixedSizeList(inner, 2), true)),
403+
2,
404+
),
405+
true,
406+
),
407+
]));
408+
409+
let input = r#"{"flat":[1,2,3],"nested":[[1,2],[3,4]]}
410+
{"flat":[4,null,5]}
411+
{"flat":[6,7,8],"nested":[[null,5],[6,null]]}
412+
"#
413+
.as_bytes();
414+
415+
let batches: Vec<RecordBatch> = ReaderBuilder::new(schema.clone())
416+
.with_batch_size(1024)
417+
.build(Cursor::new(input))
418+
.unwrap()
419+
.collect::<Result<Vec<_>, _>>()
420+
.unwrap();
421+
422+
let mut output = Vec::new();
423+
let mut writer = WriterBuilder::new().build::<_, LineDelimited>(&mut output);
424+
for batch in &batches {
425+
writer.write(batch).unwrap();
426+
}
427+
writer.finish().unwrap();
428+
429+
assert_eq!(input, &output);
430+
}
393431
}

arrow-json/src/reader/list_array.rs

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use std::marker::PhantomData;
1919
use std::sync::Arc;
2020

2121
use arrow_array::builder::BooleanBufferBuilder;
22-
use arrow_array::{ArrayRef, GenericListArray, GenericListViewArray, OffsetSizeTrait};
22+
use arrow_array::{
23+
ArrayRef, FixedSizeListArray, GenericListArray, GenericListViewArray, OffsetSizeTrait,
24+
};
2325
use arrow_buffer::buffer::NullBuffer;
2426
use arrow_buffer::{OffsetBuffer, ScalarBuffer};
2527
use arrow_schema::{ArrowError, DataType, FieldRef};
@@ -134,3 +136,86 @@ impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder for ListLikeArrayDeco
134136
}
135137
}
136138
}
139+
140+
pub struct FixedSizeListArrayDecoder {
141+
field: FieldRef,
142+
size: i32,
143+
decoder: Box<dyn ArrayDecoder>,
144+
ignore_type_conflicts: bool,
145+
is_nullable: bool,
146+
}
147+
148+
impl FixedSizeListArrayDecoder {
149+
pub fn new(
150+
ctx: &DecoderContext,
151+
data_type: &DataType,
152+
is_nullable: bool,
153+
) -> Result<Self, ArrowError> {
154+
let (field, size) = match data_type {
155+
DataType::FixedSizeList(f, s) => (f, *s),
156+
_ => unreachable!(),
157+
};
158+
let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;
159+
160+
Ok(Self {
161+
field: field.clone(),
162+
size,
163+
decoder,
164+
ignore_type_conflicts: ctx.ignore_type_conflicts(),
165+
is_nullable,
166+
})
167+
}
168+
}
169+
170+
impl ArrayDecoder for FixedSizeListArrayDecoder {
171+
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
172+
let expected = self.size as usize;
173+
let mut child_pos = Vec::with_capacity(pos.len() * expected);
174+
175+
let mut nulls = self
176+
.is_nullable
177+
.then(|| BooleanBufferBuilder::new(pos.len()));
178+
179+
for p in pos {
180+
let end_idx = match (tape.get(*p), nulls.as_mut()) {
181+
(TapeElement::StartList(end_idx), None) => end_idx,
182+
(TapeElement::StartList(end_idx), Some(nulls)) => {
183+
nulls.append(true);
184+
end_idx
185+
}
186+
(TapeElement::Null, Some(nulls)) => {
187+
nulls.append(false);
188+
child_pos.resize(child_pos.len() + expected, 0);
189+
continue;
190+
}
191+
(_, Some(nulls)) if self.ignore_type_conflicts => {
192+
nulls.append(false);
193+
child_pos.resize(child_pos.len() + expected, 0);
194+
continue;
195+
}
196+
_ => return Err(tape.error(*p, "[")),
197+
};
198+
199+
let child_start = child_pos.len();
200+
let mut cur_idx = *p + 1;
201+
while cur_idx < end_idx {
202+
child_pos.push(cur_idx);
203+
cur_idx = tape.next(cur_idx, "fixed-size list value")?;
204+
}
205+
206+
let actual = child_pos.len() - child_start;
207+
if actual != expected {
208+
return Err(ArrowError::JsonError(format!(
209+
"Incorrect number of elements for FixedSizeList, \
210+
expected {expected} but got {actual}"
211+
)));
212+
}
213+
}
214+
215+
let values = self.decoder.decode(tape, &child_pos)?;
216+
let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish()));
217+
218+
let array = FixedSizeListArray::try_new(self.field.clone(), self.size, values, nulls)?;
219+
Ok(Arc::new(array))
220+
}
221+
}

arrow-json/src/reader/mod.rs

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ use crate::reader::binary_array::{
151151
};
152152
use crate::reader::boolean_array::BooleanArrayDecoder;
153153
use crate::reader::decimal_array::DecimalArrayDecoder;
154-
use crate::reader::list_array::{ListArrayDecoder, ListViewArrayDecoder};
154+
use crate::reader::list_array::{
155+
FixedSizeListArrayDecoder, ListArrayDecoder, ListViewArrayDecoder,
156+
};
155157
use crate::reader::map_array::MapArrayDecoder;
156158
use crate::reader::null_array::NullArrayDecoder;
157159
use crate::reader::primitive_array::PrimitiveArrayDecoder;
@@ -835,6 +837,7 @@ fn make_decoder(
835837
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
836838
DataType::ListView(_) => Ok(Box::new(ListViewArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
837839
DataType::LargeListView(_) => Ok(Box::new(ListViewArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
840+
DataType::FixedSizeList(_, _) => Ok(Box::new(FixedSizeListArrayDecoder::new(ctx, data_type, is_nullable)?)),
838841
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
839842
DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
840843
DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
@@ -2308,6 +2311,152 @@ mod tests {
23082311
assert_read_list_view::<i64>();
23092312
}
23102313

2314+
#[test]
2315+
fn test_fixed_size_list() {
2316+
let buf = r#"
2317+
{"a": [1, 2, 3]}
2318+
{"a": [4, 5, 6]}
2319+
{"a": [7, 8, 9]}
2320+
"#;
2321+
2322+
let field = Field::new_list_field(DataType::Int32, true);
2323+
let schema = Arc::new(Schema::new(vec![Field::new(
2324+
"a",
2325+
DataType::FixedSizeList(Arc::new(field), 3),
2326+
false,
2327+
)]));
2328+
2329+
let batches = do_read(buf, 1024, false, false, schema);
2330+
assert_eq!(batches.len(), 1);
2331+
2332+
let col = batches[0].column(0).as_fixed_size_list();
2333+
assert_eq!(col.len(), 3);
2334+
assert_eq!(col.value_length(), 3);
2335+
2336+
let values = col.values().as_primitive::<Int32Type>();
2337+
assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
2338+
}
2339+
2340+
#[test]
2341+
fn test_fixed_size_list_nullable() {
2342+
let buf = r#"
2343+
{"a": [1, 2]}
2344+
{"a": null}
2345+
{"a": [3, null]}
2346+
"#;
2347+
2348+
let field = Field::new_list_field(DataType::Int32, true);
2349+
let schema = Arc::new(Schema::new(vec![Field::new(
2350+
"a",
2351+
DataType::FixedSizeList(Arc::new(field), 2),
2352+
true,
2353+
)]));
2354+
2355+
let batches = do_read(buf, 1024, false, false, schema);
2356+
assert_eq!(batches.len(), 1);
2357+
2358+
let col = batches[0].column(0).as_fixed_size_list();
2359+
assert_eq!(col.len(), 3);
2360+
assert!(col.is_valid(0));
2361+
assert!(col.is_null(1));
2362+
assert!(col.is_valid(2));
2363+
2364+
let values = col.values().as_primitive::<Int32Type>();
2365+
assert_eq!(values.value(0), 1);
2366+
assert_eq!(values.value(1), 2);
2367+
assert_eq!(values.value(4), 3);
2368+
assert!(values.is_null(5));
2369+
}
2370+
2371+
#[test]
2372+
fn test_fixed_size_list_wrong_size() {
2373+
let buf = r#"{"a": [1, 2, 3]}"#;
2374+
2375+
let field = Field::new_list_field(DataType::Int32, true);
2376+
let schema = Arc::new(Schema::new(vec![Field::new(
2377+
"a",
2378+
DataType::FixedSizeList(Arc::new(field), 2),
2379+
false,
2380+
)]));
2381+
2382+
let err = ReaderBuilder::new(schema)
2383+
.build(Cursor::new(buf.as_bytes()))
2384+
.unwrap()
2385+
.next()
2386+
.unwrap()
2387+
.unwrap_err();
2388+
2389+
assert!(err.to_string().contains("expected 2 but got 3"), "{}", err);
2390+
}
2391+
2392+
#[test]
2393+
fn test_fixed_size_list_nested() {
2394+
let buf = r#"
2395+
{"a": [[1, 2], [3, 4]]}
2396+
{"a": [[5, 6], [7, 8]]}
2397+
"#;
2398+
2399+
let inner_field = Field::new_list_field(DataType::Int32, true);
2400+
let inner_type = DataType::FixedSizeList(Arc::new(inner_field), 2);
2401+
let outer_field = Arc::new(Field::new_list_field(inner_type.clone(), true));
2402+
let schema = Arc::new(Schema::new(vec![Field::new(
2403+
"a",
2404+
DataType::FixedSizeList(outer_field, 2),
2405+
false,
2406+
)]));
2407+
2408+
let batches = do_read(buf, 1024, false, false, schema);
2409+
assert_eq!(batches.len(), 1);
2410+
2411+
let col = batches[0].column(0).as_fixed_size_list();
2412+
assert_eq!(col.len(), 2);
2413+
assert_eq!(col.value_length(), 2);
2414+
2415+
let inner = col.values().as_fixed_size_list();
2416+
assert_eq!(inner.len(), 4);
2417+
assert_eq!(inner.value_length(), 2);
2418+
2419+
let values = inner.values().as_primitive::<Int32Type>();
2420+
assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8]);
2421+
}
2422+
2423+
#[test]
2424+
fn test_fixed_size_list_ignore_type_conflicts() {
2425+
let field = Field::new("item", DataType::Int32, true);
2426+
let schema = Arc::new(Schema::new(vec![Field::new(
2427+
"a",
2428+
DataType::FixedSizeList(Arc::new(field), 2),
2429+
true,
2430+
)]));
2431+
2432+
let json = vec![
2433+
json!({"a": [1, 2]}),
2434+
json!({"a": "not a list"}),
2435+
json!({"a": 42}),
2436+
json!({"a": [6, 7]}),
2437+
];
2438+
2439+
let mut decoder = ReaderBuilder::new(schema)
2440+
.with_ignore_type_conflicts(true)
2441+
.build_decoder()
2442+
.unwrap();
2443+
decoder.serialize(&json).unwrap();
2444+
let batch = decoder.flush().unwrap().unwrap();
2445+
2446+
let col = batch.column(0).as_fixed_size_list();
2447+
assert_eq!(col.len(), 4);
2448+
assert!(col.is_valid(0));
2449+
assert!(col.is_null(1)); // string -> null
2450+
assert!(col.is_null(2)); // number -> null
2451+
assert!(col.is_valid(3));
2452+
2453+
let values = col.values().as_primitive::<Int32Type>();
2454+
assert_eq!(values.value(0), 1);
2455+
assert_eq!(values.value(1), 2);
2456+
assert_eq!(values.value(6), 6);
2457+
assert_eq!(values.value(7), 7);
2458+
}
2459+
23112460
#[test]
23122461
fn test_skip_empty_lines() {
23132462
let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
@@ -3256,6 +3405,11 @@ mod tests {
32563405
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
32573406
false,
32583407
),
3408+
Field::new(
3409+
"fixed_size_list",
3410+
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 2),
3411+
false,
3412+
),
32593413
Field::new(
32603414
"map",
32613415
DataType::Map(
@@ -3312,6 +3466,11 @@ mod tests {
33123466
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
33133467
true,
33143468
),
3469+
Field::new(
3470+
"fixed_size_list",
3471+
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 2),
3472+
true,
3473+
),
33153474
Field::new(
33163475
"map",
33173476
DataType::Map(

0 commit comments

Comments
 (0)