Skip to content

Commit ca03929

Browse files
committed
fixes
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 4b8d709 commit ca03929

2 files changed

Lines changed: 13 additions & 18 deletions

File tree

vortex-layout/src/layouts/zoned/builder.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// SPDX-FileCopyrightText: Copyright the Vortex contributors
55

66
use std::marker::PhantomData;
7+
use std::sync::Arc;
78

89
use itertools::Itertools;
910
use vortex_array::ArrayRef;
@@ -90,9 +91,10 @@ impl StatsAccumulator {
9091
Ok(())
9192
}
9293

93-
pub fn as_array(&mut self) -> VortexResult<Option<StructArray>> {
94+
pub fn as_array(&mut self) -> VortexResult<Option<(StructArray, Arc<[Stat]>)>> {
9495
let mut names = Vec::new();
9596
let mut fields = Vec::new();
97+
let mut stats = Vec::new();
9698

9799
for builder in self
98100
.builders
@@ -107,6 +109,7 @@ impl StatsAccumulator {
107109
continue;
108110
}
109111

112+
stats.push(builder.stat());
110113
names.extend(values.names);
111114
fields.extend(values.arrays);
112115
}
@@ -115,7 +118,8 @@ impl StatsAccumulator {
115118
return Ok(None);
116119
}
117120

118-
StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable).map(Some)
121+
let array = StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)?;
122+
Ok(Some((array, stats.into())))
119123
}
120124

121125
/// Returns an aggregated stats set for the table.
@@ -125,7 +129,7 @@ impl StatsAccumulator {
125129
ctx: &mut ExecutionCtx,
126130
) -> VortexResult<StatsSet> {
127131
let mut stats_set = StatsSet::default();
128-
let Some(array) = self.as_array()? else {
132+
let Some((array, _)) = self.as_array()? else {
129133
return Ok(stats_set);
130134
};
131135

@@ -415,7 +419,7 @@ mod tests {
415419
.vortex_expect("push_chunk should succeed for test data");
416420
acc.push_chunk(&builder2.finish(), &mut ctx)
417421
.vortex_expect("push_chunk should succeed for test data");
418-
let stats_table = acc.as_array().unwrap().expect("Must have stats table");
422+
let (stats_table, _) = acc.as_array().unwrap().expect("Must have stats table");
419423
assert_eq!(
420424
stats_table.names().as_ref(),
421425
&[
@@ -452,7 +456,7 @@ mod tests {
452456
let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
453457
acc.push_chunk(&array, &mut ctx)
454458
.vortex_expect("push_chunk should succeed for test array");
455-
let stats_table = acc.as_array().unwrap().expect("Must have stats table");
459+
let (stats_table, _) = acc.as_array().unwrap().expect("Must have stats table");
456460
assert_eq!(
457461
stats_table.names().as_ref(),
458462
&[

vortex-layout/src/layouts/zoned/writer.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::sync::Arc;
77

88
use async_trait::async_trait;
99
use futures::StreamExt as _;
10-
use itertools::Itertools;
1110
use parking_lot::Mutex;
1211
use vortex_array::ArrayContext;
1312
use vortex_array::IntoArray;
@@ -94,14 +93,7 @@ impl LayoutStrategy for ZonedStrategy {
9493
"ZonedStrategy requires block_size > 0 when writing"
9594
);
9695

97-
let stats: Arc<[Stat]> = self
98-
.options
99-
.stats
100-
.as_ref()
101-
.iter()
102-
.cloned()
103-
.sorted_unstable()
104-
.collect();
96+
let stats = Arc::clone(&self.options.stats);
10597
let session = session.clone();
10698
let compute_session = session.clone();
10799
let handle = session.handle();
@@ -113,13 +105,12 @@ impl LayoutStrategy for ZonedStrategy {
113105
self.options.max_variable_length_statistics_size,
114106
)));
115107

116-
let stats2 = Arc::clone(&stats);
117108
// We can compute per-chunk statistics in parallel, so we spawn tasks for each chunk
118109
let stream = SequentialStreamAdapter::new(
119110
stream.dtype().clone(),
120111
stream
121112
.map(move |chunk| {
122-
let stats = Arc::clone(&stats2);
113+
let stats = Arc::clone(&stats);
123114
let session = compute_session.clone();
124115
handle2.spawn_cpu(move || {
125116
let (sequence_id, chunk) = chunk?;
@@ -164,15 +155,15 @@ impl LayoutStrategy for ZonedStrategy {
164155
)
165156
.await?;
166157

167-
let Some(stats_table) = stats_accumulator.lock().as_array()? else {
158+
let Some((stats_array, stats)) = stats_accumulator.lock().as_array()? else {
168159
// If we have no stats (e.g. the DType doesn't support them), then we just return the
169160
// child layout.
170161
return Ok(data_layout);
171162
};
172163

173164
// We must defer creating the stats table LayoutWriter until now, because the DType of
174165
// the table depends on which stats were successfully computed.
175-
let stats_stream = stats_table
166+
let stats_stream = stats_array
176167
.into_array()
177168
.to_array_stream()
178169
.sequenced(eof.split_off());

0 commit comments

Comments
 (0)