Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bench-orchestrator/bench_orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Format(Enum):
PARQUET = "parquet"
VORTEX = "vortex"
VORTEX_COMPACT = "vortex-compact"
VORTEX_NATIVE = "vortex-geo-native"
DUCKDB = "duckdb"
LANCE = "lance"

Expand Down Expand Up @@ -68,6 +69,7 @@ class Benchmark(Enum):
Format.PARQUET,
Format.VORTEX,
Format.VORTEX_COMPACT,
Format.VORTEX_NATIVE,
Format.DUCKDB,
],
Engine.LANCE: [Format.LANCE],
Expand Down
34 changes: 34 additions & 0 deletions bench-orchestrator/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ def test_parse_formats_json_accepts_ci_format_arrays() -> None:
assert formats == [Format.PARQUET, Format.VORTEX, Format.DUCKDB]


def test_parse_formats_json_accepts_vortex_native() -> None:
formats = parse_formats_json('["parquet","vortex","vortex-geo-native"]')

assert formats == [Format.PARQUET, Format.VORTEX, Format.VORTEX_NATIVE]


def test_resolve_axis_targets_offers_vortex_native_on_duckdb_only() -> None:
# vortex-geo-native is a DuckDB-only lane; the DataFusion axis is dropped as unsupported.
targets, warnings = resolve_axis_targets(
[Engine.DATAFUSION, Engine.DUCKDB],
[Format.VORTEX_NATIVE],
)

assert targets == [BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX_NATIVE)]
assert warnings == ["Format vortex-geo-native is not supported by engine datafusion"]


def test_resolve_axis_targets_filters_unsupported_combinations() -> None:
targets, warnings = resolve_axis_targets(
[Engine.DATAFUSION, Engine.DUCKDB],
Expand Down Expand Up @@ -55,6 +72,23 @@ def test_resolve_axis_targets_skips_engines_a_benchmark_cannot_run() -> None:
assert warnings == ["Benchmark spatialbench does not support engine datafusion"]


def test_resolve_axis_targets_expands_spatialbench_three_lanes() -> None:
# The single-command three-lane comparison: parquet, WKB vortex, and native-geometry vortex, all
# on DuckDB.
targets, warnings = resolve_axis_targets(
[Engine.DUCKDB],
[Format.PARQUET, Format.VORTEX, Format.VORTEX_NATIVE],
Benchmark.SPATIALBENCH,
)

assert targets == [
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.PARQUET),
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX),
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX_NATIVE),
]
assert warnings == []


def test_validate_targets_rejects_engine_a_benchmark_cannot_run() -> None:
errors = validate_targets(
[BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.PARQUET)],
Expand Down
13 changes: 13 additions & 0 deletions bench-orchestrator/tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ def test_build_command_adds_duckdb_cleanup_flag() -> None:
assert "scale-factor=1.0" in cmd


def test_build_command_serializes_vortex_native_format() -> None:
executor = BenchmarkExecutor(Path("/tmp/duckdb-bench"), Engine.DUCKDB)

cmd = executor.build_command(
benchmark=Benchmark.SPATIALBENCH,
formats=[Format.PARQUET, Format.VORTEX, Format.VORTEX_NATIVE],
iterations=1,
options={"scale-factor": "1.0"},
)

assert "parquet,vortex,vortex-geo-native" in cmd


def test_build_command_omits_formats_for_lance_backend() -> None:
executor = BenchmarkExecutor(Path("/tmp/lance-bench"), Engine.LANCE)

Expand Down
7 changes: 3 additions & 4 deletions benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ pub fn format_to_df_format(format: Format) -> Arc<dyn FileFormat> {
Format::Csv => Arc::new(CsvFormat::default()) as _,
Format::Arrow => Arc::new(ArrowFormat),
Format::Parquet => Arc::new(ParquetFormat::new()),
Format::OnDiskVortex | Format::VortexCompact => Arc::new(VortexFormat::new_with_options(
SESSION.clone(),
vortex_table_options(),
)),
Format::OnDiskVortex | Format::VortexCompact | Format::VortexNative => Arc::new(
VortexFormat::new_with_options(SESSION.clone(), vortex_table_options()),
),
Format::OnDiskDuckDB | Format::Lance => {
unimplemented!("Format {format} cannot be turned into a DataFusion `FileFormat`")
}
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/duckdb-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ impl DuckClient {
file_format: Format,
) -> Result<()> {
let object_type = match file_format {
Format::Parquet | Format::OnDiskVortex | Format::VortexCompact => "VIEW",
Format::Parquet
| Format::OnDiskVortex
| Format::VortexCompact
| Format::VortexNative => "VIEW",
Format::OnDiskDuckDB => "TABLE",
Format::Lance => {
anyhow::bail!(
Expand Down
1 change: 1 addition & 0 deletions benchmarks/duckdb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ fn main() -> anyhow::Result<()> {
// OnDiskDuckDB tables are created during register_tables by loading from Parquet
_ => {}
}
benchmark.prepare_format(format, &base_path).await?;
}

anyhow::Ok(())
Expand Down
2 changes: 2 additions & 0 deletions vortex-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ async-trait = { workspace = true }
bzip2 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
geoarrow = { workspace = true }
geoarrow-cast = { workspace = true }
get_dir = { workspace = true }
glob = { workspace = true }
humansize = { workspace = true }
Expand Down
9 changes: 9 additions & 0 deletions vortex-bench/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

//! Core benchmark trait and types.
use std::path::Path;

use arrow_schema::Schema;
use glob::Pattern;
use url::Url;
Expand Down Expand Up @@ -47,6 +49,13 @@ pub trait Benchmark: Send + Sync {
/// call this method to ensure base data exists, then perform their own format conversion.
async fn generate_base_data(&self) -> anyhow::Result<()>;

/// Prepare benchmark- and format-specific data beyond the Parquet base that
/// [`Benchmark::generate_base_data`] produced. Called once per requested format, after the base
/// data exists. Default: nothing.
async fn prepare_format(&self, _format: Format, _base_path: &Path) -> anyhow::Result<()> {
Ok(())
}

/// Get expected row counts for validation (optional)
/// If None, no validation will be performed
fn expected_row_counts(&self) -> Option<Vec<usize>> {
Expand Down
12 changes: 10 additions & 2 deletions vortex-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,11 @@ use vortex::session::VortexSession;
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

pub static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::default().with_tokio());
pub static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
let session = VortexSession::default().with_tokio();
vortex_geo::initialize(&session);
session
});

#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct Target {
Expand Down Expand Up @@ -146,6 +149,9 @@ pub enum Format {
#[clap(name = "vortex-compact")]
#[serde(rename = "vortex-compact")]
VortexCompact,
#[clap(name = "vortex-geo-native")]
#[serde(rename = "vortex-geo-native")]
VortexNative,
#[clap(name = "duckdb")]
#[serde(rename = "duckdb")]
OnDiskDuckDB,
Expand Down Expand Up @@ -185,6 +191,7 @@ impl Format {
Format::Parquet => "parquet",
Format::OnDiskVortex => "vortex-file-compressed",
Format::VortexCompact => "vortex-compact",
Format::VortexNative => "vortex-geo-native",
Format::OnDiskDuckDB => "duckdb",
Format::Lance => "lance",
}
Expand All @@ -197,6 +204,7 @@ impl Format {
Format::Parquet => "parquet",
Format::OnDiskVortex => "vortex",
Format::VortexCompact => "vortex",
Format::VortexNative => "vortex",
Format::OnDiskDuckDB => "duckdb",
Format::Lance => "lance",
}
Expand Down
57 changes: 47 additions & 10 deletions vortex-bench/src/spatialbench/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! SpatialBench benchmark implementation

use std::fs;
use std::path::Path;

use url::Url;

Expand All @@ -17,6 +18,9 @@ use crate::spatialbench::datagen::Table;
use crate::utils::file::resolve_data_url;
use crate::workspace_root;

/// Data-dir subfolder for the native-geometry Vortex files (the `vortex-geo-native` lane).
pub const NATIVE_DIR: &str = "vortex-geo-native";

/// SpatialBench geospatial benchmark (Apache Sedona): a `trip` point table, `building` polygons, and
/// a `customer` attribute table, queried with spatial filters and joins. `zone` polygons are sourced
/// externally and registered when present. See <https://sedona.apache.org/spatialbench/>.
Expand All @@ -35,6 +39,21 @@ impl SpatialBenchBenchmark {
scale_factor,
})
}

/// Tables to materialize and register: the in-process base tables (`trip`, `building`,
/// `customer`) plus the externally-sourced `zone` when its parquet is present. Shared by native
/// data-gen and table registration so both lanes cover the same set.
fn base_tables(&self) -> Vec<Table> {
let mut tables = vec![Table::Trip, Table::Building, Table::Customer];
let zone_present = match self.data_url.to_file_path() {
Ok(base) => zone_parquet_present(&base.join(Format::Parquet.name())),
Err(()) => true,
};
if zone_present {
tables.push(Table::Zone);
}
tables
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -79,10 +98,33 @@ impl Benchmark for SpatialBenchBenchmark {
Ok(())
}

/// The `vortex-geo-native` lane decodes each table's WKB geometry to native GeoArrow once, into the
/// `vortex-geo-native` dir, so its queries read DuckDB `GEOMETRY` directly. Idempotent.
async fn prepare_format(&self, format: Format, base_path: &Path) -> anyhow::Result<()> {
if format == Format::VortexNative {
let parquet_dir = base_path.join(Format::Parquet.name());
let native_dir = base_path.join(NATIVE_DIR);
for table in self.base_tables() {
datagen::write_native_vortex(table, &parquet_dir, &native_dir).await?;
}
}
Ok(())
}

fn data_url(&self) -> &Url {
&self.data_url
}

/// The `vortex-geo-native` lane reads the native-geometry Vortex dir; every other format reads its
/// own `{format}` subfolder.
fn format_path(&self, format: Format, base_url: &Url) -> anyhow::Result<Url> {
let dir = match format {
Format::VortexNative => NATIVE_DIR,
other => other.name(),
};
Ok(base_url.join(&format!("{dir}/"))?)
}

fn expected_row_counts(&self) -> Option<Vec<usize>> {
// Indexed by `query_idx` (1-based), so index 0 is a dummy and Q1's count is at index 1 (TPC-H
// convention). Only SF1.0 and SF10.0 are validated (like TPC-H); other scale factors return
Expand Down Expand Up @@ -110,16 +152,11 @@ impl Benchmark for SpatialBenchBenchmark {
format!("spatialbench(sf={})", self.scale_factor)
}

/// Both lanes register the same tables (WKB reads `parquet`/`vortex`, native reads
/// `vortex-geo-native`); `zone` is externally sourced and optional, registered only when present.
fn table_specs(&self) -> Vec<TableSpec> {
// `zone` is externally sourced and optional; register it only when present so queries that
// don't need it don't fail on the missing glob.
let zone_present = match self.data_url.to_file_path() {
Ok(base) => zone_parquet_present(&base.join(Format::Parquet.name())),
Err(()) => true,
};
Table::ALL
.into_iter()
.filter(|table| !matches!(table, Table::Zone) || zone_present)
self.base_tables()
.iter()
.map(|table| TableSpec::new(table.name(), None))
.collect()
}
Expand All @@ -146,7 +183,7 @@ impl Benchmark for SpatialBenchBenchmark {

/// Whether an externally-sourced `zone_*.parquet` exists under `parquet_dir` (generated by the
/// upstream `spatialbench-cli`; see the module docs).
fn zone_parquet_present(parquet_dir: &std::path::Path) -> bool {
fn zone_parquet_present(parquet_dir: &Path) -> bool {
glob::glob(&parquet_dir.join("zone_*.parquet").to_string_lossy())
.map(|mut paths| paths.next().is_some())
.unwrap_or(false)
Expand Down
7 changes: 5 additions & 2 deletions vortex-bench/src/spatialbench/datagen/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! SpatialBench data preparation. [`wkb`] generates the canonical WKB base tables (Parquet + Vortex);
//! the [`table`] catalog is the single source of truth for the base tables.
//! SpatialBench data preparation. [`wkb`] generates the canonical WKB base tables; [`native`] derives
//! native-geometry Vortex files from them for `points=native`. The [`table`] catalog is the single
//! source of truth for the base tables both stages share.

pub mod native;
pub mod table;
pub mod wkb;

pub use native::write_native_vortex;
pub use table::Table;
pub use wkb::generate_tables;
Loading
Loading