Skip to content

Commit 951bdf9

Browse files
fix: array correctly update their validity (#7307)
After #7287 The validity of some arrays as in-corrected cached. This PR fixes that. --------- Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent b0cf87b commit 951bdf9

File tree

17 files changed

+194
-209
lines changed

17 files changed

+194
-209
lines changed

encodings/fsst/public-api.lock

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub type vortex_fsst::FSST::ArrayData = vortex_fsst::FSSTData
2222

2323
pub type vortex_fsst::FSST::OperationsVTable = vortex_fsst::FSST
2424

25-
pub type vortex_fsst::FSST::ValidityVTable = vortex_array::array::vtable::validity::ValidityVTableFromChild
25+
pub type vortex_fsst::FSST::ValidityVTable = vortex_fsst::FSST
2626

2727
pub fn vortex_fsst::FSST::append_to_builder(array: vortex_array::array::view::ArrayView<'_, Self>, builder: &mut dyn vortex_array::builders::ArrayBuilder, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<()>
2828

@@ -52,9 +52,9 @@ impl vortex_array::array::vtable::operations::OperationsVTable<vortex_fsst::FSST
5252

5353
pub fn vortex_fsst::FSST::scalar_at(array: vortex_array::array::view::ArrayView<'_, vortex_fsst::FSST>, index: usize, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
5454

55-
impl vortex_array::array::vtable::validity::ValidityChild<vortex_fsst::FSST> for vortex_fsst::FSST
55+
impl vortex_array::array::vtable::validity::ValidityVTable<vortex_fsst::FSST> for vortex_fsst::FSST
5656

57-
pub fn vortex_fsst::FSST::validity_child(array: vortex_array::array::view::ArrayView<'_, vortex_fsst::FSST>) -> vortex_array::array::erased::ArrayRef
57+
pub fn vortex_fsst::FSST::validity(array: vortex_array::array::view::ArrayView<'_, vortex_fsst::FSST>) -> vortex_error::VortexResult<vortex_array::validity::Validity>
5858

5959
impl vortex_array::arrays::dict::take::TakeExecute for vortex_fsst::FSST
6060

encodings/fsst/src/array.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ use vortex_array::serde::ArrayChildren;
3737
use vortex_array::validity::Validity;
3838
use vortex_array::vtable;
3939
use vortex_array::vtable::VTable;
40-
use vortex_array::vtable::ValidityChild;
41-
use vortex_array::vtable::ValidityVTableFromChild;
40+
use vortex_array::vtable::ValidityVTable;
41+
use vortex_array::vtable::child_to_validity;
4242
use vortex_array::vtable::validity_to_child;
4343
use vortex_buffer::Buffer;
4444
use vortex_buffer::ByteBuffer;
@@ -98,7 +98,7 @@ impl ArrayEq for FSSTData {
9898
impl VTable for FSST {
9999
type ArrayData = FSSTData;
100100
type OperationsVTable = Self;
101-
type ValidityVTable = ValidityVTableFromChild;
101+
type ValidityVTable = Self;
102102

103103
fn id(&self) -> ArrayId {
104104
Self::ID
@@ -319,9 +319,8 @@ pub(crate) const SLOT_NAMES: [&str; NUM_SLOTS] =
319319
pub struct FSSTData {
320320
symbols: Buffer<Symbol>,
321321
symbol_lengths: Buffer<u8>,
322+
// TODO(joe): this was broken by a previous pr. This will not be updated if a slot is replaced.
322323
codes: VarBinArray,
323-
/// NOTE(ngates): this === codes, but is stored as an ArrayRef so we can return &ArrayRef!
324-
codes_array: ArrayRef,
325324

326325
/// Memoized compressor used for push-down of compute by compressing the RHS.
327326
compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
@@ -505,13 +504,10 @@ impl FSSTData {
505504
Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
506505
})
507506
as Box<dyn Fn() -> Compressor + Send>));
508-
let codes_array = codes.clone().into_array();
509-
510507
Self {
511508
symbols,
512509
symbol_lengths,
513510
codes,
514-
codes_array,
515511
compressor,
516512
}
517513
}
@@ -577,9 +573,12 @@ pub trait FSSTArrayExt: TypedArrayRef<FSST> {
577573

578574
impl<T: TypedArrayRef<FSST>> FSSTArrayExt for T {}
579575

580-
impl ValidityChild<FSST> for FSST {
581-
fn validity_child(array: ArrayView<'_, FSST>) -> ArrayRef {
582-
array.codes_array.clone()
576+
impl ValidityVTable<FSST> for FSST {
577+
fn validity(array: ArrayView<'_, FSST>) -> VortexResult<Validity> {
578+
Ok(child_to_validity(
579+
&array.slots()[CODES_VALIDITY_SLOT],
580+
array.dtype().nullability(),
581+
))
583582
}
584583
}
585584

encodings/pco/public-api.lock

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub type vortex_pco::Pco::ArrayData = vortex_pco::PcoData
2222

2323
pub type vortex_pco::Pco::OperationsVTable = vortex_pco::Pco
2424

25-
pub type vortex_pco::Pco::ValidityVTable = vortex_array::array::vtable::validity::ValidityVTableFromValiditySliceHelper
25+
pub type vortex_pco::Pco::ValidityVTable = vortex_pco::Pco
2626

2727
pub fn vortex_pco::Pco::buffer(array: vortex_array::array::view::ArrayView<'_, Self>, idx: usize) -> vortex_array::buffer::BufferHandle
2828

@@ -42,12 +42,16 @@ pub fn vortex_pco::Pco::serialize(array: vortex_array::array::view::ArrayView<'_
4242

4343
pub fn vortex_pco::Pco::slot_name(_array: vortex_array::array::view::ArrayView<'_, Self>, idx: usize) -> alloc::string::String
4444

45-
pub fn vortex_pco::Pco::validate(&self, data: &vortex_pco::PcoData, dtype: &vortex_array::dtype::DType, len: usize, _slots: &[core::option::Option<vortex_array::array::erased::ArrayRef>]) -> vortex_error::VortexResult<()>
45+
pub fn vortex_pco::Pco::validate(&self, data: &vortex_pco::PcoData, dtype: &vortex_array::dtype::DType, len: usize, slots: &[core::option::Option<vortex_array::array::erased::ArrayRef>]) -> vortex_error::VortexResult<()>
4646

4747
impl vortex_array::array::vtable::operations::OperationsVTable<vortex_pco::Pco> for vortex_pco::Pco
4848

4949
pub fn vortex_pco::Pco::scalar_at(array: vortex_array::array::view::ArrayView<'_, vortex_pco::Pco>, index: usize, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::scalar::Scalar>
5050

51+
impl vortex_array::array::vtable::validity::ValidityVTable<vortex_pco::Pco> for vortex_pco::Pco
52+
53+
pub fn vortex_pco::Pco::validity(array: vortex_array::array::view::ArrayView<'_, vortex_pco::Pco>) -> vortex_error::VortexResult<vortex_array::validity::Validity>
54+
5155
impl vortex_array::arrays::slice::SliceReduce for vortex_pco::Pco
5256

5357
pub fn vortex_pco::Pco::slice(array: vortex_array::array::view::ArrayView<'_, Self>, range: core::ops::range::Range<usize>) -> vortex_error::VortexResult<core::option::Option<vortex_array::array::erased::ArrayRef>>
@@ -82,7 +86,7 @@ pub struct vortex_pco::PcoData
8286

8387
impl vortex_pco::PcoData
8488

85-
pub fn vortex_pco::PcoData::decompress(&self, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::primitive::vtable::PrimitiveArray>
89+
pub fn vortex_pco::PcoData::decompress(&self, unsliced_validity: &vortex_array::validity::Validity, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::primitive::vtable::PrimitiveArray>
8690

8791
pub fn vortex_pco::PcoData::from_array(array: vortex_array::array::erased::ArrayRef, level: usize, nums_per_page: usize) -> vortex_error::VortexResult<Self>
8892

@@ -92,9 +96,9 @@ pub fn vortex_pco::PcoData::is_empty(&self) -> bool
9296

9397
pub fn vortex_pco::PcoData::len(&self) -> usize
9498

95-
pub fn vortex_pco::PcoData::new(chunk_metas: alloc::vec::Vec<vortex_buffer::ByteBuffer>, pages: alloc::vec::Vec<vortex_buffer::ByteBuffer>, ptype: vortex_array::dtype::ptype::PType, metadata: vortex_pco::PcoMetadata, len: usize, validity: vortex_array::validity::Validity) -> Self
99+
pub fn vortex_pco::PcoData::new(chunk_metas: alloc::vec::Vec<vortex_buffer::ByteBuffer>, pages: alloc::vec::Vec<vortex_buffer::ByteBuffer>, ptype: vortex_array::dtype::ptype::PType, metadata: vortex_pco::PcoMetadata, len: usize) -> Self
96100

97-
pub fn vortex_pco::PcoData::validate(&self, dtype: &vortex_array::dtype::DType, len: usize) -> vortex_error::VortexResult<()>
101+
pub fn vortex_pco::PcoData::validate(&self, dtype: &vortex_array::dtype::DType, len: usize, validity: &vortex_array::validity::Validity) -> vortex_error::VortexResult<()>
98102

99103
impl core::clone::Clone for vortex_pco::PcoData
100104

@@ -104,10 +108,6 @@ impl core::fmt::Debug for vortex_pco::PcoData
104108

105109
pub fn vortex_pco::PcoData::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
106110

107-
impl vortex_array::array::vtable::validity::ValiditySliceHelper for vortex_pco::PcoData
108-
109-
pub fn vortex_pco::PcoData::unsliced_validity_and_slice(&self) -> (&vortex_array::validity::Validity, usize, usize)
110-
111111
impl vortex_array::hash::ArrayEq for vortex_pco::PcoData
112112

113113
pub fn vortex_pco::PcoData::array_eq(&self, other: &Self, precision: vortex_array::hash::Precision) -> bool

encodings/pco/src/array.rs

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ use vortex_array::validity::Validity;
4242
use vortex_array::vtable;
4343
use vortex_array::vtable::OperationsVTable;
4444
use vortex_array::vtable::VTable;
45-
use vortex_array::vtable::ValiditySliceHelper;
46-
use vortex_array::vtable::ValidityVTableFromValiditySliceHelper;
45+
use vortex_array::vtable::ValidityVTable;
46+
use vortex_array::vtable::child_to_validity;
4747
use vortex_array::vtable::validity_to_child;
4848
use vortex_buffer::BufferMut;
4949
use vortex_buffer::ByteBuffer;
@@ -83,7 +83,6 @@ vtable!(Pco, Pco, PcoData);
8383

8484
impl ArrayHash for PcoData {
8585
fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
86-
self.unsliced_validity.array_hash(state, precision);
8786
self.unsliced_n_rows.hash(state);
8887
self.slice_start.hash(state);
8988
self.slice_stop.hash(state);
@@ -99,10 +98,7 @@ impl ArrayHash for PcoData {
9998

10099
impl ArrayEq for PcoData {
101100
fn array_eq(&self, other: &Self, precision: Precision) -> bool {
102-
if !self
103-
.unsliced_validity
104-
.array_eq(&other.unsliced_validity, precision)
105-
|| self.unsliced_n_rows != other.unsliced_n_rows
101+
if self.unsliced_n_rows != other.unsliced_n_rows
106102
|| self.slice_start != other.slice_start
107103
|| self.slice_stop != other.slice_stop
108104
|| self.chunk_metas.len() != other.chunk_metas.len()
@@ -128,7 +124,7 @@ impl VTable for Pco {
128124
type ArrayData = PcoData;
129125

130126
type OperationsVTable = Self;
131-
type ValidityVTable = ValidityVTableFromValiditySliceHelper;
127+
type ValidityVTable = Self;
132128

133129
fn id(&self) -> ArrayId {
134130
Self::ID
@@ -139,9 +135,10 @@ impl VTable for Pco {
139135
data: &PcoData,
140136
dtype: &DType,
141137
len: usize,
142-
_slots: &[Option<ArrayRef>],
138+
slots: &[Option<ArrayRef>],
143139
) -> VortexResult<()> {
144-
data.validate(dtype, len)
140+
let validity = child_to_validity(&slots[0], dtype.nullability());
141+
data.validate(dtype, len, &validity)
145142
}
146143

147144
fn nbuffers(array: ArrayView<'_, Self>) -> usize {
@@ -206,14 +203,7 @@ impl VTable for Pco {
206203
vortex_ensure!(pages.len() == expected_n_pages);
207204

208205
let slots = vec![validity_to_child(&validity, len)];
209-
let data = PcoData::new(
210-
chunk_metas,
211-
pages,
212-
dtype.as_ptype(),
213-
metadata,
214-
len,
215-
validity,
216-
);
206+
let data = PcoData::new(chunk_metas, pages, dtype.as_ptype(), metadata, len);
217207
Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
218208
}
219209

@@ -222,7 +212,14 @@ impl VTable for Pco {
222212
}
223213

224214
fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
225-
Ok(ExecutionResult::done(array.decompress(ctx)?.into_array()))
215+
let unsliced_validity =
216+
child_to_validity(&array.as_ref().slots()[0], array.dtype().nullability());
217+
Ok(ExecutionResult::done(
218+
array
219+
.data()
220+
.decompress(&unsliced_validity, ctx)?
221+
.into_array(),
222+
))
226223
}
227224

228225
fn reduce_parent(
@@ -273,13 +270,14 @@ pub struct Pco;
273270
impl Pco {
274271
pub const ID: ArrayId = ArrayId::new_ref("vortex.pco");
275272

276-
pub(crate) fn try_new(dtype: DType, data: PcoData) -> VortexResult<PcoArray> {
273+
pub(crate) fn try_new(
274+
dtype: DType,
275+
data: PcoData,
276+
validity: Validity,
277+
) -> VortexResult<PcoArray> {
277278
let len = data.len();
278-
data.validate(&dtype, len)?;
279-
let slots = vec![validity_to_child(
280-
&data.unsliced_validity,
281-
data.unsliced_n_rows,
282-
)];
279+
data.validate(&dtype, len, &validity)?;
280+
let slots = vec![validity_to_child(&validity, data.unsliced_n_rows())];
283281
Ok(unsafe {
284282
Array::from_parts_unchecked(ArrayParts::new(Pco, dtype, len, data).with_slots(slots))
285283
})
@@ -292,8 +290,9 @@ impl Pco {
292290
values_per_page: usize,
293291
) -> VortexResult<PcoArray> {
294292
let dtype = parray.dtype().clone();
293+
let validity = parray.validity()?;
295294
let data = PcoData::from_primitive(parray, level, values_per_page)?;
296-
Self::try_new(dtype, data)
295+
Self::try_new(dtype, data, validity)
297296
}
298297
}
299298

@@ -307,14 +306,13 @@ pub struct PcoData {
307306
pub(crate) pages: Vec<ByteBuffer>,
308307
pub(crate) metadata: PcoMetadata,
309308
ptype: PType,
310-
pub(crate) unsliced_validity: Validity,
311309
unsliced_n_rows: usize,
312310
slice_start: usize,
313311
slice_stop: usize,
314312
}
315313

316314
impl PcoData {
317-
pub fn validate(&self, dtype: &DType, len: usize) -> VortexResult<()> {
315+
pub fn validate(&self, dtype: &DType, len: usize, validity: &Validity) -> VortexResult<()> {
318316
let _ = number_type_from_ptype(self.ptype);
319317
vortex_ensure!(
320318
dtype.as_ptype() == self.ptype,
@@ -323,9 +321,9 @@ impl PcoData {
323321
dtype.as_ptype()
324322
);
325323
vortex_ensure!(
326-
dtype.nullability() == self.unsliced_validity.nullability(),
324+
dtype.nullability() == validity.nullability(),
327325
"expected nullability {}, got {}",
328-
self.unsliced_validity.nullability(),
326+
validity.nullability(),
329327
dtype.nullability()
330328
);
331329
vortex_ensure!(
@@ -340,7 +338,7 @@ impl PcoData {
340338
"expected len {len}, got {}",
341339
self.slice_stop - self.slice_start
342340
);
343-
if let Some(validity_len) = self.unsliced_validity.maybe_len() {
341+
if let Some(validity_len) = validity.maybe_len() {
344342
vortex_ensure!(
345343
validity_len == self.unsliced_n_rows,
346344
"expected validity len {}, got {}",
@@ -373,14 +371,12 @@ impl PcoData {
373371
ptype: PType,
374372
metadata: PcoMetadata,
375373
len: usize,
376-
validity: Validity,
377374
) -> Self {
378375
Self {
379376
chunk_metas,
380377
pages,
381378
metadata,
382379
ptype,
383-
unsliced_validity: validity,
384380
unsliced_n_rows: len,
385381
slice_start: 0,
386382
slice_stop: len,
@@ -464,7 +460,6 @@ impl PcoData {
464460
parray.dtype().as_ptype(),
465461
metadata,
466462
parray.len(),
467-
parray.validity()?,
468463
))
469464
}
470465

@@ -478,33 +473,36 @@ impl PcoData {
478473
Self::from_primitive(&parray, level, nums_per_page)
479474
}
480475

481-
pub fn decompress(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
476+
pub fn decompress(
477+
&self,
478+
unsliced_validity: &Validity,
479+
ctx: &mut ExecutionCtx,
480+
) -> VortexResult<PrimitiveArray> {
482481
// To start, we figure out which chunks and pages we need to decompress, and with
483482
// what value offset into the first such page.
484483
let number_type = number_type_from_ptype(self.ptype);
485484
let values_byte_buffer = match_number_enum!(
486485
number_type,
487486
NumberType<T> => {
488-
self.decompress_values_typed::<T>(ctx)?
487+
self.decompress_values_typed::<T>(unsliced_validity, ctx)?
489488
}
490489
);
491490

492491
Ok(PrimitiveArray::from_values_byte_buffer(
493492
values_byte_buffer,
494493
self.ptype,
495-
self.unsliced_validity
496-
.slice(self.slice_start..self.slice_stop)?,
494+
unsliced_validity.slice(self.slice_start..self.slice_stop)?,
497495
self.slice_stop - self.slice_start,
498496
))
499497
}
500498

501499
fn decompress_values_typed<T: Number>(
502500
&self,
501+
unsliced_validity: &Validity,
503502
ctx: &mut ExecutionCtx,
504503
) -> VortexResult<ByteBuffer> {
505504
// To start, we figure out what range of values we need to decompress.
506-
let slice_value_indices = self
507-
.unsliced_validity
505+
let slice_value_indices = unsliced_validity
508506
.execute_mask(self.unsliced_n_rows, ctx)?
509507
.valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
510508
let slice_value_start = slice_value_indices[0];
@@ -605,9 +603,10 @@ impl PcoData {
605603
}
606604
}
607605

608-
impl ValiditySliceHelper for PcoData {
609-
fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize) {
610-
(&self.unsliced_validity, self.slice_start, self.slice_stop)
606+
impl ValidityVTable<Pco> for Pco {
607+
fn validity(array: ArrayView<'_, Pco>) -> VortexResult<Validity> {
608+
let unsliced_validity = child_to_validity(&array.slots()[0], array.dtype().nullability());
609+
unsliced_validity.slice(array.slice_start()..array.slice_stop())
611610
}
612611
}
613612

@@ -618,9 +617,10 @@ impl OperationsVTable<Pco> for Pco {
618617
_ctx: &mut ExecutionCtx,
619618
) -> VortexResult<Scalar> {
620619
let mut ctx = LEGACY_SESSION.create_execution_ctx();
620+
let unsliced_validity = child_to_validity(&array.slots()[0], array.dtype().nullability());
621621
array
622622
._slice(index, index + 1)
623-
.decompress(&mut ctx)?
623+
.decompress(&unsliced_validity, &mut ctx)?
624624
.into_array()
625625
.scalar_at(0)
626626
}

0 commit comments

Comments
 (0)