@@ -25,6 +25,7 @@ use vortex_array::arrays::ConstantArray;
2525use vortex_array:: arrays:: PrimitiveArray ;
2626use vortex_array:: arrays:: VarBinViewArray ;
2727use vortex_array:: arrays:: varbinview:: build_views:: BinaryView ;
28+ use vortex_array:: arrays:: varbinview:: build_views:: MAX_BUFFER_LEN ;
2829use vortex_array:: buffer:: BufferHandle ;
2930use vortex_array:: dtype:: DType ;
3031use vortex_array:: scalar:: Scalar ;
@@ -380,9 +381,23 @@ fn collect_valid_vbv(vbv: &VarBinViewArray) -> VortexResult<(ByteBuffer, Vec<usi
380381/// Reconstruct BinaryView structs from length-prefixed byte data.
381382///
382383/// The buffer contains interleaved u32 lengths (little-endian) and string data.
383- pub fn reconstruct_views ( buffer : & ByteBuffer ) -> Buffer < BinaryView > {
384- let mut res = BufferMut :: < BinaryView > :: empty ( ) ;
384+ /// When the cumulative data exceeds `max_buffer_len`, the buffer is split (zero-copy) into
385+ /// multiple segments so that BinaryView's u32 offsets can address all data. This mirrors the
386+ /// approach in [`vortex_array::arrays::varbinview::build_views::build_views`] for VarBin/FSST
387+ /// (see <https://github.com/vortex-data/vortex/pull/5961>), adapted for Zstd's interleaved
388+ /// length-prefix format.
389+ ///
390+ /// Pass [`MAX_BUFFER_LEN`] for `max_buffer_len` in production; a smaller value can be used in
391+ /// tests to exercise the splitting path without allocating >2 GiB.
392+ pub fn reconstruct_views (
393+ buffer : & ByteBuffer ,
394+ max_buffer_len : usize ,
395+ ) -> ( Vec < ByteBuffer > , Buffer < BinaryView > ) {
396+ let mut views = BufferMut :: < BinaryView > :: empty ( ) ;
397+ let mut buffers = Vec :: new ( ) ;
398+ let mut segment_start: usize = 0 ;
385399 let mut offset = 0 ;
400+
386401 while offset < buffer. len ( ) {
387402 let str_len = ViewLen :: from_le_bytes (
388403 buffer
@@ -392,16 +407,28 @@ pub fn reconstruct_views(buffer: &ByteBuffer) -> Buffer<BinaryView> {
392407 . ok ( )
393408 . vortex_expect ( "must fit ViewLen size" ) ,
394409 ) as usize ;
395- offset += size_of :: < ViewLen > ( ) ;
396- let value = & buffer[ offset..offset + str_len] ;
397- res. push ( BinaryView :: make_view (
398- value,
399- 0 ,
400- u32:: try_from ( offset) . vortex_expect ( "offset must fit in u32" ) ,
401- ) ) ;
402- offset += str_len;
403- }
404- res. freeze ( )
410+
411+ let value_data_offset = offset + size_of :: < ViewLen > ( ) ;
412+ let local_offset = value_data_offset - segment_start;
413+
414+ if local_offset + str_len > max_buffer_len && offset > segment_start {
415+ buffers. push ( buffer. slice ( segment_start..offset) ) ;
416+ segment_start = offset;
417+ }
418+
419+ let local_offset = u32:: try_from ( value_data_offset - segment_start)
420+ . vortex_expect ( "local offset within segment must fit in u32" ) ;
421+ let buf_index = u32:: try_from ( buffers. len ( ) ) . vortex_expect ( "buffer index must fit in u32" ) ;
422+ let value = & buffer[ value_data_offset..value_data_offset + str_len] ;
423+ views. push ( BinaryView :: make_view ( value, buf_index, local_offset) ) ;
424+ offset = value_data_offset + str_len;
425+ }
426+
427+ if segment_start < buffer. len ( ) {
428+ buffers. push ( buffer. slice ( segment_start..buffer. len ( ) ) ) ;
429+ }
430+
431+ ( buffers, views. freeze ( ) )
405432}
406433
407434impl ZstdArray {
@@ -821,10 +848,8 @@ impl ZstdArray {
821848 DType :: Binary ( _) | DType :: Utf8 ( _) => {
822849 match slice_validity. execute_mask ( slice_n_rows, ctx) ?. indices ( ) {
823850 AllOr :: All => {
824- // the decompressed buffer is a bunch of interleaved u32 lengths
825- // and strings of those lengths, we need to reconstruct the
826- // views into those strings by passing through the buffer.
827- let valid_views = reconstruct_views ( & decompressed) . slice (
851+ let ( buffers, all_views) = reconstruct_views ( & decompressed, MAX_BUFFER_LEN ) ;
852+ let valid_views = all_views. slice (
828853 slice_value_idx_start - n_skipped_values
829854 ..slice_value_idx_stop - n_skipped_values,
830855 ) ;
@@ -833,7 +858,7 @@ impl ZstdArray {
833858 Ok ( unsafe {
834859 VarBinViewArray :: new_unchecked (
835860 valid_views,
836- Arc :: from ( [ decompressed ] ) ,
861+ Arc :: from ( buffers ) ,
837862 self . dtype . clone ( ) ,
838863 slice_validity,
839864 )
@@ -846,10 +871,8 @@ impl ZstdArray {
846871 )
847872 . into_array ( ) ) ,
848873 AllOr :: Some ( valid_indices) => {
849- // the decompressed buffer is a bunch of interleaved u32 lengths
850- // and strings of those lengths, we need to reconstruct the
851- // views into those strings by passing through the buffer.
852- let valid_views = reconstruct_views ( & decompressed) . slice (
874+ let ( buffers, all_views) = reconstruct_views ( & decompressed, MAX_BUFFER_LEN ) ;
875+ let valid_views = all_views. slice (
853876 slice_value_idx_start - n_skipped_values
854877 ..slice_value_idx_stop - n_skipped_values,
855878 ) ;
@@ -863,7 +886,7 @@ impl ZstdArray {
863886 Ok ( unsafe {
864887 VarBinViewArray :: new_unchecked (
865888 views. freeze ( ) ,
866- Arc :: from ( [ decompressed ] ) ,
889+ Arc :: from ( buffers ) ,
867890 self . dtype . clone ( ) ,
868891 slice_validity,
869892 )
@@ -946,3 +969,53 @@ impl OperationsVTable<Zstd> for Zstd {
946969 . scalar_at ( 0 )
947970 }
948971}
972+
973+ #[ cfg( test) ]
974+ #[ allow( clippy:: cast_possible_truncation) ]
975+ mod tests {
976+ use vortex_buffer:: ByteBuffer ;
977+
978+ use super :: reconstruct_views;
979+ use crate :: array:: BinaryView ;
980+
981+ /// Build a Zstd-style interleaved buffer: [u32-LE length][string bytes] repeated.
982+ fn make_interleaved ( strings : & [ & [ u8 ] ] ) -> ByteBuffer {
983+ let mut buf = Vec :: new ( ) ;
984+ for s in strings {
985+ let len = s. len ( ) as u32 ;
986+ buf. extend_from_slice ( & len. to_le_bytes ( ) ) ;
987+ buf. extend_from_slice ( s) ;
988+ }
989+ ByteBuffer :: copy_from ( buf. as_slice ( ) )
990+ }
991+
992+ #[ test]
993+ fn test_reconstruct_views_no_split ( ) {
994+ let strings: & [ & [ u8 ] ] = & [ b"hello" , b"world" ] ;
995+ let buf = make_interleaved ( strings) ;
996+ let ( buffers, views) = reconstruct_views ( & buf, 1024 ) ;
997+
998+ assert_eq ! ( buffers. len( ) , 1 ) ;
999+ assert_eq ! ( views. len( ) , 2 ) ;
1000+ // Each entry: [u32 len (4 bytes)][data], so offsets are 4 and 4+5+4=13
1001+ assert_eq ! ( views[ 0 ] , BinaryView :: make_view( b"hello" , 0 , 4 ) ) ;
1002+ assert_eq ! ( views[ 1 ] , BinaryView :: make_view( b"world" , 0 , 13 ) ) ;
1003+ }
1004+
1005+ #[ test]
1006+ fn test_reconstruct_views_split_across_segments ( ) {
1007+ // "aaaaaaaaaaaaa" (13 bytes) and "bbbbbbbbbbbbb" (13 bytes).
1008+ // Each entry occupies 4 (length prefix) + 13 (data) = 17 bytes.
1009+ // With max_buffer_len=20, the second entry's data (offset 4+13+4=21) exceeds the limit,
1010+ // so it rolls into a second segment.
1011+ let strings: & [ & [ u8 ] ] = & [ b"aaaaaaaaaaaaa" , b"bbbbbbbbbbbbb" ] ;
1012+ let buf = make_interleaved ( strings) ;
1013+ let ( buffers, views) = reconstruct_views ( & buf, 20 ) ;
1014+
1015+ assert_eq ! ( buffers. len( ) , 2 ) ;
1016+ assert_eq ! ( views. len( ) , 2 ) ;
1017+ assert_eq ! ( views[ 0 ] , BinaryView :: make_view( b"aaaaaaaaaaaaa" , 0 , 4 ) ) ;
1018+ // Second entry starts a new segment at byte 17 (the length prefix), so local offset = 4.
1019+ assert_eq ! ( views[ 1 ] , BinaryView :: make_view( b"bbbbbbbbbbbbb" , 1 , 4 ) ) ;
1020+ }
1021+ }
0 commit comments