Skip to content

Commit b8f8bd5

Browse files
authored
fix[encodings]: take in progress buffer into account on fsst builder append (#7318)
Signed-off-by: Alfonso Subiotto Marques <alfonso.subiotto@polarsignals.com>
1 parent 29a5f43 commit b8f8bd5

File tree

4 files changed

+35
-2
lines changed

4 files changed

+35
-2
lines changed

encodings/fsst/src/array.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,10 @@ impl VTable for FSST {
278278
};
279279

280280
// Decompress the whole block of data into a new buffer, and create some views
281-
// from it instead.
282-
let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;
281+
// from it instead. The new buffer lands after any pending in-progress
282+
// buffer that push_buffer_and_adjusted_views will flush first.
283+
let next_buffer_index = builder.completed_block_count() + u32::from(builder.in_progress());
284+
let (buffers, views) = fsst_decode_views(array, next_buffer_index, ctx)?;
283285

284286
builder.push_buffer_and_adjusted_views(&buffers, &views, array.array().validity_mask()?);
285287
Ok(())

encodings/fsst/src/canonical.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,27 @@ mod tests {
192192
};
193193
Ok(())
194194
}
195+
196+
#[test]
197+
fn test_append_after_in_progress_buffer() -> VortexResult<()> {
198+
let dtype = DType::Binary(Nullability::NonNullable);
199+
let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
200+
builder.append_value(b"long enough!!!");
201+
202+
let varbin = VarBinArray::from_iter(
203+
[Some(b"long enough too".to_vec().into_boxed_slice())],
204+
dtype,
205+
);
206+
let fsst_array = fsst_compress(
207+
&varbin,
208+
varbin.len(),
209+
varbin.dtype(),
210+
&fsst_train_compressor(&varbin),
211+
)
212+
.into_array();
213+
fsst_array.append_to_builder(&mut builder, &mut SESSION.create_execution_ctx())?;
214+
215+
let _result = builder.finish_into_varbinview();
216+
Ok(())
217+
}
195218
}

vortex-array/public-api.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7868,6 +7868,8 @@ pub fn vortex_array::builders::VarBinViewBuilder::completed_block_count(&self) -
78687868

78697869
pub fn vortex_array::builders::VarBinViewBuilder::finish_into_varbinview(&mut self) -> vortex_array::arrays::VarBinViewArray
78707870

7871+
pub fn vortex_array::builders::VarBinViewBuilder::in_progress(&self) -> bool
7872+
78717873
pub fn vortex_array::builders::VarBinViewBuilder::new(dtype: vortex_array::dtype::DType, capacity: usize, completed: vortex_array::builders::CompletedBuffers, growth_strategy: vortex_array::builders::BufferGrowthStrategy, compaction_threshold: f64) -> Self
78727874

78737875
pub fn vortex_array::builders::VarBinViewBuilder::push_buffer_and_adjusted_views(&mut self, buffers: &[vortex_buffer::ByteBuffer], views: &vortex_buffer::buffer::Buffer<vortex_array::arrays::varbinview::BinaryView>, validity_mask: vortex_mask::Mask)

vortex-array/src/builders/varbinview.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,12 @@ impl VarBinViewBuilder {
165165
self.completed.len()
166166
}
167167

168+
/// Returns true if a non-empty in-progress buffer is staged (and would
169+
/// become a completed buffer on the next flush), false otherwise.
170+
pub fn in_progress(&self) -> bool {
171+
!self.in_progress.is_empty()
172+
}
173+
168174
/// Pushes buffers and pre-adjusted views into the builder.
169175
///
170176
/// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the

0 commit comments

Comments
 (0)