Skip to content

Commit 2ee2033

Browse files
authored
Widen VarBinBuilder offets during FSST compress (#7853)
## Summary Fixes: #7833 I don't think that a more complicated solution here makes a lot of sense. We narrow these offsets is the compressor anyways, so there is no reason to do something more complicated than this. Additionally adds the logic for comparing with arrow large binary arrays that was missing before. To be honest, I feel like it doesn't really make any sense to allow the user to specify what the offsets and sizes types should be at all, I think we can have the builder _always_ use the 64-bit types because we know that they will be compressed by the compressor anyways. ## Testing Adds some regression tests. I've ignored the big test because it allocates too much memory. You can run the ignored test with: ``` cargo t --release -p vortex-fsst varbin_compress_offsets_overflow_i32 -- --ignored --no-capture ``` Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 92962a4 commit 2ee2033

5 files changed

Lines changed: 143 additions & 6 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/fsst/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ _test-harness = ["dep:rand", "vortex-array/_test-harness"]
3333
divan = { workspace = true }
3434
rand = { workspace = true }
3535
rstest = { workspace = true }
36+
test-with = { workspace = true }
3637
vortex-array = { workspace = true, features = ["_test-harness"] }
3738

3839
[[bench]]

encodings/fsst/src/compress.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,12 @@ where
6969
I: Iterator<Item = Option<&'a [u8]>>,
7070
{
7171
let mut buffer = Vec::with_capacity(DEFAULT_BUFFER_LEN);
72-
let mut builder = VarBinBuilder::<i32>::with_capacity(len);
72+
73+
// Offsets are widened to i64 because the cumulative compressed bytes can exceed i32::MAX for
74+
// large inputs (see issue #7833). Per-string sizes still fit in i32.
75+
let mut builder = VarBinBuilder::<i64>::with_capacity(len);
7376
let mut uncompressed_lengths: BufferMut<i32> = BufferMut::with_capacity(len);
77+
7478
for string in iter {
7579
match string {
7680
None => {

encodings/fsst/src/tests.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use rand::SeedableRng;
5+
use rand::rngs::StdRng;
6+
use rand::seq::IndexedRandom;
47
use vortex_array::ArrayRef;
58
use vortex_array::IntoArray;
69
use vortex_array::LEGACY_SESSION;
@@ -107,3 +110,61 @@ fn test_fsst_array_ops() {
107110

108111
assert_arrays_eq!(fsst_array, canonical_array);
109112
}
113+
114+
// TODO(someone): ideally CI would run this in release mode as well since debug builds make the
115+
// allocation and compression loop substantially slower.
116+
/// Regression for #7833: [`fsst_compress`] must accept inputs whose cumulative compressed
117+
/// bytes exceed [`i32::MAX`]. Before the fix, [`fsst_compress_iter`] hardcoded
118+
/// [`VarBinBuilder<i32>`] for the FSST output and panicked in
119+
/// [`VarBinBuilder::append_value`] once cumulative compressed bytes crossed the boundary.
120+
///
121+
/// The input is built with [`VarBinBuilder<i64>`] so the input itself does not panic, which
122+
/// confirms the overflow is on the FSST output side. After the fix the test must succeed
123+
/// with the row count preserved.
124+
///
125+
/// Allocates ~2.5 GiB for the input and ~2.5 GiB for the FSST output (~5 GiB total), so it
126+
/// is gated to CI runs and skipped when `VORTEX_SKIP_SLOW_TESTS` is set. To run it locally:
127+
///
128+
/// ```text
129+
/// CI=1 cargo test --release -p vortex-fsst fsst_compress_offsets
130+
/// ```
131+
///
132+
/// [`fsst_compress_iter`]: crate::compress::fsst_compress_iter
133+
#[test_with::env(CI)]
134+
#[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
135+
fn fsst_compress_offsets_overflow_i32() {
136+
// High-entropy ASCII strings sliced from a random pool. FSST is a symbol-table
137+
// compressor; pseudo-random data with no recurring byte sequences resists compression,
138+
// so the compressed output stays close to input size and crosses the i32 boundary.
139+
const STRING_LEN: usize = 64 * 1024;
140+
const TOTAL_BYTES: usize = (1usize << 31) + (512 << 20); // ~2.5 GiB
141+
const N: usize = TOTAL_BYTES / STRING_LEN;
142+
const POOL_LEN: usize = 64 * 1024 * 1024;
143+
144+
// Printable ASCII alphabet so the result is valid UTF-8.
145+
const ALPHABET: &[u8; 95] =
146+
b" !\"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~";
147+
148+
let mut rng = StdRng::seed_from_u64(0xC0DE_C011_B711);
149+
let pool: Vec<u8> = (0..POOL_LEN)
150+
.map(|_| *ALPHABET.choose(&mut rng).unwrap())
151+
.collect();
152+
153+
println!("building large VarBinArray");
154+
let mut builder = VarBinBuilder::<i64>::with_capacity(N);
155+
for i in 0..N {
156+
let off = i.wrapping_mul(31337) % (POOL_LEN - STRING_LEN);
157+
builder.append_value(&pool[off..off + STRING_LEN]);
158+
}
159+
let array = builder.finish(DType::Utf8(Nullability::NonNullable));
160+
161+
println!("training FSST compressor");
162+
let compressor = fsst_train_compressor(&array);
163+
let len = array.len();
164+
let dtype = array.dtype().clone();
165+
let mut ctx = LEGACY_SESSION.create_execution_ctx();
166+
167+
println!("compressing to FSST");
168+
let compressed = fsst_compress(array, len, &dtype, &compressor, &mut ctx);
169+
assert_eq!(compressed.len(), len);
170+
}

vortex-array/src/arrays/varbin/compute/compare.rs

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use arrow_array::BinaryArray;
5+
use arrow_array::LargeBinaryArray;
6+
use arrow_array::LargeStringArray;
57
use arrow_array::StringArray;
68
use arrow_ord::cmp;
9+
use arrow_schema::DataType;
710
use vortex_buffer::BitBuffer;
811
use vortex_error::VortexExpect as _;
912
use vortex_error::VortexResult;
@@ -82,15 +85,26 @@ impl CompareKernel for VarBin {
8285

8386
let lhs = Datum::try_new(lhs.array(), ctx)?;
8487

85-
// Use StringViewArray/BinaryViewArray to match the Utf8View/BinaryView types
86-
// produced by Datum::try_new (which uses execute_arrow(None, ctx))
87-
let arrow_rhs: &dyn arrow_array::Datum = match rhs_const.dtype() {
88-
DType::Utf8(_) => &rhs_const
88+
// The RHS scalar must match the LHS Arrow data type. VarBin with i64 offsets is
89+
// converted to LargeBinary/LargeUtf8 (see `preferred_arrow_type`), and Arrow refuses to
90+
// compare LargeBinary with Binary (or LargeUtf8 with Utf8).
91+
let arrow_rhs: &dyn arrow_array::Datum = match (rhs_const.dtype(), lhs.data_type()) {
92+
(DType::Utf8(_), DataType::LargeUtf8) => &rhs_const
93+
.as_utf8()
94+
.value()
95+
.map(LargeStringArray::new_scalar)
96+
.unwrap_or_else(|| arrow_array::Scalar::new(LargeStringArray::new_null(1))),
97+
(DType::Utf8(_), _) => &rhs_const
8998
.as_utf8()
9099
.value()
91100
.map(StringArray::new_scalar)
92101
.unwrap_or_else(|| arrow_array::Scalar::new(StringArray::new_null(1))),
93-
DType::Binary(_) => &rhs_const
102+
(DType::Binary(_), DataType::LargeBinary) => &rhs_const
103+
.as_binary()
104+
.value()
105+
.map(LargeBinaryArray::new_scalar)
106+
.unwrap_or_else(|| arrow_array::Scalar::new(LargeBinaryArray::new_null(1))),
107+
(DType::Binary(_), _) => &rhs_const
94108
.as_binary()
95109
.value()
96110
.map(BinaryArray::new_scalar)
@@ -237,9 +251,14 @@ mod test {
237251

238252
#[cfg(test)]
239253
mod tests {
254+
use vortex_buffer::ByteBuffer;
255+
240256
use crate::IntoArray;
257+
use crate::arrays::BoolArray;
241258
use crate::arrays::ConstantArray;
242259
use crate::arrays::VarBinArray;
260+
use crate::arrays::varbin::builder::VarBinBuilder;
261+
use crate::assert_arrays_eq;
243262
use crate::builtins::ArrayBuiltins;
244263
use crate::dtype::DType;
245264
use crate::dtype::Nullability;
@@ -260,4 +279,55 @@ mod tests {
260279
&DType::Bool(Nullability::Nullable)
261280
);
262281
}
282+
283+
/// Regression: a [`VarBinArray`] built with `i64` offsets is canonicalised to
284+
/// Arrow `LargeUtf8` / `LargeBinary` by `preferred_arrow_type`. Without an explicit
285+
/// branch in [`CompareKernel`], the constant RHS is wrapped in a `StringArray` /
286+
/// `BinaryArray` and Arrow rejects the `LargeUtf8 == Utf8` mismatch. Triggering
287+
/// this only requires `i64` offsets, not large data.
288+
///
289+
/// [`CompareKernel`]: super::CompareKernel
290+
#[test]
291+
fn varbin_i64_offsets_compare_constant() {
292+
let mut builder = VarBinBuilder::<i64>::with_capacity(3);
293+
builder.append_value(b"abc");
294+
builder.append_value(b"xyz");
295+
builder.append_value(b"abc");
296+
let array = builder.finish(DType::Utf8(Nullability::NonNullable));
297+
298+
let result = array
299+
.into_array()
300+
.binary(
301+
ConstantArray::new(Scalar::utf8("abc", Nullability::NonNullable), 3).into_array(),
302+
Operator::Eq,
303+
)
304+
.unwrap();
305+
306+
let expected = BoolArray::from_iter([true, false, true]);
307+
assert_arrays_eq!(result, expected);
308+
}
309+
310+
#[test]
311+
fn varbin_i64_offsets_compare_constant_binary() {
312+
let mut builder = VarBinBuilder::<i64>::with_capacity(3);
313+
builder.append_value(b"abc");
314+
builder.append_value(b"xyz");
315+
builder.append_value(b"abc");
316+
let array = builder.finish(DType::Binary(Nullability::NonNullable));
317+
318+
let result = array
319+
.into_array()
320+
.binary(
321+
ConstantArray::new(
322+
Scalar::binary(ByteBuffer::copy_from(b"abc"), Nullability::NonNullable),
323+
3,
324+
)
325+
.into_array(),
326+
Operator::Eq,
327+
)
328+
.unwrap();
329+
330+
let expected = BoolArray::from_iter([true, false, true]);
331+
assert_arrays_eq!(result, expected);
332+
}
263333
}

0 commit comments

Comments
 (0)