Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
929 changes: 634 additions & 295 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ geo = "0.31.0"
geo-traits = "0.3.0"
geo-types = "0.7.19"
geoarrow = "0.8.0"
geoarrow-cast = "0.8.0"
get_dir = "0.5.0"
glob = "0.3.2"
goldenfile = "1"
Expand Down Expand Up @@ -242,6 +243,14 @@ similar = "3.0.0"
sketches-ddsketch = "0.4.0"
smallvec = "1.15.1"
smol = "2.0.2"
spatialbench = "0.2"
spatialbench-arrow = "0.2"
# spatialbench still pins arrow 56, two majors behind the workspace arrow. Until upstream
# catches up, write its generated batches with a matching parquet instead of converting
# arrow versions at the boundary.
spatialbench-parquet = { package = "parquet", version = "56", features = [
"async",
] }
static_assertions = "1.1"
strum = "0.28"
syn = { version = "2.0.117", features = ["full"] }
Expand Down
8 changes: 5 additions & 3 deletions bench-orchestrator/bench_orchestrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ def run_ref_auto_complete() -> list[str]:
return list(map(lambda x: x.run_id, ResultStore().list_runs(limit=None)))


def targets_from_axes(engine: str, format: str) -> tuple[list[BenchmarkTarget], list[str]]:
def targets_from_axes(
engine: str, format: str, benchmark: Benchmark | None = None
) -> tuple[list[BenchmarkTarget], list[str]]:
"""Resolve legacy engine/format axes into explicit benchmark targets."""
return resolve_axis_targets(parse_engines(engine), parse_formats(format))
return resolve_axis_targets(parse_engines(engine), parse_formats(format), benchmark)


def backends_for_engines(engines: list[Engine]) -> list[Engine]:
Expand Down Expand Up @@ -260,7 +262,7 @@ def run(
targets = parse_targets_json(targets_json)
warnings: list[str] = []
else:
targets, warnings = targets_from_axes(engine, format)
targets, warnings = targets_from_axes(engine, format, benchmark)
except ValueError as exc:
console.print(f"[red]{exc}[/red]")
raise typer.Exit(1) from exc
Expand Down
29 changes: 26 additions & 3 deletions bench-orchestrator/bench_orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Format(Enum):
PARQUET = "parquet"
VORTEX = "vortex"
VORTEX_COMPACT = "vortex-compact"
VORTEX_NATIVE = "vortex-native"
DUCKDB = "duckdb"
LANCE = "lance"

Expand All @@ -52,6 +53,7 @@ class Benchmark(Enum):
POLARSIGNALS = "polarsignals"
PUBLIC_BI = "public-bi"
STATPOPGEN = "statpopgen"
SPATIALBENCH = "spatialbench"


# Engine to supported formats mapping.
Expand All @@ -67,11 +69,25 @@ class Benchmark(Enum):
Format.PARQUET,
Format.VORTEX,
Format.VORTEX_COMPACT,
Format.VORTEX_NATIVE,
Format.DUCKDB,
],
Engine.LANCE: [Format.LANCE],
}

# Engines each benchmark can run on. Benchmarks default to *every* engine; list one here only to
# restrict it. SpatialBench's queries use DuckDB-specific `ST_*` spatial SQL that DataFusion has no
# functions for yet.
BENCHMARK_ENGINES: dict[Benchmark, frozenset[Engine]] = {
Benchmark.SPATIALBENCH: frozenset({Engine.DUCKDB}),
}


def engines_for_benchmark(benchmark: Benchmark) -> frozenset[Engine]:
"""Return the engines `benchmark` supports, defaulting to every engine when unrestricted."""
return BENCHMARK_ENGINES.get(benchmark, frozenset(Engine))


T = TypeVar("T")


Expand Down Expand Up @@ -175,13 +191,16 @@ def parse_formats_json(value: str) -> list[Format]:


def resolve_axis_targets(
engines: Iterable[Engine], formats: Iterable[Format]
engines: Iterable[Engine], formats: Iterable[Format], benchmark: Benchmark | None = None
) -> tuple[list[BenchmarkTarget], list[str]]:
"""Expand engine/format axes into supported explicit targets."""
warnings: list[str] = []
targets: list[BenchmarkTarget] = []

for engine in engines:
if benchmark is not None and engine not in engines_for_benchmark(benchmark):
warnings.append(f"Benchmark {benchmark.value} does not support engine {engine.value}")
continue
for fmt in formats:
target = BenchmarkTarget(engine=engine, format=fmt).normalized()
if not target.is_supported():
Expand All @@ -200,14 +219,18 @@ def group_targets_by_backend(targets: Iterable[BenchmarkTarget]) -> dict[Engine,
return groups


def validate_targets(targets: Iterable[BenchmarkTarget], options: dict[str, str]) -> list[str]:
def validate_targets(
targets: Iterable[BenchmarkTarget], options: dict[str, str], benchmark: Benchmark | None = None
) -> list[str]:
"""Validate explicit targets against benchmark runner constraints."""
errors: list[str] = []

normalized_targets = [target.normalized() for target in targets]
for target in normalized_targets:
if not target.is_supported():
errors.append(f"Format {target.format.value} is not supported by engine {target.engine.value}")
if benchmark is not None and target.engine not in engines_for_benchmark(benchmark):
errors.append(f"Benchmark {benchmark.value} does not support engine {target.engine.value}")

if options.get("remote-data-dir") and any(target.format == Format.LANCE for target in normalized_targets):
errors.append("Lance format is not supported for remote storage benchmarks.")
Expand Down Expand Up @@ -242,7 +265,7 @@ def backends(self) -> list[Engine]:

def validate(self) -> list[str]:
"""Validate the configuration and return any errors."""
return validate_targets(self.targets, self.options)
return validate_targets(self.targets, self.options, self.benchmark)


@dataclass
Expand Down
60 changes: 60 additions & 0 deletions bench-orchestrator/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-FileCopyrightText: Copyright the Vortex contributors

from bench_orchestrator.config import (
Benchmark,
BenchmarkTarget,
Engine,
Format,
Expand All @@ -25,6 +26,23 @@ def test_parse_formats_json_accepts_ci_format_arrays() -> None:
assert formats == [Format.PARQUET, Format.VORTEX, Format.DUCKDB]


def test_parse_formats_json_accepts_vortex_native() -> None:
formats = parse_formats_json('["parquet","vortex","vortex-native"]')

assert formats == [Format.PARQUET, Format.VORTEX, Format.VORTEX_NATIVE]


def test_resolve_axis_targets_offers_vortex_native_on_duckdb_only() -> None:
# vortex-native is a DuckDB-only lane; the DataFusion axis is dropped as unsupported.
targets, warnings = resolve_axis_targets(
[Engine.DATAFUSION, Engine.DUCKDB],
[Format.VORTEX_NATIVE],
)

assert targets == [BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX_NATIVE)]
assert warnings == ["Format vortex-native is not supported by engine datafusion"]


def test_resolve_axis_targets_filters_unsupported_combinations() -> None:
targets, warnings = resolve_axis_targets(
[Engine.DATAFUSION, Engine.DUCKDB],
Expand All @@ -39,6 +57,48 @@ def test_resolve_axis_targets_filters_unsupported_combinations() -> None:
assert warnings == ["Format arrow is not supported by engine duckdb"]


def test_resolve_axis_targets_skips_engines_a_benchmark_cannot_run() -> None:
# SpatialBench is DuckDB-only (ST_* spatial SQL), so the DataFusion axis is dropped with a warning.
targets, warnings = resolve_axis_targets(
[Engine.DATAFUSION, Engine.DUCKDB],
[Format.PARQUET, Format.VORTEX],
Benchmark.SPATIALBENCH,
)

assert targets == [
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.PARQUET),
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX),
]
assert warnings == ["Benchmark spatialbench does not support engine datafusion"]


def test_resolve_axis_targets_expands_spatialbench_three_lanes() -> None:
# The single-command three-lane comparison: parquet, WKB vortex, and native-geometry vortex, all
# on DuckDB.
targets, warnings = resolve_axis_targets(
[Engine.DUCKDB],
[Format.PARQUET, Format.VORTEX, Format.VORTEX_NATIVE],
Benchmark.SPATIALBENCH,
)

assert targets == [
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.PARQUET),
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX),
BenchmarkTarget(engine=Engine.DUCKDB, format=Format.VORTEX_NATIVE),
]
assert warnings == []


def test_validate_targets_rejects_engine_a_benchmark_cannot_run() -> None:
errors = validate_targets(
[BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.PARQUET)],
{},
Benchmark.SPATIALBENCH,
)

assert errors == ["Benchmark spatialbench does not support engine datafusion"]


def test_validate_targets_rejects_remote_lance() -> None:
errors = validate_targets(
[BenchmarkTarget(engine=Engine.DATAFUSION, format=Format.LANCE)],
Expand Down
13 changes: 13 additions & 0 deletions bench-orchestrator/tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ def test_build_command_adds_duckdb_cleanup_flag() -> None:
assert "scale-factor=1.0" in cmd


def test_build_command_serializes_vortex_native_format() -> None:
executor = BenchmarkExecutor(Path("/tmp/duckdb-bench"), Engine.DUCKDB)

cmd = executor.build_command(
benchmark=Benchmark.SPATIALBENCH,
formats=[Format.PARQUET, Format.VORTEX, Format.VORTEX_NATIVE],
iterations=1,
options={"scale-factor": "1.0"},
)

assert "parquet,vortex,vortex-native" in cmd


def test_build_command_omits_formats_for_lance_backend() -> None:
executor = BenchmarkExecutor(Path("/tmp/lance-bench"), Engine.LANCE)

Expand Down
7 changes: 3 additions & 4 deletions benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ pub fn format_to_df_format(format: Format) -> Arc<dyn FileFormat> {
Format::Csv => Arc::new(CsvFormat::default()) as _,
Format::Arrow => Arc::new(ArrowFormat),
Format::Parquet => Arc::new(ParquetFormat::new()),
Format::OnDiskVortex | Format::VortexCompact => Arc::new(VortexFormat::new_with_options(
SESSION.clone(),
vortex_table_options(),
)),
Format::OnDiskVortex | Format::VortexCompact | Format::VortexNative => Arc::new(
VortexFormat::new_with_options(SESSION.clone(), vortex_table_options()),
),
Format::OnDiskDuckDB | Format::Lance => {
unimplemented!("Format {format} cannot be turned into a DataFusion `FileFormat`")
}
Expand Down
36 changes: 35 additions & 1 deletion benchmarks/duckdb-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct DuckClient {
connection: Option<Connection>,
pub db_path: PathBuf,
pub threads: Option<usize>,
/// `INSTALL spatial; LOAD spatial;` for SpatialBench.
init_sql: Vec<String>,
}

impl DuckClient {
Expand Down Expand Up @@ -67,9 +69,24 @@ impl DuckClient {
connection: Some(connection),
db_path,
threads,
init_sql: Vec::new(),
})
}

/// Run `statements` now and after every subsequent [`DuckClient::reopen`].
pub fn set_init_sql(&mut self, statements: Vec<String>) -> Result<()> {
for stmt in &statements {
self.connection().query(stmt)?;
}
// After `LOAD spatial`, register `vortex_dwithin` so the radius filter pushes. No-op without it.
self.db
.as_ref()
.vortex_expect("DuckClient database accessed after close")
.register_geo_aliases()?;
self.init_sql = statements;
Ok(())
}

pub fn open_and_setup_database(
path: Option<PathBuf>,
threads: Option<usize>,
Expand Down Expand Up @@ -108,6 +125,19 @@ impl DuckClient {
self.db = Some(db);
self.connection = Some(connection);

// Replay init SQL (e.g. LOAD spatial).
for stmt in &self.init_sql {
self.connection
.as_ref()
.vortex_expect("connection just opened")
.query(stmt)?;
}
// Re-register `vortex_dwithin` against the fresh instance.
self.db
.as_ref()
.vortex_expect("database just opened")
.register_geo_aliases()?;

Ok(())
}

Expand All @@ -123,6 +153,7 @@ impl DuckClient {
connection: Some(connection),
db_path,
threads: None,
init_sql: Vec::new(),
})
}

Expand All @@ -148,7 +179,10 @@ impl DuckClient {
file_format: Format,
) -> Result<()> {
let object_type = match file_format {
Format::Parquet | Format::OnDiskVortex | Format::VortexCompact => "VIEW",
Format::Parquet
| Format::OnDiskVortex
| Format::VortexCompact
| Format::VortexNative => "VIEW",
Format::OnDiskDuckDB => "TABLE",
Format::Lance => {
anyhow::bail!(
Expand Down
7 changes: 5 additions & 2 deletions benchmarks/duckdb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ fn main() -> anyhow::Result<()> {
// OnDiskDuckDB tables are created during register_tables by loading from Parquet
_ => {}
}
benchmark.prepare_format(format, &base_path).await?;
}

anyhow::Ok(())
Expand Down Expand Up @@ -171,12 +172,13 @@ fn main() -> anyhow::Result<()> {
&filtered_queries,
mode,
|format| {
let ctx = DuckClient::new(
let mut ctx = DuckClient::new(
&*benchmark,
format,
args.delete_duckdb_database,
args.threads,
)?;
ctx.set_init_sql(benchmark.engine_init_sql(Engine::DuckDB))?;
ctx.register_tables(&*benchmark, format)?;

// Duckdb doesn't support octet_length for strings but we need this
Expand All @@ -196,7 +198,8 @@ fn main() -> anyhow::Result<()> {
if !args.reuse {
ctx.reopen()?;
}
ctx.execute_query_result(query)
let query = benchmark.query_for_format(query, format);
ctx.execute_query_result(&query)
},
)?;

Expand Down
6 changes: 6 additions & 0 deletions vortex-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ vortex = { workspace = true, features = [
"zstd",
] }
vortex-tensor = { workspace = true } # TODO(connor): In the future, this might be inside vortex.
vortex-geo = { workspace = true }

anyhow = { workspace = true }
arrow-array = { workspace = true }
Expand All @@ -33,6 +34,8 @@ async-trait = { workspace = true }
bzip2 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
geoarrow = { workspace = true }
geoarrow-cast = { workspace = true }
get_dir = { workspace = true }
glob = { workspace = true }
humansize = { workspace = true }
Expand All @@ -48,6 +51,9 @@ regex = { workspace = true }
reqwest = { workspace = true, features = ["stream"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
spatialbench = { workspace = true }
spatialbench-arrow = { workspace = true }
spatialbench-parquet = { workspace = true }
sysinfo = { workspace = true }
tabled = { workspace = true, features = ["std"] }
target-lexicon = { workspace = true }
Expand Down
Loading
Loading