Skip to content

Commit a2af603

Browse files
committed
Better scalar at for variants
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 4fc5e3d commit a2af603

2 files changed

Lines changed: 26 additions & 19 deletions

File tree

encodings/parquet-variant/src/operations.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,11 @@ use crate::ParquetVariantArrayExt;
3030
use crate::vtable::ParquetVariant;
3131

3232
impl OperationsVTable<ParquetVariant> for ParquetVariant {
33-
/// Resolves a single variant value according to the Parquet Variant shredding spec:
33+
/// Resolves one row according to the Parquet Variant shredding rules.
3434
///
35-
/// | value | typed_value | Meaning |
36-
/// |----------|-------------|------------------------------------------------------|
37-
/// | NULL | NULL | Missing (only valid for shredded object fields) |
38-
/// | non-NULL | NULL | Un-shredded: decode from metadata + value bytes |
39-
/// | NULL | non-NULL | Perfectly shredded: use typed_value directly |
40-
/// | non-NULL | non-NULL | Partially shredded object (typed_value takes priority) |
35+
/// For valid data, a row with both `value` and struct `typed_value` is a partially
36+
/// shredded object: recursively reconstruct shredded fields and merge them with the
37+
/// raw-only fields from `value`.
4138
fn scalar_at(
4239
array: ArrayView<'_, ParquetVariant>,
4340
index: usize,
@@ -76,6 +73,9 @@ fn scalar_from_variant_storage(
7673
index: usize,
7774
ctx: &mut ExecutionCtx,
7875
) -> VortexResult<Scalar> {
76+
// A valid typed row owns the logical value except for partially shredded
77+
// objects, where the raw `value` may carry object fields that were not
78+
// represented in `typed_value`.
7979
if let Some(typed_value) = typed_value
8080
&& typed_value.is_valid(index, ctx)?
8181
{
@@ -98,15 +98,15 @@ fn scalar_from_typed_value_array(
9898
index: usize,
9999
ctx: &mut ExecutionCtx,
100100
) -> VortexResult<Scalar> {
101-
let value_scalar = match value {
102-
Some(value) if value.is_valid(index, ctx)? => Some(value.execute_scalar(index, ctx)?),
101+
let typed_value_scalar = typed_value.execute_scalar(index, ctx)?;
102+
let value_scalar = match typed_value_scalar.dtype() {
103+
DType::Struct(..) => match value {
104+
Some(value) if value.is_valid(index, ctx)? => Some(value.execute_scalar(index, ctx)?),
105+
_ => None,
106+
},
103107
_ => None,
104108
};
105-
scalar_from_typed_value_scalar(
106-
metadata,
107-
value_scalar,
108-
typed_value.execute_scalar(index, ctx)?,
109-
)
109+
scalar_from_typed_value_scalar(metadata, value_scalar, typed_value_scalar)
110110
}
111111

112112
fn scalar_from_typed_value_scalar(
@@ -132,6 +132,8 @@ fn scalar_from_typed_value_scalar(
132132
Nullability::NonNullable,
133133
))
134134
}
135+
// A struct `typed_value` represents object shredding. It may be paired
136+
// with a raw object containing only fields absent from the typed struct.
135137
DType::Struct(..) => scalar_from_shredded_object_scalar(metadata, value, typed_value),
136138
_ => Ok(typed_value),
137139
}
@@ -225,6 +227,8 @@ fn scalar_from_shredded_object_scalar(
225227
.iter()
226228
.any(|typed_name| typed_name.as_ref() == name.as_ref())
227229
{
230+
// Invalid writers may duplicate a shredded field in `value`.
231+
// Keep the typed field so field reads remain consistent.
228232
continue;
229233
}
230234
let field = unshredded

vortex-array/src/arrays/variant/vtable/operations.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@ impl OperationsVTable<Variant> for Variant {
1717
index: usize,
1818
ctx: &mut ExecutionCtx,
1919
) -> VortexResult<Scalar> {
20-
let fallback = array.core_storage().execute_scalar(index, ctx)?;
21-
if fallback.is_null() {
22-
return Ok(fallback);
20+
let core_storage = array.core_storage();
21+
if !core_storage.is_valid(index, ctx)? {
22+
return Ok(Scalar::null(array.dtype().clone()));
2323
}
2424

2525
let Some(shredded) = array.shredded() else {
26-
return Ok(fallback);
26+
return core_storage.execute_scalar(index, ctx);
2727
};
2828

2929
let typed = shredded.execute_scalar(index, ctx)?;
30-
merge_typed_scalar_as_variant(typed, Some(fallback), array.dtype())
30+
let fallback = (typed.is_null() || typed.dtype().is_struct())
31+
.then(|| core_storage.execute_scalar(index, ctx))
32+
.transpose()?;
33+
merge_typed_scalar_as_variant(typed, fallback, array.dtype())
3134
}
3235
}

0 commit comments

Comments
 (0)