Skip to content

Commit ffd090b

Browse files
committed
fix issues
Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 35d59ae commit ffd090b

4 files changed

Lines changed: 228 additions & 13 deletions

File tree

bench-orchestrator/bench_orchestrator/cli.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from contextlib import contextmanager
88
from datetime import datetime, timedelta
99
from pathlib import Path
10+
from tempfile import TemporaryDirectory
1011
from typing import Annotated
1112

1213
import pandas as pd
@@ -115,6 +116,38 @@ def open_results_output(path: Path | None):
115116
yield handle
116117

117118

119+
@contextmanager
120+
def temporary_v3_output_dir(enabled: bool):
121+
"""Create a temporary directory for per-backend v3 JSONL files."""
122+
if not enabled:
123+
yield None
124+
return
125+
126+
with TemporaryDirectory(prefix="vx-bench-v3-") as temp_dir:
127+
yield Path(temp_dir)
128+
129+
130+
def backend_v3_output_path(temp_dir: Path | None, index: int, backend: Engine) -> Path | None:
131+
"""Return the v3 JSONL path a backend should write, if v3 output is enabled."""
132+
if temp_dir is None:
133+
return None
134+
return temp_dir / f"{index:02d}-{backend.value}.jsonl"
135+
136+
137+
def write_combined_v3_output(output_path: Path, input_paths: list[Path]) -> None:
138+
"""Concatenate successful per-backend v3 JSONL files into the requested output."""
139+
if output_path.parent != Path():
140+
output_path.parent.mkdir(parents=True, exist_ok=True)
141+
142+
with output_path.open("w", encoding="utf-8") as output:
143+
for input_path in input_paths:
144+
if not input_path.exists():
145+
raise RuntimeError(f"v3 output was not written by benchmark backend: {input_path}")
146+
with input_path.open("r", encoding="utf-8") as input_file:
147+
for line in input_file:
148+
output.write(line)
149+
150+
118151
def write_result_line(line: str, store_writer, compatibility_file) -> None:
119152
"""Write a raw result line to the run store and optional compatibility output."""
120153
store_writer(line)
@@ -280,10 +313,16 @@ def run(
280313
soft_failures: list[str] = []
281314

282315
try:
283-
with store.create_run(config, build_config) as ctx, open_results_output(output) as compatibility_file:
284-
for backend, backend_targets in backend_groups.items():
316+
with (
317+
store.create_run(config, build_config) as ctx,
318+
open_results_output(output) as compatibility_file,
319+
temporary_v3_output_dir(gh_json_v3 is not None) as v3_temp_dir,
320+
):
321+
v3_output_parts: list[Path] = []
322+
for backend_idx, (backend, backend_targets) in enumerate(backend_groups.items()):
285323
executor = BenchmarkExecutor(binary_paths[backend], backend, verbose=verbose)
286324
backend_formats = [target.format for target in backend_targets]
325+
backend_gh_json_v3 = backend_v3_output_path(v3_temp_dir, backend_idx, backend)
287326

288327
try:
289328
results = executor.run(
@@ -298,7 +337,7 @@ def run(
298337
sample_rate=sample_rate,
299338
tracing=tracing,
300339
runner=runner,
301-
gh_json_v3=gh_json_v3,
340+
gh_json_v3=backend_gh_json_v3,
302341
on_result=lambda line, store_writer=ctx.write_raw_json, compatibility=compatibility_file: (
303342
write_result_line(
304343
line,
@@ -307,6 +346,8 @@ def run(
307346
)
308347
),
309348
)
349+
if backend_gh_json_v3 is not None:
350+
v3_output_parts.append(backend_gh_json_v3)
310351
console.print(f"[green]{backend.value}: {len(results)} results[/green]")
311352
except RuntimeError as exc:
312353
ctx.metadata.partial = True
@@ -315,6 +356,9 @@ def run(
315356
console.print(f"[red]{backend.value} failed: {exc}[/red]")
316357
soft_failures.append(str(exc))
317358

359+
if gh_json_v3 is not None:
360+
write_combined_v3_output(gh_json_v3, v3_output_parts)
361+
318362
ctx.metadata.binaries = {backend.value: str(path) for backend, path in binary_paths.items()}
319363
except RuntimeError as exc:
320364
console.print(f"[red]{exc}[/red]")

bench-orchestrator/tests/test_cli.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,47 @@ def fake_run(self, **kwargs):
105105
metadata = json.loads((run_dirs[0] / "metadata.json").read_text(encoding="utf-8"))
106106
assert metadata["targets"] == [{"engine": "datafusion", "format": "parquet"}]
107107
assert metadata["binaries"] == {"datafusion": str(binary_path)}
108+
109+
110+
def test_run_combines_gh_json_v3_output_per_backend(tmp_path, monkeypatch) -> None:
111+
run_store = ResultStore(base_dir=tmp_path / "runs")
112+
output_path = tmp_path / "artifacts" / "results.v3.jsonl"
113+
binary_paths = {
114+
cli_module.Engine.DATAFUSION: tmp_path / "datafusion-bench",
115+
cli_module.Engine.DUCKDB: tmp_path / "duckdb-bench",
116+
}
117+
for binary_path in binary_paths.values():
118+
binary_path.write_text("", encoding="utf-8")
119+
120+
monkeypatch.setattr(cli_module, "ResultStore", lambda: run_store)
121+
monkeypatch.setattr(cli_module.BenchmarkBuilder, "get_binary_path", lambda self, backend: binary_paths[backend])
122+
123+
seen_backend_paths = []
124+
125+
def fake_run(self, **kwargs):
126+
backend_output = kwargs["gh_json_v3"]
127+
assert backend_output is not None
128+
assert backend_output != output_path
129+
backend_output.write_text(f"{self.backend.value}-v3\n", encoding="utf-8")
130+
seen_backend_paths.append(backend_output)
131+
return []
132+
133+
monkeypatch.setattr(BenchmarkExecutor, "run", fake_run)
134+
135+
result = runner.invoke(
136+
cli_module.app,
137+
[
138+
"run",
139+
"tpch",
140+
"--targets-json",
141+
'[{"engine":"datafusion","format":"parquet"},{"engine":"duckdb","format":"parquet"}]',
142+
"--no-build",
143+
"--gh-json-v3",
144+
str(output_path),
145+
],
146+
)
147+
148+
assert result.exit_code == 0
149+
assert output_path.read_text(encoding="utf-8") == "datafusion-v3\nduckdb-v3\n"
150+
assert len(seen_backend_paths) == 2
151+
assert seen_backend_paths[0] != seen_backend_paths[1]

benchmarks/random-access-bench/src/main.rs

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,28 @@ fn measurement_name(dataset: &str, pattern: Option<AccessPattern>, format: Forma
280280
}
281281
}
282282

283+
fn v3_random_access_dataset_name(dataset: &str, pattern: Option<AccessPattern>) -> String {
284+
match pattern {
285+
Some(pattern) => format!("{dataset}/{}", pattern.name()),
286+
None => dataset.to_string(),
287+
}
288+
}
289+
290+
fn push_v3_random_access_record(
291+
records: &mut Vec<v3::V3Record>,
292+
measurement: &TimingMeasurement,
293+
dataset: &str,
294+
pattern: Option<AccessPattern>,
295+
reopen: bool,
296+
) {
297+
if reopen {
298+
return;
299+
}
300+
301+
let dataset = v3_random_access_dataset_name(dataset, pattern);
302+
records.push(v3::random_access_record(measurement, &dataset));
303+
}
304+
283305
/// Map format to the appropriate engine for random access benchmarks.
284306
fn format_to_engine(format: Format) -> Engine {
285307
match format {
@@ -388,7 +410,13 @@ async fn run_random_access(
388410
)
389411
.await?;
390412

391-
v3_records.push(v3::random_access_record(&measurement, dataset.name()));
413+
push_v3_random_access_record(
414+
&mut v3_records,
415+
&measurement,
416+
dataset.name(),
417+
None,
418+
reopen,
419+
);
392420
targets.push(measurement.target);
393421
measurements.push(measurement);
394422
progress.inc(1);
@@ -415,7 +443,13 @@ async fn run_random_access(
415443
)
416444
.await?;
417445

418-
v3_records.push(v3::random_access_record(&measurement, dataset.name()));
446+
push_v3_random_access_record(
447+
&mut v3_records,
448+
&measurement,
449+
dataset.name(),
450+
Some(*pattern),
451+
reopen,
452+
);
419453
targets.push(measurement.target);
420454
measurements.push(measurement);
421455
progress.inc(1);
@@ -443,3 +477,58 @@ async fn run_random_access(
443477

444478
Ok(())
445479
}
480+
481+
#[cfg(test)]
482+
mod tests {
483+
use super::*;
484+
485+
#[test]
486+
fn v3_random_access_dataset_names_match_schema_dims() {
487+
assert_eq!(v3_random_access_dataset_name("taxi", None), "taxi");
488+
assert_eq!(
489+
v3_random_access_dataset_name("taxi", Some(AccessPattern::Correlated)),
490+
"taxi/correlated"
491+
);
492+
assert_eq!(
493+
v3_random_access_dataset_name("feature-vectors", Some(AccessPattern::Uniform)),
494+
"feature-vectors/uniform"
495+
);
496+
}
497+
498+
#[test]
499+
fn v3_random_access_records_skip_reopen_variants() {
500+
let measurement = TimingMeasurement {
501+
name: "random-access/taxi/uniform/parquet-tokio-local-disk".to_string(),
502+
target: Target::new(Engine::Arrow, Format::Parquet),
503+
storage: STORAGE_NVME.to_string(),
504+
runs: vec![Duration::from_nanos(10)],
505+
};
506+
let mut records = Vec::new();
507+
508+
push_v3_random_access_record(&mut records, &measurement, "taxi", None, false);
509+
push_v3_random_access_record(
510+
&mut records,
511+
&measurement,
512+
"taxi",
513+
Some(AccessPattern::Uniform),
514+
false,
515+
);
516+
push_v3_random_access_record(
517+
&mut records,
518+
&measurement,
519+
"taxi",
520+
Some(AccessPattern::Correlated),
521+
true,
522+
);
523+
524+
assert_eq!(records.len(), 2);
525+
match &records[0] {
526+
v3::V3Record::RandomAccessTime(record) => assert_eq!(record.dataset, "taxi"),
527+
other => panic!("expected random-access record, got {other:?}"),
528+
}
529+
match &records[1] {
530+
v3::V3Record::RandomAccessTime(record) => assert_eq!(record.dataset, "taxi/uniform"),
531+
other => panic!("expected random-access record, got {other:?}"),
532+
}
533+
}
534+
}

vortex-bench/src/v3.rs

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,16 @@ pub struct VectorSearchRunRecord {
201201
/// `benchmarks-website/planning/benchmark-mapping.md`.
202202
pub fn benchmark_dataset_dims(d: &BenchmarkDataset) -> (String, Option<String>, Option<String>) {
203203
match d {
204-
BenchmarkDataset::TpcH { scale_factor } => {
205-
("tpch".to_string(), None, Some(scale_factor.clone()))
206-
}
207-
BenchmarkDataset::TpcDS { scale_factor } => {
208-
("tpcds".to_string(), None, Some(scale_factor.clone()))
209-
}
204+
BenchmarkDataset::TpcH { scale_factor } => (
205+
"tpch".to_string(),
206+
None,
207+
Some(canonical_tpc_scale_factor(scale_factor)),
208+
),
209+
BenchmarkDataset::TpcDS { scale_factor } => (
210+
"tpcds".to_string(),
211+
None,
212+
Some(canonical_tpc_scale_factor(scale_factor)),
213+
),
210214
BenchmarkDataset::ClickBench { flavor } => {
211215
let variant = match flavor {
212216
Flavor::Partitioned => "partitioned",
@@ -237,6 +241,7 @@ pub fn query_measurement_record(
237241
let (dataset, dataset_variant, scale_factor) = benchmark_dataset_dims(&qm.benchmark_dataset);
238242
let value_ns = duration_as_ns(qm.median_run());
239243
let all_runtimes_ns = qm.runs.iter().copied().map(duration_as_ns).collect();
244+
let query_idx = v3_query_idx(qm);
240245
let (peak_physical, peak_virtual, physical_delta, virtual_delta) = match memory {
241246
Some(m) => (
242247
Some(m.peak_physical_memory),
@@ -251,7 +256,7 @@ pub fn query_measurement_record(
251256
dataset,
252257
dataset_variant,
253258
scale_factor,
254-
query_idx: u32::try_from(qm.query_idx).unwrap_or(u32::MAX),
259+
query_idx,
255260
storage: qm.storage.clone(),
256261
engine: engine_label(qm.target.engine).to_string(),
257262
format: qm.target.format.name().to_string(),
@@ -383,6 +388,23 @@ fn duration_as_ns(d: std::time::Duration) -> u64 {
383388
u64::try_from(d.as_nanos()).unwrap_or(u64::MAX)
384389
}
385390

391+
fn canonical_tpc_scale_factor(scale_factor: &str) -> String {
392+
let trimmed = scale_factor.trim();
393+
match trimmed.parse::<f64>() {
394+
Ok(value) if value.is_finite() => format!("{value}"),
395+
_ => scale_factor.to_string(),
396+
}
397+
}
398+
399+
fn v3_query_idx(qm: &QueryMeasurement) -> u32 {
400+
let query_idx = if matches!(&qm.benchmark_dataset, BenchmarkDataset::ClickBench { .. }) {
401+
qm.query_idx.saturating_add(1)
402+
} else {
403+
qm.query_idx
404+
};
405+
u32::try_from(query_idx).unwrap_or(u32::MAX)
406+
}
407+
386408
fn engine_label(engine: Engine) -> &'static str {
387409
match engine {
388410
Engine::Vortex => "vortex",
@@ -461,7 +483,7 @@ mod tests {
461483
#[test]
462484
fn snapshot_query_measurement_clickbench_no_memory() -> anyhow::Result<()> {
463485
let qm = QueryMeasurement {
464-
query_idx: 1,
486+
query_idx: 0,
465487
target: Target::new(Engine::DuckDB, Format::Parquet),
466488
benchmark_dataset: BenchmarkDataset::ClickBench {
467489
flavor: Flavor::Partitioned,
@@ -478,6 +500,22 @@ mod tests {
478500
Ok(())
479501
}
480502

503+
#[test]
504+
fn tpc_scale_factors_are_canonicalized_for_query_dims() {
505+
assert_eq!(
506+
benchmark_dataset_dims(&BenchmarkDataset::TpcH {
507+
scale_factor: "1.0".to_string()
508+
}),
509+
("tpch".to_string(), None, Some("1".to_string()))
510+
);
511+
assert_eq!(
512+
benchmark_dataset_dims(&BenchmarkDataset::TpcDS {
513+
scale_factor: "10.0".to_string()
514+
}),
515+
("tpcds".to_string(), None, Some("10".to_string()))
516+
);
517+
}
518+
481519
#[test]
482520
fn snapshot_compression_time_encode() -> anyhow::Result<()> {
483521
let timing = CompressionTimingMeasurement {

0 commit comments

Comments
 (0)