diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index ee59d79a8a5..b0d0020a339 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -278,8 +278,10 @@ impl VTable for FSST { }; // Decompress the whole block of data into a new buffer, and create some views - // from it instead. - let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?; + // from it instead. The new buffer lands after any pending in-progress + // buffer that push_buffer_and_adjusted_views will flush first. + let next_buffer_index = builder.completed_block_count() + u32::from(builder.in_progress()); + let (buffers, views) = fsst_decode_views(array, next_buffer_index, ctx)?; builder.push_buffer_and_adjusted_views(&buffers, &views, array.array().validity_mask()?); Ok(()) diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index b37a56f4613..14d06de20f4 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -192,4 +192,27 @@ mod tests { }; Ok(()) } + + #[test] + fn test_append_after_in_progress_buffer() -> VortexResult<()> { + let dtype = DType::Binary(Nullability::NonNullable); + let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2); + builder.append_value(b"long enough!!!"); + + let varbin = VarBinArray::from_iter( + [Some(b"long enough too".to_vec().into_boxed_slice())], + dtype, + ); + let fsst_array = fsst_compress( + &varbin, + varbin.len(), + varbin.dtype(), + &fsst_train_compressor(&varbin), + ) + .into_array(); + fsst_array.append_to_builder(&mut builder, &mut SESSION.create_execution_ctx())?; + + let _result = builder.finish_into_varbinview(); + Ok(()) + } } diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index 11a6116e6f9..90690e706a0 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -7868,6 +7868,8 @@ pub fn vortex_array::builders::VarBinViewBuilder::completed_block_count(&self) - pub fn vortex_array::builders::VarBinViewBuilder::finish_into_varbinview(&mut self) -> vortex_array::arrays::VarBinViewArray +pub fn vortex_array::builders::VarBinViewBuilder::in_progress(&self) -> bool + pub fn vortex_array::builders::VarBinViewBuilder::new(dtype: vortex_array::dtype::DType, capacity: usize, completed: vortex_array::builders::CompletedBuffers, growth_strategy: vortex_array::builders::BufferGrowthStrategy, compaction_threshold: f64) -> Self pub fn vortex_array::builders::VarBinViewBuilder::push_buffer_and_adjusted_views(&mut self, buffers: &[vortex_buffer::ByteBuffer], views: &vortex_buffer::buffer::Buffer, validity_mask: vortex_mask::Mask) diff --git a/vortex-array/src/builders/varbinview.rs b/vortex-array/src/builders/varbinview.rs index 572be55d1b3..294ac5cd551 100644 --- a/vortex-array/src/builders/varbinview.rs +++ b/vortex-array/src/builders/varbinview.rs @@ -165,6 +165,12 @@ impl VarBinViewBuilder { self.completed.len() } + /// Returns true if a non-empty in-progress buffer is staged (and would + /// become a completed buffer on the next flush), false otherwise. + pub fn in_progress(&self) -> bool { + !self.in_progress.is_empty() + } + /// Pushes buffers and pre-adjusted views into the builder. /// /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the