diff --git a/vortex-duckdb/cpp/include/duckdb_vx/vector.h b/vortex-duckdb/cpp/include/duckdb_vx/vector.h index 906937f1629..036c4aeb313 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/vector.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/vector.h @@ -54,6 +54,15 @@ void duckdb_vx_vector_set_all_valid(duckdb_vector ffi_vector); // Set the data pointer for the vector. This is the start of the values array in the vector. void duckdb_vx_vector_set_data_ptr(duckdb_vector ffi_vector, void *ptr); +// Set the validity pointer for the vector to external data, and store the buffer in auxiliary +// to keep it alive. The validity pointer is derived from data_ptr at the given u64 offset. +// The buffer is attached purely as a keep-alive. This enables zero-copy export of validity masks. +void duckdb_vx_vector_set_validity_data(duckdb_vector ffi_vector, + idx_t u64_offset, + idx_t capacity, + duckdb_vx_vector_buffer buffer, + void *data_ptr); + // Converts a duckdb flat vector into a Sequence vector. void duckdb_vx_sequence_vector(duckdb_vector c_vector, int64_t start, int64_t step, idx_t capacity); diff --git a/vortex-duckdb/cpp/vector.cpp b/vortex-duckdb/cpp/vector.cpp index 0328b7aff3e..e2153886a1b 100644 --- a/vortex-duckdb/cpp/vector.cpp +++ b/vortex-duckdb/cpp/vector.cpp @@ -57,6 +57,21 @@ class DataVector : public Vector { inline void SetDataPtr(data_ptr_t ptr) { data = ptr; }; + + inline ValidityMask &GetValidity() { + return validity; + }; +}; + +// Same hack for ValidityMask: access protected fields via inheritance. +class ExternalValidityMask : public ValidityMask { +public: + inline void SetExternal(idx_t u64_offset, idx_t cap, buffer_ptr keeper) { + validity_data = std::move(keeper); + // Derive validity_mask from validity_data so the two stay consistent. + validity_mask = reinterpret_cast(validity_data.get()) + u64_offset; + capacity = cap; + }; }; } // namespace vortex @@ -82,6 +97,29 @@ extern "C" void duckdb_vx_vector_set_data_ptr(duckdb_vector ffi_vector, void *pt dvector->SetDataPtr((data_ptr_t)ptr); } +extern "C" void duckdb_vx_vector_set_validity_data(duckdb_vector ffi_vector, + idx_t u64_offset, + idx_t capacity, + duckdb_vx_vector_buffer buffer, + void *data_ptr) { + auto dvector = reinterpret_cast(ffi_vector); + auto &validity = dvector->GetValidity(); + // ExternalValidityMask adds no members, so this downcast only exposes + // access to ValidityMask's protected fields. + auto ext_validity = static_cast(&validity); + + // Use the shared_ptr aliasing constructor: the control block ref-counts the + // ExternalVectorBuffer (preventing the Rust buffer from being freed), + // while the stored pointer points to the explicit data_ptr. + auto ext_buf = reinterpret_cast *>(buffer); + auto keeper = shared_ptr>( + *ext_buf, + reinterpret_cast *>(data_ptr)); + + // Set validity_data, derive validity_mask from it at u64_offset, and set capacity. + ext_validity->SetExternal(u64_offset, capacity, std::move(keeper)); +} + extern "C" duckdb_value duckdb_vx_vector_get_value(duckdb_vector ffi_vector, idx_t index) { auto vector = reinterpret_cast(ffi_vector); auto value = duckdb::make_uniq(vector->GetValue(index)); diff --git a/vortex-duckdb/src/convert/expr.rs b/vortex-duckdb/src/convert/expr.rs index 3cbfec883b6..a9f11059805 100644 --- a/vortex-duckdb/src/convert/expr.rs +++ b/vortex-duckdb/src/convert/expr.rs @@ -48,7 +48,7 @@ pub fn try_from_bound_expression( value: &duckdb::ExpressionRef, ) -> VortexResult> { let Some(value) = value.as_class() else { - tracing::debug!("no expression class id {:?}", value.as_class_id()); + debug!("no expression class id {:?}", value.as_class_id()); return Ok(None); }; Ok(Some(match value { @@ -164,7 +164,7 @@ pub fn try_from_bound_expression( Like.new_expr(LikeOptions::default(), [value, pattern]) } _ => { - tracing::debug!("bound function {}", func.scalar_function.name()); + debug!("bound function {}", func.scalar_function.name()); return Ok(None); } }, diff --git a/vortex-duckdb/src/duckdb/vector.rs b/vortex-duckdb/src/duckdb/vector.rs index 73aca417e49..ac652bed43f 100644 --- a/vortex-duckdb/src/duckdb/vector.rs +++ b/vortex-duckdb/src/duckdb/vector.rs @@ -24,9 +24,20 @@ use crate::duckdb::LogicalTypeRef; use crate::duckdb::SelectionVectorRef; use crate::duckdb::Value; use crate::duckdb::ValueRef; +use crate::duckdb::VectorBuffer; use crate::duckdb::VectorBufferRef; use crate::lifetime_wrapper; +/// External validity data for zero-copy export of validity masks to DuckDB. +/// +/// Holds a [`VectorBuffer`] as a keep-alive and a raw pointer to the validity bitmap. +pub(crate) struct ValidityData { + /// VectorBuffer that keeps the underlying memory alive via DuckDB's ref-counting. + pub(crate) shared_buffer: VectorBuffer, + /// Pointer to the raw validity bitmap data within the buffer. + pub(crate) data_ptr: *const u8, +} + /// Returns the internal vector size used by DuckDB at runtime. #[expect( clippy::cast_possible_truncation, @@ -151,6 +162,31 @@ impl VectorRef { unsafe { cpp::duckdb_vx_vector_set_data_ptr(self.as_ptr(), ptr as *mut c_void) } } + /// Sets the validity data for the vector from a [`ValidityData`]. The buffer is + /// attached purely as a keep-alive, and the data pointer is used as the validity data + /// at the given `u64_offset`. + /// + /// # Safety + /// + /// The data pointer must point to a valid `u64` array with at least + /// `u64_offset + capacity.div_ceil(64)` elements. + pub(crate) unsafe fn set_validity_data( + &self, + u64_offset: usize, + capacity: usize, + zero_copy: &ValidityData, + ) { + unsafe { + cpp::duckdb_vx_vector_set_validity_data( + self.as_ptr(), + u64_offset as idx_t, + capacity as idx_t, + zero_copy.shared_buffer.as_ptr(), + zero_copy.data_ptr as *mut c_void, + ) + } + } + /// Assigns the element at the specified index with a string value. /// FIXME(ngates): remove this. pub fn assign_string_element(&self, idx: usize, value: &CStr) { diff --git a/vortex-duckdb/src/exporter/constant.rs b/vortex-duckdb/src/exporter/constant.rs index ade5fece825..587fe9e9472 100644 --- a/vortex-duckdb/src/exporter/constant.rs +++ b/vortex-duckdb/src/exporter/constant.rs @@ -71,7 +71,6 @@ impl ColumnExporter for ConstantExporter { ) -> VortexResult<()> { match self.value.as_ref() { None => { - // TODO(ngates): would be good if DuckDB supported constant null vectors. vector.set_all_false_validity(); } Some(value) => { diff --git a/vortex-duckdb/src/exporter/list.rs b/vortex-duckdb/src/exporter/list.rs index c532242fae7..7e7d024e11c 100644 --- a/vortex-duckdb/src/exporter/list.rs +++ b/vortex-duckdb/src/exporter/list.rs @@ -19,6 +19,7 @@ use vortex::mask::Mask; use super::ConversionCache; use super::all_invalid; use super::new_array_exporter_with_flatten; +use super::validity; use crate::cpp; use crate::duckdb::LogicalType; use crate::duckdb::Vector; @@ -26,7 +27,6 @@ use crate::duckdb::VectorRef; use crate::exporter::ColumnExporter; struct ListExporter { - validity: Mask, /// We cache the child elements of our list array so that we don't have to export it every time, /// and we also share it across any other exporters who want to export this array. /// @@ -92,7 +92,6 @@ pub(crate) fn new_exporter( let boxed = match_each_integer_ptype!(offsets.ptype(), |O| { Box::new(ListExporter { - validity, duckdb_elements: shared_elements, offsets, num_elements, @@ -100,7 +99,7 @@ pub(crate) fn new_exporter( }) as Box }); - Ok(boxed) + Ok(validity::new_exporter(validity, boxed)) } impl ColumnExporter for ListExporter { @@ -111,21 +110,6 @@ impl ColumnExporter for ListExporter { vector: &mut VectorRef, _ctx: &mut ExecutionCtx, ) -> VortexResult<()> { - // Verify that offset + len doesn't exceed the validity mask length. - assert!( - offset + len <= self.validity.len(), - "Export range [{}, {}) exceeds validity mask length {}", - offset, - offset + len, - self.validity.len() - ); - - // Set validity if necessary. - if unsafe { vector.set_validity(&self.validity, offset, len) } { - // All values are null, so no point copying the data. - return Ok(()); - } - let offsets = &self.offsets.as_slice::()[offset..offset + len + 1]; debug_assert_eq!(offsets.len(), len + 1); diff --git a/vortex-duckdb/src/exporter/list_view.rs b/vortex-duckdb/src/exporter/list_view.rs index 4e88d8f4199..5b2f19c09d7 100644 --- a/vortex-duckdb/src/exporter/list_view.rs +++ b/vortex-duckdb/src/exporter/list_view.rs @@ -19,6 +19,7 @@ use vortex::mask::Mask; use super::ConversionCache; use super::all_invalid; use super::new_array_exporter_with_flatten; +use super::validity; use crate::cpp; use crate::duckdb::LogicalType; use crate::duckdb::Vector; @@ -26,7 +27,6 @@ use crate::duckdb::VectorRef; use crate::exporter::ColumnExporter; struct ListViewExporter { - validity: Mask, /// We cache the child elements of our list array so that we don't have to export it every time, /// and we also share it across any other exporters who want to export this array. /// @@ -97,7 +97,6 @@ pub(crate) fn new_exporter( let boxed = match_each_integer_ptype!(offsets.ptype(), |O| { match_each_integer_ptype!(sizes.ptype(), |S| { Box::new(ListViewExporter { - validity, duckdb_elements: shared_elements, offsets, sizes, @@ -108,7 +107,7 @@ pub(crate) fn new_exporter( }) }); - Ok(boxed) + Ok(validity::new_exporter(validity, boxed)) } impl ColumnExporter for ListViewExporter { @@ -119,21 +118,6 @@ impl ColumnExporter for ListViewExporter vector: &mut VectorRef, _ctx: &mut ExecutionCtx, ) -> VortexResult<()> { - // Verify that offset + len doesn't exceed the validity mask length. - assert!( - offset + len <= self.validity.len(), - "Export range [{}, {}) exceeds validity mask length {}", - offset, - offset + len, - self.validity.len() - ); - - // Set validity if necessary. - if unsafe { vector.set_validity(&self.validity, offset, len) } { - // All values are null, so no point copying the data. - return Ok(()); - } - let offsets = &self.offsets.as_slice::()[offset..offset + len]; let sizes = &self.sizes.as_slice::()[offset..offset + len]; debug_assert_eq!(offsets.len(), len); diff --git a/vortex-duckdb/src/exporter/primitive.rs b/vortex-duckdb/src/exporter/primitive.rs index 1bbec2d79c8..a0b08c80e4b 100644 --- a/vortex-duckdb/src/exporter/primitive.rs +++ b/vortex-duckdb/src/exporter/primitive.rs @@ -97,6 +97,78 @@ mod tests { ); } + #[test] + fn test_primitive_exporter_with_nulls() { + let arr = PrimitiveArray::from_option_iter([Some(10i32), None, Some(30), None, Some(50)]); + + let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_INTEGER)]); + let mut ctx = SESSION.create_execution_ctx(); + + new_exporter(arr, &mut ctx) + .unwrap() + .export(0, 5, chunk.get_vector_mut(0), &mut ctx) + .unwrap(); + chunk.set_len(5); + + assert_eq!( + format!("{}", String::try_from(&*chunk).unwrap()), + r#"Chunk - [1 Columns] +- FLAT INTEGER: 5 = [ 10, NULL, 30, NULL, 50] +"# + ); + } + + /// Export a large nullable primitive array over many chunks to exercise the + /// zero-copy validity path. The non-zero-copy fallback currently panics, + /// so this test proves every chunk goes through the zero-copy branch. + #[test] + fn test_primitive_exporter_with_nulls_zero_copy() { + let vector_size = duckdb_vector_size(); + const NUM_CHUNKS: usize = 8; + let len = vector_size * NUM_CHUNKS; + + // Every 3rd element is null — guarantees mixed validity in every chunk. + #[expect(clippy::cast_possible_truncation, reason = "test data fits in i32")] + let arr = PrimitiveArray::from_option_iter( + (0..len).map(|i| if i % 3 == 1 { None } else { Some(i as i32) }), + ); + + let mut ctx = SESSION.create_execution_ctx(); + let exporter = new_exporter(arr, &mut ctx).unwrap(); + + for chunk_idx in 0..NUM_CHUNKS { + let mut chunk = + DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_INTEGER)]); + + // This will panic if the non-zero-copy path is hit. + exporter + .export( + chunk_idx * vector_size, + vector_size, + chunk.get_vector_mut(0), + &mut ctx, + ) + .unwrap(); + chunk.set_len(vector_size); + + let vec = chunk.get_vector(0); + for i in 0..vector_size { + let global_idx = chunk_idx * vector_size + i; + if global_idx % 3 == 1 { + assert!( + vec.row_is_null(i as u64), + "expected null at global index {global_idx}" + ); + } else { + assert!( + !vec.row_is_null(i as u64), + "expected non-null at global index {global_idx}" + ); + } + } + } + } + #[test] fn test_long_primitive_exporter() { let vector_size = duckdb_vector_size(); diff --git a/vortex-duckdb/src/exporter/validity.rs b/vortex-duckdb/src/exporter/validity.rs index 76107240212..d1fbf8123cb 100644 --- a/vortex-duckdb/src/exporter/validity.rs +++ b/vortex-duckdb/src/exporter/validity.rs @@ -5,14 +5,42 @@ use vortex::array::ExecutionCtx; use vortex::error::VortexResult; use vortex::mask::Mask; +use crate::duckdb::ValidityData; +use crate::duckdb::VectorBuffer; use crate::duckdb::VectorRef; use crate::exporter::ColumnExporter; struct ValidityExporter { mask: Mask, + /// If the mask's bit buffer is u64-aligned with no sub-byte offset, + /// we can zero-copy it into DuckDB. We hold the ValidityData to keep + /// the underlying memory alive via DuckDB's ref-counting. + zero_copy: Option, exporter: Box, } +/// Returns true if the bit buffer can be zero-copied as a DuckDB validity mask. +/// +/// Requirements: +/// - No sub-byte bit offset (offset == 0) +/// - The underlying byte buffer is u64-aligned +/// - The underlying byte buffer length is a multiple of 8 (so u64 reads are in-bounds) +fn can_zero_copy_validity(mask: &Mask) -> bool { + let Mask::Values(values) = mask else { + return false; + }; + let bit_buf = values.bit_buffer(); + if bit_buf.offset() != 0 { + return false; + } + let inner = bit_buf.inner(); + let slice = inner.as_slice(); + // DuckDB reads validity as u64 words, so the buffer must be u64-aligned and + // its length must be a multiple of 8 bytes to avoid out-of-bounds reads. + (slice.as_ptr() as usize).is_multiple_of(size_of::()) + && slice.len().is_multiple_of(size_of::()) +} + pub(crate) fn new_exporter( mask: Mask, exporter: Box, @@ -20,7 +48,22 @@ pub(crate) fn new_exporter( if mask.all_true() { exporter } else { - Box::new(ValidityExporter { mask, exporter }) + let zero_copy = can_zero_copy_validity(&mask).then(|| { + let Mask::Values(values) = &mask else { + unreachable!() + }; + let buffer = values.bit_buffer().inner().clone(); + let data_ptr = buffer.as_slice().as_ptr(); + ValidityData { + shared_buffer: VectorBuffer::new(buffer), + data_ptr, + } + }); + Box::new(ValidityExporter { + mask, + zero_copy, + exporter, + }) } } @@ -36,7 +79,9 @@ impl ColumnExporter for ValidityExporter { offset + len <= self.mask.len(), "cannot access outside of array" ); - if unsafe { vector.set_validity(&self.mask, offset, len) } { + if unsafe { + vector.set_validity_zero_copy(&self.mask, offset, len, self.zero_copy.as_ref()) + } { // All values are null, so no point copying the data. return Ok(()); } diff --git a/vortex-duckdb/src/exporter/vector.rs b/vortex-duckdb/src/exporter/vector.rs index e6ada5c7dd7..3ab0c484318 100644 --- a/vortex-duckdb/src/exporter/vector.rs +++ b/vortex-duckdb/src/exporter/vector.rs @@ -2,15 +2,30 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex::mask::Mask; -use vortex::mask::MaskValues; use crate::cpp::duckdb_vx_vector_set_all_valid; +use crate::duckdb::ValidityData; use crate::duckdb::Value; use crate::duckdb::VectorRef; use crate::exporter::copy_from_slice; impl VectorRef { - pub(super) unsafe fn set_validity(&mut self, mask: &Mask, offset: usize, len: usize) -> bool { + /// Returns true if all values are null (caller can skip data export). + pub unsafe fn set_validity(&mut self, mask: &Mask, offset: usize, len: usize) -> bool { + unsafe { self.set_validity_zero_copy(mask, offset, len, None) } + } + + /// Like [`set_validity`](Self::set_validity), but attempts a zero-copy path when + /// `zero_copy` is provided and the offset is u64-aligned. + /// + /// Returns true if all values are null (caller can skip data export). + pub(super) unsafe fn set_validity_zero_copy( + &mut self, + mask: &Mask, + offset: usize, + len: usize, + zero_copy: Option<&ValidityData>, + ) -> bool { match mask { Mask::AllTrue(_) => { self.set_all_true_validity(); @@ -20,31 +35,37 @@ impl VectorRef { self.set_all_false_validity(); true } - Mask::Values(arr) => self.set_validity_with_array(arr, len, offset), - } - } + Mask::Values(arr) => { + let true_count = arr.bit_buffer().slice(offset..(offset + len)).true_count(); + if true_count == len { + self.set_all_true_validity() + } else if true_count == 0 { + self.set_all_false_validity() + } else if let Some(zc) = zero_copy.filter(|_| offset.is_multiple_of(64)) { + let u64_offset = offset / 64; + // SAFETY: the underlying buffer is u64-aligned (checked in + // can_zero_copy_validity) and the VectorBuffer keeps the data alive. + // data_ptr points into the buffer at the start of the validity bitmap. + unsafe { self.set_validity_data(u64_offset, len, zc) }; + } else { + // If zero_copy is available and offset is aligned, we should + // have taken the branch above. Assert this invariant. + assert!( + zero_copy.is_none() || !offset.is_multiple_of(64), + "zero-copy validity available and offset {offset} is aligned \ + but copy path was taken" + ); + let source = arr.bit_buffer().inner().as_slice(); + copy_from_slice( + unsafe { self.ensure_validity_slice(len) }, + source, + offset, + len, + ); + } - fn set_validity_with_array(&mut self, arr: &MaskValues, len: usize, offset: usize) -> bool { - let true_count = arr.true_count(); - if true_count == arr.len() { - self.set_all_true_validity(); - return false; - } else if true_count == 0 { - self.set_all_false_validity(); - return true; - } - - let dest = unsafe { self.ensure_validity_slice(len) }; - let source = arr.bit_buffer().inner().as_slice(); - let ones = copy_from_slice(dest, source, offset, len); - if ones == 0 { - self.set_all_false_validity(); - true - } else if ones == len { - self.set_all_true_validity(); - false - } else { - false + true_count == 0 + } } } diff --git a/vortex-io/src/read_at.rs b/vortex-io/src/read_at.rs index 28cc5bf244b..aa9a8a03abf 100644 --- a/vortex-io/src/read_at.rs +++ b/vortex-io/src/read_at.rs @@ -229,10 +229,8 @@ impl InstrumentedReadAt { } } -// We implement drop for `InnerMetrics` so this will be logged only when we eventually drop the final instance of `InstrumentedRead` -impl Drop for InnerMetrics { - #[allow(clippy::cognitive_complexity)] - fn drop(&mut self) { +impl InnerMetrics { + fn log_sizes(&self) { tracing::debug!("Reads: {}", self.sizes.count()); if !self.sizes.is_empty() { tracing::debug!( @@ -245,10 +243,10 @@ impl Drop for InnerMetrics { .vortex_expect("must not be empty"), ); } + tracing::debug!("Total read size: {}", self.total_size.value()); + } - let total_size = self.total_size.value(); - tracing::debug!("Total read size: {total_size}"); - + fn log_durations(&self) { if !self.durations.is_empty() { tracing::debug!( "Read duration: p50={}ms p95={}ms p99={}ms p999={}ms", @@ -273,6 +271,14 @@ impl Drop for InnerMetrics { } } +// We implement drop for `InnerMetrics` so this will be logged only when we eventually drop the final instance of `InstrumentedRead` +impl Drop for InnerMetrics { + fn drop(&mut self) { + self.log_sizes(); + self.log_durations(); + } +} + impl VortexReadAt for InstrumentedReadAt { fn uri(&self) -> Option<&Arc> { self.read.uri()