-
Notifications
You must be signed in to change notification settings - Fork 56
Expand file tree
/
Copy pathconnection.rs
More file actions
3238 lines (2992 loc) · 130 KB
/
Copy pathconnection.rs
File metadata and controls
3238 lines (2992 loc) · 130 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//! Public `Connection` / `Statement` / `Rows` / `Row` API (Phase 5a + SQLR-23).
//!
//! This is the stable surface external consumers bind against — Rust
//! callers use it directly, language SDKs (Python, Node.js, Go) bind
//! against the C FFI wrapper over these same types in Phase 5b, and
//! the WASM build in Phase 5g re-exposes them via `wasm-bindgen`.
//!
//! The shape mirrors `rusqlite` / Python's `sqlite3` so users
//! familiar with either can pick it up immediately:
//!
//! ```no_run
//! use sqlrite::Connection;
//!
//! let mut conn = Connection::open("foo.sqlrite")?;
//! conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")?;
//! conn.execute("INSERT INTO users (name) VALUES ('alice')")?;
//!
//! let mut stmt = conn.prepare("SELECT id, name FROM users")?;
//! let mut rows = stmt.query()?;
//! while let Some(row) = rows.next()? {
//! let id: i64 = row.get(0)?;
//! let name: String = row.get(1)?;
//! println!("{id}: {name}");
//! }
//! # Ok::<(), sqlrite::SQLRiteError>(())
//! ```
//!
//! **Relationship to the internal engine.** A `Connection` owns a
//! `Database` (which owns a `Pager` for file-backed connections).
//! `execute` and `query` go through the same `process_command`
//! pipeline the REPL uses, just with typed row return instead of
//! pre-rendered tables. The internal `Database` / `Pager` stay
//! accessible via `sqlrite::sql::...` for the engine's own tests
//! and for the desktop app — but those paths aren't considered
//! stable API.
//!
//! # Prepared statements & parameter binding (SQLR-23)
//!
//! `Connection::prepare` parses the SQL once and stashes the AST on
//! the returned `Statement`. Subsequent calls to `Statement::query` /
//! `Statement::run` execute against the cached AST without re-running
//! sqlparser. Bound versions ([`Statement::query_with_params`] /
//! [`Statement::execute_with_params`]) accept a `&[Value]` slice that is
//! substituted into the cached AST at execute time — including
//! `Value::Vector(...)` for HNSW-eligible KNN queries, where binding
//! the query vector skips per-iter lexing of the 4 KB bracket-array
//! literal.
//!
//! [`Connection::prepare_cached`] adds a small per-connection LRU
//! (default cap 16) so a hot SQL string is parsed exactly once across
//! every call, not once per `prepare()`. Matches the rusqlite pattern.
use std::collections::{HashMap, VecDeque};
use std::path::Path;
use std::sync::{Arc, Mutex, MutexGuard};
use crate::sql::dialect::SqlriteDialect;
use sqlparser::ast::Statement as AstStatement;
use sqlparser::parser::Parser;
use crate::error::{Result, SQLRiteError};
use crate::mvcc::{
ConcurrentTx, JournalMode, MvccCommitBatch, MvccLogRecord, RowID, RowVersion, VersionPayload,
};
use crate::sql::db::database::{Database, TxnSnapshot};
use crate::sql::db::table::{Table, Value};
use crate::sql::executor::execute_select_rows;
use crate::sql::pager::{self, AccessMode, open_database_with_mode, save_database};
use crate::sql::params::{rewrite_placeholders, substitute_params};
use crate::sql::parser::select::SelectQuery;
use crate::sql::process_ast_with_render;
/// Default capacity of the per-connection prepared-statement plan cache.
/// Matches rusqlite's default; tweak with [`Connection::set_prepared_cache_capacity`].
const DEFAULT_PREP_CACHE_CAP: usize = 16;
/// A handle to a SQLRite database. Opens a file or an in-memory DB;
/// drop it to close. Every mutating statement auto-saves (except inside
/// an explicit `BEGIN`/`COMMIT` block — see [Transactions](#transactions)).
///
/// ## Transactions
///
/// ```no_run
/// # use sqlrite::Connection;
/// let mut conn = Connection::open("foo.sqlrite")?;
/// conn.execute("BEGIN")?;
/// conn.execute("INSERT INTO users (name) VALUES ('alice')")?;
/// conn.execute("INSERT INTO users (name) VALUES ('bob')")?;
/// conn.execute("COMMIT")?;
/// # Ok::<(), sqlrite::SQLRiteError>(())
/// ```
///
/// ## Multiple connections (Phase 10.1)
///
/// `Connection` is a thin handle over an `Arc<Mutex<Database>>`. Call
/// [`Connection::connect`] to mint a sibling handle that shares the
/// same backing `Database` — typically one per worker thread. Today
/// every operation still serializes through the single mutex (and the
/// pager's exclusive flock between processes), so the headline
/// behaviour change is that callers can hold and address the same DB
/// from more than one thread without wrapping the whole `Connection`
/// in a `Mutex` themselves. `BEGIN CONCURRENT` and snapshot-isolated
/// reads land in subsequent Phase 10 sub-phases.
///
/// `Connection` is `Send + Sync`. The recommended pattern is one
/// connection per thread (clone via `connect()`); statements still
/// borrow `&mut Connection`, so a single connection isn't suitable
/// for true concurrent statement execution.
pub struct Connection {
/// Shared engine state. Mints sibling connections via
/// [`Connection::connect`] without copying the in-memory tables
/// or the long-lived pager.
inner: Arc<Mutex<Database>>,
/// SQLR-23 — small SQL→cached-plan LRU. Keyed by the verbatim SQL
/// string the caller passed to `prepare_cached`. Stored as a
/// `VecDeque` rather than a HashMap+linked-list because the
/// expected capacity is small (default 16) — linear scan is fine
/// and the implementation stays dependency-free.
///
/// Per-connection (not shared with sibling handles) — each thread
/// gets its own LRU so cache-mutation never crosses a thread
/// boundary.
prep_cache: VecDeque<(String, Arc<CachedPlan>)>,
prep_cache_cap: usize,
/// Phase 11.4 — per-connection `BEGIN CONCURRENT` state.
/// `None` outside a concurrent transaction; `Some` between
/// `BEGIN CONCURRENT` and `COMMIT` / `ROLLBACK`. Multiple
/// sibling connections can each hold their own — that's the
/// headline concurrency story this slice unlocks.
///
/// While `Some`, every statement on this connection runs
/// against the cloned tables in [`ConcurrentTx::tables`]
/// instead of the live `Database::tables`. The live database
/// stays untouched until the commit-validation pass succeeds.
///
/// **Phase 11.5 — wrapped in a `Mutex`.** [`Statement::query`]
/// and [`Statement::query_with_params`] take `&self`, so they
/// need interior mutability to swap the snapshot in for the
/// read. The lock is uncontended in single-thread use (each
/// connection's `concurrent_tx` is per-handle, and the
/// Statement-borrows-Connection contract still serializes
/// statements on a given handle); the Mutex is the cheapest
/// way to satisfy the borrow checker without restructuring
/// the Statement API. Lock order is always
/// `concurrent_tx` → `inner` to keep deadlock-free.
concurrent_tx: Mutex<Option<ConcurrentTx>>,
}
impl Connection {
/// Opens (or creates) a database file for read-write access.
///
/// If the file doesn't exist, an empty one is materialized with the
/// current format version. Takes an exclusive advisory lock on the
/// file and its `-wal` sidecar; returns `Err` if either is already
/// locked by another process.
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
let db_name = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("db")
.to_string();
let db = if path.exists() {
open_database_with_mode(path, db_name, AccessMode::ReadWrite)?
} else {
// Fresh file: materialize on disk and keep the attached
// pager. Setting `source_path` before `save_database` lets
// its `same_path` branch create the pager and stash it
// back on the Database — no reopen needed (and trying to
// reopen here would hit the file's own lock).
let mut fresh = Database::new(db_name);
fresh.source_path = Some(path.to_path_buf());
save_database(&mut fresh, path)?;
fresh
};
Ok(Self::wrap(db))
}
/// Opens an existing database file for read-only access. Takes a
/// shared advisory lock, so multiple read-only connections can
/// coexist on the same file; any open writer excludes them.
/// Mutating statements return `cannot execute: database is opened
/// read-only`.
pub fn open_read_only<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
let db_name = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("db")
.to_string();
let db = open_database_with_mode(path, db_name, AccessMode::ReadOnly)?;
Ok(Self::wrap(db))
}
/// Opens a transient in-memory database. No file is touched and no
/// locks are taken; state lives for the lifetime of the
/// `Connection` and is discarded on drop.
pub fn open_in_memory() -> Result<Self> {
Ok(Self::wrap(Database::new("memdb".to_string())))
}
fn wrap(db: Database) -> Self {
Self {
inner: Arc::new(Mutex::new(db)),
prep_cache: VecDeque::new(),
prep_cache_cap: DEFAULT_PREP_CACHE_CAP,
concurrent_tx: Mutex::new(None),
}
}
/// Phase 10.1 — mints another `Connection` sharing the same
/// backing `Database`. Hand the returned handle to a separate
/// thread to address the same in-memory tables and persistent
/// pager from there.
///
/// The new handle starts with an empty prepared-statement cache
/// (caches are per-handle, by design). Inherits the parent's
/// `prepare_cached` capacity. Concurrent operations still
/// serialize through the engine's internal lock and the pager's
/// existing single-writer rule — a true multi-writer story
/// arrives with `BEGIN CONCURRENT` in Phase 10.4.
///
/// ```no_run
/// # use sqlrite::Connection;
/// let mut primary = Connection::open("foo.sqlrite")?;
/// let secondary = primary.connect();
/// std::thread::spawn(move || {
/// let mut conn = secondary;
/// conn.execute("INSERT INTO t (x) VALUES (1)").unwrap();
/// })
/// .join()
/// .unwrap();
/// # Ok::<(), sqlrite::SQLRiteError>(())
/// ```
pub fn connect(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
prep_cache: VecDeque::new(),
prep_cache_cap: self.prep_cache_cap,
// Phase 11.4: each sibling handle starts outside any
// concurrent transaction. Multi-thread `BEGIN CONCURRENT`
// is the headline use case — every clone gets its own
// independent slot.
concurrent_tx: Mutex::new(None),
}
}
/// Phase 10.1 — number of `Connection` handles currently sharing
/// this database (this handle plus every live `connect()`
/// descendant). Useful for diagnostics and tests; no semantic
/// guarantee beyond that.
pub fn handle_count(&self) -> usize {
Arc::strong_count(&self.inner)
}
/// Locks the shared `Database` and returns the guard. Internal
/// helper — every public method that needs `&mut Database` calls
/// this. The lock is released when the guard drops, so callers
/// must keep the guard alive for the duration of the engine call
/// (typically by binding it to a local).
fn lock(&self) -> MutexGuard<'_, Database> {
// `unwrap` propagates a panic from another thread that held
// the lock — there's no engine-level recovery story for a
// poisoned `Database` (the in-memory tables would be in an
// unknown state), so failing fast is the right behaviour.
self.inner
.lock()
.unwrap_or_else(|e| panic!("sqlrite: database mutex poisoned: {e}"))
}
/// Parses and executes one SQL statement. For DDL (`CREATE TABLE`,
/// `CREATE INDEX`), DML (`INSERT`, `UPDATE`, `DELETE`) and
/// transaction control (`BEGIN`, `COMMIT`, `ROLLBACK`,
/// `BEGIN CONCURRENT`). Returns the status message the engine
/// produced (e.g. `"INSERT Statement executed."`).
///
/// For `SELECT`, `execute` works but discards the row data and
/// just returns the rendered status — use [`Connection::prepare`]
/// and [`Statement::query`] to iterate typed rows.
///
/// Phase 11.4 — intercepts `BEGIN CONCURRENT`, `COMMIT`, and
/// `ROLLBACK` before sqlparser sees them so the per-connection
/// MVCC transaction state stays in sync. Inside an open
/// concurrent transaction, every other statement runs against
/// the transaction's private cloned tables; the live database
/// stays untouched until commit-validation succeeds.
pub fn execute(&mut self, sql: &str) -> Result<String> {
let intent = concurrent_tx_intent(sql);
let has_tx = self.concurrent_tx_is_open();
match intent {
ConcurrentTxIntent::Begin => self.begin_concurrent(),
ConcurrentTxIntent::Commit if has_tx => self.commit_concurrent(),
ConcurrentTxIntent::Rollback if has_tx => self.rollback_concurrent(),
ConcurrentTxIntent::None
| ConcurrentTxIntent::Commit
| ConcurrentTxIntent::Rollback => self.execute_dispatch(sql),
}
}
/// Phase 11.11a — same as [`Connection::execute`], but returns
/// the full [`CommandOutput`] (status + optional pre-rendered
/// prettytable for `SELECT`). The REPL needs this to print the
/// table the engine produced *and* the status line in one
/// pass, while still routing `BEGIN CONCURRENT` / `COMMIT` /
/// `ROLLBACK` through the per-connection MVCC state.
///
/// `BEGIN` / `COMMIT` / `ROLLBACK` carry no rendered output —
/// they return `CommandOutput { status, rendered: None }`.
pub fn execute_with_render(&mut self, sql: &str) -> Result<crate::sql::CommandOutput> {
let intent = concurrent_tx_intent(sql);
let has_tx = self.concurrent_tx_is_open();
let status = match intent {
ConcurrentTxIntent::Begin => self.begin_concurrent()?,
ConcurrentTxIntent::Commit if has_tx => self.commit_concurrent()?,
ConcurrentTxIntent::Rollback if has_tx => self.rollback_concurrent()?,
ConcurrentTxIntent::None
| ConcurrentTxIntent::Commit
| ConcurrentTxIntent::Rollback => return self.execute_dispatch_with_render(sql),
};
Ok(crate::sql::CommandOutput {
status,
rendered: None,
})
}
/// Phase 11.5 — cheap probe used by [`Connection::execute`]
/// (and [`Statement::query`]) to decide whether to route
/// through the concurrent-tx dispatch. Acquires the
/// `concurrent_tx` mutex briefly; never blocks for a
/// meaningful amount of time because the only other lockers
/// are this connection's own writers.
///
/// Public so the REPL can render per-handle tx state in
/// `.conns` output (Phase 11.11a).
pub fn concurrent_tx_is_open(&self) -> bool {
self.lock_concurrent_tx().is_some()
}
/// Phase 11.5 — locks the per-connection
/// `Mutex<Option<ConcurrentTx>>`. Wrapping the poison handler
/// in one place keeps every caller's lock-order discipline
/// visible at the call site (always `concurrent_tx` before
/// `inner`).
fn lock_concurrent_tx(&self) -> MutexGuard<'_, Option<ConcurrentTx>> {
self.concurrent_tx.lock().unwrap_or_else(|e| {
panic!("sqlrite: concurrent_tx mutex poisoned: {e}");
})
}
/// Phase 11.5 — runs `f` against the read-side `&Database`
/// the caller's transaction expects to see.
///
/// - **No concurrent transaction open** — `f` runs against the
/// live `Database::tables`. Same path the legacy `query`
/// used.
/// - **Concurrent transaction open** — swaps the transaction's
/// private cloned `tables` in for the duration of `f`, so
/// `f` sees the BEGIN-time snapshot plus any writes the
/// transaction has staged. Swaps back before the function
/// returns even on error (the swap-back uses a scope guard
/// pattern so a panic inside `f` doesn't leave `db.tables`
/// pointing at the snapshot clone).
///
/// Takes `&self` (rather than `&mut self`) because the
/// `Statement::query` API contract is `&self` — that's why the
/// `concurrent_tx` field lives behind a `Mutex`. Lock order is
/// `concurrent_tx` → `inner`, matching every other tx-aware
/// path on this connection.
pub(crate) fn with_snapshot_read<F, R>(&self, f: F) -> R
where
F: FnOnce(&Database) -> R,
{
let mut tx_slot = self.lock_concurrent_tx();
let mut db = self.lock();
match tx_slot.as_mut() {
None => f(&db),
Some(tx) => {
// Swap the snapshot in. Use a scope guard so the
// unswap happens even if `f` unwinds — leaving
// `db.tables` pointing at the tx's private clone
// would be catastrophic for later sibling-handle
// reads.
std::mem::swap(&mut db.tables, &mut tx.tables);
let prior_txn = db.txn.take();
db.txn = Some(TxnSnapshot {
tables: HashMap::new(),
});
struct UnswapGuard<'a> {
db: &'a mut Database,
tx_tables: &'a mut HashMap<String, Table>,
prior_txn: Option<TxnSnapshot>,
armed: bool,
}
impl Drop for UnswapGuard<'_> {
fn drop(&mut self) {
if self.armed {
self.db.txn = self.prior_txn.take();
std::mem::swap(&mut self.db.tables, self.tx_tables);
}
}
}
let mut guard = UnswapGuard {
db: &mut db,
tx_tables: &mut tx.tables,
prior_txn,
armed: true,
};
let result = f(guard.db);
// Disarm the guard explicitly and unwind in the
// expected order so the borrow checker can see
// both fields are accessed disjointly.
guard.armed = false;
guard.db.txn = guard.prior_txn.take();
std::mem::swap(&mut guard.db.tables, guard.tx_tables);
result
}
}
}
/// Internal — runs `sql` against the engine. If a concurrent
/// transaction is open, swaps the transaction's private
/// `tables` map in for the duration of the dispatch so writes
/// land on the snapshot, not the live database. Otherwise
/// falls straight through to the legacy
/// [`crate::sql::process_command`] path.
fn execute_dispatch(&mut self, sql: &str) -> Result<String> {
if self.concurrent_tx_is_open() {
self.execute_in_concurrent_tx(sql)
} else {
let mut db = self.lock();
crate::sql::process_command(sql, &mut db)
}
}
/// Phase 11.11a — render-aware twin of
/// [`Connection::execute_dispatch`]. Same branching, but the
/// non-concurrent path calls `process_command_with_render` and
/// the concurrent path goes through
/// [`Connection::execute_in_concurrent_tx_with_render`].
fn execute_dispatch_with_render(&mut self, sql: &str) -> Result<crate::sql::CommandOutput> {
if self.concurrent_tx_is_open() {
self.execute_in_concurrent_tx_with_render(sql)
} else {
let mut db = self.lock();
crate::sql::process_command_with_render(sql, &mut db)
}
}
/// Phase 11.4 — opens a `BEGIN CONCURRENT` transaction on this
/// connection. Allocates a new `TxHandle` (which advances the
/// MVCC clock by one), deep-clones the live tables into the
/// per-connection [`ConcurrentTx`] state, and records the
/// schema fingerprint. Returns the status string the REPL
/// renders (`"BEGIN"`).
///
/// Errors if the database isn't in `journal_mode = mvcc`, or
/// if any transaction (concurrent or legacy `BEGIN`) is
/// already open on this connection.
fn begin_concurrent(&mut self) -> Result<String> {
// Lock order: concurrent_tx → inner (db). Keep this order
// in every method that touches both — deadlock-free by
// construction.
let mut tx_slot = self.lock_concurrent_tx();
if tx_slot.is_some() {
return Err(SQLRiteError::General(
"cannot BEGIN CONCURRENT: a concurrent transaction is already open".to_string(),
));
}
let db = self.lock();
if db.journal_mode() != JournalMode::Mvcc {
return Err(SQLRiteError::General(
"BEGIN CONCURRENT requires `PRAGMA journal_mode = mvcc;` first".to_string(),
));
}
if db.in_transaction() {
return Err(SQLRiteError::General(
"cannot BEGIN CONCURRENT: a non-concurrent transaction is already open".to_string(),
));
}
if db.is_read_only() {
return Err(SQLRiteError::General(
"cannot BEGIN CONCURRENT: database is opened read-only".to_string(),
));
}
let tx = ConcurrentTx::begin(db.mvcc_clock(), db.mv_store().active_registry(), &db.tables);
drop(db);
*tx_slot = Some(tx);
Ok("BEGIN".to_string())
}
/// Phase 11.4 — commits the open concurrent transaction.
///
/// Steps (Hekaton-style optimistic validation):
///
/// 1. Diff the transaction's private `tables` against the
/// live `Database::tables` to derive the write-set.
/// 2. For each row in the write-set, walk the
/// [`MvStore`](crate::mvcc::MvStore) chain. If any
/// committed version's `begin > tx.begin_ts`, abort with
/// [`SQLRiteError::Busy`] — some other transaction
/// superseded the row after our snapshot.
/// 3. Allocate a `commit_ts`, push every write into the
/// `MvStore` as a committed version (caps the previous
/// latest's `end` at `commit_ts`), and apply the writes
/// to `Database::tables`.
/// 4. Run the legacy `save_database` so the changes durable
/// via the existing WAL.
///
/// On `Busy`, the transaction is dropped (rollback semantics)
/// and the caller should retry with a fresh `BEGIN
/// CONCURRENT`.
fn commit_concurrent(&mut self) -> Result<String> {
let mut tx_slot = self.lock_concurrent_tx();
let tx = tx_slot
.take()
.expect("commit_concurrent called without active tx (caller should check)");
// Drop the slot guard — we already moved the tx out, and
// holding it across `self.lock()` would violate the
// `concurrent_tx → inner` order if any helper were to
// grow a reverse acquire.
drop(tx_slot);
let mut db = self.lock();
// Schema drift catches DDL run on the live database under
// us. v0 rejects DDL inside the tx; outside is the only
// way to land here.
if !tx.schema_unchanged(&db.tables) {
return Err(SQLRiteError::Busy(
"schema changed under BEGIN CONCURRENT (a CREATE/DROP/ALTER ran on \
another connection); transaction rolled back"
.to_string(),
));
}
// Diff against the BEGIN-time clone, NOT against the live
// database. Other concurrent transactions may have
// committed between our BEGIN and now; their writes show
// up in `db.tables` but aren't part of our write-set, and
// diffing against live would surface them as bogus DELETEs
// (silently undoing someone else's commit).
let writes = diff_tables_for_writes(&tx.tables_at_begin, &tx.tables)?;
// Validation pass: walk the write-set against MvStore.
let mv = db.mv_store().clone();
let begin_ts = tx.begin_ts();
for (row_id, _payload) in &writes {
if let Some(latest_begin) = mv.latest_committed_begin(row_id) {
if latest_begin > begin_ts {
return Err(SQLRiteError::Busy(format!(
"write-write conflict on {}/{}: another transaction committed \
this row at ts={latest_begin} (after our begin_ts={begin_ts}); \
transaction rolled back, retry with a fresh BEGIN CONCURRENT",
row_id.table, row_id.rowid,
)));
}
}
}
// Validation passed — allocate commit_ts and apply.
let commit_ts = db.mvcc_clock().tick();
for (row_id, payload) in &writes {
let version = RowVersion::committed(commit_ts, payload.clone());
// `push_committed`'s monotonic-begin check is satisfied
// because validation above ensured no version has
// begin >= commit_ts (commit_ts is freshly ticked).
mv.push_committed(row_id.clone(), version)
.map_err(|e| SQLRiteError::General(format!("MvStore push failed: {e}")))?;
}
// Apply the diff to Database::tables. Reuses the legacy
// INSERT / UPDATE / DELETE shape so post-commit reads on
// any handle (concurrent or legacy) see the latest row
// values via the existing read path.
apply_writes_to_live(&mut db, &tx.tables, &writes)?;
// Phase 11.9 — append the MVCC commit batch into the WAL
// before the legacy page-commit flush. The MVCC frame is
// not fsync'd on its own; the legacy `save_database`
// below ends with a commit-frame fsync that durably
// includes every byte written since the previous fsync,
// covering this batch too. A crash between the two
// append calls drops both — torn-write atomicity for the
// whole transaction.
//
// For in-memory databases (no source_path) we skip the
// WAL append: there's no pager and no fsync. MVCC state
// stays in the in-memory `MvStore` for the lifetime of
// the process.
if let Some(pager) = db.pager.as_mut() {
let records = writes
.iter()
.map(|(row, payload)| MvccLogRecord {
row: row.clone(),
payload: payload.clone(),
})
.collect();
let batch = MvccCommitBatch { commit_ts, records };
if let Err(append_err) = pager.append_mvcc_batch(&batch) {
return Err(SQLRiteError::General(format!(
"COMMIT failed appending MVCC log record: {append_err}"
)));
}
// Bump the WAL header's persisted clock high-water so
// the next checkpoint truncates with a header that
// covers this commit. The MVCC frames themselves
// also carry `commit_ts`, so even an un-checkpointed
// crash still seeds the clock correctly via the
// replayer's max-with-frames logic — this just keeps
// the post-checkpoint path correct.
if let Err(set_err) = pager.observe_clock_high_water(commit_ts) {
return Err(SQLRiteError::General(format!(
"COMMIT failed updating WAL clock high-water: {set_err}"
)));
}
}
// Persist via the legacy WAL — the on-disk format is
// unchanged in 11.4+. The page-commit's fsync below
// covers the MVCC frame appended above; one atomic
// boundary for the whole transaction.
if let Some(path) = db.source_path.clone() {
if let Err(save_err) = pager::save_database(&mut db, &path) {
return Err(SQLRiteError::General(format!(
"COMMIT failed during save_database: {save_err}"
)));
}
}
// Phase 11.6 — per-commit GC sweep on the write-set's
// chains. Drop the `tx` handle FIRST so its `begin_ts`
// exits the active-tx registry; otherwise the watermark
// is still pinned at our own `begin_ts` and we'd preserve
// versions we're free to reclaim. Only the rows this
// transaction wrote can have a newly-capped `end` worth
// sweeping — the broader GC story (full-store sweeps,
// background drains) lands behind explicit
// [`Connection::vacuum_mvcc`] / [`MvStore::gc_all`].
drop(tx);
let watermark = mv.active_watermark();
for (row_id, _) in &writes {
mv.gc_chain(row_id, watermark);
}
Ok("COMMIT".to_string())
}
/// Phase 11.4 — rolls back the open concurrent transaction.
/// Drops the per-connection state; the live `Database::tables`
/// is unchanged because writes never landed there.
fn rollback_concurrent(&mut self) -> Result<String> {
// tx drops here; TxHandle unregisters automatically.
let _ = self
.lock_concurrent_tx()
.take()
.expect("rollback_concurrent called without active tx (caller should check)");
Ok("ROLLBACK".to_string())
}
/// Phase 11.4 — runs `sql` against the open concurrent
/// transaction's private cloned tables. Implementation: swap
/// `db.tables` <-> `tx.tables` for the duration of the
/// dispatch, suppress auto-save by parking a dummy
/// [`TxnSnapshot`] on `db.txn`, then unwind both.
///
/// DDL is rejected before the swap with a typed error —
/// schema mutations inside a `BEGIN CONCURRENT` block aren't
/// supported in v0 (the plan flags this as an explicit
/// non-goal, and the swap-based dispatch can't safely apply
/// new tables to the live database without a separate merge
/// pass).
fn execute_in_concurrent_tx(&mut self, sql: &str) -> Result<String> {
self.execute_in_concurrent_tx_with_render(sql)
.map(|o| o.status)
}
/// Render-aware twin of [`Connection::execute_in_concurrent_tx`].
/// Same swap-based dispatch; the only difference is the inner
/// call goes through `process_command_with_render` so the
/// caller gets the rendered SELECT table (Phase 11.11a).
fn execute_in_concurrent_tx_with_render(
&mut self,
sql: &str,
) -> Result<crate::sql::CommandOutput> {
let intent = legacy_tx_intent(sql);
if matches!(intent, LegacyTxIntent::Begin) {
return Err(SQLRiteError::General(
"cannot BEGIN: a concurrent transaction is already open".to_string(),
));
}
// String-prefix DDL check. Rejecting up front means the
// tx's snapshot never gets a half-applied schema change —
// which would be hard to merge back at commit because the
// live database wouldn't agree.
if rejects_in_concurrent_tx(sql) {
return Err(SQLRiteError::General(
"DDL is not supported inside BEGIN CONCURRENT (v0 limitation; the \
transaction stays open, the live schema is unchanged)"
.to_string(),
));
}
// Lock order: concurrent_tx → inner (db). Same shape as
// every other tx-aware path on this connection.
let mut tx_slot = self.lock_concurrent_tx();
let tx = tx_slot
.as_mut()
.expect("execute_in_concurrent_tx called without active tx");
let mut db = self.inner.lock().unwrap_or_else(|e| {
panic!("sqlrite: database mutex poisoned: {e}");
});
// Swap the snapshot in. After this, db.tables IS the tx's
// private clone; the executor mutates it freely.
std::mem::swap(&mut db.tables, &mut tx.tables);
// Suppress auto-save with a dummy TxnSnapshot. The
// executor's auto-save check looks at `db.in_transaction()`,
// which is true while `db.txn` is `Some`. The dummy
// snapshot is never restored from — `tx` itself owns the
// rollback story for concurrent transactions.
let prior_txn = db.txn.take();
db.txn = Some(TxnSnapshot {
tables: HashMap::new(),
});
let result = crate::sql::process_command_with_render(sql, &mut db);
// Unwind in reverse: take the dummy txn off (don't restore
// anything from it), swap the tables back.
db.txn = prior_txn;
std::mem::swap(&mut db.tables, &mut tx.tables);
result
}
/// Prepares a statement for repeated execution or row iteration.
/// SQLR-23: the SQL is parsed once at prepare time (sqlparser walk
/// plus placeholder rewriting), and the resulting AST is cached
/// on the [`Statement`] for re-execution without further parsing.
///
/// Use [`Statement::query`] / [`Statement::run`] for unbound
/// execution, or [`Statement::query_with_params`] /
/// [`Statement::execute_with_params`] to substitute `?`
/// placeholders.
pub fn prepare<'c>(&'c mut self, sql: &str) -> Result<Statement<'c>> {
let plan = Arc::new(CachedPlan::compile(sql)?);
Ok(Statement { conn: self, plan })
}
/// Same as [`Connection::prepare`], but consults a small
/// per-connection LRU first. SQLR-23 — for hot statements
/// (the body of an INSERT loop, a frequently-rerun lookup) the
/// sqlparser walk is amortized to once across the connection's
/// lifetime, not once per `prepare()`.
///
/// Default cache capacity is 16; tune with
/// [`Connection::set_prepared_cache_capacity`].
pub fn prepare_cached<'c>(&'c mut self, sql: &str) -> Result<Statement<'c>> {
// Lookup-or-insert. Found entries are also moved to the back
// (most-recently-used) so capacity-eviction runs LRU.
let plan = if let Some(pos) = self.prep_cache.iter().position(|(k, _)| k == sql) {
let (k, v) = self.prep_cache.remove(pos).unwrap();
self.prep_cache.push_back((k, Arc::clone(&v)));
v
} else {
let plan = Arc::new(CachedPlan::compile(sql)?);
self.prep_cache
.push_back((sql.to_string(), Arc::clone(&plan)));
while self.prep_cache.len() > self.prep_cache_cap {
self.prep_cache.pop_front();
}
plan
};
Ok(Statement { conn: self, plan })
}
/// SQLR-23 — sets the maximum number of cached prepared plans
/// (matches `prepare_cached`'s default 16). Reducing below the
/// current size evicts the oldest entries; setting to 0 disables
/// caching but `prepare_cached` still works (it just always
/// re-parses).
pub fn set_prepared_cache_capacity(&mut self, cap: usize) {
self.prep_cache_cap = cap;
while self.prep_cache.len() > cap {
self.prep_cache.pop_front();
}
}
/// SQLR-23 — current number of plans held by the prepared-statement
/// cache. Useful for tests / introspection; not load-bearing for
/// the public API.
pub fn prepared_cache_len(&self) -> usize {
self.prep_cache.len()
}
/// Returns `true` while a `BEGIN … COMMIT/ROLLBACK` block is open
/// against this connection.
pub fn in_transaction(&self) -> bool {
self.lock().in_transaction()
}
/// Returns the current auto-VACUUM threshold (SQLR-10). After a
/// page-releasing DDL (DROP TABLE / DROP INDEX / ALTER TABLE DROP
/// COLUMN) commits, the engine compacts the file in place if the
/// freelist exceeds this fraction of `page_count`. New connections
/// default to `Some(0.25)` (SQLite parity); `None` means the
/// trigger is disabled. See [`Connection::set_auto_vacuum_threshold`].
pub fn auto_vacuum_threshold(&self) -> Option<f32> {
self.lock().auto_vacuum_threshold()
}
/// Sets the auto-VACUUM threshold (SQLR-10). `Some(t)` with `t` in
/// `0.0..=1.0` arms the trigger; `None` disables it. Values outside
/// `0.0..=1.0` (or NaN / infinite) return a typed error rather than
/// silently saturating. The setting is per-database runtime state —
/// closing the last connection to a database drops it; new
/// connections start at the default `Some(0.25)`.
///
/// Calling this on an in-memory or read-only database is allowed
/// (it just won't fire — there's nothing to compact / no writes
/// will reach the trigger).
pub fn set_auto_vacuum_threshold(&mut self, threshold: Option<f32>) -> Result<()> {
self.lock().set_auto_vacuum_threshold(threshold)
}
/// Returns `true` if the connection was opened read-only. Mutating
/// statements on a read-only connection return a typed error.
pub fn is_read_only(&self) -> bool {
self.lock().is_read_only()
}
/// Phase 11.3 — current journal mode. `Wal` (default) keeps every
/// pre-Phase-11 caller's behaviour. `Mvcc` is opt-in via
/// `PRAGMA journal_mode = mvcc;`. Per-database — every
/// [`Connection::connect`] sibling sees the same value.
///
/// Today this is observable but doesn't change query behaviour;
/// 11.4 wires `Mvcc` mode into the read/write paths.
pub fn journal_mode(&self) -> crate::mvcc::JournalMode {
self.lock().journal_mode()
}
/// Phase 11.6 — explicit full-store MVCC garbage collection
/// pass. Walks every row in the [`MvStore`](crate::mvcc::MvStore)
/// chain and drops versions whose `end` timestamp is below the
/// current watermark (the smallest `begin_ts` across all
/// in-flight transactions on this database, or `u64::MAX` when
/// nothing is in flight).
///
/// Returns the number of versions reclaimed. Cheap when the
/// store is small; a future optimisation will give it
/// background-thread semantics behind a configurable cadence.
///
/// Per-commit GC already sweeps the rows each transaction
/// touched, so most callers don't need this — it's the
/// "vacuum the whole store" escape hatch for memory-pressure
/// workloads or test suites that want a deterministic baseline.
/// Safe to call even if `journal_mode` is `Wal` (the store is
/// just empty); useful for tests that want to assert "no
/// versions left."
pub fn vacuum_mvcc(&self) -> usize {
let db = self.lock();
let mv = db.mv_store().clone();
let watermark = mv.active_watermark();
drop(db);
mv.gc_all(watermark)
}
/// Escape hatch for advanced callers — locks the shared `Database`
/// and hands back the guard. Not part of the stable API; will move
/// or change as Phase 10's MVCC sub-phases land.
///
/// Bind the guard to a local before calling functions that take
/// `&Database`:
///
/// ```no_run
/// # use sqlrite::Connection;
/// # fn use_db(_d: &sqlrite::Database) {}
/// let conn = Connection::open_in_memory()?;
/// let db = conn.database();
/// use_db(&db);
/// # Ok::<(), sqlrite::SQLRiteError>(())
/// ```
#[doc(hidden)]
pub fn database(&self) -> MutexGuard<'_, Database> {
self.lock()
}
#[doc(hidden)]
pub fn database_mut(&mut self) -> MutexGuard<'_, Database> {
self.lock()
}
}
impl std::fmt::Debug for Connection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let db = self.lock();
f.debug_struct("Connection")
.field("in_transaction", &db.in_transaction())
.field("read_only", &db.is_read_only())
.field("tables", &db.tables.len())
.field("prep_cache_len", &self.prep_cache.len())
.field("handles", &Arc::strong_count(&self.inner))
.field("concurrent_tx", &self.concurrent_tx_is_open())
.finish()
}
}
// =====================================================================
// Phase 11.4 — concurrent-transaction helpers
//
// These live as free functions (rather than methods) so the borrow
// checker stays out of the way: callers in `Connection::execute*`
// already juggle mutable borrows of `self.concurrent_tx` and
// `self.inner.lock()` simultaneously, and threading a third `&mut self`
// through helpers would force every helper to either take owned
// arguments or split the borrow at the call site. Free functions take
// exactly the slices they need.
/// Coarse classifier for tx-control statements. Spotted by string
/// match before `sqlparser` runs, just like the PRAGMA intercept.
/// Distinguishing `BEGIN CONCURRENT` from plain `BEGIN` matters
/// because plain `BEGIN` still routes through the legacy
/// deep-clone snapshot path; only `BEGIN CONCURRENT` opens an
/// MVCC transaction.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ConcurrentTxIntent {
/// `BEGIN CONCURRENT` — opens an MVCC transaction.
Begin,
/// `COMMIT` (with optional `TRANSACTION` / `WORK` / `;`).
Commit,
/// `ROLLBACK` (with optional `TRANSACTION` / `WORK` / `;`).
Rollback,
/// Anything else — falls through to the regular dispatch.
None,
}
/// Coarse classifier for legacy tx-control statements (used to
/// reject nested `BEGIN` inside an open `BEGIN CONCURRENT`).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LegacyTxIntent {
/// Plain `BEGIN` / `BEGIN TRANSACTION` / `BEGIN DEFERRED` etc.
/// — every shape that *isn't* `BEGIN CONCURRENT`.
Begin,
/// Anything else.
None,
}
fn concurrent_tx_intent(sql: &str) -> ConcurrentTxIntent {
let tokens = lowercase_tokens(sql);
let head = tokens.as_slice();
match head {
[first, second, ..] if first == "begin" && second == "concurrent" => {
ConcurrentTxIntent::Begin
}
[first, ..] if first == "commit" => ConcurrentTxIntent::Commit,
[first, ..] if first == "end" => ConcurrentTxIntent::Commit,
[first, ..] if first == "rollback" => ConcurrentTxIntent::Rollback,
_ => ConcurrentTxIntent::None,
}
}
fn legacy_tx_intent(sql: &str) -> LegacyTxIntent {
let tokens = lowercase_tokens(sql);
let head = tokens.as_slice();
match head {
// Plain BEGIN — but not BEGIN CONCURRENT, which the
// concurrent-tx intent already caught.
[first, ..] if first == "begin" => {
if matches!(head.get(1).map(String::as_str), Some("concurrent")) {
LegacyTxIntent::None
} else {
LegacyTxIntent::Begin
}
}
[first, ..] if first == "start" => LegacyTxIntent::Begin,
_ => LegacyTxIntent::None,
}
}
/// Splits `sql` on whitespace + punctuation that's not part of
/// keywords, lowercases each piece, and returns the resulting
/// token list. Coarse enough to spot `BEGIN`, `COMMIT`,
/// `ROLLBACK`, `CONCURRENT`, `TRANSACTION`, etc.; not a real
/// tokenizer.
fn lowercase_tokens(sql: &str) -> Vec<String> {
sql.split(|c: char| c.is_whitespace() || c == ';' || c == '(' || c == ')' || c == ',')
.filter(|t| !t.is_empty())
.map(|t| t.to_ascii_lowercase())
.collect()
}
/// Statement shapes that must be rejected inside a `BEGIN
/// CONCURRENT` block. v0 covers the canonical DDL — CREATE
/// TABLE, CREATE INDEX, DROP TABLE, DROP INDEX, ALTER TABLE,
/// VACUUM. Cheap string-prefix check; misses contrived
/// formattings like a leading SQL comment, but the rejection is