Skip to content

Commit 40a7857

Browse files
authored
ReusableDict for RunEnd duckdb exporter (#8538)
Move shared logic of creating a ReusableDict or using a created one from dict.rs to a shared file. Use this logic for RunEnd duckdb exporter as well. Make Constant and Sequence exportes obey flatten=true flag, otherwise ReusableDict values exporting failed for them, reading garbage data. Signed-off-by: Mikhail Kot <mikhail@spiraldb.com>
1 parent 5a764e6 commit 40a7857

5 files changed

Lines changed: 74 additions & 63 deletions

File tree

vortex-duckdb/src/exporter/constant.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::duckdb::Value;
1313
use crate::duckdb::VectorRef;
1414
use crate::exporter::ColumnExporter;
1515
use crate::exporter::ConversionCache;
16+
use crate::exporter::canonical;
1617
use crate::exporter::new_array_exporter;
1718
use crate::exporter::validity;
1819

@@ -46,7 +47,19 @@ pub fn new_exporter_with_mask(
4647
new_exporter(array)
4748
}
4849

49-
pub(crate) fn new_exporter(array: ConstantArray) -> VortexResult<Box<dyn ColumnExporter>> {
50+
pub(crate) fn new_exporter_with_flatten(
51+
array: ConstantArray,
52+
cache: &ConversionCache,
53+
ctx: &mut ExecutionCtx,
54+
flatten: bool,
55+
) -> VortexResult<Box<dyn ColumnExporter>> {
56+
if flatten {
57+
return canonical::new_exporter(array.into_array(), cache, ctx);
58+
}
59+
new_exporter(array)
60+
}
61+
62+
fn new_exporter(array: ConstantArray) -> VortexResult<Box<dyn ColumnExporter>> {
5063
let value = if array.scalar().is_null() {
5164
// If the scalar is null and _not_ of type Null, then we cannot assign a null DuckDB value
5265
// to a constant vector since DuckDB will complain about a type-mismatch. In these cases,

vortex-duckdb/src/exporter/dict.rs

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ use crate::duckdb::VectorRef;
2323
use crate::exporter::ColumnExporter;
2424
use crate::exporter::all_invalid;
2525
use crate::exporter::cache::ConversionCache;
26+
use crate::exporter::cached_values_dict;
2627
use crate::exporter::constant;
2728
use crate::exporter::new_array_exporter;
28-
use crate::exporter::new_array_exporter_with_flatten;
2929

3030
struct DictExporter<I: IntegerPType> {
3131
// Store the dictionary values once and export the same dictionary with each codes chunk.
@@ -70,7 +70,7 @@ pub(crate) fn new_exporter_with_flatten(
7070
let values_key = values.addr();
7171
let codes = array.codes().clone().execute::<PrimitiveArray>(ctx)?;
7272

73-
let reusable_dict = if flatten {
73+
if flatten {
7474
let canonical = cache
7575
.canonical_cache
7676
.get(&values_key)
@@ -93,33 +93,9 @@ pub(crate) fn new_exporter_with_flatten(
9393
cache,
9494
ctx,
9595
);
96-
} else {
97-
// Check if we have a cached vector and extract it if we do.
98-
let reusable_dict = cache
99-
.dict_cache
100-
.get(&values_key)
101-
.map(|entry| entry.value().1.clone());
102-
103-
match reusable_dict {
104-
Some(reusable_dict) => reusable_dict,
105-
None => {
106-
// Create a new reusable dictionary for the values.
107-
let mut reusable_dict = ReusableDict::new(values.dtype().try_into()?, values.len());
108-
new_array_exporter_with_flatten(values.clone(), cache, ctx, true)?.export(
109-
0,
110-
values.len(),
111-
reusable_dict.vector(),
112-
ctx,
113-
)?;
114-
115-
cache
116-
.dict_cache
117-
.insert(values_key, (values.clone(), reusable_dict.clone()));
96+
}
11897

119-
reusable_dict
120-
}
121-
}
122-
};
98+
let reusable_dict = cached_values_dict(values.clone(), cache, ctx)?;
12399

124100
match_each_integer_ptype!(codes.ptype(), |I| {
125101
Ok(Box::new(DictExporter {

vortex-duckdb/src/exporter/mod.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use vortex::error::vortex_bail;
4242
use vortex::error::vortex_ensure;
4343

4444
use crate::duckdb::DataChunkRef;
45+
use crate::duckdb::ReusableDict;
4546
use crate::duckdb::VectorRef;
4647
use crate::duckdb::duckdb_vector_size;
4748

@@ -195,6 +196,32 @@ fn new_array_exporter(
195196
new_array_exporter_with_flatten(array, cache, ctx, false)
196197
}
197198

199+
/// Export "values" into a ReusableDict, saving the dictionary in "cache".
200+
fn cached_values_dict(
201+
values: ArrayRef,
202+
cache: &ConversionCache,
203+
ctx: &mut ExecutionCtx,
204+
) -> VortexResult<ReusableDict> {
205+
let key = values.addr();
206+
if let Some(entry) = cache.dict_cache.get(&key) {
207+
return Ok(entry.value().1.clone());
208+
}
209+
let mut dict = ReusableDict::new(values.dtype().try_into()?, values.len());
210+
// ReusableDict's values must be flattened. When we call
211+
// vector.reuse_dictionary() with dict returned from this function, if
212+
// data is not flat, duckdb's functions like TupleDataScatter read inner
213+
// storage directly as T. If data inside was SEQUENCE or CONSTANT vectors
214+
// which don't have a T buffer, we read garbage data.
215+
new_array_exporter_with_flatten(values.clone(), cache, ctx, true)?.export(
216+
0,
217+
values.len(),
218+
dict.vector(),
219+
ctx,
220+
)?;
221+
cache.dict_cache.insert(key, (values, dict.clone()));
222+
Ok(dict)
223+
}
224+
198225
/// Create a DuckDB exporter for the given Vortex array.
199226
fn new_array_exporter_with_flatten(
200227
array: ArrayRef,
@@ -203,12 +230,12 @@ fn new_array_exporter_with_flatten(
203230
flatten: bool,
204231
) -> VortexResult<Box<dyn ColumnExporter>> {
205232
let array = match array.try_downcast::<Constant>() {
206-
Ok(array) => return constant::new_exporter(array),
233+
Ok(array) => return constant::new_exporter_with_flatten(array, cache, ctx, flatten),
207234
Err(array) => array,
208235
};
209236

210237
let array = match array.try_downcast::<Sequence>() {
211-
Ok(array) => return sequence::new_exporter(&array),
238+
Ok(array) => return sequence::new_exporter_with_flatten(&array, cache, ctx, flatten),
212239
Err(array) => array,
213240
};
214241

vortex-duckdb/src/exporter/run_end.rs

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,21 @@ use vortex::error::VortexExpect;
1717
use vortex::error::VortexResult;
1818

1919
use crate::convert::ToDuckDBScalar;
20+
use crate::duckdb::ReusableDict;
2021
use crate::duckdb::SelectionVector;
2122
use crate::duckdb::VectorRef;
2223
use crate::exporter::ColumnExporter;
2324
use crate::exporter::cache::ConversionCache;
25+
use crate::exporter::cached_values_dict;
2426
use crate::exporter::canonical;
25-
use crate::exporter::new_array_exporter_with_flatten;
2627

27-
/// We export run-end arrays to a DuckDB dictionary vector, using a selection vector to
28-
/// repeat the values in the run-end array.
28+
/// We export run-end arrays to a DuckDB dictionary vector. Values are exported
29+
/// into a ReusableDict with SelectionVector applied in export().
2930
struct RunEndExporter<E: IntegerPType> {
3031
ends: PrimitiveArray,
3132
ends_type: PhantomData<E>,
3233
values: ArrayRef,
33-
values_exporter: Box<dyn ColumnExporter>,
34+
values_dict: ReusableDict,
3435
run_end_offset: usize,
3536
}
3637

@@ -50,16 +51,14 @@ pub(crate) fn new_exporter_with_flatten(
5051
let ends = array.ends().clone();
5152
let values = array.values().clone();
5253
let ends = ends.execute::<PrimitiveArray>(ctx)?;
53-
// REE exports values in run-index space, not outer row space. Materialize the dictionary
54-
// payload so chunked physical boundaries in the values child cannot constrain row batches.
55-
let values_exporter = new_array_exporter_with_flatten(values.clone(), cache, ctx, true)?;
54+
let values_dict = cached_values_dict(values.clone(), cache, ctx)?;
5655

5756
match_each_integer_ptype!(ends.ptype(), |E| {
5857
Ok(Box::new(RunEndExporter {
5958
ends,
6059
ends_type: PhantomData::<E>,
6160
values,
62-
values_exporter,
61+
values_dict,
6362
run_end_offset: offset,
6463
}))
6564
})
@@ -88,10 +87,7 @@ impl<E: IntegerPType> ColumnExporter for RunEndExporter<E> {
8887

8988
// Find the final run in case we can short-circuit and return a constant vector.
9089
let end_run_idx = ends_slice
91-
.search_sorted(
92-
&offset.add(E::from_usize(len).vortex_expect("len out of bounds")),
93-
SearchSortedSide::Right,
94-
)?
90+
.search_sorted(&end_offset, SearchSortedSide::Right)?
9591
.to_ends_index(ends_slice.len());
9692

9793
if start_run_idx == end_run_idx {
@@ -113,29 +109,16 @@ impl<E: IntegerPType> ColumnExporter for RunEndExporter<E> {
113109
.to_usize()
114110
.vortex_expect("run_len is usize");
115111

116-
// Push the runs into the selection vector.
117-
sel_vec_slice[..run_len].fill(u32::try_from(run_idx).vortex_expect("sel_idx is u32"));
112+
let global_run_idx =
113+
u32::try_from(start_run_idx + run_idx).vortex_expect("run index exceeds u32");
114+
sel_vec_slice[..run_len].fill(global_run_idx);
118115
sel_vec_slice = &mut sel_vec_slice[run_len..];
119116

120117
offset = next_end;
121118
}
122-
assert!(
123-
sel_vec_slice.is_empty(),
124-
"Selection vector not completely filled"
125-
);
126-
127-
// The values in the selection vector are the run indices, so we can find the number of
128-
// values we referenced by looking at the last index of the selection vector.
129-
let values_len = *unsafe { sel_vec.as_slice_mut(len) }
130-
.last()
131-
.vortex_expect("non-empty")
132-
+ 1;
133-
134-
// Export the run-end values into the vector, and then turn it into a dictionary vector.
135-
self.values_exporter
136-
.export(start_run_idx, values_len as usize, vector, ctx)?;
137-
vector.dictionary(vector, values_len as usize, &sel_vec, len as _);
119+
debug_assert!(sel_vec_slice.is_empty());
138120

121+
vector.reuse_dictionary(&self.values_dict, &sel_vec);
139122
Ok(())
140123
}
141124
}

vortex-duckdb/src/exporter/sequence.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,30 @@
33

44
use bitvec::macros::internal::funty::Fundamental;
55
use vortex::array::ExecutionCtx;
6+
use vortex::array::IntoArray;
67
use vortex::encodings::sequence::SequenceArray;
78
use vortex::error::VortexExpect;
89
use vortex::error::VortexResult;
910

1011
use crate::duckdb::VectorRef;
1112
use crate::exporter::ColumnExporter;
13+
use crate::exporter::ConversionCache;
14+
use crate::exporter::canonical;
1215

1316
struct SequenceExporter {
1417
start: i64,
1518
step: i64,
1619
}
1720

18-
pub(crate) fn new_exporter(array: &SequenceArray) -> VortexResult<Box<dyn ColumnExporter>> {
21+
pub(crate) fn new_exporter_with_flatten(
22+
array: &SequenceArray,
23+
cache: &ConversionCache,
24+
ctx: &mut ExecutionCtx,
25+
flatten: bool,
26+
) -> VortexResult<Box<dyn ColumnExporter>> {
27+
if flatten {
28+
return canonical::new_exporter(array.clone().into_array(), cache, ctx);
29+
}
1930
Ok(Box::new(SequenceExporter {
2031
start: array.base().as_i64().vortex_expect("cannot have null base"),
2132
step: array
@@ -58,8 +69,9 @@ mod tests {
5869
fn test_sequence() {
5970
let arr = Sequence::try_new_typed(2, 5, Nullability::NonNullable, 100).unwrap();
6071
let mut chunk = DataChunk::new([LogicalType::new(cpp::duckdb_type::DUCKDB_TYPE_INTEGER)]);
72+
let mut ctx = SESSION.create_execution_ctx();
6173

62-
new_exporter(&arr)
74+
new_exporter_with_flatten(&arr, &ConversionCache::default(), &mut ctx, false)
6375
.unwrap()
6476
.export(
6577
0,

0 commit comments

Comments
 (0)