Skip to content

Commit 3d8c1f9

Browse files
committed
fix: array correctly update their validity
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 00b80cc commit 3d8c1f9

7 files changed

Lines changed: 42 additions & 89 deletions

File tree

encodings/fsst/src/array.rs

Lines changed: 38 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl ArrayHash for FSSTData {
7777
fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
7878
self.symbols.array_hash(state, precision);
7979
self.symbol_lengths.array_hash(state, precision);
80-
self.codes_bytes.array_hash(state, precision);
80+
self.codes.clone().into_array().array_hash(state, precision);
8181
}
8282
}
8383

@@ -87,7 +87,11 @@ impl ArrayEq for FSSTData {
8787
&& self
8888
.symbol_lengths
8989
.array_eq(&other.symbol_lengths, precision)
90-
&& self.codes_bytes.array_eq(&other.codes_bytes, precision)
90+
&& self
91+
.codes
92+
.clone()
93+
.into_array()
94+
.array_eq(&other.codes.clone().into_array(), precision)
9195
}
9296
}
9397

@@ -118,7 +122,7 @@ impl VTable for FSST {
118122
match idx {
119123
0 => BufferHandle::new_host(array.symbols().clone().into_byte_buffer()),
120124
1 => BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()),
121-
2 => array.codes_bytes.clone(),
125+
2 => array.codes.bytes_handle().clone(),
122126
_ => vortex_panic!("FSSTArray buffer index {idx} out of bounds"),
123127
}
124128
}
@@ -136,7 +140,7 @@ impl VTable for FSST {
136140
Ok(Some(
137141
FSSTMetadata {
138142
uncompressed_lengths_ptype: array.uncompressed_lengths().dtype().as_ptype().into(),
139-
codes_offsets_ptype: array.codes().offsets().dtype().as_ptype().into(),
143+
codes_offsets_ptype: array.codes.offsets().dtype().as_ptype().into(),
140144
}
141145
.encode_to_vec(),
142146
))
@@ -315,7 +319,7 @@ pub(crate) const SLOT_NAMES: [&str; NUM_SLOTS] =
315319
pub struct FSSTData {
316320
symbols: Buffer<Symbol>,
317321
symbol_lengths: Buffer<u8>,
318-
codes_bytes: BufferHandle,
322+
codes: VarBinArray,
319323

320324
/// Memoized compressor used for push-down of compute by compressing the RHS.
321325
compressor: Arc<LazyLock<Compressor, Box<dyn Fn() -> Compressor + Send>>>,
@@ -326,7 +330,7 @@ impl Debug for FSSTData {
326330
f.debug_struct("FSSTArray")
327331
.field("symbols", &self.symbols)
328332
.field("symbol_lengths", &self.symbol_lengths)
329-
.field("codes_bytes", &self.codes_bytes)
333+
.field("codes", &self.codes)
330334
.field("uncompressed_lengths", &"<outer slot>")
331335
.finish()
332336
}
@@ -365,8 +369,7 @@ impl FSST {
365369
codes.len(),
366370
),
367371
];
368-
let codes_bytes = codes.bytes_handle().clone();
369-
let data = FSSTData::new_from_parts(symbols, symbol_lengths, codes_bytes);
372+
let data = FSSTData::try_new(symbols, symbol_lengths, codes, &dtype)?;
370373
Ok(unsafe {
371374
Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
372375
})
@@ -390,8 +393,7 @@ impl FSST {
390393
codes.len(),
391394
),
392395
];
393-
let codes_bytes = codes.bytes_handle().clone();
394-
let data = FSSTData::new_from_parts(symbols, symbol_lengths, codes_bytes);
396+
let data = unsafe { FSSTData::new_unchecked(symbols, symbol_lengths, codes) };
395397
unsafe {
396398
Array::from_parts_unchecked(ArrayParts::new(FSST, dtype, len, data).with_slots(slots))
397399
}
@@ -426,8 +428,8 @@ impl FSSTData {
426428
codes: VarBinArray,
427429
_dtype: &DType,
428430
) -> VortexResult<Self> {
429-
let codes_bytes = codes.bytes_handle().clone();
430-
Ok(Self::new_from_parts(symbols, symbol_lengths, codes_bytes))
431+
// SAFETY: all components validated above
432+
unsafe { Ok(Self::new_unchecked(symbols, symbol_lengths, codes)) }
431433
}
432434

433435
pub fn validate(
@@ -436,11 +438,10 @@ impl FSSTData {
436438
len: usize,
437439
slots: &[Option<ArrayRef>],
438440
) -> VortexResult<()> {
439-
let codes = self.codes_from_slots(slots, dtype);
440441
Self::validate_parts(
441442
&self.symbols,
442443
&self.symbol_lengths,
443-
&codes,
444+
&self.codes,
444445
uncompressed_lengths_from_slots(slots),
445446
dtype,
446447
len,
@@ -491,26 +492,35 @@ impl FSSTData {
491492
Ok(())
492493
}
493494

494-
fn new_from_parts(
495+
pub(crate) unsafe fn new_unchecked(
495496
symbols: Buffer<Symbol>,
496497
symbol_lengths: Buffer<u8>,
497-
codes_bytes: BufferHandle,
498+
codes: VarBinArray,
498499
) -> Self {
499500
let symbols2 = symbols.clone();
500501
let symbol_lengths2 = symbol_lengths.clone();
501502
let compressor = Arc::new(LazyLock::new(Box::new(move || {
502503
Compressor::rebuild_from(symbols2.as_slice(), symbol_lengths2.as_slice())
503504
})
504505
as Box<dyn Fn() -> Compressor + Send>));
505-
506506
Self {
507507
symbols,
508508
symbol_lengths,
509-
codes_bytes,
509+
codes,
510510
compressor,
511511
}
512512
}
513513

514+
/// Returns the number of elements in the array.
515+
pub fn len(&self) -> usize {
516+
self.codes.len()
517+
}
518+
519+
/// Returns `true` if the array contains no elements.
520+
pub fn is_empty(&self) -> bool {
521+
self.codes.len() == 0
522+
}
523+
514524
/// Access the symbol table array
515525
pub fn symbols(&self) -> &Buffer<Symbol> {
516526
&self.symbols
@@ -521,9 +531,15 @@ impl FSSTData {
521531
&self.symbol_lengths
522532
}
523533

524-
/// Access the codes bytes buffer handle.
525-
pub fn codes_bytes(&self) -> &BufferHandle {
526-
&self.codes_bytes
534+
/// Access the codes array
535+
pub fn codes(&self) -> &VarBinArray {
536+
&self.codes
537+
}
538+
539+
/// Get the DType of the codes array
540+
#[inline]
541+
pub fn codes_dtype(&self) -> &DType {
542+
self.codes.dtype()
527543
}
528544

529545
/// Build a [`Decompressor`][fsst::Decompressor] that can be used to decompress values from
@@ -536,21 +552,6 @@ impl FSSTData {
536552
pub fn compressor(&self) -> &Compressor {
537553
self.compressor.as_ref()
538554
}
539-
540-
fn codes_from_slots(&self, slots: &[Option<ArrayRef>], dtype: &DType) -> VarBinArray {
541-
let offsets = slots[CODES_OFFSETS_SLOT]
542-
.clone()
543-
.vortex_expect("FSSTArray codes_offsets slot must be present");
544-
let validity = child_to_validity(&slots[CODES_VALIDITY_SLOT], dtype.nullability());
545-
unsafe {
546-
VarBinArray::new_unchecked_from_handle(
547-
offsets,
548-
self.codes_bytes.clone(),
549-
DType::Binary(dtype.nullability()),
550-
validity,
551-
)
552-
}
553-
}
554555
}
555556

556557
fn uncompressed_lengths_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
@@ -567,32 +568,6 @@ pub trait FSSTArrayExt: TypedArrayRef<FSST> {
567568
fn uncompressed_lengths_dtype(&self) -> &DType {
568569
self.uncompressed_lengths().dtype()
569570
}
570-
571-
/// Reconstruct the codes VarBinArray on demand from stored bytes and slot components.
572-
fn codes(&self) -> VarBinArray {
573-
let slots = self.as_ref().slots();
574-
let offsets = slots[CODES_OFFSETS_SLOT]
575-
.clone()
576-
.vortex_expect("FSSTArray codes_offsets slot must be present");
577-
let validity = child_to_validity(
578-
&slots[CODES_VALIDITY_SLOT],
579-
self.as_ref().dtype().nullability(),
580-
);
581-
// SAFETY: components were validated at FSSTArray construction time.
582-
unsafe {
583-
VarBinArray::new_unchecked_from_handle(
584-
offsets,
585-
self.codes_bytes().clone(),
586-
DType::Binary(self.as_ref().dtype().nullability()),
587-
validity,
588-
)
589-
}
590-
}
591-
592-
/// Get the DType of the codes array.
593-
fn codes_dtype(&self) -> DType {
594-
DType::Binary(self.as_ref().dtype().nullability())
595-
}
596571
}
597572

598573
impl<T: TypedArrayRef<FSST>> FSSTArrayExt for T {}
@@ -666,7 +641,7 @@ mod test {
666641
&compressor,
667642
);
668643

669-
let compressed_codes = fsst_array.codes();
644+
let compressed_codes = fsst_array.codes().clone();
670645

671646
// There were two buffers:
672647
// 1. The 8 byte symbols

encodings/fsst/src/compute/cast.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ impl CastReduce for FSST {
2020
// Cast codes array to handle nullability
2121
let new_codes = array
2222
.codes()
23+
.clone()
2324
.into_array()
24-
.cast(array.codes_dtype().with_nullability(dtype.nullability()))?;
25+
.cast(array.codes().dtype().with_nullability(dtype.nullability()))?;
2526

2627
Ok(Some(
2728
FSST::try_new(

encodings/fsst/src/compute/compare.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ fn compare_fsst_constant(
115115

116116
let rhs = ConstantArray::new(encoded_scalar, left.len());
117117
left.codes()
118+
.clone()
118119
.into_array()
119120
.binary(rhs.into_array(), Operator::from(operator))
120121
.map(Some)

encodings/fsst/src/compute/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ impl FilterKernel for FSST {
2121
ctx: &mut ExecutionCtx,
2222
) -> VortexResult<Option<ArrayRef>> {
2323
// Directly invoke VarBin's FilterKernel on the codes child.
24-
let codes = array.codes();
24+
let codes = array.codes().clone();
2525
let codes = codes.as_view();
2626
let filtered_codes_ref = <VarBin as FilterKernel>::filter(codes, mask, ctx)?
2727
.vortex_expect("VarBin filter kernel always returns Some");

encodings/fsst/src/compute/like.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use vortex_array::scalar_fn::fns::like::LikeOptions;
1616
use vortex_error::VortexResult;
1717

1818
use crate::FSST;
19-
use crate::array::FSSTArrayExt;
2019
use crate::dfa::FsstMatcher;
2120
use crate::dfa::dfa_scan_to_bitbuf;
2221

encodings/fsst/src/ops.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use vortex_error::VortexExpect;
1111
use vortex_error::VortexResult;
1212

1313
use crate::FSST;
14-
use crate::array::FSSTArrayExt;
1514

1615
impl OperationsVTable<FSST> for FSST {
1716
fn scalar_at(

vortex-array/src/array/vtable/validity.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,6 @@ pub trait ValidityVTable<V: VTable> {
1717
fn validity(array: ArrayView<'_, V>) -> VortexResult<Validity>;
1818
}
1919

20-
/// An implementation of the [`ValidityVTable`] for arrays that hold an unsliced validity
21-
/// and a slice into it.
22-
pub struct ValidityVTableFromValiditySliceHelper;
23-
24-
pub trait ValiditySliceHelper {
25-
fn unsliced_validity_and_slice(&self) -> (&Validity, usize, usize);
26-
27-
fn sliced_validity(&self) -> VortexResult<Validity> {
28-
let (unsliced_validity, start, stop) = self.unsliced_validity_and_slice();
29-
unsliced_validity.slice(start..stop)
30-
}
31-
}
32-
33-
impl<V: VTable> ValidityVTable<V> for ValidityVTableFromValiditySliceHelper
34-
where
35-
V::ArrayData: ValiditySliceHelper,
36-
{
37-
fn validity(array: ArrayView<'_, V>) -> VortexResult<Validity> {
38-
array.data().sliced_validity()
39-
}
40-
}
41-
4220
/// An implementation of the [`ValidityVTable`] for arrays that delegate validity entirely
4321
/// to a child array.
4422
pub struct ValidityVTableFromChild;

0 commit comments

Comments
 (0)