Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion vortex-cuda/src/hybrid_dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ use crate::executor::CudaExecutionCtx;
///
/// Returns `Ok(Canonical)` on success. Returns `Err` when the array
/// cannot be handled (non-primitive output dtype, no registered kernel).
#[allow(clippy::cognitive_complexity)]
pub async fn try_gpu_dispatch(
array: &ArrayRef,
ctx: &mut CudaExecutionCtx,
Expand Down
9 changes: 9 additions & 0 deletions vortex-duckdb/cpp/include/duckdb_vx/vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ void duckdb_vx_vector_set_all_valid(duckdb_vector ffi_vector);
// Set the data pointer for the vector. This is the start of the values array in the vector.
void duckdb_vx_vector_set_data_ptr(duckdb_vector ffi_vector, void *ptr);

// Set the validity pointer for the vector to external data, and store the buffer in auxiliary
// to keep it alive. The validity pointer is derived from data_ptr at the given u64 offset.
// The buffer is attached purely as a keep-alive. This enables zero-copy export of validity masks.
void duckdb_vx_vector_set_validity_data(duckdb_vector ffi_vector,
idx_t u64_offset,
idx_t capacity,
duckdb_vx_vector_buffer buffer,
void *data_ptr);

// Converts a duckdb flat vector into a Sequence vector.
void duckdb_vx_sequence_vector(duckdb_vector c_vector, int64_t start, int64_t step, idx_t capacity);

Expand Down
38 changes: 38 additions & 0 deletions vortex-duckdb/cpp/vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ class DataVector : public Vector {
inline void SetDataPtr(data_ptr_t ptr) {
data = ptr;
};

inline ValidityMask &GetValidity() {
return validity;
};
};

// Same hack for ValidityMask: access protected fields via inheritance.
class ExternalValidityMask : public ValidityMask {
public:
inline void SetExternal(idx_t u64_offset, idx_t cap, buffer_ptr<ValidityBuffer> keeper) {
validity_data = std::move(keeper);
Comment thread
myrrc marked this conversation as resolved.
// Derive validity_mask from validity_data so the two stay consistent.
validity_mask = reinterpret_cast<validity_t *>(validity_data.get()) + u64_offset;
capacity = cap;
};
};

} // namespace vortex
Expand All @@ -82,6 +97,29 @@ extern "C" void duckdb_vx_vector_set_data_ptr(duckdb_vector ffi_vector, void *pt
dvector->SetDataPtr((data_ptr_t)ptr);
}

extern "C" void duckdb_vx_vector_set_validity_data(duckdb_vector ffi_vector,
idx_t u64_offset,
idx_t capacity,
duckdb_vx_vector_buffer buffer,
void *data_ptr) {
auto dvector = reinterpret_cast<vortex::DataVector *>(ffi_vector);
auto &validity = dvector->GetValidity();
// ExternalValidityMask adds no members, so this downcast only exposes
// access to ValidityMask's protected fields.
auto ext_validity = static_cast<vortex::ExternalValidityMask *>(&validity);

// Use the shared_ptr aliasing constructor: the control block ref-counts the
// ExternalVectorBuffer (preventing the Rust buffer from being freed),
// while the stored pointer points to the explicit data_ptr.
auto ext_buf = reinterpret_cast<shared_ptr<vortex::ExternalVectorBuffer> *>(buffer);
auto keeper = shared_ptr<TemplatedValidityData<validity_t>>(
*ext_buf,
reinterpret_cast<TemplatedValidityData<validity_t> *>(data_ptr));

// Set validity_data, derive validity_mask from it at u64_offset, and set capacity.
ext_validity->SetExternal(u64_offset, capacity, std::move(keeper));
}

extern "C" duckdb_value duckdb_vx_vector_get_value(duckdb_vector ffi_vector, idx_t index) {
auto vector = reinterpret_cast<Vector *>(ffi_vector);
auto value = duckdb::make_uniq<Value>(vector->GetValue(index));
Expand Down
4 changes: 2 additions & 2 deletions vortex-duckdb/src/convert/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn try_from_bound_expression(
value: &duckdb::ExpressionRef,
) -> VortexResult<Option<Expression>> {
let Some(value) = value.as_class() else {
tracing::debug!("no expression class id {:?}", value.as_class_id());
debug!("no expression class id {:?}", value.as_class_id());
return Ok(None);
};
Ok(Some(match value {
Expand Down Expand Up @@ -164,7 +164,7 @@ pub fn try_from_bound_expression(
Like.new_expr(LikeOptions::default(), [value, pattern])
}
_ => {
tracing::debug!("bound function {}", func.scalar_function.name());
debug!("bound function {}", func.scalar_function.name());
return Ok(None);
}
},
Expand Down
36 changes: 36 additions & 0 deletions vortex-duckdb/src/duckdb/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,20 @@ use crate::duckdb::LogicalTypeRef;
use crate::duckdb::SelectionVectorRef;
use crate::duckdb::Value;
use crate::duckdb::ValueRef;
use crate::duckdb::VectorBuffer;
use crate::duckdb::VectorBufferRef;
use crate::lifetime_wrapper;

/// External validity data for zero-copy export of validity masks to DuckDB.
///
/// Holds a [`VectorBuffer`] as a keep-alive and a raw pointer to the validity bitmap.
pub(crate) struct ValidityData {
/// VectorBuffer that keeps the underlying memory alive via DuckDB's ref-counting.
pub(crate) shared_buffer: VectorBuffer,
/// Pointer to the raw validity bitmap data within the buffer.
pub(crate) data_ptr: *const u8,
}

/// Returns the internal vector size used by DuckDB at runtime.
#[expect(
clippy::cast_possible_truncation,
Expand Down Expand Up @@ -151,6 +162,31 @@ impl VectorRef {
unsafe { cpp::duckdb_vx_vector_set_data_ptr(self.as_ptr(), ptr as *mut c_void) }
}

/// Sets the validity data for the vector from a [`ValidityData`]. The buffer is
/// attached purely as a keep-alive, and the data pointer is used as the validity data
/// at the given `u64_offset`.
///
/// # Safety
///
/// The data pointer must point to a valid `u64` array with at least
/// `u64_offset + capacity.div_ceil(64)` elements.
pub(crate) unsafe fn set_validity_data(
&self,
u64_offset: usize,
capacity: usize,
zero_copy: &ValidityData,
) {
unsafe {
cpp::duckdb_vx_vector_set_validity_data(
self.as_ptr(),
u64_offset as idx_t,
capacity as idx_t,
zero_copy.shared_buffer.as_ptr(),
zero_copy.data_ptr as *mut c_void,
)
}
}

/// Assigns the element at the specified index with a string value.
/// FIXME(ngates): remove this.
pub fn assign_string_element(&self, idx: usize, value: &CStr) {
Expand Down
1 change: 0 additions & 1 deletion vortex-duckdb/src/exporter/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ impl ColumnExporter for ConstantExporter {
) -> VortexResult<()> {
match self.value.as_ref() {
None => {
// TODO(ngates): would be good if DuckDB supported constant null vectors.
vector.set_all_false_validity();
Comment thread
joseph-isaacs marked this conversation as resolved.
}
Some(value) => {
Expand Down
20 changes: 2 additions & 18 deletions vortex-duckdb/src/exporter/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use vortex::mask::Mask;
use super::ConversionCache;
use super::all_invalid;
use super::new_array_exporter_with_flatten;
use super::validity;
use crate::cpp;
use crate::duckdb::LogicalType;
use crate::duckdb::Vector;
use crate::duckdb::VectorRef;
use crate::exporter::ColumnExporter;

struct ListExporter<O> {
validity: Mask,
/// We cache the child elements of our list array so that we don't have to export it every time,
/// and we also share it across any other exporters who want to export this array.
///
Expand Down Expand Up @@ -92,15 +92,14 @@ pub(crate) fn new_exporter(

let boxed = match_each_integer_ptype!(offsets.ptype(), |O| {
Box::new(ListExporter {
validity,
duckdb_elements: shared_elements,
offsets,
num_elements,
offset_type: PhantomData::<O>,
}) as Box<dyn ColumnExporter>
});

Ok(boxed)
Ok(validity::new_exporter(validity, boxed))
}

impl<O: IntegerPType> ColumnExporter for ListExporter<O> {
Expand All @@ -111,21 +110,6 @@ impl<O: IntegerPType> ColumnExporter for ListExporter<O> {
vector: &mut VectorRef,
_ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
// Verify that offset + len doesn't exceed the validity mask length.
assert!(
offset + len <= self.validity.len(),
"Export range [{}, {}) exceeds validity mask length {}",
offset,
offset + len,
self.validity.len()
);

// Set validity if necessary.
if unsafe { vector.set_validity(&self.validity, offset, len) } {
// All values are null, so no point copying the data.
return Ok(());
}

let offsets = &self.offsets.as_slice::<O>()[offset..offset + len + 1];
debug_assert_eq!(offsets.len(), len + 1);

Expand Down
20 changes: 2 additions & 18 deletions vortex-duckdb/src/exporter/list_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ use vortex::mask::Mask;
use super::ConversionCache;
use super::all_invalid;
use super::new_array_exporter_with_flatten;
use super::validity;
use crate::cpp;
use crate::duckdb::LogicalType;
use crate::duckdb::Vector;
use crate::duckdb::VectorRef;
use crate::exporter::ColumnExporter;

struct ListViewExporter<O, S> {
validity: Mask,
/// We cache the child elements of our list array so that we don't have to export it every time,
/// and we also share it across any other exporters who want to export this array.
///
Expand Down Expand Up @@ -97,7 +97,6 @@ pub(crate) fn new_exporter(
let boxed = match_each_integer_ptype!(offsets.ptype(), |O| {
match_each_integer_ptype!(sizes.ptype(), |S| {
Box::new(ListViewExporter {
validity,
duckdb_elements: shared_elements,
offsets,
sizes,
Expand All @@ -108,7 +107,7 @@ pub(crate) fn new_exporter(
})
});

Ok(boxed)
Ok(validity::new_exporter(validity, boxed))
}

impl<O: IntegerPType, S: IntegerPType> ColumnExporter for ListViewExporter<O, S> {
Expand All @@ -119,21 +118,6 @@ impl<O: IntegerPType, S: IntegerPType> ColumnExporter for ListViewExporter<O, S>
vector: &mut VectorRef,
_ctx: &mut ExecutionCtx,
) -> VortexResult<()> {
// Verify that offset + len doesn't exceed the validity mask length.
assert!(
offset + len <= self.validity.len(),
"Export range [{}, {}) exceeds validity mask length {}",
offset,
offset + len,
self.validity.len()
);

// Set validity if necessary.
if unsafe { vector.set_validity(&self.validity, offset, len) } {
// All values are null, so no point copying the data.
return Ok(());
}

let offsets = &self.offsets.as_slice::<O>()[offset..offset + len];
let sizes = &self.sizes.as_slice::<S>()[offset..offset + len];
debug_assert_eq!(offsets.len(), len);
Expand Down
72 changes: 72 additions & 0 deletions vortex-duckdb/src/exporter/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,78 @@ mod tests {
);
}

#[test]
fn test_primitive_exporter_with_nulls() {
let arr = PrimitiveArray::from_option_iter([Some(10i32), None, Some(30), None, Some(50)]);

let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_INTEGER)]);
let mut ctx = SESSION.create_execution_ctx();

new_exporter(arr, &mut ctx)
.unwrap()
.export(0, 5, chunk.get_vector_mut(0), &mut ctx)
.unwrap();
chunk.set_len(5);

assert_eq!(
format!("{}", String::try_from(&*chunk).unwrap()),
r#"Chunk - [1 Columns]
- FLAT INTEGER: 5 = [ 10, NULL, 30, NULL, 50]
"#
);
}

/// Export a large nullable primitive array over many chunks to exercise the
/// zero-copy validity path. The non-zero-copy fallback currently panics,
/// so this test proves every chunk goes through the zero-copy branch.
#[test]
fn test_primitive_exporter_with_nulls_zero_copy() {
let vector_size = duckdb_vector_size();
const NUM_CHUNKS: usize = 8;
let len = vector_size * NUM_CHUNKS;

// Every 3rd element is null — guarantees mixed validity in every chunk.
#[expect(clippy::cast_possible_truncation, reason = "test data fits in i32")]
let arr = PrimitiveArray::from_option_iter(
(0..len).map(|i| if i % 3 == 1 { None } else { Some(i as i32) }),
);

let mut ctx = SESSION.create_execution_ctx();
let exporter = new_exporter(arr, &mut ctx).unwrap();

for chunk_idx in 0..NUM_CHUNKS {
let mut chunk =
DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_INTEGER)]);

// This will panic if the non-zero-copy path is hit.
exporter
.export(
chunk_idx * vector_size,
vector_size,
chunk.get_vector_mut(0),
&mut ctx,
)
.unwrap();
chunk.set_len(vector_size);

let vec = chunk.get_vector(0);
for i in 0..vector_size {
let global_idx = chunk_idx * vector_size + i;
if global_idx % 3 == 1 {
assert!(
vec.row_is_null(i as u64),
"expected null at global index {global_idx}"
);
} else {
assert!(
!vec.row_is_null(i as u64),
"expected non-null at global index {global_idx}"
);
}
}
}
}

#[test]
fn test_long_primitive_exporter() {
let vector_size = duckdb_vector_size();
Expand Down
Loading
Loading