Skip to content

Commit ce80674

Browse files
committed
more
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent a46ea07 commit ce80674

3 files changed

Lines changed: 195 additions & 61 deletions

File tree

vortex-array/src/arrays/varbin/vtable/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::array::VTable;
2222
use crate::arrays::varbin::VarBinArrayExt;
2323
use crate::arrays::varbin::VarBinData;
2424
use crate::arrays::varbin::array::NUM_SLOTS;
25+
use crate::arrays::varbin::array::OFFSETS_SLOT;
2526
use crate::arrays::varbin::array::SLOT_NAMES;
2627
use crate::buffer::BufferHandle;
2728
use crate::dtype::DType;
@@ -90,7 +91,7 @@ impl VTable for VarBin {
9091
"VarBinArray expected {NUM_SLOTS} slots, found {}",
9192
slots.len()
9293
);
93-
let offsets = slots[crate::arrays::varbin::array::OFFSETS_SLOT]
94+
let offsets = slots[OFFSETS_SLOT]
9495
.as_ref()
9596
.vortex_expect("VarBinArray offsets slot");
9697
vortex_ensure!(
@@ -138,7 +139,6 @@ impl VTable for VarBin {
138139
dtype: &DType,
139140
len: usize,
140141
metadata: &[u8],
141-
142142
buffers: &[BufferHandle],
143143
children: &dyn ArrayChildren,
144144
_session: &VortexSession,

vortex-array/src/builders/dict/bytes.rs

Lines changed: 191 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::cell::OnceCell;
45
use std::hash::BuildHasher;
56
use std::mem;
67
use std::sync::Arc;
78

9+
use itertools::Itertools;
10+
use num_traits::AsPrimitive;
811
use vortex_array::ExecutionCtx;
912
use vortex_buffer::BitBufferMut;
1013
use vortex_buffer::BufferMut;
1114
use vortex_buffer::ByteBufferMut;
1215
use vortex_error::VortexExpect;
1316
use vortex_error::VortexResult;
1417
use vortex_error::vortex_panic;
18+
use vortex_mask::AllOr;
1519
use vortex_utils::aliases::hash_map::DefaultHashBuilder;
1620
use vortex_utils::aliases::hash_map::HashTable;
1721
use vortex_utils::aliases::hash_map::HashTableEntry;
@@ -20,21 +24,24 @@ use vortex_utils::aliases::hash_map::RandomState;
2024
use super::DictConstraints;
2125
use super::DictEncoder;
2226
use crate::ArrayRef;
27+
use crate::ArrayView;
2328
use crate::IntoArray;
24-
use crate::accessor::ArrayAccessor;
2529
use crate::arrays::PrimitiveArray;
2630
use crate::arrays::VarBin;
2731
use crate::arrays::VarBinView;
2832
use crate::arrays::VarBinViewArray;
33+
use crate::arrays::varbin::VarBinArrayExt;
2934
use crate::arrays::varbinview::build_views::BinaryView;
3035
use crate::dtype::DType;
3136
use crate::dtype::PType;
3237
use crate::dtype::UnsignedPType;
38+
use crate::match_each_integer_ptype;
3339
use crate::validity::Validity;
3440

3541
/// Dictionary encode varbin array. Specializes for primitive byte arrays to avoid double copying
36-
pub struct BytesDictBuilder<Codes> {
37-
lookup: Option<HashTable<Codes>>,
42+
pub struct BytesDictBuilder<Code> {
43+
lookup: Option<HashTable<Code>>,
44+
null_code: OnceCell<Code>,
3845
views: BufferMut<BinaryView>,
3946
values: ByteBufferMut,
4047
values_nulls: BitBufferMut,
@@ -58,6 +65,7 @@ impl<Code: UnsignedPType> BytesDictBuilder<Code> {
5865
Self {
5966
lookup: Some(HashTable::new()),
6067
views: BufferMut::<BinaryView>::empty(),
68+
null_code: OnceCell::new(),
6169
values: BufferMut::empty(),
6270
values_nulls: BitBufferMut::empty(),
6371
hasher: DefaultHashBuilder::default(),
@@ -71,18 +79,16 @@ impl<Code: UnsignedPType> BytesDictBuilder<Code> {
7179
self.views.len() * size_of::<BinaryView>() + self.values.len()
7280
}
7381

74-
fn lookup_bytes(&self, idx: usize) -> Option<&[u8]> {
75-
self.values_nulls.value(idx).then(|| {
76-
let bin_view = &self.views[idx];
77-
if bin_view.is_inlined() {
78-
bin_view.as_inlined().value()
79-
} else {
80-
&self.values[bin_view.as_view().as_range()]
81-
}
82-
})
82+
fn lookup_bytes(&self, idx: usize) -> &[u8] {
83+
let bin_view = &self.views[idx];
84+
if bin_view.is_inlined() {
85+
bin_view.as_inlined().value()
86+
} else {
87+
&self.values[bin_view.as_view().as_range()]
88+
}
8389
}
8490

85-
fn encode_value(&mut self, lookup: &mut HashTable<Code>, val: Option<&[u8]>) -> Option<Code> {
91+
fn encode_value(&mut self, lookup: &mut HashTable<Code>, val: &[u8]) -> Option<Code> {
8692
match lookup.entry(
8793
self.hasher.hash_one(val),
8894
|idx| val == self.lookup_bytes(idx.as_()),
@@ -94,36 +100,26 @@ impl<Code: UnsignedPType> BytesDictBuilder<Code> {
94100
return None;
95101
}
96102

97-
let next_code = self.views.len();
98-
match val {
99-
None => {
100-
// Null value
101-
self.views.push(BinaryView::default());
102-
self.values_nulls.append_false();
103-
}
104-
Some(val) => {
105-
let view = BinaryView::make_view(
106-
val,
107-
0,
108-
u32::try_from(self.values.len())
109-
.vortex_expect("values length must fit in u32"),
110-
);
111-
let additional_bytes = if view.is_inlined() {
112-
size_of::<BinaryView>()
113-
} else {
114-
size_of::<BinaryView>() + val.len()
115-
};
103+
let next_code = self.views.len() + self.null_code.get().is_some() as usize;
104+
let view = BinaryView::make_view(
105+
val,
106+
0,
107+
u32::try_from(self.values.len()).vortex_expect("values length must fit in u32"),
108+
);
109+
let additional_bytes = if view.is_inlined() {
110+
size_of::<BinaryView>()
111+
} else {
112+
size_of::<BinaryView>() + val.len()
113+
};
116114

117-
if self.dict_bytes() + additional_bytes > self.max_dict_bytes {
118-
return None;
119-
}
115+
if self.dict_bytes() + additional_bytes > self.max_dict_bytes {
116+
return None;
117+
}
120118

121-
self.views.push(view);
122-
self.values_nulls.append_true();
123-
if !view.is_inlined() {
124-
self.values.extend_from_slice(val);
125-
}
126-
}
119+
self.views.push(view);
120+
self.values_nulls.append_true();
121+
if !view.is_inlined() {
122+
self.values.extend_from_slice(val);
127123
}
128124

129125
let next_code = Code::from_usize(next_code).unwrap_or_else(|| {
@@ -134,23 +130,158 @@ impl<Code: UnsignedPType> BytesDictBuilder<Code> {
134130
}
135131
}
136132

137-
fn encode_bytes<A: ArrayAccessor<[u8]>>(
133+
#[expect(clippy::cognitive_complexity)]
134+
fn encode_varbin(
138135
&mut self,
139-
accessor: &A,
140-
len: usize,
136+
var_bin: ArrayView<VarBin>,
137+
ctx: &mut ExecutionCtx,
141138
) -> VortexResult<ArrayRef> {
142139
let mut local_lookup = self.lookup.take().vortex_expect("Must have a lookup dict");
143-
let mut codes: BufferMut<Code> = BufferMut::with_capacity(len);
140+
let mut codes: BufferMut<Code> = BufferMut::with_capacity(var_bin.len());
144141

145-
accessor.with_iterator(|it| {
146-
for value in it {
147-
let Some(code) = self.encode_value(&mut local_lookup, value) else {
148-
break;
149-
};
150-
// SAFETY: we reserved capacity in the buffer for `len` elements
151-
unsafe { codes.push_unchecked(code) }
142+
let offsets = var_bin.offsets().clone().execute::<PrimitiveArray>(ctx)?;
143+
let bytes = var_bin.bytes();
144+
let validity_mask = var_bin.validity()?.execute_mask(var_bin.len(), ctx)?;
145+
146+
match validity_mask.bit_buffer() {
147+
AllOr::All => {
148+
match_each_integer_ptype!(offsets.ptype(), |P| {
149+
let slice_offsets = offsets.as_slice::<P>();
150+
for w in slice_offsets.windows(2) {
151+
let start = w[0].as_();
152+
let end = w[1].as_();
153+
let Some(code) = self.encode_value(&mut local_lookup, &bytes[start..end])
154+
else {
155+
break;
156+
};
157+
// SAFETY: we reserved capacity in the buffer for `len` elements
158+
unsafe { codes.push_unchecked(code) }
159+
}
160+
})
152161
}
153-
});
162+
AllOr::None => {
163+
self.views.push(BinaryView::default());
164+
self.values_nulls.append_false();
165+
unsafe {
166+
codes.push_n_unchecked(
167+
Code::from_usize(0).vortex_expect("must fit 0"),
168+
var_bin.len(),
169+
)
170+
}
171+
}
172+
AllOr::Some(b) => {
173+
match_each_integer_ptype!(offsets.ptype(), |P| {
174+
let slice_offsets = offsets.as_slice::<P>();
175+
for (w, valid) in slice_offsets.windows(2).zip_eq(b.iter()) {
176+
if !valid {
177+
let code = self.null_code.get_or_init(|| {
178+
self.views.push(BinaryView::default());
179+
self.values_nulls.append_false();
180+
Code::from_usize(self.values.len()).unwrap_or_else(|| {
181+
vortex_panic!(
182+
"{} has to fit into {}",
183+
self.values.len(),
184+
Code::PTYPE
185+
)
186+
})
187+
});
188+
unsafe { codes.push_unchecked(*code) }
189+
} else {
190+
let start = w[0].as_();
191+
let end = w[1].as_();
192+
let Some(code) =
193+
self.encode_value(&mut local_lookup, &bytes[start..end])
194+
else {
195+
break;
196+
};
197+
// SAFETY: we reserved capacity in the buffer for `len` elements
198+
unsafe { codes.push_unchecked(code) }
199+
}
200+
}
201+
})
202+
}
203+
}
204+
205+
// Restore lookup dictionary back into the struct
206+
self.lookup = Some(local_lookup);
207+
208+
Ok(PrimitiveArray::new(codes, Validity::NonNullable).into_array())
209+
}
210+
211+
fn encode_varbinview(
212+
&mut self,
213+
var_bin_view: ArrayView<VarBinView>,
214+
ctx: &mut ExecutionCtx,
215+
) -> VortexResult<ArrayRef> {
216+
let mut local_lookup = self.lookup.take().vortex_expect("Must have a lookup dict");
217+
let mut codes: BufferMut<Code> = BufferMut::with_capacity(var_bin_view.len());
218+
219+
let views = var_bin_view.views();
220+
let buffers = var_bin_view
221+
.data_buffers()
222+
.iter()
223+
.map(|b| b.as_host())
224+
.collect::<Vec<_>>();
225+
let validity_mask = var_bin_view
226+
.validity()?
227+
.execute_mask(var_bin_view.len(), ctx)?;
228+
229+
match validity_mask.bit_buffer() {
230+
AllOr::All => {
231+
for view in views {
232+
let value = if view.is_inlined() {
233+
view.as_inlined().value()
234+
} else {
235+
&buffers[view.as_view().buffer_index as usize][view.as_view().as_range()]
236+
};
237+
let Some(code) = self.encode_value(&mut local_lookup, value) else {
238+
break;
239+
};
240+
// SAFETY: we reserved capacity in the buffer for `len` elements
241+
unsafe { codes.push_unchecked(code) }
242+
}
243+
}
244+
AllOr::None => {
245+
self.views.push(BinaryView::default());
246+
self.values_nulls.append_false();
247+
unsafe {
248+
codes.push_n_unchecked(
249+
Code::from_usize(0).vortex_expect("must fit 0"),
250+
var_bin_view.len(),
251+
)
252+
}
253+
}
254+
AllOr::Some(b) => {
255+
for (view, valid) in views.iter().zip_eq(b.iter()) {
256+
if !valid {
257+
let code = self.null_code.get_or_init(|| {
258+
self.views.push(BinaryView::default());
259+
self.values_nulls.append_false();
260+
Code::from_usize(self.values.len()).unwrap_or_else(|| {
261+
vortex_panic!(
262+
"{} has to fit into {}",
263+
self.values.len(),
264+
Code::PTYPE
265+
)
266+
})
267+
});
268+
unsafe { codes.push_unchecked(*code) }
269+
} else {
270+
let value = if view.is_inlined() {
271+
view.as_inlined().value()
272+
} else {
273+
&buffers[view.as_view().buffer_index as usize]
274+
[view.as_view().as_range()]
275+
};
276+
let Some(code) = self.encode_value(&mut local_lookup, value) else {
277+
break;
278+
};
279+
// SAFETY: we reserved capacity in the buffer for `len` elements
280+
unsafe { codes.push_unchecked(code) }
281+
}
282+
}
283+
}
284+
}
154285

155286
// Restore lookup dictionary back into the struct
156287
self.lookup = Some(local_lookup);
@@ -169,16 +300,18 @@ impl<Code: UnsignedPType> DictEncoder for BytesDictBuilder<Code> {
169300
self.dtype
170301
);
171302

172-
let len = array.len();
173303
if let Some(varbinview) = array.as_opt::<VarBinView>() {
174-
self.encode_bytes(&varbinview.into_owned(), len)
304+
self.encode_varbinview(varbinview, ctx)
175305
} else if let Some(varbin) = array.as_opt::<VarBin>() {
176-
self.encode_bytes(&varbin.into_owned(), len)
306+
self.encode_varbin(varbin, ctx)
177307
} else {
178308
// NOTE(aduffy): it is very rare that this path would be taken, only e.g.
179309
// if we're performing dictionary encoding downstream of some other compression.
180-
let varbinview = array.clone().execute::<VarBinViewArray>(ctx)?;
181-
self.encode_bytes(&varbinview, len)
310+
let vbv_array = array.clone().execute::<VarBinViewArray>(ctx)?.into_array();
311+
let varbinview = vbv_array
312+
.as_opt::<VarBinView>()
313+
.vortex_expect("Must be a VarBinView");
314+
self.encode_varbinview(varbinview, ctx)
182315
}
183316
}
184317

vortex-layout/src/layouts/dict/writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ use futures::pin_mut;
1818
use futures::stream::BoxStream;
1919
use futures::stream::once;
2020
use futures::try_join;
21-
use vortex_array::{ArrayContext, LEGACY_SESSION};
21+
use vortex_array::ArrayContext;
2222
use vortex_array::ArrayRef;
23+
use vortex_array::LEGACY_SESSION;
2324
use vortex_array::VortexSessionExecute;
2425
use vortex_array::arrays::Dict;
2526
use vortex_array::builders::dict::DictConstraints;

0 commit comments

Comments
 (0)