Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions encodings/alp/src/alp/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ impl OperationsVTable<ALP> for ALP {
fn scalar_at(
array: ArrayView<'_, ALP>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
if let Some(patches) = array.patches()
&& let Some(patch) = patches.get_patched(index)?
{
return patch.cast(array.dtype());
}

let encoded_val = array.encoded().scalar_at(index)?;
let encoded_val = array.encoded().scalar_at(index, ctx)?;

Ok(match_each_alp_float_ptype!(array.dtype().as_ptype(), |T| {
let encoded_val: <T as ALPFloat>::ALPInt =
Expand Down
8 changes: 4 additions & 4 deletions encodings/alp/src/alp_rd/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl OperationsVTable<ALPRD> for ALPRD {
fn scalar_at(
array: ArrayView<'_, ALPRD>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
// The left value can either be a direct value, or an exception.
// The exceptions array represents exception positions with non-null values.
Expand All @@ -32,7 +32,7 @@ impl OperationsVTable<ALPRD> for ALPRD {
_ => {
let left_code: u16 = array
.left_parts()
.scalar_at(index)?
.scalar_at(index, ctx)?
.as_primitive()
.as_::<u16>()
.vortex_expect("left_code must be non-null");
Expand All @@ -44,7 +44,7 @@ impl OperationsVTable<ALPRD> for ALPRD {
Ok(if array.dtype().as_ptype() == PType::F32 {
let right: u32 = array
.right_parts()
.scalar_at(index)?
.scalar_at(index, ctx)?
.as_primitive()
.as_::<u32>()
.vortex_expect("non-null");
Expand All @@ -53,7 +53,7 @@ impl OperationsVTable<ALPRD> for ALPRD {
} else {
let right: u64 = array
.right_parts()
.scalar_at(index)?
.scalar_at(index, ctx)?
.as_primitive()
.as_::<u64>()
.vortex_expect("non-null");
Expand Down
2 changes: 1 addition & 1 deletion encodings/datetime-parts/src/compute/is_constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ impl DynAggregateKernel for DateTimePartsIsConstantKernel {
let result = is_constant(array.days(), ctx)?
&& is_constant(array.seconds(), ctx)?
&& is_constant(array.subseconds(), ctx)?;
Ok(Some(IsConstant::make_partial(batch, result)?))
Ok(Some(IsConstant::make_partial(batch, result, ctx)?))
}
}
10 changes: 5 additions & 5 deletions encodings/datetime-parts/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl OperationsVTable<DateTimeParts> for DateTimeParts {
fn scalar_at(
array: ArrayView<'_, DateTimeParts>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
let DType::Extension(ext) = array.dtype().clone() else {
vortex_panic!(
Expand All @@ -33,25 +33,25 @@ impl OperationsVTable<DateTimeParts> for DateTimeParts {
vortex_panic!(Compute: "must decode TemporalMetadata from extension metadata");
};

if !array.as_ref().is_valid(index)? {
if !array.as_ref().is_valid(index, ctx)? {
return Ok(Scalar::null(DType::Extension(ext)));
}

let days: i64 = array
.days()
.scalar_at(index)?
.scalar_at(index, ctx)?
.as_primitive()
.as_::<i64>()
.vortex_expect("days fits in i64");
let seconds: i64 = array
.seconds()
.scalar_at(index)?
.scalar_at(index, ctx)?
.as_primitive()
.as_::<i64>()
.vortex_expect("seconds fits in i64");
let subseconds: i64 = array
.subseconds()
.scalar_at(index)?
.scalar_at(index, ctx)?
.as_primitive()
.as_::<i64>()
.vortex_expect("subseconds fits in i64");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ impl DynAggregateKernel for DecimalBytePartsIsConstantKernel {
};

let result = is_constant(array.msp(), ctx)?;
Ok(Some(IsConstant::make_partial(batch, result)?))
Ok(Some(IsConstant::make_partial(batch, result, ctx)?))
}
}
4 changes: 2 additions & 2 deletions encodings/decimal-byte-parts/src/decimal_byte_parts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ impl OperationsVTable<DecimalByteParts> for DecimalByteParts {
fn scalar_at(
array: ArrayView<'_, DecimalByteParts>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
// TODO(joe): support parts len != 1
let scalar = array.msp().scalar_at(index)?;
let scalar = array.msp().scalar_at(index, ctx)?;

// Note. values in msp, can only be signed integers upto size i64.
let primitive_scalar = scalar.as_primitive();
Expand Down
4 changes: 2 additions & 2 deletions encodings/fastlanes/src/bitpacking/compute/is_constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl DynAggregateKernel for BitPackedIsConstantKernel {
&self,
aggregate_fn: &AggregateFnRef,
batch: &ArrayRef,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<Scalar>> {
if !aggregate_fn.is::<IsConstant>() {
return Ok(None);
Expand All @@ -48,7 +48,7 @@ impl DynAggregateKernel for BitPackedIsConstantKernel {
bitpacked_is_constant::<P, { IS_CONST_LANE_WIDTH / size_of::<P>() }>(array)?
});

Ok(Some(IsConstant::make_partial(batch, result)?))
Ok(Some(IsConstant::make_partial(batch, result, ctx)?))
}
}

Expand Down
4 changes: 2 additions & 2 deletions encodings/fastlanes/src/delta/vtable/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ impl OperationsVTable<Delta> for Delta {
fn scalar_at(
array: ArrayView<'_, Delta>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
let decompressed = array.array().slice(index..index + 1)?.to_primitive();
decompressed.into_array().scalar_at(0)
decompressed.into_array().scalar_at(0, ctx)
}
}

Expand Down
2 changes: 1 addition & 1 deletion encodings/fastlanes/src/for/compute/is_constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ impl DynAggregateKernel for FoRIsConstantKernel {
};

let result = is_constant(array.encoded(), ctx)?;
Ok(Some(IsConstant::make_partial(batch, result)?))
Ok(Some(IsConstant::make_partial(batch, result, ctx)?))
}
}
7 changes: 6 additions & 1 deletion encodings/fastlanes/src/for/compute/is_sorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ impl DynAggregateKernel for FoRIsSortedKernel {
is_sorted(&unsigned_array, ctx)?
};

Ok(Some(IsSorted::make_partial(batch, result, options.strict)?))
Ok(Some(IsSorted::make_partial(
batch,
result,
options.strict,
ctx,
)?))
}
}

Expand Down
4 changes: 2 additions & 2 deletions encodings/fastlanes/src/for/vtable/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ impl OperationsVTable<FoR> for FoR {
fn scalar_at(
array: ArrayView<'_, FoR>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
let encoded_pvalue = array.encoded().scalar_at(index)?;
let encoded_pvalue = array.encoded().scalar_at(index, ctx)?;
let encoded_pvalue = encoded_pvalue.as_primitive();
let reference = array.reference_scalar();
let reference = reference.as_primitive();
Expand Down
8 changes: 6 additions & 2 deletions encodings/fastlanes/src/rle/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::fmt::Display;
use std::fmt::Formatter;

use vortex_array::ArrayRef;
use vortex_array::LEGACY_SESSION;
use vortex_array::TypedArrayRef;
use vortex_array::VortexSessionExecute as _;
use vortex_error::VortexExpect as _;
use vortex_error::VortexResult;
use vortex_error::vortex_ensure;
Expand Down Expand Up @@ -110,15 +112,17 @@ pub trait RLEArrayExt: TypedArrayRef<crate::RLE> {
reason = "expect is safe here as scalar_at returns a valid primitive"
)]
fn values_idx_offset(&self, chunk_idx: usize) -> usize {
let mut ctx = LEGACY_SESSION.create_execution_ctx();

self.values_idx_offsets()
.scalar_at(chunk_idx)
.scalar_at(chunk_idx, &mut ctx)
.expect("index must be in bounds")
.as_primitive()
.as_::<usize>()
.expect("index must be of type usize")
- self
.values_idx_offsets()
.scalar_at(0)
.scalar_at(0, &mut ctx)
.expect("index must be in bounds")
.as_primitive()
.as_::<usize>()
Expand Down
6 changes: 3 additions & 3 deletions encodings/fastlanes/src/rle/vtable/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ impl OperationsVTable<RLE> for RLE {
fn scalar_at(
array: ArrayView<'_, RLE>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
let offset_in_chunk = array.offset();
let chunk_relative_idx = array.indices().scalar_at(offset_in_chunk + index)?;
let chunk_relative_idx = array.indices().scalar_at(offset_in_chunk + index, ctx)?;

let chunk_relative_idx = chunk_relative_idx
.as_primitive()
Expand All @@ -31,7 +31,7 @@ impl OperationsVTable<RLE> for RLE {

let scalar = array
.values()
.scalar_at(value_idx_offset + chunk_relative_idx)?;
.scalar_at(value_idx_offset + chunk_relative_idx, ctx)?;

Scalar::try_new(array.dtype().clone(), scalar.into_value())
}
Expand Down
6 changes: 5 additions & 1 deletion encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::ExecutionResult;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::Precision;
use vortex_array::TypedArrayRef;
use vortex_array::VortexSessionExecute as _;
use vortex_array::arrays::VarBin;
use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::varbin::VarBinArrayExt;
Expand Down Expand Up @@ -570,10 +572,12 @@ impl FSSTData {
vortex_bail!(InvalidArgument: "codes nullability must match outer dtype nullability");
}

let mut ctx = LEGACY_SESSION.create_execution_ctx();

// Validate that last offset doesn't exceed bytes length (when host-resident).
if codes_bytes.is_on_host() && codes_offsets.is_host() && !codes_offsets.is_empty() {
let last_offset: usize = (&codes_offsets
.scalar_at(codes_offsets.len() - 1)
.scalar_at(codes_offsets.len() - 1, &mut ctx)
.vortex_expect("offsets must support scalar_at"))
.try_into()
.vortex_expect("Failed to convert offset to usize");
Expand Down
4 changes: 2 additions & 2 deletions encodings/fsst/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ impl OperationsVTable<FSST> for FSST {
fn scalar_at(
array: ArrayView<'_, FSST>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
let compressed = array.codes().scalar_at(index)?;
let compressed = array.codes().scalar_at(index, ctx)?;
let binary_datum = compressed.as_binary().value().vortex_expect("non-null");

let decoded_buffer = ByteBuffer::from(array.decompressor().decompress(binary_datum));
Expand Down
25 changes: 17 additions & 8 deletions encodings/parquet-variant/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use parquet_variant::Variant as PqVariant;
use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute as _;
use vortex_array::dtype::DType;
use vortex_array::dtype::DecimalDType;
use vortex_array::dtype::FieldName;
Expand Down Expand Up @@ -41,15 +43,15 @@ impl OperationsVTable<ParquetVariant> for ParquetVariant {
fn scalar_at(
array: ArrayView<'_, ParquetVariant>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
if array.validity()?.is_null(index)? {
if array.validity()?.is_null(index, ctx)? {
return Ok(Scalar::null(DType::Variant(Nullability::Nullable)));
}

let metadata = array
.metadata_array()
.scalar_at(index)?
.scalar_at(index, ctx)?
.as_binary()
.value()
.cloned()
Expand All @@ -74,16 +76,17 @@ fn scalar_from_variant_storage(
typed_value: Option<&ArrayRef>,
index: usize,
) -> VortexResult<Scalar> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
if let Some(typed_value) = typed_value
&& typed_value.is_valid(index)?
&& typed_value.is_valid(index, &mut ctx)?
{
return scalar_from_typed_value_array(metadata, value, typed_value, index);
}

if let Some(value) = value
&& value.is_valid(index)?
&& value.is_valid(index, &mut ctx)?
{
return scalar_from_unshredded_value(metadata, &value.scalar_at(index)?);
return scalar_from_unshredded_value(metadata, &value.scalar_at(index, &mut ctx)?);
}

Ok(Scalar::null(DType::Null))
Expand All @@ -95,11 +98,17 @@ fn scalar_from_typed_value_array(
typed_value: &ArrayRef,
index: usize,
) -> VortexResult<Scalar> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();

let value_scalar = match value {
Some(value) if value.is_valid(index)? => Some(value.scalar_at(index)?),
Some(value) if value.is_valid(index, &mut ctx)? => Some(value.scalar_at(index, &mut ctx)?),
_ => None,
};
scalar_from_typed_value_scalar(metadata, value_scalar, typed_value.scalar_at(index)?)
scalar_from_typed_value_scalar(
metadata,
value_scalar,
typed_value.scalar_at(index, &mut ctx)?,
)
}

fn scalar_from_typed_value_scalar(
Expand Down
9 changes: 3 additions & 6 deletions encodings/pco/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::ExecutionResult;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::Precision;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::Primitive;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::buffer::BufferHandle;
Expand Down Expand Up @@ -629,15 +627,14 @@ impl OperationsVTable<Pco> for Pco {
fn scalar_at(
array: ArrayView<'_, Pco>,
index: usize,
_ctx: &mut ExecutionCtx,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let unsliced_validity = child_to_validity(&array.slots()[0], array.dtype().nullability());
array
._slice(index, index + 1)
.decompress(&unsliced_validity, &mut ctx)?
.decompress(&unsliced_validity, ctx)?
.into_array()
.scalar_at(0)
.scalar_at(0, ctx)
}
}

Expand Down
Loading
Loading