@@ -28,9 +28,11 @@ use vortex_array::arrays::VarBinView;
2828use vortex_array:: dtype:: FieldPath ;
2929use vortex_array:: session:: ArrayRegistry ;
3030use vortex_array:: session:: ArraySession ;
31+ use vortex_btrblocks:: BtrBlocksCompressorBuilder ;
3132use vortex_bytebool:: ByteBool ;
3233use vortex_datetime_parts:: DateTimeParts ;
3334use vortex_decimal_byte_parts:: DecimalByteParts ;
35+ use vortex_error:: vortex_panic;
3436use vortex_fastlanes:: BitPacked ;
3537use vortex_fastlanes:: Delta ;
3638use vortex_fastlanes:: FoR ;
@@ -53,13 +55,14 @@ use vortex_pco::Pco;
5355use vortex_runend:: RunEnd ;
5456use vortex_sequence:: Sequence ;
5557use vortex_sparse:: Sparse ;
58+ #[ cfg( feature = "unstable_encodings" ) ]
59+ use vortex_tensor:: encodings:: turboquant:: TurboQuant ;
5660use vortex_utils:: aliases:: hash_map:: HashMap ;
5761use vortex_zigzag:: ZigZag ;
5862
5963#[ rustfmt:: skip]
6064#[ cfg( feature = "zstd" ) ]
6165use vortex_btrblocks:: {
62- BtrBlocksCompressorBuilder ,
6366 SchemeExt ,
6467 schemes:: float,
6568 schemes:: integer,
@@ -111,6 +114,8 @@ pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
111114 session. register ( RunEnd ) ;
112115 session. register ( Sequence ) ;
113116 session. register ( Sparse ) ;
117+ #[ cfg( feature = "unstable_encodings" ) ]
118+ session. register ( TurboQuant ) ;
114119 session. register ( ZigZag ) ;
115120
116121 #[ cfg( feature = "zstd" ) ]
@@ -127,23 +132,26 @@ pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
127132/// repartitioning and compressing them to strike a balance between size on-disk,
128133/// bulk decoding performance, and IOPS required to perform an indexed read.
129134pub struct WriteStrategyBuilder {
130- compressor : Option < Arc < dyn CompressorPlugin > > ,
131135 row_block_size : usize ,
132136 field_writers : HashMap < FieldPath , Arc < dyn LayoutStrategy > > ,
133137 allow_encodings : Option < ArrayRegistry > ,
134138 flat_strategy : Option < Arc < dyn LayoutStrategy > > ,
139+ // builder and compressor are mutually exclusive
140+ builder : Option < BtrBlocksCompressorBuilder > ,
141+ compressor : Option < Arc < dyn CompressorPlugin > > ,
135142}
136143
137144impl Default for WriteStrategyBuilder {
138145 /// Create a new empty builder. It can be further configured,
139146 /// and then finally built yielding the [`LayoutStrategy`].
140147 fn default ( ) -> Self {
141148 Self {
142- compressor : None ,
143149 row_block_size : 8192 ,
144150 field_writers : HashMap :: new ( ) ,
145151 allow_encodings : Some ( ALLOWED_ENCODINGS . clone ( ) ) ,
146152 flat_strategy : None ,
153+ builder : None ,
154+ compressor : None ,
147155 }
148156 }
149157}
@@ -154,6 +162,9 @@ impl WriteStrategyBuilder {
154162 /// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
155163 /// total size with decoding performance.
156164 pub fn with_compressor < C : CompressorPlugin > ( mut self , compressor : C ) -> Self {
165+ if self . builder . is_some ( ) {
166+ vortex_panic ! ( "Cannot configure both a custom compressor and custom builder schemes" ) ;
167+ }
157168 self . compressor = Some ( Arc :: new ( compressor) ) ;
158169 self
159170 }
@@ -198,7 +209,12 @@ impl WriteStrategyBuilder {
198209 /// GPU decompression. Without it, strings use interleaved Zstd compression.
199210 #[ cfg( feature = "zstd" ) ]
200211 pub fn with_cuda_compatible_encodings ( mut self ) -> Self {
201- let mut builder = BtrBlocksCompressorBuilder :: default ( ) . exclude ( [
212+ if self . compressor . is_some ( ) {
213+ vortex_panic ! (
214+ "Cannot configure both a custom compressor and CUDA compatible encodings"
215+ ) ;
216+ }
217+ let b = self . builder . take ( ) . unwrap_or_default ( ) . exclude ( [
202218 integer:: SparseScheme . id ( ) ,
203219 integer:: RLE_INTEGER_SCHEME . id ( ) ,
204220 float:: RLE_FLOAT_SCHEME . id ( ) ,
@@ -209,14 +225,13 @@ impl WriteStrategyBuilder {
209225
210226 #[ cfg( feature = "unstable_encodings" ) ]
211227 {
212- builder = builder . include ( [ string:: ZstdBuffersScheme . id ( ) ] ) ;
228+ self . builder = Some ( b . include ( [ string:: ZstdBuffersScheme . id ( ) ] ) ) ;
213229 }
214230 #[ cfg( not( feature = "unstable_encodings" ) ) ]
215231 {
216- builder = builder . include ( [ string:: ZstdScheme . id ( ) ] ) ;
232+ self . builder = Some ( b . include ( [ string:: ZstdScheme . id ( ) ] ) ) ;
217233 }
218234
219- self . compressor = Some ( Arc :: new ( builder. build ( ) ) ) ;
220235 self
221236 }
222237
@@ -227,21 +242,47 @@ impl WriteStrategyBuilder {
227242 /// especially for floating-point heavy datasets.
228243 #[ cfg( feature = "zstd" ) ]
229244 pub fn with_compact_encodings ( mut self ) -> Self {
230- let btrblocks = BtrBlocksCompressorBuilder :: default ( )
231- . include ( [
232- string:: ZstdScheme . id ( ) ,
233- integer:: PcoScheme . id ( ) ,
234- float:: PcoScheme . id ( ) ,
235- ] )
236- . build ( ) ;
237-
238- self . compressor = Some ( Arc :: new ( btrblocks) ) ;
245+ if self . compressor . is_some ( ) {
246+ vortex_panic ! ( "Cannot configure both a custom compressor and compact encodings" ) ;
247+ }
248+ self . builder = Some ( self . builder . take ( ) . unwrap_or_default ( ) . include ( [
249+ string:: ZstdScheme . id ( ) ,
250+ integer:: PcoScheme . id ( ) ,
251+ float:: PcoScheme . id ( ) ,
252+ ] ) ) ;
253+ self
254+ }
255+
256+ /// Enable TurboQuant lossy vector quantization for tensor columns.
257+ ///
258+ /// When enabled, `Vector` and `FixedShapeTensor` extension arrays are
259+ /// compressed using the TurboQuant algorithm with QJL correction for
260+ /// unbiased inner product estimation.
261+ ///
262+ /// This augments any existing compressor configuration rather than
263+ /// replacing it. If no compressor has been set, the default BtrBlocks
264+ /// compressor is used with TurboQuant added.
265+ #[ cfg( feature = "unstable_encodings" ) ]
266+ pub fn with_vector_quantization ( mut self ) -> Self {
267+ if self . compressor . is_some ( ) {
268+ vortex_panic ! ( "Cannot configure both a custom compressor and vector quantization" ) ;
269+ }
270+ use vortex_tensor:: encodings:: turboquant:: scheme:: TURBOQUANT_SCHEME ;
271+ self . builder = Some (
272+ self . builder
273+ . take ( )
274+ . unwrap_or_default ( )
275+ . with_scheme ( & TURBOQUANT_SCHEME ) ,
276+ ) ;
239277 self
240278 }
241279
242280 /// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
243281 /// applied.
244282 pub fn build ( self ) -> Arc < dyn LayoutStrategy > {
283+ use vortex_btrblocks:: SchemeExt as _;
284+ use vortex_btrblocks:: schemes:: integer:: IntDictScheme ;
285+
245286 let flat: Arc < dyn LayoutStrategy > = if let Some ( flat) = self . flat_strategy {
246287 flat
247288 } else if let Some ( allow_encodings) = self . allow_encodings {
@@ -254,12 +295,24 @@ impl WriteStrategyBuilder {
254295 let chunked = ChunkedLayoutStrategy :: new ( flat. clone ( ) ) ;
255296 // 6. buffer chunks so they end up with closer segment ids physically
256297 let buffered = BufferedStrategy :: new ( chunked, 2 * ONE_MEG ) ; // 2MB
298+
257299 // 5. compress each chunk
258- let compressing = if let Some ( ref compressor) = self . compressor {
259- CompressingStrategy :: new_opaque ( buffered, compressor. clone ( ) )
260- } else {
261- CompressingStrategy :: new_btrblocks ( buffered, true )
262- } ;
300+ let data_compressor: Arc < dyn CompressorPlugin > =
301+ if let Some ( ref compressor) = self . compressor {
302+ assert ! (
303+ self . builder. is_none( ) ,
304+ "Cannot configure both a custom compressor and custom builder schemes"
305+ ) ;
306+ compressor. clone ( )
307+ } else {
308+ Arc :: new (
309+ self . builder
310+ . unwrap_or_default ( )
311+ . exclude ( [ IntDictScheme . id ( ) ] )
312+ . build ( ) ,
313+ )
314+ } ;
315+ let compressing = CompressingStrategy :: new ( buffered, data_compressor. clone ( ) ) ;
263316
264317 // 4. prior to compression, coalesce up to a minimum size
265318 let coalescing = RepartitionStrategy :: new (
@@ -279,11 +332,12 @@ impl WriteStrategyBuilder {
279332 ) ;
280333
281334 // 2.1. | 3.1. compress stats tables and dict values.
282- let compress_then_flat = if let Some ( ref compressor) = self . compressor {
283- CompressingStrategy :: new_opaque ( flat , compressor. clone ( ) )
335+ let stats_compressor = if let Some ( compressor) = self . compressor {
336+ compressor. clone ( )
284337 } else {
285- CompressingStrategy :: new_btrblocks ( flat , false )
338+ Arc :: new ( BtrBlocksCompressorBuilder :: default ( ) . build ( ) )
286339 } ;
340+ let compress_then_flat = CompressingStrategy :: new ( flat, stats_compressor) ;
287341
288342 // 3. apply dict encoding or fallback
289343 let dict = DictStrategy :: new (
0 commit comments