Skip to content

Commit 43d5c26

Browse files
committed
less
Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 17d1922 commit 43d5c26

6 files changed

Lines changed: 70 additions & 112 deletions

File tree

vortex-layout/src/layouts/file_stats.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,8 @@ impl FileStatsAccumulator {
130130
.lock()
131131
.iter_mut()
132132
.map(|acc| {
133-
acc.as_stats_table()
133+
acc.as_stats_set(&self.stats, &mut ctx)
134134
.vortex_expect("as_stats_table should not fail")
135-
.map(|table| {
136-
table
137-
.to_stats_set(&self.stats, &mut ctx)
138-
.vortex_expect("shouldn't fail to convert table we just created")
139-
})
140-
.unwrap_or_default()
141135
})
142136
.collect()
143137
}

vortex-layout/src/layouts/zoned/builder.rs

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,30 +11,32 @@ use vortex_array::ExecutionCtx;
1111
use vortex_array::IntoArray;
1212
use vortex_array::LEGACY_SESSION;
1313
use vortex_array::VortexSessionExecute;
14+
use vortex_array::aggregate_fn::fns::sum::sum;
1415
use vortex_array::arrays::ConstantArray;
1516
use vortex_array::arrays::StructArray;
17+
use vortex_array::arrays::struct_::StructArrayExt;
1618
use vortex_array::builders::ArrayBuilder;
1719
use vortex_array::builders::BoolBuilder;
1820
use vortex_array::builders::builder_with_capacity;
1921
use vortex_array::dtype::DType;
2022
use vortex_array::dtype::FieldName;
2123
use vortex_array::dtype::Nullability;
24+
use vortex_array::dtype::PType;
2225
use vortex_array::expr::stats::Precision;
2326
use vortex_array::expr::stats::Stat;
2427
use vortex_array::expr::stats::StatsProvider;
2528
use vortex_array::scalar::Scalar;
2629
use vortex_array::scalar::ScalarTruncation;
2730
use vortex_array::scalar::lower_bound;
2831
use vortex_array::scalar::upper_bound;
32+
use vortex_array::stats::StatsSet;
2933
use vortex_array::validity::Validity;
3034
use vortex_buffer::BufferString;
3135
use vortex_buffer::ByteBuffer;
32-
use vortex_error::VortexExpect;
3336
use vortex_error::VortexResult;
3437

3538
use crate::layouts::zoned::schema::MAX_IS_TRUNCATED;
3639
use crate::layouts::zoned::schema::MIN_IS_TRUNCATED;
37-
use crate::layouts::zoned::zone_map::ZoneMap;
3840

3941
/// Accumulates write-time statistics for each logical zone.
4042
pub struct StatsAccumulator {
@@ -88,14 +90,9 @@ impl StatsAccumulator {
8890
Ok(())
8991
}
9092

91-
/// Finishes the accumulator into a [`ZoneMap`].
92-
///
93-
/// Returns `None` if none of the requested statistics can be computed, for example they are
94-
/// not applicable to the column's data type.
95-
pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
93+
pub fn as_array(&mut self) -> VortexResult<Option<StructArray>> {
9694
let mut names = Vec::new();
9795
let mut fields = Vec::new();
98-
let mut stats = Vec::new();
9996

10097
for builder in self
10198
.builders
@@ -110,7 +107,6 @@ impl StatsAccumulator {
110107
continue;
111108
}
112109

113-
stats.push(builder.stat());
114110
names.extend(values.names);
115111
fields.extend(values.arrays);
116112
}
@@ -119,15 +115,49 @@ impl StatsAccumulator {
119115
return Ok(None);
120116
}
121117

122-
let array = StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
123-
.vortex_expect("Failed to create zone map");
124-
let stats = stats.into();
118+
StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable).map(Some)
119+
}
125120

126-
// SAFETY: `StatsAccumulator` builds the struct fields from `stats_builder_with_capacity`
127-
// using the same field-ordering and truncation-column rules as `stats_table_dtype`.
128-
// The `stats` list is collected in that same sorted order, so the resulting struct array
129-
// matches the expected zoned stats-table dtype by construction.
130-
Ok(Some(unsafe { ZoneMap::new_unchecked(array, stats) }))
121+
/// Returns an aggregated stats set for the table.
122+
pub fn as_stats_set(
123+
&mut self,
124+
stats: &[Stat],
125+
ctx: &mut ExecutionCtx,
126+
) -> VortexResult<StatsSet> {
127+
let mut stats_set = StatsSet::default();
128+
let Some(array) = self.as_array()? else {
129+
return Ok(stats_set);
130+
};
131+
132+
for &stat in stats {
133+
let Some(array) = array.unmasked_field_by_name_opt(stat.name()) else {
134+
continue;
135+
};
136+
137+
// Different stats need different aggregations
138+
match stat {
139+
// For stats that are associative, we can just compute them over the stat column
140+
Stat::Min | Stat::Max | Stat::Sum => {
141+
if let Some(s) = array.statistics().compute_stat(stat, ctx)?
142+
&& let Some(v) = s.into_value()
143+
{
144+
stats_set.set(stat, Precision::exact(v))
145+
}
146+
}
147+
// These stats sum up
148+
Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
149+
if let Some(sum_value) = sum(array, ctx)?
150+
.cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
151+
.into_value()
152+
{
153+
stats_set.set(stat, Precision::exact(sum_value));
154+
}
155+
}
156+
// We could implement these aggregations in the future, but for now they're unused
157+
Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
158+
}
159+
}
160+
Ok(stats_set)
131161
}
132162
}
133163

@@ -385,12 +415,9 @@ mod tests {
385415
.vortex_expect("push_chunk should succeed for test data");
386416
acc.push_chunk(&builder2.finish(), &mut ctx)
387417
.vortex_expect("push_chunk should succeed for test data");
388-
let stats_table = acc
389-
.as_stats_table()
390-
.unwrap()
391-
.expect("Must have stats table");
418+
let stats_table = acc.as_array().unwrap().expect("Must have stats table");
392419
assert_eq!(
393-
stats_table.array().names().as_ref(),
420+
stats_table.names().as_ref(),
394421
&[
395422
Stat::Max.name(),
396423
MAX_IS_TRUNCATED,
@@ -399,7 +426,6 @@ mod tests {
399426
]
400427
);
401428
let field1_bool = stats_table
402-
.array()
403429
.unmasked_field(1)
404430
.clone()
405431
.execute::<BoolArray>(&mut ctx)
@@ -409,7 +435,6 @@ mod tests {
409435
BitBuffer::from(vec![false, true])
410436
);
411437
let field3_bool = stats_table
412-
.array()
413438
.unmasked_field(3)
414439
.clone()
415440
.execute::<BoolArray>(&mut ctx)
@@ -427,12 +452,9 @@ mod tests {
427452
let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
428453
acc.push_chunk(&array, &mut ctx)
429454
.vortex_expect("push_chunk should succeed for test array");
430-
let stats_table = acc
431-
.as_stats_table()
432-
.unwrap()
433-
.expect("Must have stats table");
455+
let stats_table = acc.as_array().unwrap().expect("Must have stats table");
434456
assert_eq!(
435-
stats_table.array().names().as_ref(),
457+
stats_table.names().as_ref(),
436458
&[
437459
Stat::Max.name(),
438460
MAX_IS_TRUNCATED,
@@ -442,14 +464,12 @@ mod tests {
442464
]
443465
);
444466
let field1_bool = stats_table
445-
.array()
446467
.unmasked_field(1)
447468
.clone()
448469
.execute::<BoolArray>(&mut ctx)
449470
.unwrap();
450471
assert_eq!(field1_bool.to_bit_buffer(), BitBuffer::from(vec![false]));
451472
let field3_bool = stats_table
452-
.array()
453473
.unmasked_field(3)
454474
.clone()
455475
.execute::<BoolArray>(&mut ctx)

vortex-layout/src/layouts/zoned/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ impl ZonedLayout {
201201
usize::try_from(self.children.child_row_count(1)).vortex_expect("Invalid number of zones")
202202
}
203203

204+
pub fn zone_len(&self) -> usize {
205+
self.zone_len
206+
}
207+
204208
/// Returns an array of stats that exist in the layout's data, must be sorted.
205209
pub fn present_stats(&self) -> &Arc<[Stat]> {
206210
&self.present_stats

vortex-layout/src/layouts/zoned/pruning.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type PredicateCache = Arc<OnceLock<Option<Expression>>>;
4040

4141
pub(super) struct PruningState {
4242
zone_count: usize,
43+
row_count: u64,
44+
zone_len: u64,
4345
present_stats: Arc<[Stat]>,
4446
lazy_children: Arc<LazyReaderChildren>,
4547
session: VortexSession,
@@ -57,6 +59,8 @@ impl PruningState {
5759
) -> Self {
5860
Self {
5961
zone_count: layout.nzones(),
62+
row_count: layout.row_count(),
63+
zone_len: layout.zone_len() as u64,
6064
present_stats: Arc::clone(layout.present_stats()),
6165
lazy_children,
6266
session,
@@ -131,7 +135,6 @@ impl PruningState {
131135
self.zone_map
132136
.get_or_init(move || {
133137
let zone_count = self.zone_count;
134-
let present_stats = Arc::clone(&self.present_stats);
135138
let zones_eval = self
136139
.lazy_children
137140
.get(1)
@@ -143,13 +146,15 @@ impl PruningState {
143146
)
144147
.vortex_expect("Failed construct zone map evaluation");
145148
let session = self.session.clone();
149+
let zone_len = self.zone_len;
150+
let row_count = self.row_count;
146151

147152
async move {
148153
let mut ctx = session.create_execution_ctx();
149154
let zones_array = zones_eval.await?.execute::<StructArray>(&mut ctx)?;
150155
// SAFETY: zoned layout validation ensures the zones child matches the expected
151156
// stats-table schema for `present_stats`.
152-
Ok(unsafe { ZoneMap::new_unchecked(zones_array, present_stats) })
157+
Ok(unsafe { ZoneMap::new_unchecked(zones_array, zone_len, row_count) })
153158
}
154159
.map_err(Arc::new)
155160
.boxed()

vortex-layout/src/layouts/zoned/writer.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,19 +105,20 @@ impl LayoutStrategy for ZonedStrategy {
105105
self.options.max_variable_length_statistics_size,
106106
)));
107107

108+
let stats2 = Arc::clone(&stats);
108109
// We can compute per-chunk statistics in parallel, so we spawn tasks for each chunk
109110
let stream = SequentialStreamAdapter::new(
110111
stream.dtype().clone(),
111112
stream
112113
.map(move |chunk| {
113-
let stats = Arc::clone(&stats);
114+
let stats = Arc::clone(&stats2);
114115
let session = compute_session.clone();
115116
handle2.spawn_cpu(move || {
116117
let (sequence_id, chunk) = chunk?;
117118
chunk
118119
.statistics()
119120
.compute_all(&stats, &mut session.create_execution_ctx())?;
120-
VortexResult::Ok((sequence_id, chunk))
121+
Ok((sequence_id, chunk))
121122
})
122123
})
123124
.buffered(self.options.concurrency),
@@ -155,7 +156,7 @@ impl LayoutStrategy for ZonedStrategy {
155156
)
156157
.await?;
157158

158-
let Some((stats_table, stats)) = stats_accumulator.lock().as_stats_table()? else {
159+
let Some(stats_table) = stats_accumulator.lock().as_array()? else {
159160
// If we have no stats (e.g. the DType doesn't support them), then we just return the
160161
// child layout.
161162
return Ok(data_layout);
@@ -164,8 +165,6 @@ impl LayoutStrategy for ZonedStrategy {
164165
// We must defer creating the stats table LayoutWriter until now, because the DType of
165166
// the table depends on which stats were successfully computed.
166167
let stats_stream = stats_table
167-
.array()
168-
.clone()
169168
.into_array()
170169
.to_array_stream()
171170
.sequenced(eof.split_off());

0 commit comments

Comments
 (0)