Skip to content

Commit 61913f6

Browse files
committed
fix: Zstd canonicalize with >2GB decompressed buffers
Fixes a panic in `reconstruct_views` when a ZstdArray's decompressed string data exceeds ~2 GiB. The function builds BinaryView structs that use u32 offsets into the decompressed buffer; when the buffer is larger than i32::MAX the offset cast panics. This is the same class of bug fixed for VarBin/FSST in #5961, but Zstd was not addressed because its decompressed buffer interleaves u32 length prefixes with string data (unlike VarBin/FSST which have a separate lengths array). The fix splits the decompressed buffer at value boundaries (zero-copy via `ByteBuffer::slice`) when approaching i32::MAX, using BinaryView's native `buffer_index` field to reference the correct segment. The i32::MAX limit matches the convention established in #5961 per the Arrow spec that BinaryView offsets are logically signed. Signed-off-by: Sumedh Arani <sumedh@langchain.dev>
1 parent 91e4e3f commit 61913f6

2 files changed

Lines changed: 96 additions & 24 deletions

File tree

encodings/zstd/src/array.rs

Lines changed: 92 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use vortex_array::arrays::ConstantArray;
2525
use vortex_array::arrays::PrimitiveArray;
2626
use vortex_array::arrays::VarBinViewArray;
2727
use vortex_array::arrays::varbinview::build_views::BinaryView;
28+
use vortex_array::arrays::varbinview::build_views::MAX_BUFFER_LEN;
2829
use vortex_array::buffer::BufferHandle;
2930
use vortex_array::dtype::DType;
3031
use vortex_array::scalar::Scalar;
@@ -380,9 +381,20 @@ fn collect_valid_vbv(vbv: &VarBinViewArray) -> VortexResult<(ByteBuffer, Vec<usi
380381
/// Reconstruct BinaryView structs from length-prefixed byte data.
381382
///
382383
/// The buffer contains interleaved u32 lengths (little-endian) and string data.
383-
pub fn reconstruct_views(buffer: &ByteBuffer) -> Buffer<BinaryView> {
384-
let mut res = BufferMut::<BinaryView>::empty();
384+
/// When the cumulative data exceeds `max_buffer_len`, the buffer is split (zero-copy) into
385+
/// multiple segments so that BinaryView's u32 offsets can address all data.
386+
///
387+
/// Pass [`MAX_BUFFER_LEN`] for `max_buffer_len` in production; a smaller value can be used in
388+
/// tests to exercise the splitting path without allocating >2 GiB.
389+
pub fn reconstruct_views(
390+
buffer: &ByteBuffer,
391+
max_buffer_len: usize,
392+
) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
393+
let mut views = BufferMut::<BinaryView>::empty();
394+
let mut buffers = Vec::new();
395+
let mut segment_start: usize = 0;
385396
let mut offset = 0;
397+
386398
while offset < buffer.len() {
387399
let str_len = ViewLen::from_le_bytes(
388400
buffer
@@ -392,16 +404,28 @@ pub fn reconstruct_views(buffer: &ByteBuffer) -> Buffer<BinaryView> {
392404
.ok()
393405
.vortex_expect("must fit ViewLen size"),
394406
) as usize;
395-
offset += size_of::<ViewLen>();
396-
let value = &buffer[offset..offset + str_len];
397-
res.push(BinaryView::make_view(
398-
value,
399-
0,
400-
u32::try_from(offset).vortex_expect("offset must fit in u32"),
401-
));
402-
offset += str_len;
403-
}
404-
res.freeze()
407+
408+
let value_data_offset = offset + size_of::<ViewLen>();
409+
let local_offset = value_data_offset - segment_start;
410+
411+
if local_offset + str_len > max_buffer_len && offset > segment_start {
412+
buffers.push(buffer.slice(segment_start..offset));
413+
segment_start = offset;
414+
}
415+
416+
let local_offset = u32::try_from(value_data_offset - segment_start)
417+
.vortex_expect("local offset within segment must fit in u32");
418+
let buf_index = u32::try_from(buffers.len()).vortex_expect("buffer index must fit in u32");
419+
let value = &buffer[value_data_offset..value_data_offset + str_len];
420+
views.push(BinaryView::make_view(value, buf_index, local_offset));
421+
offset = value_data_offset + str_len;
422+
}
423+
424+
if segment_start < buffer.len() {
425+
buffers.push(buffer.slice(segment_start..buffer.len()));
426+
}
427+
428+
(buffers, views.freeze())
405429
}
406430

407431
impl ZstdArray {
@@ -821,10 +845,8 @@ impl ZstdArray {
821845
DType::Binary(_) | DType::Utf8(_) => {
822846
match slice_validity.execute_mask(slice_n_rows, ctx)?.indices() {
823847
AllOr::All => {
824-
// the decompressed buffer is a bunch of interleaved u32 lengths
825-
// and strings of those lengths, we need to reconstruct the
826-
// views into those strings by passing through the buffer.
827-
let valid_views = reconstruct_views(&decompressed).slice(
848+
let (buffers, all_views) = reconstruct_views(&decompressed, MAX_BUFFER_LEN);
849+
let valid_views = all_views.slice(
828850
slice_value_idx_start - n_skipped_values
829851
..slice_value_idx_stop - n_skipped_values,
830852
);
@@ -833,7 +855,7 @@ impl ZstdArray {
833855
Ok(unsafe {
834856
VarBinViewArray::new_unchecked(
835857
valid_views,
836-
Arc::from([decompressed]),
858+
Arc::from(buffers),
837859
self.dtype.clone(),
838860
slice_validity,
839861
)
@@ -846,10 +868,8 @@ impl ZstdArray {
846868
)
847869
.into_array()),
848870
AllOr::Some(valid_indices) => {
849-
// the decompressed buffer is a bunch of interleaved u32 lengths
850-
// and strings of those lengths, we need to reconstruct the
851-
// views into those strings by passing through the buffer.
852-
let valid_views = reconstruct_views(&decompressed).slice(
871+
let (buffers, all_views) = reconstruct_views(&decompressed, MAX_BUFFER_LEN);
872+
let valid_views = all_views.slice(
853873
slice_value_idx_start - n_skipped_values
854874
..slice_value_idx_stop - n_skipped_values,
855875
);
@@ -863,7 +883,7 @@ impl ZstdArray {
863883
Ok(unsafe {
864884
VarBinViewArray::new_unchecked(
865885
views.freeze(),
866-
Arc::from([decompressed]),
886+
Arc::from(buffers),
867887
self.dtype.clone(),
868888
slice_validity,
869889
)
@@ -946,3 +966,53 @@ impl OperationsVTable<Zstd> for Zstd {
946966
.scalar_at(0)
947967
}
948968
}
969+
970+
#[cfg(test)]
971+
#[allow(clippy::cast_possible_truncation)]
972+
mod tests {
973+
use vortex_buffer::ByteBuffer;
974+
975+
use super::reconstruct_views;
976+
use crate::array::BinaryView;
977+
978+
/// Build a Zstd-style interleaved buffer: [u32-LE length][string bytes] repeated.
979+
fn make_interleaved(strings: &[&[u8]]) -> ByteBuffer {
980+
let mut buf = Vec::new();
981+
for s in strings {
982+
let len = s.len() as u32;
983+
buf.extend_from_slice(&len.to_le_bytes());
984+
buf.extend_from_slice(s);
985+
}
986+
ByteBuffer::copy_from(buf.as_slice())
987+
}
988+
989+
#[test]
990+
fn test_reconstruct_views_no_split() {
991+
let strings: &[&[u8]] = &[b"hello", b"world"];
992+
let buf = make_interleaved(strings);
993+
let (buffers, views) = reconstruct_views(&buf, 1024);
994+
995+
assert_eq!(buffers.len(), 1);
996+
assert_eq!(views.len(), 2);
997+
// Each entry: [u32 len (4 bytes)][data], so offsets are 4 and 4+5+4=13
998+
assert_eq!(views[0], BinaryView::make_view(b"hello", 0, 4));
999+
assert_eq!(views[1], BinaryView::make_view(b"world", 0, 13));
1000+
}
1001+
1002+
#[test]
1003+
fn test_reconstruct_views_split_across_segments() {
1004+
// "aaaaaaaaaaaaa" (13 bytes) and "bbbbbbbbbbbbb" (13 bytes).
1005+
// Each entry occupies 4 (length prefix) + 13 (data) = 17 bytes.
1006+
// With max_buffer_len=20, the second entry's data (offset 4+13+4=21) exceeds the limit,
1007+
// so it rolls into a second segment.
1008+
let strings: &[&[u8]] = &[b"aaaaaaaaaaaaa", b"bbbbbbbbbbbbb"];
1009+
let buf = make_interleaved(strings);
1010+
let (buffers, views) = reconstruct_views(&buf, 20);
1011+
1012+
assert_eq!(buffers.len(), 2);
1013+
assert_eq!(views.len(), 2);
1014+
assert_eq!(views[0], BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 4));
1015+
// Second entry starts a new segment at byte 17 (the length prefix), so local offset = 4.
1016+
assert_eq!(views[1], BinaryView::make_view(b"bbbbbbbbbbbbb", 1, 4));
1017+
}
1018+
}

vortex-cuda/src/kernel/encodings/zstd.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use vortex::array::ArrayRef;
1717
use vortex::array::Canonical;
1818
use vortex::array::arrays::VarBinViewArray;
1919
use vortex::array::arrays::varbinview::BinaryView;
20+
use vortex::array::arrays::varbinview::build_views::MAX_BUFFER_LEN;
2021
use vortex::array::buffer::BufferHandle;
2122
use vortex::array::buffer::DeviceBuffer;
2223
use vortex::buffer::Alignment;
@@ -325,13 +326,14 @@ async fn decode_zstd(array: ZstdArray, ctx: &mut CudaExecutionCtx) -> VortexResu
325326
.indices()
326327
{
327328
AllOr::All => {
328-
let all_views = vortex::encodings::zstd::reconstruct_views(&host_buffer);
329+
let (buffers, all_views) =
330+
vortex::encodings::zstd::reconstruct_views(&host_buffer, MAX_BUFFER_LEN);
329331
let sliced_views = all_views.slice(slice_value_idx_start..slice_value_idx_stop);
330332

331333
Ok(Canonical::VarBinView(unsafe {
332334
VarBinViewArray::new_unchecked(
333335
sliced_views,
334-
Arc::from([host_buffer]),
336+
Arc::from(buffers),
335337
dtype,
336338
sliced_validity,
337339
)

0 commit comments

Comments
 (0)