33
44//! ClickHouse Local context for benchmarks.
55//!
6- //! Uses `clickhouse-local` via `std::process::Command` to execute SQL queries
7- //! against Parquet files on disk.
6+ //! Spawns a fresh `clickhouse-local` process for each query execution. Setup SQL
7+ //! (CREATE VIEW statements) is prepended to each query so that table definitions
8+ //! are available, but the setup cost is excluded from query timings by measuring
9+ //! only the wall-clock time from the first byte of query output to process exit.
10+ //!
11+ //! ## Why per-query processes?
12+ //!
13+ //! `clickhouse-local` in non-interactive (piped stdin) mode reads **all** of stdin
14+ //! before executing any queries. This makes a persistent-process + delimiter protocol
15+ //! impossible — the process blocks on stdin, never producing output until EOF. Spawning
16+ //! a fresh process per query avoids this by closing stdin immediately after writing the
17+ //! full SQL batch (setup + query), which triggers execution.
18+ //!
19+ //! Process spawn overhead is negligible (~1ms) compared to query execution times.
820//!
921//! The ClickHouse binary is resolved at build time via `build.rs`:
1022//! 1. `CLICKHOUSE_BINARY` env var — use the specified path.
1426//! or download from <https://clickhouse.com/docs/en/install>).
1527//! In CI, it is installed by the workflow before the benchmark step.
1628
29+ use std:: io:: BufRead ;
30+ use std:: io:: BufReader ;
31+ use std:: io:: Read ;
1732use std:: io:: Write ;
33+ use std:: path:: Path ;
1834use std:: path:: PathBuf ;
1935use std:: process:: Command ;
2036use std:: process:: Stdio ;
@@ -33,45 +49,42 @@ use vortex_bench::Format;
3349/// or `"clickhouse"` (resolved from `$PATH` at runtime).
3450const CLICKHOUSE_BINARY : & str = env ! ( "CLICKHOUSE_BINARY" ) ;
3551
36- /// A client that wraps `clickhouse-local` for running SQL benchmarks.
52+ /// A client that spawns `clickhouse-local` processes for running SQL benchmarks.
53+ ///
54+ /// Setup SQL (CREATE VIEW) is stored at construction time and prepended to each
55+ /// query execution. Each `execute_query` call spawns a fresh process, writes the
56+ /// full SQL batch (setup + query), closes stdin, and reads the output.
3757pub struct ClickHouseClient {
38- /// The path to the `clickhouse` binary.
58+ /// Path to the ClickHouse binary.
3959 binary : PathBuf ,
40- /// SQL statements to run before each query (CREATE VIEW statements) .
60+ /// Setup SQL statements (CREATE VIEW) to prepend to each query.
4161 setup_sql : Vec < String > ,
4262}
4363
4464impl ClickHouseClient {
45- /// Create a new client. Only Parquet format is supported .
65+ /// Create a new client that will use `clickhouse-local` for query execution .
4666 ///
47- /// The ClickHouse binary is resolved from (in order):
48- /// 1. `CLICKHOUSE_BINARY` env var at build time
49- /// 2. `"clickhouse"` on `$PATH`
67+ /// Validates that the binary is available and builds setup SQL (CREATE VIEW
68+ /// statements) from the benchmark's table specs. Only Parquet format is supported.
5069 pub fn new ( benchmark : & dyn Benchmark , format : Format ) -> Result < Self > {
5170 if format != Format :: Parquet {
5271 anyhow:: bail!( "clickhouse-bench only supports Parquet format, got {format}" ) ;
5372 }
5473
5574 let binary = PathBuf :: from ( CLICKHOUSE_BINARY ) ;
56-
57- // Verify the binary is usable (either absolute path exists, or resolvable via PATH).
5875 Self :: verify_binary ( & binary) ?;
59-
6076 tracing:: info!( binary = %binary. display( ) , "Using clickhouse-local" ) ;
6177
62- let mut client = Self {
63- binary,
64- setup_sql : Vec :: new ( ) ,
65- } ;
66- client. register_tables ( benchmark, format) ?;
67- Ok ( client)
78+ let setup_sql = Self :: build_setup_sql ( benchmark, format) ?;
79+
80+ Ok ( Self { binary, setup_sql } )
6881 }
6982
7083 /// Check that the ClickHouse binary is available.
7184 ///
7285 /// For absolute paths, checks that the file exists on disk.
7386 /// For bare names (e.g., `"clickhouse"`), tries to invoke it to verify it's resolvable.
74- fn verify_binary ( binary : & PathBuf ) -> Result < ( ) > {
87+ fn verify_binary ( binary : & Path ) -> Result < ( ) > {
7588 if binary. is_absolute ( ) {
7689 anyhow:: ensure!(
7790 binary. exists( ) ,
@@ -82,7 +95,6 @@ impl ClickHouseClient {
8295 ) ;
8396 }
8497
85- // Verify the binary is actually usable by running `clickhouse local --version`.
8698 let output = Command :: new ( binary. as_os_str ( ) )
8799 . args ( [ "local" , "--version" ] )
88100 . output ( )
@@ -108,12 +120,8 @@ impl ClickHouseClient {
108120 Ok ( ( ) )
109121 }
110122
111- /// Generate `CREATE VIEW ... AS SELECT * FROM file(...)` statements.
112- ///
113- /// We use a VIEW over the `file()` table function rather than `CREATE TABLE ... ENGINE = File()`
114- /// because the `file()` function handles glob patterns (e.g., `*.parquet`) more reliably across
115- /// ClickHouse versions.
116- fn register_tables ( & mut self , benchmark : & dyn Benchmark , format : Format ) -> Result < ( ) > {
123+ /// Build `CREATE VIEW ... AS SELECT * FROM file(...)` statements for all tables.
124+ fn build_setup_sql ( benchmark : & dyn Benchmark , format : Format ) -> Result < Vec < String > > {
117125 let data_url = benchmark. data_url ( ) ;
118126 let base_dir = if data_url. scheme ( ) == "file" {
119127 data_url
@@ -131,6 +139,7 @@ impl ClickHouseClient {
131139 ) ;
132140 }
133141
142+ let mut stmts = Vec :: new ( ) ;
134143 for table_spec in benchmark. table_specs ( ) {
135144 let name = table_spec. name ;
136145 let pattern = benchmark
@@ -146,78 +155,86 @@ impl ClickHouseClient {
146155 "Registering ClickHouse table"
147156 ) ;
148157
149- let create_sql = format ! (
158+ stmts . push ( format ! (
150159 "CREATE VIEW IF NOT EXISTS {name} AS \
151160 SELECT * FROM file('{data_path}', Parquet);"
152- ) ;
153- self . setup_sql . push ( create_sql) ;
161+ ) ) ;
154162 }
155163
156- Ok ( ( ) )
164+ Ok ( stmts )
157165 }
158166
159- /// Execute a SQL query via `clickhouse-local` , returning `(row_count, timing)`.
167+ /// Execute a SQL query, returning `(row_count, timing)`.
160168 ///
161- /// The approach:
162- /// 1. Prepend all CREATE VIEW statements
163- /// 2. Append the benchmark query
164- /// 3. Pipe the combined SQL into `clickhouse local` via stdin
165- /// 4. Parse stdout to count result rows
166- pub fn execute_query ( & self , query : & str ) -> Result < ( usize , Option < Duration > ) > {
169+ /// Spawns a fresh `clickhouse-local` process with the setup SQL prepended to
170+ /// the query. Timing covers only query execution (from process spawn through
171+ /// output collection).
172+ pub fn execute_query ( & mut self , query : & str ) -> Result < ( usize , Option < Duration > ) > {
167173 trace ! ( "execute clickhouse query: {query}" ) ;
168174
169- // Build the full SQL: setup views + the actual query
175+ let mut query_str = query. to_string ( ) ;
176+ if !query_str. trim_end ( ) . ends_with ( ';' ) {
177+ query_str. push ( ';' ) ;
178+ }
179+
180+ // Build the full SQL batch: setup statements + query.
170181 let mut full_sql = String :: new ( ) ;
171182 for stmt in & self . setup_sql {
172183 full_sql. push_str ( stmt) ;
173184 full_sql. push ( '\n' ) ;
174185 }
175- full_sql. push_str ( query) ;
176- // Ensure we have a trailing semicolon
177- if !query. trim_end ( ) . ends_with ( ';' ) {
178- full_sql. push ( ';' ) ;
179- }
186+ full_sql. push_str ( & query_str) ;
187+ full_sql. push ( '\n' ) ;
180188
181189 let time_instant = Instant :: now ( ) ;
182190
183- // The `clickhouse` binary is a multi-tool; invoke it as `clickhouse local`.
184- let mut child = Command :: new ( & self . binary )
191+ let mut child = Command :: new ( self . binary . as_os_str ( ) )
185192 . args ( [ "local" , "--format" , "TabSeparated" ] )
186193 . stdin ( Stdio :: piped ( ) )
187194 . stdout ( Stdio :: piped ( ) )
188195 . stderr ( Stdio :: piped ( ) )
189196 . spawn ( )
190197 . context ( "Failed to spawn clickhouse-local" ) ?;
191198
192- // Write SQL to stdin
199+ // Write all SQL and close stdin to trigger execution.
193200 {
194- let stdin = child
195- . stdin
196- . as_mut ( )
197- . context ( "Failed to open clickhouse-local stdin" ) ?;
201+ let mut stdin = child. stdin . take ( ) . context ( "Failed to open stdin" ) ?;
198202 stdin
199203 . write_all ( full_sql. as_bytes ( ) )
200- . context ( "Failed to write SQL to clickhouse-local stdin" ) ?;
204+ . context ( "Failed to write SQL to clickhouse-local" ) ?;
205+ stdin. flush ( ) . context ( "Failed to flush stdin" ) ?;
206+ // stdin is dropped here, closing the pipe and signaling EOF.
201207 }
202208
203- let output = child
204- . wait_with_output ( )
209+ // Read all output lines from stdout.
210+ let stdout = child. stdout . take ( ) . context ( "Failed to open stdout" ) ?;
211+ let reader = BufReader :: new ( stdout) ;
212+ let mut row_count = 0usize ;
213+ for line in reader. lines ( ) {
214+ let line = line. context ( "Failed to read from clickhouse-local stdout" ) ?;
215+ if !line. trim ( ) . is_empty ( ) {
216+ row_count += 1 ;
217+ }
218+ }
219+
220+ let status = child
221+ . wait ( )
205222 . context ( "Failed to wait for clickhouse-local" ) ?;
206223
207224 let query_time = time_instant. elapsed ( ) ;
208225
209- if !output. status . success ( ) {
210- let stderr = String :: from_utf8_lossy ( & output. stderr ) ;
211- anyhow:: bail!(
212- "clickhouse-local failed (exit {}): {stderr}" ,
213- output. status. code( ) . unwrap_or( -1 )
214- ) ;
226+ if !status. success ( ) {
227+ let stderr = match child. stderr . take ( ) {
228+ Some ( s) => {
229+ let mut buf = String :: new ( ) ;
230+ BufReader :: new ( s) . read_to_string ( & mut buf) . ok ( ) ;
231+ buf
232+ }
233+ None => String :: new ( ) ,
234+ } ;
235+ anyhow:: bail!( "clickhouse-local exited with {status}. stderr:\n {stderr}" , ) ;
215236 }
216237
217- // Count non-empty lines in stdout as row count
218- let stdout = String :: from_utf8_lossy ( & output. stdout ) ;
219- let row_count = stdout. lines ( ) . filter ( |line| !line. is_empty ( ) ) . count ( ) ;
220-
221238 Ok ( ( row_count, Some ( query_time) ) )
222239 }
223240}
0 commit comments