Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ cargo +nightly fmt --all
cargo clippy --all-targets --all-features
```

Do not push Rust code changes before running the applicable lint command above. If the change adds
or edits Rustdoc on public APIs, also run the CI docs command so broken intra-doc links are caught
locally:

```bash
RUSTDOCFLAGS="-D warnings" cargo doc --profile ci --no-deps
```

Notes:

- For `.github/` changes, follow `.github/AGENTS.md` and run
Expand Down Expand Up @@ -190,5 +198,8 @@ you ran and call out any checks you could not run.
All commits must be signed off by the committers in this form:

```text
Signed-off-by: "COMMITTER" <COMMITTER_EMAIL>
Signed-off-by: COMMITTER <COMMITTER_EMAIL>
```

Do not wrap the committer name in quotes; the DCO check expects the exact unquoted name/email
pair from the commit author.
80 changes: 37 additions & 43 deletions vortex-array/benches/aggregate_grouped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ use vortex_array::aggregate_fn::EmptyOptions;
use vortex_array::aggregate_fn::GroupedAccumulator;
use vortex_array::aggregate_fn::fns::count::Count;
use vortex_array::aggregate_fn::fns::sum::Sum;
use vortex_array::arrays::ListViewArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::dtype::DType;
use vortex_array::validity::Validity;
use vortex_buffer::Buffer;

Expand All @@ -45,44 +43,42 @@ fn total_element_count(group_sizes: &[usize]) -> usize {
group_sizes.iter().sum()
}

fn contiguous_list_view(elements: ArrayRef, group_sizes: &[usize]) -> ArrayRef {
let mut offset = 0usize;
let offsets: Buffer<u32> = group_sizes
struct DenseGroupedInput {
values: ArrayRef,
group_ids: Vec<u32>,
num_groups: usize,
}

fn dense_grouped_input(values: ArrayRef, group_sizes: &[usize]) -> DenseGroupedInput {
assert_eq!(values.len(), total_element_count(group_sizes));

let group_ids = group_sizes
.iter()
.map(|&size| {
let current_offset = offset;
offset += size;
current_offset as u32
})
.enumerate()
.flat_map(|(group_id, &size)| std::iter::repeat_n(group_id as u32, size))
.collect();
let sizes: Buffer<u32> = group_sizes.iter().map(|&size| size as u32).collect();

assert_eq!(elements.len(), total_element_count(group_sizes));

ListViewArray::try_new(
elements,
offsets.into_array(),
sizes.into_array(),
Validity::NonNullable,
)
.unwrap()
.into_array()
DenseGroupedInput {
values,
group_ids,
num_groups: group_sizes.len(),
}
}

fn i32_nullable_all_valid_input() -> ArrayRef {
fn i32_nullable_all_valid_input() -> DenseGroupedInput {
let group_sizes = random_group_sizes();
let element_count = total_element_count(&group_sizes);
let values: Buffer<i32> = (0..element_count)
.map(|i| (i % 1024) as i32 - 512)
.collect();
let validity = Validity::from_iter(std::iter::repeat_n(true, element_count));
contiguous_list_view(
dense_grouped_input(
PrimitiveArray::new(values, validity).into_array(),
&group_sizes,
)
}

fn i32_clustered_nulls_input() -> ArrayRef {
fn i32_clustered_nulls_input() -> DenseGroupedInput {
let group_sizes = random_group_sizes();
let element_count = total_element_count(&group_sizes);
let values = (0..element_count).map(|i| {
Expand All @@ -92,26 +88,26 @@ fn i32_clustered_nulls_input() -> ArrayRef {
Some((i % 1024) as i32 - 512)
}
});
contiguous_list_view(
dense_grouped_input(
PrimitiveArray::from_option_iter(values).into_array(),
&group_sizes,
)
}

fn f64_all_valid_input() -> ArrayRef {
fn f64_all_valid_input() -> DenseGroupedInput {
let group_sizes = random_group_sizes();
let element_count = total_element_count(&group_sizes);
let mut rng = StdRng::seed_from_u64(GROUP_SIZE_SEED);
let values: Buffer<f64> = (0..element_count)
.map(|_| rng.random_range(-1000.0..1000.0))
.collect();
contiguous_list_view(
dense_grouped_input(
PrimitiveArray::new(values, Validity::NonNullable).into_array(),
&group_sizes,
)
}

fn f64_clustered_nulls_input() -> ArrayRef {
fn f64_clustered_nulls_input() -> DenseGroupedInput {
let group_sizes = random_group_sizes();
let element_count = total_element_count(&group_sizes);
let mut rng = StdRng::seed_from_u64(GROUP_SIZE_SEED);
Expand All @@ -122,40 +118,38 @@ fn f64_clustered_nulls_input() -> ArrayRef {
Some(rng.random_range(-1000.0f64..1000.0))
}
});
contiguous_list_view(
dense_grouped_input(
PrimitiveArray::from_option_iter(values).into_array(),
&group_sizes,
)
}

fn varbinview_input() -> ArrayRef {
fn varbinview_input() -> DenseGroupedInput {
let group_sizes = random_group_sizes();
let element_count = total_element_count(&group_sizes);
let values: Vec<String> = (0..element_count)
.map(|i| format!("value-{i:06}"))
.collect();
contiguous_list_view(
dense_grouped_input(
VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(),
&group_sizes,
)
}

fn list_element_dtype(list_view: &ArrayRef) -> DType {
match list_view.dtype() {
DType::List(element_dtype, _) => element_dtype.as_ref().clone(),
dtype => unreachable!("expected List dtype, got {dtype}"),
}
}

fn grouped_accumulator<V>(list_view: &ArrayRef, vtable: V) -> ArrayRef
fn grouped_accumulator<V>(input: &DenseGroupedInput, vtable: V) -> ArrayRef
where
V: AggregateFnVTable<Options = EmptyOptions> + Clone,
{
let mut acc =
GroupedAccumulator::try_new(vtable, EmptyOptions, list_element_dtype(list_view)).unwrap();
acc.accumulate_list(list_view, &mut LEGACY_SESSION.create_execution_ctx())
.unwrap();
divan::black_box(acc.finish().unwrap())
GroupedAccumulator::try_new(vtable, EmptyOptions, input.values.dtype().clone()).unwrap();
acc.accumulate(
&input.values,
&input.group_ids,
input.num_groups,
&mut LEGACY_SESSION.create_execution_ctx(),
)
.unwrap();
divan::black_box(acc.finish(input.num_groups).unwrap())
}

#[divan::bench]
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/aggregate_fn/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl<V: AggregateFnVTable> DynAccumulator for Accumulator<V> {
}

// 3. Iteratively check the registry against each intermediate encoding, executing one
// step between checks. Mirrors the loop in `GroupedAccumulator::accumulate_list_view`.
// step between checks. Mirrors the loop in `GroupedAccumulator::accumulate`.
// Iteration 0 re-checks the initial encoding — a redundant HashMap miss, the price of
// keeping the loop body uniform. Terminates on `AnyColumnar` (Canonical or Constant)
// since the vtable's `accumulate(&Columnar)` handles both cases directly.
Expand Down
Loading
Loading