Skip to content

Commit 5f88e4a

Browse files
feat: wire vortex-native into duckdb-bench
Signed-off-by: Nemo Yu <zyu379@wisc.edu>
1 parent a3c046b commit 5f88e4a

14 files changed

Lines changed: 445 additions & 32 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/datafusion-bench/src/lib.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,9 @@ pub fn format_to_df_format(format: Format) -> Arc<dyn FileFormat> {
111111
Format::Csv => Arc::new(CsvFormat::default()) as _,
112112
Format::Arrow => Arc::new(ArrowFormat),
113113
Format::Parquet => Arc::new(ParquetFormat::new()),
114-
Format::OnDiskVortex | Format::VortexCompact => Arc::new(VortexFormat::new_with_options(
115-
SESSION.clone(),
116-
vortex_table_options(),
117-
)),
114+
Format::OnDiskVortex | Format::VortexCompact | Format::VortexNative => Arc::new(
115+
VortexFormat::new_with_options(SESSION.clone(), vortex_table_options()),
116+
),
118117
Format::OnDiskDuckDB | Format::Lance => {
119118
unimplemented!("Format {format} cannot be turned into a DataFusion `FileFormat`")
120119
}

benchmarks/duckdb-bench/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,10 @@ impl DuckClient {
169169
file_format: Format,
170170
) -> Result<()> {
171171
let object_type = match file_format {
172-
Format::Parquet | Format::OnDiskVortex | Format::VortexCompact => "VIEW",
172+
Format::Parquet
173+
| Format::OnDiskVortex
174+
| Format::VortexCompact
175+
| Format::VortexNative => "VIEW",
173176
Format::OnDiskDuckDB => "TABLE",
174177
Format::Lance => {
175178
anyhow::bail!(

benchmarks/duckdb-bench/src/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ fn main() -> anyhow::Result<()> {
142142
// OnDiskDuckDB tables are created during register_tables by loading from Parquet
143143
_ => {}
144144
}
145+
benchmark.prepare_format(format, &base_path).await?;
145146
}
146147

147148
anyhow::Ok(())
@@ -197,7 +198,8 @@ fn main() -> anyhow::Result<()> {
197198
if !args.reuse {
198199
ctx.reopen()?;
199200
}
200-
ctx.execute_query_result(query)
201+
let query = benchmark.query_for_format(query, format);
202+
ctx.execute_query_result(&query)
201203
},
202204
)?;
203205

vortex-bench/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ vortex = { workspace = true, features = [
2424
"zstd",
2525
] }
2626
vortex-tensor = { workspace = true } # TODO(connor): In the future, this might be inside vortex.
27+
vortex-geo = { workspace = true }
2728

2829
anyhow = { workspace = true }
2930
arrow-array = { workspace = true }
@@ -33,6 +34,8 @@ async-trait = { workspace = true }
3334
bzip2 = { workspace = true }
3435
clap = { workspace = true, features = ["derive"] }
3536
futures = { workspace = true }
37+
geoarrow = { workspace = true }
38+
geoarrow-cast = { workspace = true }
3639
get_dir = { workspace = true }
3740
glob = { workspace = true }
3841
humansize = { workspace = true }

vortex-bench/src/benchmark.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
//! Core benchmark trait and types.
55
6+
use std::path::Path;
7+
68
use arrow_schema::Schema;
79
use glob::Pattern;
810
use url::Url;
@@ -33,6 +35,11 @@ pub trait Benchmark: Send + Sync {
3335
/// Get all available queries for this benchmark
3436
fn queries(&self) -> anyhow::Result<Vec<(usize, String)>>;
3537

38+
/// Adapt a query to a specific storage `format` before execution. Default: unchanged.
39+
fn query_for_format(&self, query: &str, _format: Format) -> String {
40+
query.to_string()
41+
}
42+
3643
/// SQL an `engine` must run before this benchmark's queries (e.g. loading engine
3744
/// extensions). Runners replay these after every (re)open. Default: none.
3845
fn engine_init_sql(&self, _engine: Engine) -> Vec<String> {
@@ -47,6 +54,13 @@ pub trait Benchmark: Send + Sync {
4754
/// call this method to ensure base data exists, then perform their own format conversion.
4855
async fn generate_base_data(&self) -> anyhow::Result<()>;
4956

57+
/// Prepare benchmark- and format-specific data beyond the Parquet base that
58+
/// [`Benchmark::generate_base_data`] produced. Called once per requested format, after the base
59+
/// data exists. Default: nothing.
60+
async fn prepare_format(&self, _format: Format, _base_path: &Path) -> anyhow::Result<()> {
61+
Ok(())
62+
}
63+
5064
/// Get expected row counts for validation (optional)
5165
/// If None, no validation will be performed
5266
fn expected_row_counts(&self) -> Option<Vec<usize>> {
@@ -80,4 +94,10 @@ pub trait Benchmark: Send + Sync {
8094
_ = format;
8195
None
8296
}
97+
98+
/// SQL projection substituted into `SELECT {..} FROM read_<fmt>(..)` when registering
99+
/// `table_name` as a DuckDB view. Defaults to `*`.
100+
fn view_projection(&self, _table_name: &str, _format: Format) -> String {
101+
"*".to_string()
102+
}
83103
}

vortex-bench/src/lib.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,11 @@ use vortex::session::VortexSession;
7676
#[global_allocator]
7777
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
7878

79-
pub static SESSION: LazyLock<VortexSession> =
80-
LazyLock::new(|| VortexSession::default().with_tokio());
79+
pub static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
80+
let session = VortexSession::default().with_tokio();
81+
vortex_geo::initialize(&session);
82+
session
83+
});
8184

8285
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
8386
pub struct Target {
@@ -146,6 +149,9 @@ pub enum Format {
146149
#[clap(name = "vortex-compact")]
147150
#[serde(rename = "vortex-compact")]
148151
VortexCompact,
152+
#[clap(name = "vortex-native")]
153+
#[serde(rename = "vortex-native")]
154+
VortexNative,
149155
#[clap(name = "duckdb")]
150156
#[serde(rename = "duckdb")]
151157
OnDiskDuckDB,
@@ -185,6 +191,7 @@ impl Format {
185191
Format::Parquet => "parquet",
186192
Format::OnDiskVortex => "vortex-file-compressed",
187193
Format::VortexCompact => "vortex-compact",
194+
Format::VortexNative => "vortex-native",
188195
Format::OnDiskDuckDB => "duckdb",
189196
Format::Lance => "lance",
190197
}
@@ -197,6 +204,7 @@ impl Format {
197204
Format::Parquet => "parquet",
198205
Format::OnDiskVortex => "vortex",
199206
Format::VortexCompact => "vortex",
207+
Format::VortexNative => "vortex",
200208
Format::OnDiskDuckDB => "duckdb",
201209
Format::Lance => "lance",
202210
}
@@ -451,8 +459,16 @@ where
451459
object_type.to_lowercase()
452460
);
453461

462+
let projection = benchmark.view_projection(name, load_format);
463+
// SpatialBench's native and WKB lanes both register `trip` from the same db path but with different casts —
464+
// so always replace views (cheap, metadata-only). Tables hold materialized data: keep them.
465+
let create = if object_type == "VIEW" {
466+
format!("CREATE OR REPLACE VIEW {name}")
467+
} else {
468+
format!("CREATE {object_type} IF NOT EXISTS {name}")
469+
};
454470
sql_statements.push(format!(
455-
"CREATE {object_type} IF NOT EXISTS {name} AS SELECT * FROM read_{extension}('{base_dir}/{pattern}');\n",
471+
"{create} AS SELECT {projection} FROM read_{extension}('{base_dir}/{pattern}');\n",
456472
));
457473
}
458474

vortex-bench/src/spatialbench/benchmark.rs

Lines changed: 104 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! SpatialBench benchmark implementation
55
66
use std::fs;
7+
use std::path::Path;
78

89
use url::Url;
910

@@ -13,9 +14,13 @@ use crate::Engine;
1314
use crate::Format;
1415
use crate::TableSpec;
1516
use crate::spatialbench::datagen;
17+
use crate::spatialbench::datagen::Table;
1618
use crate::utils::file::resolve_data_url;
1719
use crate::workspace_root;
1820

21+
/// Data-dir subfolder for the native-geometry Vortex files (the `vortex-native` lane).
22+
pub const NATIVE_DIR: &str = "vortex-native";
23+
1924
/// SpatialBench geospatial benchmark (Apache Sedona): a `trip` point table, `building` polygons, and
2025
/// a `customer` attribute table, queried with spatial filters and joins. `zone` polygons are sourced
2126
/// externally and registered when present. See <https://sedona.apache.org/spatialbench/>.
@@ -34,6 +39,21 @@ impl SpatialBenchBenchmark {
3439
scale_factor,
3540
})
3641
}
42+
43+
/// Tables to materialize and register: the in-process base tables (`trip`, `building`,
44+
/// `customer`) plus the externally-sourced `zone` when its parquet is present. Shared by native
45+
/// data-gen and table registration so both lanes cover the same set.
46+
fn base_tables(&self) -> Vec<Table> {
47+
let mut tables = vec![Table::Trip, Table::Building, Table::Customer];
48+
let zone_present = match self.data_url.to_file_path() {
49+
Ok(base) => zone_parquet_present(&base.join(Format::Parquet.name())),
50+
Err(()) => true,
51+
};
52+
if zone_present {
53+
tables.push(Table::Zone);
54+
}
55+
tables
56+
}
3757
}
3858

3959
#[async_trait::async_trait]
@@ -58,6 +78,16 @@ impl Benchmark for SpatialBenchBenchmark {
5878
.collect())
5979
}
6080

81+
/// On the `vortex-native` lane, geometry columns surface as `GEOMETRY`, so drop the
82+
/// `ST_GeomFromWKB(..)` wrappers and let DuckDB's `spatial` extension evaluate the `ST_*`
83+
/// predicates directly on the native geometry.
84+
fn query_for_format(&self, query: &str, format: Format) -> String {
85+
match format {
86+
Format::VortexNative => strip_wkb_wrappers(query),
87+
_ => query.to_string(),
88+
}
89+
}
90+
6191
async fn generate_base_data(&self) -> anyhow::Result<()> {
6292
if self.data_url.scheme() != "file" {
6393
return Ok(());
@@ -66,14 +96,37 @@ impl Benchmark for SpatialBenchBenchmark {
6696
.data_url
6797
.to_file_path()
6898
.map_err(|_| anyhow::anyhow!("Invalid file URL: {}", self.data_url.as_str()))?;
69-
datagen::generate_tables(&self.scale_factor, base_data_dir).await?;
99+
datagen::generate_tables(&self.scale_factor, base_data_dir.clone()).await?;
100+
Ok(())
101+
}
102+
103+
/// The `vortex-native` lane decodes each table's WKB geometry to native GeoArrow once, into the
104+
/// `vortex-native` dir, so its queries read DuckDB `GEOMETRY` directly. Idempotent.
105+
async fn prepare_format(&self, format: Format, base_path: &Path) -> anyhow::Result<()> {
106+
if format == Format::VortexNative {
107+
let parquet_dir = base_path.join(Format::Parquet.name());
108+
let native_dir = base_path.join(NATIVE_DIR);
109+
for table in self.base_tables() {
110+
datagen::write_native_vortex(table, &parquet_dir, &native_dir).await?;
111+
}
112+
}
70113
Ok(())
71114
}
72115

73116
fn data_url(&self) -> &Url {
74117
&self.data_url
75118
}
76119

120+
/// The `vortex-native` lane reads the native-geometry Vortex dir; every other format reads its
121+
/// own `{format}` subfolder.
122+
fn format_path(&self, format: Format, base_url: &Url) -> anyhow::Result<Url> {
123+
let dir = match format {
124+
Format::VortexNative => NATIVE_DIR,
125+
other => other.name(),
126+
};
127+
Ok(base_url.join(&format!("{dir}/"))?)
128+
}
129+
77130
fn expected_row_counts(&self) -> Option<Vec<usize>> {
78131
// Indexed by `query_idx` (1-based), so index 0 is a dummy and Q1's count is at index 1 (TPC-H
79132
// convention). Only SF1.0 and SF10.0 are validated (like TPC-H); other scale factors return
@@ -101,22 +154,32 @@ impl Benchmark for SpatialBenchBenchmark {
101154
format!("spatialbench(sf={})", self.scale_factor)
102155
}
103156

157+
/// Both lanes register the same tables (WKB reads `parquet`/`vortex`, native reads
158+
/// `vortex-native`); `zone` is externally sourced and optional, registered only when present.
104159
fn table_specs(&self) -> Vec<TableSpec> {
105-
let mut specs = vec![
106-
TableSpec::new("trip", None),
107-
TableSpec::new("building", None),
108-
TableSpec::new("customer", None),
109-
];
110-
// `zone` is externally sourced and optional; register it only when present so queries that
111-
// don't need it don't fail on the missing glob.
112-
let zone_present = match self.data_url.to_file_path() {
113-
Ok(base) => zone_parquet_present(&base.join(Format::Parquet.name())),
114-
Err(()) => true,
115-
};
116-
if zone_present {
117-
specs.push(TableSpec::new("zone", None));
160+
self.base_tables()
161+
.iter()
162+
.map(|table| TableSpec::new(table.name(), None))
163+
.collect()
164+
}
165+
166+
/// DuckDB's view star-expansion drops native `GEOMETRY` columns down to `BLOB`, so `ST_*` fail to
167+
/// bind. Re-cast every geometry column back to `GEOMETRY` in the view's projection.
168+
fn view_projection(&self, table_name: &str, format: Format) -> String {
169+
if format == Format::VortexNative
170+
&& let Some(table) = Table::from_name(table_name)
171+
{
172+
let geometry_columns = table.geometry_columns();
173+
if !geometry_columns.is_empty() {
174+
let casts = geometry_columns
175+
.iter()
176+
.map(|column| format!("{name}::GEOMETRY AS {name}", name = column.name))
177+
.collect::<Vec<_>>()
178+
.join(", ");
179+
return format!("* REPLACE ({casts})");
180+
}
118181
}
119-
specs
182+
"*".to_string()
120183
}
121184

122185
/// Scope each table to its own `{table}_*.{ext}` files; the default globs every file in the
@@ -141,8 +204,33 @@ impl Benchmark for SpatialBenchBenchmark {
141204

142205
/// Whether an externally-sourced `zone_*.parquet` exists under `parquet_dir` (generated by the
143206
/// upstream `spatialbench-cli`; see the module docs).
144-
fn zone_parquet_present(parquet_dir: &std::path::Path) -> bool {
207+
fn zone_parquet_present(parquet_dir: &Path) -> bool {
145208
glob::glob(&parquet_dir.join("zone_*.parquet").to_string_lossy())
146209
.map(|mut paths| paths.next().is_some())
147210
.unwrap_or(false)
148211
}
212+
213+
/// Strip `ST_GeomFromWKB(<inner>)` → `<inner>` so the native lane reads the already-`GEOMETRY`
214+
/// column directly. Assumes the wrapped expression contains no inner `)` (true for our column refs).
215+
fn strip_wkb_wrappers(sql: &str) -> String {
216+
const OPEN: &str = "ST_GeomFromWKB(";
217+
let mut out = String::with_capacity(sql.len());
218+
let mut rest = sql;
219+
while let Some(pos) = rest.find(OPEN) {
220+
out.push_str(&rest[..pos]);
221+
let after = &rest[pos + OPEN.len()..];
222+
match after.find(')') {
223+
Some(close) => {
224+
out.push_str(&after[..close]);
225+
rest = &after[close + 1..];
226+
}
227+
// Unbalanced wrapper: emit it verbatim and stop rewriting.
228+
None => {
229+
out.push_str(OPEN);
230+
rest = after;
231+
}
232+
}
233+
}
234+
out.push_str(rest);
235+
out
236+
}
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
//! SpatialBench data preparation. [`wkb`] generates the canonical WKB base tables (Parquet + Vortex);
5-
//! the [`table`] catalog is the single source of truth for the base tables.
4+
//! SpatialBench data preparation. [`wkb`] generates the canonical WKB base tables; [`native`] derives
5+
//! native-geometry Vortex files from them for `points=native`. The [`table`] catalog is the single
6+
//! source of truth for the base tables both stages share.
67
8+
pub mod native;
79
pub mod table;
810
pub mod wkb;
911

12+
pub use native::write_native_vortex;
1013
pub use table::Table;
1114
pub use wkb::generate_tables;

0 commit comments

Comments
 (0)