Skip to content

Commit 3a5dc76

Browse files
committed
Tuning and fixes and stuff
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent a29c1c3 commit 3a5dc76

9 files changed

Lines changed: 19 additions & 157 deletions

File tree

vortex-array/src/arrow/executor/list.rs

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::sync::Arc;
5-
use std::sync::atomic::AtomicU64;
6-
use std::sync::atomic::Ordering;
75

86
use arrow_array::ArrayRef as ArrowArrayRef;
97
use arrow_array::GenericListArray;
@@ -19,6 +17,7 @@ use crate::ArrayRef;
1917
use crate::Canonical;
2018
use crate::DynArray;
2119
use crate::ExecutionCtx;
20+
use crate::arrays::Chunked;
2221
use crate::arrays::List;
2322
use crate::arrays::ListArray;
2423
use crate::arrays::ListView;
@@ -33,31 +32,6 @@ use crate::dtype::NativePType;
3332
use crate::dtype::Nullability;
3433
use crate::vtable::ValidityHelper;
3534

36-
static LIST_TO_LIST_COUNT: AtomicU64 = AtomicU64::new(0);
37-
static LIST_VIEW_ZCTL_COUNT: AtomicU64 = AtomicU64::new(0);
38-
static EXECUTE_LIST_VIEW_COUNT: AtomicU64 = AtomicU64::new(0);
39-
40-
#[derive(Debug, Clone, Copy, Default)]
41-
pub struct ListArrowPathCounters {
42-
pub list_to_list: u64,
43-
pub list_view_zctl: u64,
44-
pub execute_list_view: u64,
45-
}
46-
47-
pub(crate) fn reset_list_arrow_path_counters() {
48-
LIST_TO_LIST_COUNT.store(0, Ordering::Relaxed);
49-
LIST_VIEW_ZCTL_COUNT.store(0, Ordering::Relaxed);
50-
EXECUTE_LIST_VIEW_COUNT.store(0, Ordering::Relaxed);
51-
}
52-
53-
pub(crate) fn list_arrow_path_counters() -> ListArrowPathCounters {
54-
ListArrowPathCounters {
55-
list_to_list: LIST_TO_LIST_COUNT.load(Ordering::Relaxed),
56-
list_view_zctl: LIST_VIEW_ZCTL_COUNT.load(Ordering::Relaxed),
57-
execute_list_view: EXECUTE_LIST_VIEW_COUNT.load(Ordering::Relaxed),
58-
}
59-
}
60-
6135
/// Convert a Vortex array into an Arrow GenericBinaryArray.
6236
pub(super) fn to_arrow_list<O: OffsetSizeTrait + NativePType>(
6337
array: ArrayRef,
@@ -66,14 +40,12 @@ pub(super) fn to_arrow_list<O: OffsetSizeTrait + NativePType>(
6640
) -> VortexResult<ArrowArrayRef> {
6741
// If the Vortex array is already in List format, we can directly convert it.
6842
if let Some(array) = array.as_opt::<List>() {
69-
LIST_TO_LIST_COUNT.fetch_add(1, Ordering::Relaxed);
7043
return list_to_list::<O>(array, elements_field, ctx);
7144
}
7245

7346
// If the Vortex array is a ListViewArray, rebuild to ZCTL if needed and convert.
7447
let array = match array.try_into::<ListView>() {
7548
Ok(array) => {
76-
LIST_VIEW_ZCTL_COUNT.fetch_add(1, Ordering::Relaxed);
7749
let zctl = if array.is_zero_copy_to_list() {
7850
array
7951
} else {
@@ -84,18 +56,34 @@ pub(super) fn to_arrow_list<O: OffsetSizeTrait + NativePType>(
8456
Err(a) => a,
8557
};
8658

59+
// Handle ChunkedArray by converting each chunk individually.
60+
// This preserves the fast list_to_list path for inner ListArray chunks
61+
// instead of falling through to the expensive execute::<ListViewArray> path.
62+
if let Some(chunked) = array.as_opt::<Chunked>() {
63+
let mut arrow_chunks: Vec<ArrowArrayRef> = Vec::with_capacity(chunked.nchunks());
64+
for chunk in chunked.chunks() {
65+
arrow_chunks.push(to_arrow_list::<O>(chunk.clone(), elements_field, ctx)?);
66+
}
67+
if arrow_chunks.len() == 1 {
68+
return Ok(arrow_chunks
69+
.into_iter()
70+
.next()
71+
.vortex_expect("known length"));
72+
}
73+
let refs: Vec<&dyn arrow_array::Array> = arrow_chunks.iter().map(|a| a.as_ref()).collect();
74+
return Ok(arrow_select::concat::concat(&refs)?);
75+
}
76+
8777
// Otherwise, we execute the array to become a ListViewArray, then rebuild to ZCTL.
8878
// Note: arrow_cast::cast supports ListView → List (apache/arrow-rs#8735), but it
8979
// unconditionally uses take. Our rebuild uses a heuristic that picks list-by-list
9080
// for large lists, which avoids materializing a large index buffer.
91-
EXECUTE_LIST_VIEW_COUNT.fetch_add(1, Ordering::Relaxed);
9281
let list_view = array.execute::<ListViewArray>(ctx)?;
9382
let zctl = if list_view.is_zero_copy_to_list() {
9483
list_view
9584
} else {
9685
list_view.rebuild(ListViewRebuildMode::MakeZeroCopyToList)?
9786
};
98-
LIST_VIEW_ZCTL_COUNT.fetch_add(1, Ordering::Relaxed);
9987
list_view_zctl::<O>(zctl, elements_field, ctx)
10088
}
10189

vortex-array/src/arrow/executor/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@ use arrow_schema::Field;
2525
use arrow_schema::FieldRef;
2626
use arrow_schema::Schema;
2727
use itertools::Itertools;
28-
pub(crate) use list::ListArrowPathCounters;
29-
pub(crate) use list::list_arrow_path_counters;
30-
pub(crate) use list::reset_list_arrow_path_counters;
3128
use vortex_error::VortexResult;
3229
use vortex_error::vortex_bail;
3330
use vortex_error::vortex_ensure;

vortex-array/src/arrow/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ mod null_buffer;
1515
mod record_batch;
1616

1717
pub use datum::*;
18-
pub(crate) use executor::ListArrowPathCounters;
19-
pub(crate) use executor::list_arrow_path_counters;
20-
pub(crate) use executor::reset_list_arrow_path_counters;
2118
pub use executor::*;
2219
pub use iter::*;
2320

vortex-array/src/debug.rs

Lines changed: 0 additions & 14 deletions
This file was deleted.

vortex-array/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ pub mod builtins;
4141
mod canonical;
4242
mod columnar;
4343
pub mod compute;
44-
pub mod debug;
4544
pub mod display;
4645
pub mod dtype;
4746
mod executor;

vortex-bench/src/runner.rs

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
//! Generic benchmark runner infrastructure to reduce boilerplate across engine-specific benchmarks.
55
6-
use std::env;
7-
use std::fmt;
86
use std::fs::File;
97
use std::future::Future;
108
use std::io::Write;
@@ -14,11 +12,7 @@ use std::time::Duration;
1412
use std::time::Instant;
1513

1614
use indicatif::ProgressBar;
17-
use vortex::array::debug::arrow_path_counters;
18-
use vortex::array::debug::reset_arrow_path_counters;
1915
use vortex::error::vortex_panic;
20-
use vortex::scan::debug::counters as scan_debug_counters;
21-
use vortex::scan::debug::reset_counters as reset_scan_debug_counters;
2216

2317
use crate::Benchmark;
2418
use crate::BenchmarkDataset;
@@ -51,47 +45,6 @@ use crate::measurements::QueryMeasurement;
5145
use crate::memory::BenchmarkMemoryTracker;
5246
use crate::url_scheme_to_storage;
5347

54-
#[derive(Debug, Clone, Copy, Default)]
55-
struct DebugCounterSnapshot {
56-
list_to_list: u64,
57-
list_view_zctl: u64,
58-
execute_list_view: u64,
59-
legacy_stream_fallbacks: u64,
60-
}
61-
62-
impl fmt::Display for DebugCounterSnapshot {
63-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64-
write!(
65-
f,
66-
"list_to_list={}, list_view_zctl={}, execute_list_view={}, legacy_stream_fallbacks={}",
67-
self.list_to_list,
68-
self.list_view_zctl,
69-
self.execute_list_view,
70-
self.legacy_stream_fallbacks
71-
)
72-
}
73-
}
74-
75-
fn debug_counters_enabled() -> bool {
76-
env::var_os("VORTEX_BENCH_DEBUG_COUNTERS").is_some()
77-
}
78-
79-
fn reset_debug_counters() {
80-
reset_arrow_path_counters();
81-
reset_scan_debug_counters();
82-
}
83-
84-
fn snapshot_debug_counters() -> DebugCounterSnapshot {
85-
let arrow = arrow_path_counters();
86-
let scan = scan_debug_counters();
87-
DebugCounterSnapshot {
88-
list_to_list: arrow.list_to_list,
89-
list_view_zctl: arrow.list_view_zctl,
90-
execute_list_view: arrow.execute_list_view,
91-
legacy_stream_fallbacks: scan.legacy_stream_fallbacks,
92-
}
93-
}
94-
9548
/// Results from a benchmark run.
9649
pub struct BenchmarkResults {
9750
pub query_measurements: Vec<QueryMeasurement>,
@@ -177,9 +130,6 @@ impl SqlBenchmarkRunner {
177130
let mut row_count = None;
178131

179132
for _ in 0..iterations {
180-
if debug_counters_enabled() {
181-
reset_debug_counters();
182-
}
183133
let start = Instant::now();
184134
let (timing, result) = f();
185135
let elapsed = timing.unwrap_or_else(|| start.elapsed());
@@ -188,15 +138,6 @@ impl SqlBenchmarkRunner {
188138
if row_count.is_none() {
189139
row_count = Some(result.row_count());
190140
}
191-
192-
if debug_counters_enabled() {
193-
tracing::info!(
194-
query_idx,
195-
%format,
196-
counters = %snapshot_debug_counters(),
197-
"debug counters"
198-
);
199-
}
200141
}
201142

202143
let row_count = row_count.expect("iterations must be > 0");
@@ -422,9 +363,6 @@ impl SqlBenchmarkRunner {
422363
tracing::debug!(%format, query_idx, "Running query");
423364

424365
for _ in 0..iterations {
425-
if debug_counters_enabled() {
426-
reset_debug_counters();
427-
}
428366
let start = Instant::now();
429367
let (timing, result) = execute(query_idx, &ctx, query.as_str())
430368
.await
@@ -437,15 +375,6 @@ impl SqlBenchmarkRunner {
437375
if row_count.is_none() {
438376
row_count = Some(result.row_count());
439377
}
440-
441-
if debug_counters_enabled() {
442-
tracing::info!(
443-
query_idx,
444-
%format,
445-
counters = %snapshot_debug_counters(),
446-
"debug counters"
447-
);
448-
}
449378
}
450379

451380
let row_count = row_count.expect("iterations must be > 0");

vortex-scan/src/debug.rs

Lines changed: 0 additions & 14 deletions
This file was deleted.

vortex-scan/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ const IDEAL_SPLIT_SIZE: u64 = 100_000;
1212

1313
pub mod api;
1414
pub mod arrow;
15-
pub mod debug;
1615
mod filter;
1716
pub mod row_mask;
1817
mod splits;

vortex-scan/src/repeated_scan.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ use std::collections::BTreeMap;
66
use std::collections::VecDeque;
77
use std::ops::Range;
88
use std::sync::Arc;
9-
use std::sync::atomic::AtomicU64;
10-
use std::sync::atomic::Ordering;
119
use std::task::Context;
1210
use std::task::Poll;
1311

@@ -44,22 +42,6 @@ use crate::tasks::split_exec;
4442
const ADAPTIVE_SELECTIVITY_SAMPLE_SPLITS: usize = 4;
4543
const HIGH_SURVIVOR_RATIO: f64 = 0.75;
4644
const MAX_PROJECTION_SUBSPLITS_PER_SPLIT: usize = 8;
47-
static LEGACY_STREAM_FALLBACK_COUNT: AtomicU64 = AtomicU64::new(0);
48-
49-
#[derive(Debug, Clone, Copy, Default)]
50-
pub struct ScanDebugCounters {
51-
pub legacy_stream_fallbacks: u64,
52-
}
53-
54-
pub(crate) fn reset_scan_debug_counters() {
55-
LEGACY_STREAM_FALLBACK_COUNT.store(0, Ordering::Relaxed);
56-
}
57-
58-
pub(crate) fn scan_debug_counters() -> ScanDebugCounters {
59-
ScanDebugCounters {
60-
legacy_stream_fallbacks: LEGACY_STREAM_FALLBACK_COUNT.load(Ordering::Relaxed),
61-
}
62-
}
6345

6446
fn should_prefer_immediate_projection(
6547
observed_filter_splits: usize,
@@ -314,7 +296,6 @@ impl<A: 'static + Send> RepeatedScan<A> {
314296
split_ranges_exhausted,
315297
filter_ahead,
316298
) {
317-
LEGACY_STREAM_FALLBACK_COUNT.fetch_add(1, Ordering::Relaxed);
318299
return self.legacy_stream_from_ranges(
319300
ctx,
320301
prefetched_ranges,

0 commit comments

Comments
 (0)