Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions vortex-duckdb/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ fn try_build_duckdb(
}
}

fn c2rust(crate_dir: &Path, duckdb_include_dir: &Path) {
fn c2rust(crate_dir: &Path, duckdb_include_dir: &Path, duckdb_fsst_include_dir: &Path) {
let bindings = bindgen::Builder::default()
.header("cpp/include/duckdb_vx.h")
.override_abi(Abi::CUnwind, ".*")
Expand All @@ -302,6 +302,7 @@ fn c2rust(crate_dir: &Path, duckdb_include_dir: &Path) {
.rustified_non_exhaustive_enum("DUCKDB_TYPE")
.size_t_is_usize(true)
.clang_arg(format!("-I{}", duckdb_include_dir.display()))
.clang_arg(format!("-I{}", duckdb_fsst_include_dir.display()))
.clang_arg(format!("-I{}", crate_dir.join("cpp/include").display()))
.generate_comments(true)
// Tell cargo to invalidate the built crate whenever any of the
Expand All @@ -328,12 +329,13 @@ fn c2rust(crate_dir: &Path, duckdb_include_dir: &Path) {
}
}

fn cpp(duckdb_include_dir: &Path) {
fn cpp(duckdb_include_dir: &Path, duckdb_fsst_include_dir: &Path) {
cc::Build::new()
.std("c++20")
.flags(["-Wall", "-Wextra", "-Wpedantic"])
.cpp(true)
.include(duckdb_include_dir)
.include(duckdb_fsst_include_dir)
.include("cpp/include")
.files(SOURCE_FILES)
.compile("vortex-duckdb-extras");
Expand Down Expand Up @@ -449,7 +451,8 @@ fn main() {
};

let duckdb_include_dir = inner_dir.join("src").join("include");
c2rust(&crate_dir, &duckdb_include_dir);
cpp(&duckdb_include_dir);
let duckdb_fsst_include_dir = inner_dir.join("third_party").join("fsst");
c2rust(&crate_dir, &duckdb_include_dir, &duckdb_fsst_include_dir);
cpp(&duckdb_include_dir, &duckdb_fsst_include_dir);
rust2c(&crate_dir);
}
13 changes: 13 additions & 0 deletions vortex-duckdb/cpp/include/duckdb_vx/vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ void duckdb_vx_vector_set_data_ptr(duckdb_vector ffi_vector, void *ptr);
// Converts a duckdb flat vector into a Sequence vector.
void duckdb_vx_sequence_vector(duckdb_vector c_vector, int64_t start, int64_t step, idx_t capacity);

// Finalize a vector as an FSST vector using externally managed compressed string data and a
// symbol table owned by Vortex.
void duckdb_vx_fsst_vector_set(duckdb_vector ffi_vector,
const uint64_t *symbols,
const uint8_t *symbol_lengths,
idx_t symbol_count,
idx_t string_block_limit,
idx_t count,
duckdb_vx_vector_buffer buffer);

// Returns whether the vector is currently an FSST vector.
bool duckdb_vx_vector_is_fsst(duckdb_vector ffi_vector);

void duckdb_vector_flatten(duckdb_vector vector, unsigned long len);

const char *duckdb_vector_to_string(duckdb_vector vector, unsigned long len, duckdb_vx_error *err);
Expand Down
51 changes: 51 additions & 0 deletions vortex-duckdb/cpp/vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

DUCKDB_INCLUDES_BEGIN
#include "duckdb/common/vector.hpp"
#include "duckdb/common/fsst.hpp"
#include "duckdb/common/types/value.hpp"
#include "duckdb/common/types/vector_buffer.hpp"
#include "duckdb/common/types/vector.hpp"
DUCKDB_INCLUDES_END

Expand Down Expand Up @@ -60,6 +62,8 @@ class DataVector : public Vector {

} // namespace vortex

static constexpr uint64_t VX_FSST_CORRUPT = 32774747032022883ULL;

extern "C" void duckdb_vx_string_vector_add_vector_data_buffer(duckdb_vector ffi_vector,
duckdb_vx_vector_buffer buffer) {
auto vector = reinterpret_cast<Vector *>(ffi_vector);
Expand All @@ -81,6 +85,53 @@ extern "C" void duckdb_vx_vector_set_data_ptr(duckdb_vector ffi_vector, void *pt
dvector->SetDataPtr((data_ptr_t)ptr);
}

extern "C" void duckdb_vx_fsst_vector_set(duckdb_vector ffi_vector,
const uint64_t *symbols,
const uint8_t *symbol_lengths,
idx_t symbol_count,
idx_t string_block_limit,
idx_t count,
duckdb_vx_vector_buffer buffer) {
auto vector = reinterpret_cast<Vector *>(ffi_vector);
D_ASSERT(vector);
D_ASSERT(symbol_count <= 255);

buffer_ptr<void> decoder_buffer = make_buffer<duckdb_fsst_decoder_t>();
auto *decoder = reinterpret_cast<duckdb_fsst_decoder_t *>(decoder_buffer.get());
decoder->version = 0;
decoder->zeroTerminated = 0;
for (idx_t i = 0; i < 255; i++) {
decoder->len[i] = 8;
decoder->symbol[i] = VX_FSST_CORRUPT;
}
for (idx_t i = 0; i < symbol_count; i++) {
decoder->len[i] = symbol_lengths[i];
decoder->symbol[i] = symbols[i];
}

// DuckDB can reuse vector instances across chunk exports. Replace the FSST auxiliary buffer
// on each export so heap references to prior compressed byte buffers are dropped instead of
// accumulating for the lifetime of the reused vector.
vector->SetAuxiliary(make_buffer<VectorFSSTStringBuffer>());
FSSTVector::RegisterDecoder(*vector, decoder_buffer, string_block_limit);

if (buffer) {
auto data = reinterpret_cast<shared_ptr<vortex::ExternalVectorBuffer> *>(buffer);
auto aux = vector->GetAuxiliary();
D_ASSERT(aux);
D_ASSERT(aux->GetBufferType() == VectorBufferType::FSST_BUFFER);
aux->Cast<VectorFSSTStringBuffer>().AddHeapReference(*data);
}

FSSTVector::SetCount(*vector, count);
vector->SetVectorType(VectorType::FSST_VECTOR);
}

extern "C" bool duckdb_vx_vector_is_fsst(duckdb_vector ffi_vector) {
auto vector = reinterpret_cast<Vector *>(ffi_vector);
return vector && vector->GetVectorType() == VectorType::FSST_VECTOR;
}

extern "C" duckdb_value duckdb_vx_vector_get_value(duckdb_vector ffi_vector, idx_t index) {
auto vector = reinterpret_cast<Vector *>(ffi_vector);
auto value = duckdb::make_uniq<Value>(vector->GetValue(index));
Expand Down
25 changes: 25 additions & 0 deletions vortex-duckdb/src/duckdb/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,31 @@ impl VectorRef {
unsafe { cpp::duckdb_vx_sequence_vector(self.as_ptr(), start, stop, capacity) }
}

pub fn set_fsst(
&mut self,
symbols: &[u64],
symbol_lengths: &[u8],
string_block_limit: usize,
count: usize,
buffer: &VectorBufferRef,
) {
unsafe {
cpp::duckdb_vx_fsst_vector_set(
self.as_ptr(),
symbols.as_ptr(),
symbol_lengths.as_ptr(),
symbols.len() as idx_t,
string_block_limit as idx_t,
count as idx_t,
buffer.as_ptr(),
)
}
}

pub fn is_fsst(&self) -> bool {
unsafe { cpp::duckdb_vx_vector_is_fsst(self.as_ptr()) }
}

/// Converts a vector into a flat uncompressed vector vortex call this `canonicalize`.
pub fn flatten(&self, length: u64) {
unsafe { cpp::duckdb_vector_flatten(self.as_ptr(), length) }
Expand Down
26 changes: 26 additions & 0 deletions vortex-duckdb/src/e2e_test/vortex_scan_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ use vortex::array::arrays::VarBinArray;
use vortex::array::arrays::VarBinViewArray;
use vortex::array::validity::Validity;
use vortex::buffer::buffer;
use vortex::dtype::DType;
use vortex::dtype::Nullability;
use vortex::dtype::PType;
use vortex::encodings::fsst::fsst_compress;
use vortex::encodings::fsst::fsst_train_compressor;
use vortex::file::WriteOptionsSessionExt;
use vortex::io::runtime::BlockingRuntime;
use vortex::scalar::PValue;
Expand Down Expand Up @@ -202,6 +205,29 @@ fn test_vortex_scan_strings() {
assert_eq!(result, "Hello,Hi,Hey");
}

#[test]
fn test_vortex_scan_fsst_strings() {
let file = RUNTIME.block_on(async {
let strings = VarBinArray::from_iter(
[Some("Hello"), Some("Hi"), Some("Hey")],
DType::Utf8(Nullability::Nullable),
);
let compressor = fsst_train_compressor(&strings);
let strings = fsst_compress(
&strings,
strings.len(),
&DType::Utf8(Nullability::Nullable),
&compressor,
);
write_single_column_vortex_file("strings", strings).await
});

let result: String =
scan_vortex_file_single_row(file, "SELECT string_agg(strings, ',') FROM ?", 0);

assert_eq!(result, "Hello,Hi,Hey");
}

#[test]
fn test_vortex_scan_strings_contains() {
let file = RUNTIME.block_on(async {
Expand Down
38 changes: 38 additions & 0 deletions vortex-duckdb/src/exporter/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,13 @@ impl ColumnExporter for FixedSizeListExporter {
mod tests {
use vortex::array::IntoArray as _;
use vortex::array::VortexSessionExecute;
use vortex::array::arrays::VarBinArray;
use vortex::array::validity::Validity;
use vortex::buffer::buffer;
use vortex::dtype::DType;
use vortex::dtype::Nullability;
use vortex::encodings::fsst::fsst_compress;
use vortex::encodings::fsst::fsst_train_compressor;
use vortex::error::VortexExpect;

use super::*;
Expand Down Expand Up @@ -196,6 +201,39 @@ mod tests {
verify_array_elements(vector, &[1, 2, 3, 4, 5, 6], 2, 3);
}

#[test]
fn test_export_fsst_children_fall_back_to_flat_strings() {
let strings = VarBinArray::from_iter(
[Some("alpha"), Some("beta"), Some("gamma"), Some("delta")],
DType::Utf8(Nullability::Nullable),
);
let compressor = fsst_train_compressor(&strings);
let fsst = fsst_compress(
&strings,
strings.len(),
&DType::Utf8(Nullability::Nullable),
&compressor,
);
let fsl = FixedSizeListArray::new(fsst.into_array(), 2, Validity::AllValid, 2);

let array_type = LogicalType::array_type(LogicalType::varchar(), 2)
.vortex_expect("array type should be valid");
let mut chunk = DataChunk::new([array_type]);
let mut ctx = SESSION.create_execution_ctx();

new_exporter(fsl, &ConversionCache::default(), &mut ctx)
.unwrap()
.export(0, 2, chunk.get_vector_mut(0), &mut ctx)
.unwrap();
chunk.set_len(2);

let child = chunk.get_vector(0).array_vector_get_child();
assert!(
!child.is_fsst(),
"nested exports should flatten FSST children"
);
}

#[test]
fn test_export_fixed_size_list_with_nulls() {
// Create a FixedSizeListArray with 4 lists of size 3, with 2nd list null.
Expand Down
Loading
Loading