Skip to content

Commit aeb5436

Browse files
authored
Allow writing Variant to files and test parquet-variant IO (#7945)
## Summary Integrate Variant better into the compressor so that it can be used with the default write strategy. --------- Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 19a1fb3 commit aeb5436

8 files changed

Lines changed: 248 additions & 76 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ jobs:
162162
target: wasm32-unknown-unknown
163163
env:
164164
rustflags: "RUSTFLAGS='-A warnings --cfg getrandom_backend=\"unsupported\"'"
165-
args: "--target wasm32-unknown-unknown --exclude vortex --exclude vortex-cuda --exclude vortex-cub --exclude vortex-nvcomp --exclude vortex-datafusion --exclude vortex-duckdb --exclude vortex-tui --exclude vortex-zstd --exclude vortex-test-e2e-cuda --exclude vortex-sqllogictest"
165+
args: "--target wasm32-unknown-unknown --exclude vortex --exclude vortex-cuda --exclude vortex-cub --exclude vortex-nvcomp --exclude vortex-datafusion --exclude vortex-duckdb --exclude vortex-tui --exclude vortex-zstd --exclude vortex-test-e2e-cuda --exclude vortex-sqllogictest --exclude vortex-parquet-variant"
166166
steps:
167167
- uses: runs-on/action@v2
168168
if: github.repository == 'vortex-data/vortex'

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ cargo_metadata = "0.23.1"
120120
cbindgen = "0.29.0"
121121
cc = "1.2"
122122
cfg-if = "1.0.1"
123-
chrono = "0.4.42"
123+
chrono = "0.4.44"
124124
clap = "4.5"
125125
criterion = "0.8"
126126
crossterm = "0.29"
@@ -289,6 +289,7 @@ vortex-ipc = { version = "0.1.0", path = "./vortex-ipc", default-features = fals
289289
vortex-layout = { version = "0.1.0", path = "./vortex-layout", default-features = false }
290290
vortex-mask = { version = "0.1.0", path = "./vortex-mask", default-features = false }
291291
vortex-metrics = { version = "0.1.0", path = "./vortex-metrics", default-features = false }
292+
vortex-parquet-variant = { version = "0.1.0", path = "./encodings/parquet-variant" }
292293
vortex-pco = { version = "0.1.0", path = "./encodings/pco", default-features = false }
293294
vortex-proto = { version = "0.1.0", path = "./vortex-proto", default-features = false }
294295
vortex-runend = { version = "0.1.0", path = "./encodings/runend", default-features = false }

encodings/parquet-variant/src/vtable.rs

Lines changed: 117 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -302,13 +302,16 @@ mod tests {
302302
use arrow_schema::DataType;
303303
use arrow_schema::Field;
304304
use parquet_variant_compute::VariantArray as ArrowVariantArray;
305+
use rstest::fixture;
306+
use rstest::rstest;
305307
use vortex_array::ArrayContext;
306308
use vortex_array::ArrayEq;
307309
use vortex_array::ArrayRef;
308310
use vortex_array::Canonical;
309311
use vortex_array::IntoArray;
310312
use vortex_array::LEGACY_SESSION;
311313
use vortex_array::Precision;
314+
use vortex_array::VTable;
312315
use vortex_array::VortexSessionExecute;
313316
use vortex_array::arrays::PrimitiveArray;
314317
use vortex_array::arrays::VarBinViewArray;
@@ -332,6 +335,7 @@ mod tests {
332335
use vortex_file::OpenOptionsSessionExt;
333336
use vortex_file::WriteOptionsSessionExt;
334337
use vortex_io::session::RuntimeSession;
338+
use vortex_layout::LayoutStrategy;
335339
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
336340
use vortex_layout::session::LayoutSession;
337341
use vortex_session::VortexSession;
@@ -340,28 +344,65 @@ mod tests {
340344
use crate::ParquetVariant;
341345
use crate::array::ParquetVariantArrayExt;
342346

343-
fn roundtrip(array: ArrayRef) -> ArrayRef {
347+
fn roundtrip(array: ArrayRef) -> VortexResult<ArrayRef> {
344348
let dtype = array.dtype().clone();
345349
let len = array.len();
346350

347351
let session = VortexSession::empty().with::<ArraySession>();
348352
session.arrays().register(ParquetVariant);
349353

350354
let ctx = ArrayContext::empty();
351-
let serialized = array
352-
.serialize(&ctx, &session, &SerializeOptions::default())
353-
.unwrap();
355+
let serialized = array.serialize(&ctx, &session, &SerializeOptions::default())?;
354356

355357
let mut concat = ByteBufferMut::empty();
356358
for buf in serialized {
357359
concat.extend_from_slice(buf.as_ref());
358360
}
359361
let concat = concat.freeze();
360362

361-
let parts = SerializedArray::try_from(concat).unwrap();
362-
parts
363-
.decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session)
364-
.unwrap()
363+
let parts = SerializedArray::try_from(concat)?;
364+
parts.decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session)
365+
}
366+
367+
#[fixture]
368+
fn typed_value_variant_array() -> VortexResult<ArrayRef> {
369+
let mut metadata = BinaryViewBuilder::new();
370+
for _ in 0..3 {
371+
metadata.append_value(b"\x01\x00");
372+
}
373+
let metadata: ArrowArrayRef = Arc::new(metadata.finish());
374+
let typed_value: ArrowArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
375+
let arrow_storage = StructArray::try_new(
376+
vec![
377+
Arc::new(Field::new("metadata", DataType::BinaryView, false)),
378+
Arc::new(Field::new("typed_value", DataType::Int32, false)),
379+
]
380+
.into(),
381+
vec![metadata, typed_value],
382+
None,
383+
)?;
384+
385+
ParquetVariant::from_arrow_variant(&ArrowVariantArray::try_new(&arrow_storage)?)
386+
}
387+
388+
#[fixture]
389+
fn parquet_variant_file_session() -> VortexSession {
390+
let session = VortexSession::empty()
391+
.with::<ArraySession>()
392+
.with::<LayoutSession>()
393+
.with::<RuntimeSession>();
394+
vortex_file::register_default_encodings(&session);
395+
session.arrays().register(ParquetVariant);
396+
session
397+
}
398+
399+
#[fixture]
400+
fn write_strategy() -> Arc<dyn LayoutStrategy> {
401+
let mut allowed = vortex_file::ALLOWED_ENCODINGS.clone();
402+
allowed.insert(ParquetVariant.id());
403+
vortex_file::WriteStrategyBuilder::default()
404+
.with_allow_encodings(allowed)
405+
.build()
365406
}
366407

367408
#[test]
@@ -404,42 +445,22 @@ mod tests {
404445
Ok(())
405446
}
406447

448+
#[rstest]
407449
#[tokio::test]
408-
async fn test_file_roundtrip_typed_value_variant() -> VortexResult<()> {
409-
let mut metadata = BinaryViewBuilder::new();
410-
for _ in 0..3 {
411-
metadata.append_value(b"\x01\x00");
412-
}
413-
let metadata: ArrowArrayRef = Arc::new(metadata.finish());
414-
let typed_value: ArrowArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
415-
let arrow_storage = StructArray::try_new(
416-
vec![
417-
Arc::new(Field::new("metadata", DataType::BinaryView, false)),
418-
Arc::new(Field::new("typed_value", DataType::Int32, false)),
419-
]
420-
.into(),
421-
vec![metadata, typed_value],
422-
None,
423-
)?;
424-
let expected =
425-
ParquetVariant::from_arrow_variant(&ArrowVariantArray::try_new(&arrow_storage)?)?;
426-
427-
let session = VortexSession::empty()
428-
.with::<ArraySession>()
429-
.with::<LayoutSession>()
430-
.with::<RuntimeSession>();
431-
vortex_file::register_default_encodings(&session);
432-
session.arrays().register(ParquetVariant);
450+
async fn test_file_roundtrip_typed_value_variant_with_statistics(
451+
#[from(typed_value_variant_array)] expected: VortexResult<ArrayRef>,
452+
parquet_variant_file_session: VortexSession,
453+
) -> VortexResult<()> {
454+
let expected = expected?;
433455

434456
let mut bytes = ByteBufferMut::empty();
435-
session
457+
parquet_variant_file_session
436458
.write_options()
437459
.with_strategy(Arc::new(FlatLayoutStrategy::default()))
438-
.with_file_statistics(Vec::new())
439460
.write(&mut bytes, expected.to_array_stream())
440461
.await?;
441462

442-
let actual = session
463+
let actual = parquet_variant_file_session
443464
.open_options()
444465
.open_buffer(bytes)?
445466
.scan()?
@@ -451,8 +472,36 @@ mod tests {
451472
Ok(())
452473
}
453474

454-
#[test]
455-
fn test_serde_roundtrip_typed_value_variant() {
475+
#[rstest]
476+
#[tokio::test]
477+
async fn test_file_roundtrip_typed_value_variant_with_zoned_strategy(
478+
#[from(typed_value_variant_array)] expected: VortexResult<ArrayRef>,
479+
parquet_variant_file_session: VortexSession,
480+
write_strategy: Arc<dyn LayoutStrategy>,
481+
) -> VortexResult<()> {
482+
let expected = expected?;
483+
484+
let mut bytes = ByteBufferMut::empty();
485+
parquet_variant_file_session
486+
.write_options()
487+
.with_strategy(write_strategy)
488+
.write(&mut bytes, expected.to_array_stream())
489+
.await?;
490+
491+
let actual = parquet_variant_file_session
492+
.open_options()
493+
.open_buffer(bytes)?
494+
.scan()?
495+
.into_array_stream()?
496+
.read_all()
497+
.await?;
498+
499+
assert_arrays_eq!(expected, actual);
500+
Ok(())
501+
}
502+
503+
#[rstest]
504+
fn test_serde_roundtrip_typed_value_variant() -> VortexResult<()> {
456505
let outer_metadata =
457506
VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array();
458507

@@ -464,48 +513,52 @@ mod tests {
464513
inner_metadata,
465514
Some(inner_value),
466515
None,
467-
)
468-
.unwrap();
469-
let typed_value = VariantArray::try_new(inner_pv.into_array(), None)
470-
.unwrap()
471-
.into_array();
516+
)?;
517+
let typed_value = VariantArray::try_new(inner_pv.into_array(), None)?.into_array();
472518

473519
let outer_pv = ParquetVariant::try_new(
474520
Validity::NonNullable,
475521
outer_metadata,
476522
None,
477523
Some(typed_value),
478-
)
479-
.unwrap();
524+
)?;
480525
let array = outer_pv.into_array();
481-
let decoded = roundtrip(array.clone());
526+
let decoded = roundtrip(array.clone())?;
482527

483528
assert!(array.array_eq(&decoded, Precision::Value));
484-
let decoded_pv = decoded.as_opt::<ParquetVariant>().unwrap();
485-
let typed = decoded_pv.typed_value_array().unwrap();
529+
let decoded_pv = decoded
530+
.as_opt::<ParquetVariant>()
531+
.ok_or_else(|| vortex_err!("expected parquet variant array"))?;
532+
let typed = decoded_pv
533+
.typed_value_array()
534+
.ok_or_else(|| vortex_err!("expected typed_value child"))?;
486535
assert_eq!(typed.dtype(), &DType::Variant(Nullability::NonNullable));
536+
Ok(())
487537
}
488538

489-
#[test]
490-
fn test_serde_roundtrip_with_nullable_validity() {
539+
#[rstest]
540+
fn test_serde_roundtrip_with_nullable_validity() -> VortexResult<()> {
491541
let metadata =
492542
VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array();
493543
let value = VarBinViewArray::from_iter_bin([b"\x10", b"\x11", b"\x12"]).into_array();
494544
let validity = Validity::from(BitBuffer::from_iter([true, false, true]));
495545

496-
let pv = ParquetVariant::try_new(validity, metadata, Some(value), None).unwrap();
546+
let pv = ParquetVariant::try_new(validity, metadata, Some(value), None)?;
497547
let array = pv.into_array();
498-
let decoded = roundtrip(array.clone());
548+
let decoded = roundtrip(array.clone())?;
499549

500550
assert!(array.array_eq(&decoded, Precision::Value));
501551
assert_eq!(decoded.dtype(), &DType::Variant(Nullability::Nullable));
502-
let decoded_pv = decoded.as_opt::<ParquetVariant>().unwrap();
552+
let decoded_pv = decoded
553+
.as_opt::<ParquetVariant>()
554+
.ok_or_else(|| vortex_err!("expected parquet variant array"))?;
503555
assert!(decoded_pv.value_array().is_some());
504556
assert!(decoded_pv.typed_value_array().is_none());
557+
Ok(())
505558
}
506559

507-
#[test]
508-
fn test_serde_roundtrip_typed_value_int32() {
560+
#[rstest]
561+
fn test_serde_roundtrip_typed_value_int32() -> VortexResult<()> {
509562
let outer_metadata =
510563
VarBinViewArray::from_iter_bin([b"\x01\x00", b"\x01\x00", b"\x01\x00"]).into_array();
511564
let typed_value = buffer![10i32, 20, 30].into_array();
@@ -515,17 +568,21 @@ mod tests {
515568
outer_metadata,
516569
None,
517570
Some(typed_value),
518-
)
519-
.unwrap();
571+
)?;
520572
let array = outer_pv.into_array();
521-
let decoded = roundtrip(array.clone());
573+
let decoded = roundtrip(array.clone())?;
522574

523575
assert!(array.array_eq(&decoded, Precision::Value));
524-
let decoded_pv = decoded.as_opt::<ParquetVariant>().unwrap();
525-
let typed = decoded_pv.typed_value_array().unwrap();
576+
let decoded_pv = decoded
577+
.as_opt::<ParquetVariant>()
578+
.ok_or_else(|| vortex_err!("expected parquet variant array"))?;
579+
let typed = decoded_pv
580+
.typed_value_array()
581+
.ok_or_else(|| vortex_err!("expected typed_value child"))?;
526582
assert_eq!(
527583
typed.dtype(),
528584
&DType::Primitive(PType::I32, Nullability::NonNullable)
529585
);
586+
Ok(())
530587
}
531588
}

0 commit comments

Comments
 (0)