Skip to content

Commit e7a7ad9

Browse files
authored
DuckDB chunk exporter (#8520)
1 parent fc5b8a8 commit e7a7ad9

6 files changed

Lines changed: 230 additions & 4 deletions

File tree

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex::array::ExecutionCtx;
5+
use vortex::array::IntoArray;
6+
use vortex::array::arrays::ChunkedArray;
7+
use vortex::array::arrays::chunked::ChunkedArrayExt;
8+
use vortex::error::VortexResult;
9+
use vortex::error::vortex_ensure;
10+
11+
use crate::duckdb::VectorRef;
12+
use crate::exporter::ColumnExporter;
13+
use crate::exporter::ConversionCache;
14+
use crate::exporter::canonical;
15+
use crate::exporter::new_array_exporter;
16+
17+
struct ChunkedExporter {
18+
chunk_offsets: Vec<usize>,
19+
chunks: Vec<Box<dyn ColumnExporter>>,
20+
}
21+
22+
pub(crate) fn new_exporter_with_flatten(
23+
array: ChunkedArray,
24+
cache: &ConversionCache,
25+
ctx: &mut ExecutionCtx,
26+
flatten: bool,
27+
) -> VortexResult<Box<dyn ColumnExporter>> {
28+
if flatten {
29+
return canonical::new_exporter(array.into_array(), cache, ctx);
30+
}
31+
32+
let chunk_offsets = array.chunk_offsets().to_vec();
33+
let chunks = array
34+
.chunks()
35+
.iter()
36+
.map(|chunk| new_array_exporter(chunk.clone(), cache, ctx))
37+
.collect::<VortexResult<Vec<_>>>()?;
38+
39+
Ok(Box::new(ChunkedExporter {
40+
chunk_offsets,
41+
chunks,
42+
}))
43+
}
44+
45+
impl ChunkedExporter {
46+
fn chunk_index(&self, offset: usize) -> usize {
47+
self.chunk_offsets
48+
.partition_point(|&chunk_offset| chunk_offset <= offset)
49+
.saturating_sub(1)
50+
}
51+
}
52+
53+
impl ColumnExporter for ChunkedExporter {
54+
fn preferred_batch_len(&self, offset: usize, max_len: usize) -> usize {
55+
if max_len == 0 || self.chunks.is_empty() {
56+
return 0;
57+
}
58+
59+
let chunk_idx = self.chunk_index(offset);
60+
let chunk_start = self.chunk_offsets[chunk_idx];
61+
let chunk_end = self.chunk_offsets[chunk_idx + 1];
62+
let len = (chunk_end - offset).min(max_len);
63+
self.chunks[chunk_idx].preferred_batch_len(offset - chunk_start, len)
64+
}
65+
66+
fn export(
67+
&self,
68+
offset: usize,
69+
len: usize,
70+
vector: &mut VectorRef,
71+
ctx: &mut ExecutionCtx,
72+
) -> VortexResult<()> {
73+
if len == 0 {
74+
return Ok(());
75+
}
76+
77+
let chunk_idx = self.chunk_index(offset);
78+
let chunk_start = self.chunk_offsets[chunk_idx];
79+
let chunk_end = self.chunk_offsets[chunk_idx + 1];
80+
let offset_in_chunk = offset - chunk_start;
81+
vortex_ensure!(
82+
offset + len <= chunk_end,
83+
"chunked DuckDB export range {offset}..{} crosses chunk boundary at {chunk_end}",
84+
offset + len
85+
);
86+
87+
self.chunks[chunk_idx].export(offset_in_chunk, len, vector, ctx)
88+
}
89+
}
90+
91+
#[cfg(test)]
92+
mod tests {
93+
use vortex::array::IntoArray;
94+
use vortex::array::VortexSessionExecute;
95+
use vortex::array::arrays::ChunkedArray;
96+
use vortex::array::arrays::DictArray;
97+
use vortex::array::arrays::StructArray;
98+
use vortex::array::arrays::VarBinViewArray;
99+
use vortex::buffer::buffer;
100+
use vortex::error::VortexResult;
101+
102+
use crate::SESSION;
103+
use crate::duckdb::DataChunk;
104+
use crate::duckdb::LogicalType;
105+
use crate::exporter::ArrayExporter;
106+
use crate::exporter::ConversionCache;
107+
108+
#[test]
109+
fn chunked_exporter_emits_chunk_aligned_vectors() -> VortexResult<()> {
110+
let values0 = VarBinViewArray::from_iter_str(["a", "b"]).into_array();
111+
let chunk0 = DictArray::try_new(buffer![0u8, 1].into_array(), values0)?.into_array();
112+
let dtype = chunk0.dtype().clone();
113+
114+
let values1 = VarBinViewArray::from_iter_str(["c", "d", "e"]).into_array();
115+
let chunk1 = DictArray::try_new(buffer![0u8, 1, 2].into_array(), values1)?.into_array();
116+
117+
let field = ChunkedArray::try_new(vec![chunk0, chunk1], dtype)?.into_array();
118+
let array = StructArray::from_fields(&[("field", field)])?;
119+
let mut exporter = ArrayExporter::try_new(
120+
&array,
121+
&ConversionCache::default(),
122+
SESSION.create_execution_ctx(),
123+
)?;
124+
let mut chunk = DataChunk::new([LogicalType::varchar()]);
125+
126+
assert!(exporter.export(&mut chunk, None, None)?);
127+
assert_eq!(
128+
format!("{}", String::try_from(&*chunk)?),
129+
r#"Chunk - [1 Columns]
130+
- DICTIONARY VARCHAR: 2 = [ a, b]
131+
"#
132+
);
133+
134+
assert!(exporter.export(&mut chunk, None, None)?);
135+
assert_eq!(
136+
format!("{}", String::try_from(&*chunk)?),
137+
r#"Chunk - [1 Columns]
138+
- DICTIONARY VARCHAR: 3 = [ c, d, e]
139+
"#
140+
);
141+
142+
assert!(!exporter.export(&mut chunk, None, None)?);
143+
Ok(())
144+
}
145+
}

vortex-duckdb/src/exporter/dict.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::exporter::all_invalid;
2525
use crate::exporter::cache::ConversionCache;
2626
use crate::exporter::constant;
2727
use crate::exporter::new_array_exporter;
28+
use crate::exporter::new_array_exporter_with_flatten;
2829

2930
struct DictExporter<I: IntegerPType> {
3031
// Store the dictionary values once and export the same dictionary with each codes chunk.
@@ -104,7 +105,7 @@ pub(crate) fn new_exporter_with_flatten(
104105
None => {
105106
// Create a new reusable dictionary for the values.
106107
let mut reusable_dict = ReusableDict::new(values.dtype().try_into()?, values.len());
107-
new_array_exporter(values.clone(), cache, ctx)?.export(
108+
new_array_exporter_with_flatten(values.clone(), cache, ctx, true)?.export(
108109
0,
109110
values.len(),
110111
reusable_dict.vector(),

vortex-duckdb/src/exporter/mod.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod all_invalid;
55
mod bool;
66
mod cache;
77
mod canonical;
8+
mod chunked;
89
mod constant;
910
mod decimal;
1011
mod dict;
@@ -26,6 +27,7 @@ pub use cache::ConversionCache;
2627
pub use decimal::precision_to_duckdb_storage_size;
2728
use vortex::array::ArrayRef;
2829
use vortex::array::ExecutionCtx;
30+
use vortex::array::arrays::Chunked;
2931
use vortex::array::arrays::Constant;
3032
use vortex::array::arrays::Dict;
3133
use vortex::array::arrays::List;
@@ -37,6 +39,7 @@ use vortex::encodings::sequence::Sequence;
3739
use vortex::error::VortexExpect;
3840
use vortex::error::VortexResult;
3941
use vortex::error::vortex_bail;
42+
use vortex::error::vortex_ensure;
4043

4144
use crate::duckdb::DataChunkRef;
4245
use crate::duckdb::VectorRef;
@@ -96,8 +99,18 @@ impl ArrayExporter {
9699
vortex_bail!("Expected {expected_cols} columns in output chunk, got {chunk_cols}");
97100
}
98101

99-
let chunk_len = duckdb_vector_size().min(self.remaining);
100102
let position = self.array_len - self.remaining;
103+
let mut chunk_len = duckdb_vector_size().min(self.remaining);
104+
if !zero_projection {
105+
for field in &self.fields {
106+
chunk_len = field.preferred_batch_len(position, chunk_len);
107+
}
108+
}
109+
vortex_ensure!(
110+
chunk_len > 0,
111+
"column exporter returned zero rows for non-empty export"
112+
);
113+
101114
self.remaining -= chunk_len;
102115
chunk.set_len(chunk_len);
103116

@@ -156,6 +169,14 @@ impl ArrayExporter {
156169
/// the offset, len and `WritableVector` as options. Not sure what it should return though?
157170
/// This would allow Vortex extension authors to plug into the DuckDB exporter system.
158171
pub trait ColumnExporter: 'static {
172+
/// Preferred number of rows to export next, capped by `max_len`.
173+
///
174+
/// Exporters that preserve physical structure can use this to keep a DuckDB vector inside a
175+
/// natural boundary, such as a chunked-array child boundary.
176+
fn preferred_batch_len(&self, _offset: usize, max_len: usize) -> usize {
177+
max_len
178+
}
179+
159180
/// Export the given range of data from the Vortex array to the DuckDB vector.
160181
fn export(
161182
&self,
@@ -201,6 +222,11 @@ fn new_array_exporter_with_flatten(
201222
Err(array) => array,
202223
};
203224

225+
let array = match array.try_downcast::<Chunked>() {
226+
Ok(array) => return chunked::new_exporter_with_flatten(array, cache, ctx, flatten),
227+
Err(array) => array,
228+
};
229+
204230
let array = match array.try_downcast::<List>() {
205231
Ok(array) => return list::new_exporter(array, cache, ctx),
206232
Err(array) => array,

vortex-duckdb/src/exporter/run_end.rs

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::duckdb::VectorRef;
2222
use crate::exporter::ColumnExporter;
2323
use crate::exporter::cache::ConversionCache;
2424
use crate::exporter::canonical;
25-
use crate::exporter::new_array_exporter;
25+
use crate::exporter::new_array_exporter_with_flatten;
2626

2727
/// We export run-end arrays to a DuckDB dictionary vector, using a selection vector to
2828
/// repeat the values in the run-end array.
@@ -50,7 +50,9 @@ pub(crate) fn new_exporter_with_flatten(
5050
let ends = array.ends().clone();
5151
let values = array.values().clone();
5252
let ends = ends.execute::<PrimitiveArray>(ctx)?;
53-
let values_exporter = new_array_exporter(values.clone(), cache, ctx)?;
53+
// REE exports values in run-index space, not outer row space. Materialize the dictionary
54+
// payload so chunked physical boundaries in the values child cannot constrain row batches.
55+
let values_exporter = new_array_exporter_with_flatten(values.clone(), cache, ctx, true)?;
5456

5557
match_each_integer_ptype!(ends.ptype(), |E| {
5658
Ok(Box::new(RunEndExporter {
@@ -137,3 +139,45 @@ impl<E: IntegerPType> ColumnExporter for RunEndExporter<E> {
137139
Ok(())
138140
}
139141
}
142+
143+
#[cfg(test)]
144+
mod tests {
145+
use vortex::array::IntoArray;
146+
use vortex::array::VortexSessionExecute;
147+
use vortex::array::arrays::ChunkedArray;
148+
use vortex::array::arrays::PrimitiveArray;
149+
use vortex::array::arrays::StructArray;
150+
use vortex::buffer::buffer;
151+
use vortex::encodings::runend::RunEnd;
152+
use vortex::error::VortexResult;
153+
154+
use crate::SESSION;
155+
use crate::duckdb::DataChunk;
156+
use crate::duckdb::LogicalType;
157+
use crate::exporter::ArrayExporter;
158+
use crate::exporter::ConversionCache;
159+
160+
#[test]
161+
fn run_end_with_chunked_values_exports_across_value_chunks() -> VortexResult<()> {
162+
let values0 = PrimitiveArray::from_iter([10i32]).into_array();
163+
let dtype = values0.dtype().clone();
164+
let values1 = PrimitiveArray::from_iter([20i32]).into_array();
165+
let values = ChunkedArray::try_new(vec![values0, values1], dtype)?.into_array();
166+
let mut ctx = SESSION.create_execution_ctx();
167+
let field = RunEnd::try_new(buffer![1u32, 2].into_array(), values, &mut ctx)?.into_array();
168+
let array = StructArray::from_fields(&[("field", field)])?;
169+
let mut exporter = ArrayExporter::try_new(&array, &ConversionCache::default(), ctx)?;
170+
let mut chunk = DataChunk::new([LogicalType::int32()]);
171+
172+
assert!(exporter.export(&mut chunk, None, None)?);
173+
assert_eq!(
174+
format!("{}", String::try_from(&*chunk)?),
175+
r#"Chunk - [1 Columns]
176+
- DICTIONARY INTEGER: 2 = [ 10, 20]
177+
"#
178+
);
179+
180+
assert!(!exporter.export(&mut chunk, None, None)?);
181+
Ok(())
182+
}
183+
}

vortex-duckdb/src/exporter/struct_.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ pub(crate) fn new_exporter(
6161
}
6262

6363
impl ColumnExporter for StructExporter {
64+
fn preferred_batch_len(&self, offset: usize, max_len: usize) -> usize {
65+
self.children
66+
.iter()
67+
.fold(max_len, |len, child| child.preferred_batch_len(offset, len))
68+
}
69+
6470
fn export(
6571
&self,
6672
offset: usize,

vortex-duckdb/src/exporter/validity.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ pub(crate) fn new_exporter(
7070
}
7171

7272
impl ColumnExporter for ValidityExporter {
73+
fn preferred_batch_len(&self, offset: usize, max_len: usize) -> usize {
74+
self.exporter.preferred_batch_len(offset, max_len)
75+
}
76+
7377
fn export(
7478
&self,
7579
offset: usize,

0 commit comments

Comments
 (0)