4141//! SELECT briefly delays ingest.
4242
4343use std:: fmt:: Write as _;
44+ use std:: path:: Path ;
4445use std:: path:: PathBuf ;
4546
4647use anyhow:: Context as _;
@@ -68,6 +69,8 @@ use crate::app::AppState;
6869use crate :: db;
6970use crate :: schema;
7071
72+ const ADMIN_SQL_ROW_LIMIT : usize = 10_000 ;
73+
7174/// Errors surfaced by `/api/admin/*` handlers. Auth (401) is handled by
7275/// [`require_admin_bearer`] and never reaches a handler.
7376#[ derive( Debug , Error ) ]
@@ -182,44 +185,78 @@ pub async fn snapshot(
182185 target. display( )
183186 ) ) ) ;
184187 }
185- std:: fs:: create_dir_all ( & target)
186- . with_context ( || format ! ( "creating snapshot dir {}" , target. display( ) ) ) ?;
187188
189+ let tmp = tmp_snapshot_dir ( & target, & q. ts ) ;
190+ if tmp. exists ( ) {
191+ std:: fs:: remove_dir_all ( & tmp)
192+ . with_context ( || format ! ( "removing stale temp snapshot dir {}" , tmp. display( ) ) ) ?;
193+ }
194+ std:: fs:: create_dir_all ( & tmp)
195+ . with_context ( || format ! ( "creating temp snapshot dir {}" , tmp. display( ) ) ) ?;
196+
197+ let result = write_snapshot ( & state, & tmp) . await ;
198+ if let Err ( err) = result {
199+ let _ = std:: fs:: remove_dir_all ( & tmp) ;
200+ return Err ( AdminError :: Internal ( err) ) ;
201+ }
202+ if let Err ( err) = std:: fs:: rename ( & tmp, & target) . with_context ( || {
203+ format ! (
204+ "moving snapshot dir {} to {}" ,
205+ tmp. display( ) ,
206+ target. display( )
207+ )
208+ } ) {
209+ let _ = std:: fs:: remove_dir_all ( & tmp) ;
210+ return Err ( AdminError :: Internal ( err) ) ;
211+ }
212+ Ok ( Json ( SnapshotResponse {
213+ snapshot_dir : target. display ( ) . to_string ( ) ,
214+ } ) )
215+ }
216+
217+ fn tmp_snapshot_dir ( target : & Path , ts : & str ) -> PathBuf {
218+ target. with_file_name ( format ! ( "{ts}.tmp-{}" , std:: process:: id( ) ) )
219+ }
220+
221+ async fn write_snapshot ( state : & AppState , target : & Path ) -> Result < ( ) > {
188222 // Schema is just our DDL string verbatim; restore reads this with
189223 // `duckdb -init schema.sql` (or `.read schema.sql`) before
190224 // bulk-loading the per-table vortex files.
191225 std:: fs:: write ( target. join ( "schema.sql" ) , schema:: SCHEMA_DDL )
192226 . with_context ( || format ! ( "writing schema.sql under {}" , target. display( ) ) ) ?;
193227
194- let target_for_db = target. clone ( ) ;
228+ let target_for_db = target. to_path_buf ( ) ;
195229 db:: run_blocking ( & state. db , move |conn| {
196- // Idempotent — `INSTALL` is a no-op if the extension is already
197- // present, `LOAD` is cheap once the binary is on disk. The
198- // bundled libduckdb-sys has autoload enabled, so the very first
199- // call also auto-fetches the extension from the DuckDB
200- // community repo. Subsequent calls are entirely local.
201- conn. execute_batch ( "INSTALL vortex FROM community; LOAD vortex;" )
202- . context ( "INSTALL/LOAD vortex extension" ) ?;
203- for table in schema:: TABLES {
204- // Single-quote escaping is a non-issue: `target_for_db`
205- // is composed from the operator-configured snapshot dir +
206- // a validated [A-Za-z0-9_-] timestamp, and table names
207- // come from the closed const list in schema.rs.
208- let path = target_for_db. join ( format ! ( "{table}.vortex" ) ) ;
209- let path_str = path
210- . to_str ( )
211- . ok_or_else ( || anyhow:: anyhow!( "snapshot path is not UTF-8: {}" , path. display( ) ) ) ?;
212- let sql = format ! ( "COPY (SELECT * FROM {table}) TO '{path_str}' (FORMAT vortex)" ) ;
213- conn. execute_batch ( & sql)
214- . with_context ( || format ! ( "COPY {table} TO {path_str}" ) ) ?;
215- }
216- Ok ( ( ) )
230+ export_snapshot_tables ( conn, & target_for_db)
217231 } )
218232 . await
219- . map_err ( AdminError :: Internal ) ?;
220- Ok ( Json ( SnapshotResponse {
221- snapshot_dir : target. display ( ) . to_string ( ) ,
222- } ) )
233+ }
234+
235+ fn export_snapshot_tables ( conn : & mut Connection , target : & Path ) -> Result < ( ) > {
236+ // Idempotent — `INSTALL` is a no-op if the extension is already
237+ // present, `LOAD` is cheap once the binary is on disk. The
238+ // bundled libduckdb-sys has autoload enabled, so the very first
239+ // call also auto-fetches the extension from the DuckDB
240+ // community repo. Subsequent calls are entirely local.
241+ conn. execute_batch ( "INSTALL vortex FROM community; LOAD vortex;" )
242+ . context ( "INSTALL/LOAD vortex extension" ) ?;
243+ for table in schema:: TABLES {
244+ let path = target. join ( format ! ( "{table}.vortex" ) ) ;
245+ let path_str = path
246+ . to_str ( )
247+ . ok_or_else ( || anyhow:: anyhow!( "snapshot path is not UTF-8: {}" , path. display( ) ) ) ?;
248+ let sql = format ! (
249+ "COPY (SELECT * FROM {table}) TO {} (FORMAT vortex)" ,
250+ sql_string_literal( path_str)
251+ ) ;
252+ conn. execute_batch ( & sql)
253+ . with_context ( || format ! ( "COPY {table} TO {path_str}" ) ) ?;
254+ }
255+ Ok ( ( ) )
256+ }
257+
258+ fn sql_string_literal ( value : & str ) -> String {
259+ format ! ( "'{}'" , value. replace( '\'' , "''" ) )
223260}
224261
225262fn validate_ts ( ts : & str ) -> Result < ( ) , AdminError > {
@@ -275,6 +312,7 @@ pub async fn sql(
275312 "columns" : result. columns,
276313 "rows" : result. rows,
277314 "row_count" : result. rows. len( ) ,
315+ "truncated" : result. truncated,
278316 } ) )
279317 . into_response ( ) ,
280318 SqlFormat :: Table => (
@@ -286,6 +324,7 @@ pub async fn sql(
286324}
287325
288326fn validate_read_only ( sql : & str ) -> Result < ( ) , AdminError > {
327+ ensure_single_statement ( sql) ?;
289328 let trimmed = sql. trim_start_matches ( |c : char | c. is_whitespace ( ) || c == '(' || c == ';' ) ;
290329 let first_word: String = trimmed
291330 . chars ( )
@@ -301,12 +340,96 @@ fn validate_read_only(sql: &str) -> Result<(), AdminError> {
301340 Ok ( ( ) )
302341}
303342
343+ fn ensure_single_statement ( sql : & str ) -> Result < ( ) , AdminError > {
344+ #[ derive( Clone , Copy , PartialEq , Eq ) ]
345+ enum State {
346+ Normal ,
347+ SingleQuote ,
348+ DoubleQuote ,
349+ LineComment ,
350+ BlockComment ,
351+ }
352+
353+ let mut state = State :: Normal ;
354+ let mut chars = sql. char_indices ( ) . peekable ( ) ;
355+ while let Some ( ( idx, ch) ) = chars. next ( ) {
356+ match state {
357+ State :: Normal => match ch {
358+ '\'' => state = State :: SingleQuote ,
359+ '"' => state = State :: DoubleQuote ,
360+ '-' if chars. peek ( ) . is_some_and ( |( _, next) | * next == '-' ) => {
361+ chars. next ( ) ;
362+ state = State :: LineComment ;
363+ }
364+ '/' if chars. peek ( ) . is_some_and ( |( _, next) | * next == '*' ) => {
365+ chars. next ( ) ;
366+ state = State :: BlockComment ;
367+ }
368+ ';' if !sql[ idx + ch. len_utf8 ( ) ..] . trim ( ) . is_empty ( ) => {
369+ return Err ( AdminError :: Forbidden (
370+ "admin SQL accepts a single statement only" . into ( ) ,
371+ ) ) ;
372+ }
373+ _ => { }
374+ } ,
375+ State :: SingleQuote => {
376+ if ch == '\'' {
377+ if chars. peek ( ) . is_some_and ( |( _, next) | * next == '\'' ) {
378+ chars. next ( ) ;
379+ } else {
380+ state = State :: Normal ;
381+ }
382+ }
383+ }
384+ State :: DoubleQuote => {
385+ if ch == '"' {
386+ if chars. peek ( ) . is_some_and ( |( _, next) | * next == '"' ) {
387+ chars. next ( ) ;
388+ } else {
389+ state = State :: Normal ;
390+ }
391+ }
392+ }
393+ State :: LineComment => {
394+ if ch == '\n' {
395+ state = State :: Normal ;
396+ }
397+ }
398+ State :: BlockComment => {
399+ if ch == '*' && chars. peek ( ) . is_some_and ( |( _, next) | * next == '/' ) {
400+ chars. next ( ) ;
401+ state = State :: Normal ;
402+ }
403+ }
404+ }
405+ }
406+ Ok ( ( ) )
407+ }
408+
304409struct QueryResult {
305410 columns : Vec < String > ,
306411 rows : Vec < Vec < Value > > ,
412+ truncated : bool ,
413+ }
414+
415+ fn run_select ( conn : & mut Connection , sql : & str ) -> Result < QueryResult > {
416+ conn. execute_batch ( "BEGIN TRANSACTION READ ONLY" )
417+ . context ( "begin read-only admin SQL transaction" ) ?;
418+ let result = run_select_in_transaction ( conn, sql) ;
419+ match result {
420+ Ok ( value) => {
421+ conn. execute_batch ( "COMMIT" )
422+ . context ( "commit read-only admin SQL transaction" ) ?;
423+ Ok ( value)
424+ }
425+ Err ( err) => {
426+ let _ = conn. execute_batch ( "ROLLBACK" ) ;
427+ Err ( err)
428+ }
429+ }
307430}
308431
309- fn run_select ( conn : & Connection , sql : & str ) -> Result < QueryResult > {
432+ fn run_select_in_transaction ( conn : & Connection , sql : & str ) -> Result < QueryResult > {
310433 let mut stmt = conn. prepare ( sql) . context ( "prepare SQL" ) ?;
311434 let mut rows_iter = stmt. query ( [ ] ) . context ( "execute SQL" ) ?;
312435 // duckdb-rs panics on Statement::column_names() if the statement has not
@@ -318,15 +441,24 @@ fn run_select(conn: &Connection, sql: &str) -> Result<QueryResult> {
318441 . unwrap_or_default ( ) ;
319442 let column_count = columns. len ( ) ;
320443 let mut rows: Vec < Vec < Value > > = Vec :: new ( ) ;
444+ let mut truncated = false ;
321445 while let Some ( row) = rows_iter. next ( ) . context ( "row iter" ) ? {
446+ if rows. len ( ) == ADMIN_SQL_ROW_LIMIT {
447+ truncated = true ;
448+ break ;
449+ }
322450 let mut out = Vec :: with_capacity ( column_count) ;
323451 for i in 0 ..column_count {
324452 let v = row. get_ref ( i) . context ( "get col" ) ?;
325453 out. push ( value_ref_to_json ( v) ) ;
326454 }
327455 rows. push ( out) ;
328456 }
329- Ok ( QueryResult { columns, rows } )
457+ Ok ( QueryResult {
458+ columns,
459+ rows,
460+ truncated,
461+ } )
330462}
331463
332464fn value_ref_to_json ( v : ValueRef < ' _ > ) -> Value {
@@ -378,9 +510,10 @@ fn format_table(r: &QueryResult) -> String {
378510 write_separator ( & mut out, & widths, '└' , '┴' , '┘' ) ;
379511 let _ = writeln ! (
380512 out,
381- "({} row{})" ,
513+ "({} row{}{} )" ,
382514 r. rows. len( ) ,
383- if r. rows. len( ) == 1 { "" } else { "s" }
515+ if r. rows. len( ) == 1 { "" } else { "s" } ,
516+ if r. truncated { "; truncated" } else { "" } ,
384517 ) ;
385518 out
386519}
0 commit comments