Skip to content

Commit ed6287f

Browse files
feat: support large_string/large_binary in lance format v2.1 (#3967)
This PR addresses #3350. Now largeutf8 and largeBinary(64bits) are support in lance format v2.1 --------- Co-authored-by: Weston Pace <weston.pace@gmail.com>
1 parent 931d1c4 commit ed6287f

11 files changed

Lines changed: 280 additions & 184 deletions

File tree

Cargo.lock

Lines changed: 9 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deny.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ allow = [
105105
"ISC",
106106
"BSD-2-Clause",
107107
"BSD-3-Clause",
108+
"BSL-1.0",
108109
"0BSD",
109110
"OpenSSL",
110111
"Zlib",

python/Cargo.lock

Lines changed: 14 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/lance-encoding/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ rand.workspace = true
3737
snafu.workspace = true
3838
tokio.workspace = true
3939
tracing.workspace = true
40+
xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
4041
zstd.workspace = true
4142
bytemuck = "1.14"
4243
arrayref = "0.3.7"

rust/lance-encoding/src/data.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ use std::{
2020
};
2121

2222
use arrow::array::{ArrayData, ArrayDataBuilder, AsArray};
23-
use arrow_array::{new_empty_array, new_null_array, Array, ArrayRef, UInt64Array};
24-
use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer};
23+
use arrow_array::{new_empty_array, new_null_array, Array, ArrayRef, OffsetSizeTrait, UInt64Array};
24+
use arrow_buffer::{
25+
ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer,
26+
};
2527
use arrow_schema::DataType;
26-
use bytemuck::try_cast_slice;
2728
use lance_arrow::DataTypeExt;
2829
use snafu::location;
2930

@@ -252,43 +253,42 @@ impl FixedWidthDataBlock {
252253
}
253254

254255
#[derive(Debug)]
255-
pub struct VariableWidthDataBlockBuilder {
256-
offsets: Vec<u32>,
256+
pub struct VariableWidthDataBlockBuilder<T: OffsetSizeTrait> {
257+
offsets: Vec<T>,
257258
bytes: Vec<u8>,
258259
}
259260

260-
impl VariableWidthDataBlockBuilder {
261+
impl<T: OffsetSizeTrait> VariableWidthDataBlockBuilder<T> {
261262
fn new(estimated_size_bytes: u64) -> Self {
262263
Self {
263-
offsets: vec![0u32],
264+
offsets: vec![T::from_usize(0).unwrap()],
264265
bytes: Vec::with_capacity(estimated_size_bytes as usize),
265266
}
266267
}
267268
}
268269

269-
impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder {
270+
impl<T: OffsetSizeTrait> DataBlockBuilderImpl for VariableWidthDataBlockBuilder<T> {
270271
fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
271272
let block = data_block.as_variable_width_ref().unwrap();
272-
assert!(block.bits_per_offset == 32);
273+
assert!(block.bits_per_offset == T::get_byte_width() as u8 * 8);
273274

274-
let offsets: &[u32] = try_cast_slice(&block.offsets)
275-
.expect("cast from a bits_per_offset=32 `VariableWidthDataBlock's offsets field field to &[32] should be fine.");
275+
let offsets: ScalarBuffer<T> = block.offsets.try_clone().unwrap().borrow_to_typed_slice();
276276

277277
let start_offset = offsets[selection.start as usize];
278278
let end_offset = offsets[selection.end as usize];
279279
let mut previous_len = self.bytes.len();
280280

281281
self.bytes
282-
.extend_from_slice(&block.data[start_offset as usize..end_offset as usize]);
282+
.extend_from_slice(&block.data[start_offset.as_usize()..end_offset.as_usize()]);
283283

284284
self.offsets.extend(
285285
offsets[selection.start as usize..selection.end as usize]
286286
.iter()
287287
.zip(&offsets[selection.start as usize + 1..=selection.end as usize])
288288
.map(|(&current, &next)| {
289289
let this_value_len = next - current;
290-
previous_len += this_value_len as usize;
291-
previous_len as u32
290+
previous_len += this_value_len.as_usize();
291+
T::from_usize(previous_len).unwrap()
292292
}),
293293
);
294294
}
@@ -298,7 +298,7 @@ impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder {
298298
DataBlock::VariableWidth(VariableWidthBlock {
299299
data: LanceBuffer::Owned(self.bytes),
300300
offsets: LanceBuffer::reinterpret_vec(self.offsets),
301-
bits_per_offset: 32,
301+
bits_per_offset: T::get_byte_width() as u8 * 8,
302302
num_values,
303303
block_info: BlockInfo::new(),
304304
})
@@ -1085,7 +1085,13 @@ impl DataBlock {
10851085
}
10861086
Self::VariableWidth(inner) => {
10871087
if inner.bits_per_offset == 32 {
1088-
Box::new(VariableWidthDataBlockBuilder::new(estimated_size_bytes))
1088+
Box::new(VariableWidthDataBlockBuilder::<i32>::new(
1089+
estimated_size_bytes,
1090+
))
1091+
} else if inner.bits_per_offset == 64 {
1092+
Box::new(VariableWidthDataBlockBuilder::<i64>::new(
1093+
estimated_size_bytes,
1094+
))
10891095
} else {
10901096
todo!()
10911097
}

rust/lance-encoding/src/decoder.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -517,9 +517,9 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
517517
pb::array_encoding::ArrayEncoding::InlineBitpacking(description) => {
518518
Ok(Box::new(InlineBitpacking::from_description(description)))
519519
}
520-
pb::array_encoding::ArrayEncoding::Variable(_) => {
521-
Ok(Box::new(BinaryMiniBlockDecompressor::default()))
522-
}
520+
pb::array_encoding::ArrayEncoding::Variable(description) => Ok(Box::new(
521+
BinaryMiniBlockDecompressor::from_variable(description),
522+
)),
523523
pb::array_encoding::ArrayEncoding::Fsst(description) => {
524524
Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
525525
}
@@ -786,6 +786,7 @@ impl CoreFieldDecoderStrategy {
786786
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
787787
column_info.as_ref(),
788788
self.decompressor_strategy.as_ref(),
789+
32, // Use 32-bit offsets for standard primitive fields
789790
)?);
790791

791792
// advance to the next top level column
@@ -800,6 +801,7 @@ impl CoreFieldDecoderStrategy {
800801
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
801802
column_info.as_ref(),
802803
self.decompressor_strategy.as_ref(),
804+
32, // Use 32-bit offsets for packed struct fields
803805
)?);
804806

805807
// advance to the next top level column
@@ -825,6 +827,17 @@ impl CoreFieldDecoderStrategy {
825827
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
826828
column_info.as_ref(),
827829
self.decompressor_strategy.as_ref(),
830+
32, // Use 32-bit offsets for Binary/Utf8
831+
)?);
832+
column_infos.next_top_level();
833+
Ok(scheduler)
834+
}
835+
DataType::LargeBinary | DataType::LargeUtf8 => {
836+
let column_info = column_infos.expect_next()?;
837+
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
838+
column_info.as_ref(),
839+
self.decompressor_strategy.as_ref(),
840+
64, // Use 64-bit offsets for LargeBinary/LargeUtf8
828841
)?);
829842
column_infos.next_top_level();
830843
Ok(scheduler)

rust/lance-encoding/src/encoder.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,8 +851,11 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
851851
} else {
852852
Ok(Box::new(BinaryMiniBlockEncoder::default()))
853853
}
854+
} else if variable_width_data.bits_per_offset == 64 {
855+
// TODO: Support FSSTMiniBlockEncoder
856+
Ok(Box::new(BinaryMiniBlockEncoder::default()))
854857
} else {
855-
todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
858+
todo!("Implement MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width_data.bits_per_offset)
856859
}
857860
}
858861
DataBlock::Struct(struct_data_block) => {

rust/lance-encoding/src/encodings/logical/primitive.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1328,6 +1328,7 @@ impl MiniBlockScheduler {
13281328
.collect::<Vec<_>>();
13291329
let value_decompressor = decompressors
13301330
.create_miniblock_decompressor(layout.value_compression.as_ref().unwrap())?;
1331+
13311332
let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
13321333
let num_dictionary_items = layout.num_dictionary_items;
13331334
match dictionary_encoding.array_encoding.as_ref().unwrap() {
@@ -2780,6 +2781,7 @@ impl StructuralPrimitiveFieldScheduler {
27802781
pub fn try_new(
27812782
column_info: &ColumnInfo,
27822783
decompressors: &dyn DecompressorStrategy,
2784+
bits_per_offset: u8,
27832785
) -> Result<Self> {
27842786
let page_schedulers = column_info
27852787
.page_infos
@@ -2791,6 +2793,7 @@ impl StructuralPrimitiveFieldScheduler {
27912793
page_index,
27922794
column_info.index as usize,
27932795
decompressors,
2796+
bits_per_offset,
27942797
)
27952798
})
27962799
.collect::<Result<Vec<_>>>()?;
@@ -2805,6 +2808,7 @@ impl StructuralPrimitiveFieldScheduler {
28052808
page_index: usize,
28062809
_column_index: usize,
28072810
decompressors: &dyn DecompressorStrategy,
2811+
bits_per_offset: u8,
28082812
) -> Result<PageInfoAndScheduler> {
28092813
let scheduler: Box<dyn StructuralPageScheduler> =
28102814
match page_info.encoding.as_structural().layout.as_ref() {
@@ -2824,7 +2828,7 @@ impl StructuralPrimitiveFieldScheduler {
28242828
page_info.num_rows,
28252829
full_zip,
28262830
decompressors,
2827-
/*bits_per_offset=*/ 32,
2831+
bits_per_offset,
28282832
)?)
28292833
}
28302834
Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {

0 commit comments

Comments
 (0)