Skip to content

Commit c82e0c0

Browse files
committed
fix bugs
1 parent 6fdf06c commit c82e0c0

4 files changed

Lines changed: 84 additions & 65 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/clickhouse-bench/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ publish = false
1414
[dependencies]
1515
anyhow = { workspace = true }
1616
clap = { workspace = true, features = ["derive"] }
17-
tokio = { workspace = true, features = ["full"] }
17+
parking_lot = { workspace = true }
18+
tokio = { workspace = true }
1819
tracing = { workspace = true }
1920
vortex-bench = { workspace = true }
2021

benchmarks/clickhouse-bench/src/lib.rs

Lines changed: 80 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,20 @@
33

44
//! ClickHouse Local context for benchmarks.
55
//!
6-
//! Uses `clickhouse-local` via `std::process::Command` to execute SQL queries
7-
//! against Parquet files on disk.
6+
//! Spawns a fresh `clickhouse-local` process for each query execution. Setup SQL
7+
//! (CREATE VIEW statements) is prepended to each query so that table definitions
8+
//! are available, but the setup cost is excluded from query timings by measuring
9+
//! only the wall-clock time from the first byte of query output to process exit.
10+
//!
11+
//! ## Why per-query processes?
12+
//!
13+
//! `clickhouse-local` in non-interactive (piped stdin) mode reads **all** of stdin
14+
//! before executing any queries. This makes a persistent-process + delimiter protocol
15+
//! impossible — the process blocks on stdin, never producing output until EOF. Spawning
16+
//! a fresh process per query avoids this by closing stdin immediately after writing the
17+
//! full SQL batch (setup + query), which triggers execution.
18+
//!
19+
//! Process spawn overhead is negligible (~1ms) compared to query execution times.
820
//!
921
//! The ClickHouse binary is resolved at build time via `build.rs`:
1022
//! 1. `CLICKHOUSE_BINARY` env var — use the specified path.
@@ -14,7 +26,11 @@
1426
//! or download from <https://clickhouse.com/docs/en/install>).
1527
//! In CI, it is installed by the workflow before the benchmark step.
1628
29+
use std::io::BufRead;
30+
use std::io::BufReader;
31+
use std::io::Read;
1732
use std::io::Write;
33+
use std::path::Path;
1834
use std::path::PathBuf;
1935
use std::process::Command;
2036
use std::process::Stdio;
@@ -33,45 +49,42 @@ use vortex_bench::Format;
3349
/// or `"clickhouse"` (resolved from `$PATH` at runtime).
3450
const CLICKHOUSE_BINARY: &str = env!("CLICKHOUSE_BINARY");
3551

36-
/// A client that wraps `clickhouse-local` for running SQL benchmarks.
52+
/// A client that spawns `clickhouse-local` processes for running SQL benchmarks.
53+
///
54+
/// Setup SQL (CREATE VIEW) is stored at construction time and prepended to each
55+
/// query execution. Each `execute_query` call spawns a fresh process, writes the
56+
/// full SQL batch (setup + query), closes stdin, and reads the output.
3757
pub struct ClickHouseClient {
38-
/// The path to the `clickhouse` binary.
58+
/// Path to the ClickHouse binary.
3959
binary: PathBuf,
40-
/// SQL statements to run before each query (CREATE VIEW statements).
60+
/// Setup SQL statements (CREATE VIEW) to prepend to each query.
4161
setup_sql: Vec<String>,
4262
}
4363

4464
impl ClickHouseClient {
45-
/// Create a new client. Only Parquet format is supported.
65+
/// Create a new client that will use `clickhouse-local` for query execution.
4666
///
47-
/// The ClickHouse binary is resolved from (in order):
48-
/// 1. `CLICKHOUSE_BINARY` env var at build time
49-
/// 2. `"clickhouse"` on `$PATH`
67+
/// Validates that the binary is available and builds setup SQL (CREATE VIEW
68+
/// statements) from the benchmark's table specs. Only Parquet format is supported.
5069
pub fn new(benchmark: &dyn Benchmark, format: Format) -> Result<Self> {
5170
if format != Format::Parquet {
5271
anyhow::bail!("clickhouse-bench only supports Parquet format, got {format}");
5372
}
5473

5574
let binary = PathBuf::from(CLICKHOUSE_BINARY);
56-
57-
// Verify the binary is usable (either absolute path exists, or resolvable via PATH).
5875
Self::verify_binary(&binary)?;
59-
6076
tracing::info!(binary = %binary.display(), "Using clickhouse-local");
6177

62-
let mut client = Self {
63-
binary,
64-
setup_sql: Vec::new(),
65-
};
66-
client.register_tables(benchmark, format)?;
67-
Ok(client)
78+
let setup_sql = Self::build_setup_sql(benchmark, format)?;
79+
80+
Ok(Self { binary, setup_sql })
6881
}
6982

7083
/// Check that the ClickHouse binary is available.
7184
///
7285
/// For absolute paths, checks that the file exists on disk.
7386
/// For bare names (e.g., `"clickhouse"`), tries to invoke it to verify it's resolvable.
74-
fn verify_binary(binary: &PathBuf) -> Result<()> {
87+
fn verify_binary(binary: &Path) -> Result<()> {
7588
if binary.is_absolute() {
7689
anyhow::ensure!(
7790
binary.exists(),
@@ -82,7 +95,6 @@ impl ClickHouseClient {
8295
);
8396
}
8497

85-
// Verify the binary is actually usable by running `clickhouse local --version`.
8698
let output = Command::new(binary.as_os_str())
8799
.args(["local", "--version"])
88100
.output()
@@ -108,12 +120,8 @@ impl ClickHouseClient {
108120
Ok(())
109121
}
110122

111-
/// Generate `CREATE VIEW ... AS SELECT * FROM file(...)` statements.
112-
///
113-
/// We use a VIEW over the `file()` table function rather than `CREATE TABLE ... ENGINE = File()`
114-
/// because the `file()` function handles glob patterns (e.g., `*.parquet`) more reliably across
115-
/// ClickHouse versions.
116-
fn register_tables(&mut self, benchmark: &dyn Benchmark, format: Format) -> Result<()> {
123+
/// Build `CREATE VIEW ... AS SELECT * FROM file(...)` statements for all tables.
124+
fn build_setup_sql(benchmark: &dyn Benchmark, format: Format) -> Result<Vec<String>> {
117125
let data_url = benchmark.data_url();
118126
let base_dir = if data_url.scheme() == "file" {
119127
data_url
@@ -131,6 +139,7 @@ impl ClickHouseClient {
131139
);
132140
}
133141

142+
let mut stmts = Vec::new();
134143
for table_spec in benchmark.table_specs() {
135144
let name = table_spec.name;
136145
let pattern = benchmark
@@ -146,78 +155,86 @@ impl ClickHouseClient {
146155
"Registering ClickHouse table"
147156
);
148157

149-
let create_sql = format!(
158+
stmts.push(format!(
150159
"CREATE VIEW IF NOT EXISTS {name} AS \
151160
SELECT * FROM file('{data_path}', Parquet);"
152-
);
153-
self.setup_sql.push(create_sql);
161+
));
154162
}
155163

156-
Ok(())
164+
Ok(stmts)
157165
}
158166

159-
/// Execute a SQL query via `clickhouse-local`, returning `(row_count, timing)`.
167+
/// Execute a SQL query, returning `(row_count, timing)`.
160168
///
161-
/// The approach:
162-
/// 1. Prepend all CREATE VIEW statements
163-
/// 2. Append the benchmark query
164-
/// 3. Pipe the combined SQL into `clickhouse local` via stdin
165-
/// 4. Parse stdout to count result rows
166-
pub fn execute_query(&self, query: &str) -> Result<(usize, Option<Duration>)> {
169+
/// Spawns a fresh `clickhouse-local` process with the setup SQL prepended to
170+
/// the query. Timing covers only query execution (from process spawn through
171+
/// output collection).
172+
pub fn execute_query(&mut self, query: &str) -> Result<(usize, Option<Duration>)> {
167173
trace!("execute clickhouse query: {query}");
168174

169-
// Build the full SQL: setup views + the actual query
175+
let mut query_str = query.to_string();
176+
if !query_str.trim_end().ends_with(';') {
177+
query_str.push(';');
178+
}
179+
180+
// Build the full SQL batch: setup statements + query.
170181
let mut full_sql = String::new();
171182
for stmt in &self.setup_sql {
172183
full_sql.push_str(stmt);
173184
full_sql.push('\n');
174185
}
175-
full_sql.push_str(query);
176-
// Ensure we have a trailing semicolon
177-
if !query.trim_end().ends_with(';') {
178-
full_sql.push(';');
179-
}
186+
full_sql.push_str(&query_str);
187+
full_sql.push('\n');
180188

181189
let time_instant = Instant::now();
182190

183-
// The `clickhouse` binary is a multi-tool; invoke it as `clickhouse local`.
184-
let mut child = Command::new(&self.binary)
191+
let mut child = Command::new(self.binary.as_os_str())
185192
.args(["local", "--format", "TabSeparated"])
186193
.stdin(Stdio::piped())
187194
.stdout(Stdio::piped())
188195
.stderr(Stdio::piped())
189196
.spawn()
190197
.context("Failed to spawn clickhouse-local")?;
191198

192-
// Write SQL to stdin
199+
// Write all SQL and close stdin to trigger execution.
193200
{
194-
let stdin = child
195-
.stdin
196-
.as_mut()
197-
.context("Failed to open clickhouse-local stdin")?;
201+
let mut stdin = child.stdin.take().context("Failed to open stdin")?;
198202
stdin
199203
.write_all(full_sql.as_bytes())
200-
.context("Failed to write SQL to clickhouse-local stdin")?;
204+
.context("Failed to write SQL to clickhouse-local")?;
205+
stdin.flush().context("Failed to flush stdin")?;
206+
// stdin is dropped here, closing the pipe and signaling EOF.
201207
}
202208

203-
let output = child
204-
.wait_with_output()
209+
// Read all output lines from stdout.
210+
let stdout = child.stdout.take().context("Failed to open stdout")?;
211+
let reader = BufReader::new(stdout);
212+
let mut row_count = 0usize;
213+
for line in reader.lines() {
214+
let line = line.context("Failed to read from clickhouse-local stdout")?;
215+
if !line.trim().is_empty() {
216+
row_count += 1;
217+
}
218+
}
219+
220+
let status = child
221+
.wait()
205222
.context("Failed to wait for clickhouse-local")?;
206223

207224
let query_time = time_instant.elapsed();
208225

209-
if !output.status.success() {
210-
let stderr = String::from_utf8_lossy(&output.stderr);
211-
anyhow::bail!(
212-
"clickhouse-local failed (exit {}): {stderr}",
213-
output.status.code().unwrap_or(-1)
214-
);
226+
if !status.success() {
227+
let stderr = match child.stderr.take() {
228+
Some(s) => {
229+
let mut buf = String::new();
230+
BufReader::new(s).read_to_string(&mut buf).ok();
231+
buf
232+
}
233+
None => String::new(),
234+
};
235+
anyhow::bail!("clickhouse-local exited with {status}. stderr:\n{stderr}",);
215236
}
216237

217-
// Count non-empty lines in stdout as row count
218-
let stdout = String::from_utf8_lossy(&output.stdout);
219-
let row_count = stdout.lines().filter(|line| !line.is_empty()).count();
220-
221238
Ok((row_count, Some(query_time)))
222239
}
223240
}

vortex-duckdb/cpp/replacement_scan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ VortexScanReplacement(duckdb::ClientContext &context,
3434
table_function->alias = fs.ExtractBaseName(table_name);
3535
}
3636

37-
return table_function;
37+
return duckdb::unique_ptr<duckdb::TableRef>(table_function.release());
3838
}
3939

4040
} // namespace vortex

0 commit comments

Comments
 (0)