-
Notifications
You must be signed in to change notification settings - Fork 56
Expand file tree
/
Copy pathtransaction.rs
More file actions
262 lines (242 loc) · 10.4 KB
/
Copy pathtransaction.rs
File metadata and controls
262 lines (242 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
//! [`ConcurrentTx`] — per-`Connection` `BEGIN CONCURRENT`
//! transaction state (Phase 11.4).
//!
//! Per [`docs/concurrent-writes-plan.md`](../../../docs/concurrent-writes-plan.md):
//!
//! > `BEGIN CONCURRENT` doesn't acquire any locks; writes go to the
//! > version chain tagged with the transaction id; reads use
//! > snapshot-isolation visibility.
//!
//! ## How this slice does it
//!
//! Each `Connection` owns at most one [`ConcurrentTx`] at a time.
//! When the user issues `BEGIN CONCURRENT`, the connection deep-
//! clones the database's `tables` map into `ConcurrentTx::tables`
//! and stores a [`TxHandle`] (which advances the
//! [`MvccClock`] to allocate a `begin_ts`). Subsequent `INSERT` /
//! `UPDATE` / `DELETE` statements run against the cloned `tables`
//! (the executor thinks it's writing to the live database —
//! `Connection` swaps the cloned tables in just for the duration
//! of each statement). The live `Database::tables` stays
//! unchanged until commit.
//!
//! At `COMMIT`:
//!
//! 1. Diff `tx.tables_at_begin` (the immutable BEGIN-time clone)
//! vs `tx.tables` (post-write) to derive a write-set: every
//! `(RowID, payload)` the transaction changed.
//! 2. For each row in the write-set, walk the
//! [`super::MvStore`] chain. If any committed version's
//! `begin > tx.begin_ts`, ABORT with
//! [`crate::error::SQLRiteError::Busy`] — some other
//! transaction touched the row after our snapshot.
//! 3. On success, allocate a `commit_ts`, push each write into
//! the `MvStore` as a committed version (caps the previous
//! latest version's `end` at `commit_ts`), apply the writes to
//! `db.tables`, and run the legacy `save_database` so changes
//! persist via the existing WAL.
//!
//! `ROLLBACK` just drops the `ConcurrentTx` — the cloned tables
//! are released, the `TxHandle` drops (unregistering the
//! transaction from `ActiveTxRegistry`), and `db.tables` is
//! unchanged because we never touched it.
//!
//! ## What this slice doesn't do (yet)
//!
//! - **Snapshot-isolated reads inside the transaction.** Reads
//! inside `BEGIN CONCURRENT` see the cloned-at-BEGIN state of
//! the tables (because the executor is dispatched against
//! `tx.tables`), but they don't consult `MvStore` to filter by
//! `begin_ts`. Concurrent writes from outside the tx land on
//! `db.tables`, not on our snapshot — so we don't see them
//! inside the tx. That's *partial* snapshot isolation: it
//! isolates correctly under the current "lock the database
//! per statement" mutex, but doesn't survive once the engine
//! genuinely supports overlapping in-flight transactions
//! reading concurrently.
//! - **DDL inside `BEGIN CONCURRENT`.** v0 rejects with a typed
//! error before the swap, mirroring the plan's stated
//! non-goal.
//! - **`AUTOINCREMENT`.** Same — rejected with a typed error.
//! - **Persistence of the in-flight write-set across crashes.**
//! The write-set lives entirely in memory until commit. A
//! crash mid-transaction loses everything — that's correct
//! (the transaction never committed), and the legacy WAL
//! still owns durability of `Database::tables` for committed
//! data. Phase 11.5 adds the MVCC log-record frame format
//! that lets writes start landing in the WAL pre-commit.
use std::collections::HashMap;
use crate::sql::db::table::Table;
use super::{ActiveTxRegistry, MvccClock, TxHandle};
/// Per-`Connection` snapshot of `BEGIN CONCURRENT` state.
///
/// Lives on [`Connection`](crate::Connection), not on
/// [`Database`](crate::Database) — multiple sibling connections
/// each carry their own concurrent transaction without stepping
/// on each other's snapshots.
#[derive(Debug)]
pub struct ConcurrentTx {
/// RAII handle into the `ActiveTxRegistry`. Drops when this
/// struct drops (commit, rollback, or `Connection` close),
/// at which point the transaction is unregistered.
pub handle: TxHandle,
/// Working snapshot of `Database::tables` taken at `BEGIN
/// CONCURRENT` via `Table::deep_clone`. Each statement's
/// executor pass transparently swaps this in for `db.tables`
/// so writes land here, not on the live database.
pub tables: HashMap<String, Table>,
/// Immutable second clone of `Database::tables` taken at
/// `BEGIN`. Diffing `tables` against **this** at commit
/// produces the write-set. We can't diff against the live
/// `Database::tables` directly because between our `BEGIN`
/// and our `COMMIT`, *other* concurrent transactions may
/// have committed — their writes show up as differences
/// against the live state but aren't ours, and treating
/// them as our DELETEs would silently undo someone else's
/// commit. The doubled memory cost (two full clones per
/// transaction) is the price for that correctness in v0;
/// the obvious follow-up is a per-touched-row begin-state
/// map that captures only the rows we actually read or
/// wrote.
pub tables_at_begin: HashMap<String, Table>,
/// Sorted table-name fingerprint of `Database::tables` at
/// `BEGIN`. Used at commit to detect that DDL ran on the live
/// database under us — v0 rejects DDL inside the tx, but
/// nothing prevents another connection from running it
/// outside.
pub schema_at_begin: Vec<String>,
}
impl ConcurrentTx {
/// Allocates a new transaction. Advances the clock by one
/// (the `TxHandle::begin_ts`), records the table-name
/// fingerprint, and deep-clones every table.
///
/// Caller is expected to have already verified
/// `journal_mode == Mvcc` and that no transaction is open.
pub fn begin(
clock: &MvccClock,
registry: &ActiveTxRegistry,
live_tables: &HashMap<String, Table>,
) -> Self {
let handle = registry.register(clock);
let tables: HashMap<String, Table> = live_tables
.iter()
.map(|(k, v)| (k.clone(), v.deep_clone()))
.collect();
let tables_at_begin: HashMap<String, Table> = live_tables
.iter()
.map(|(k, v)| (k.clone(), v.deep_clone()))
.collect();
let mut schema_at_begin: Vec<String> = live_tables.keys().cloned().collect();
schema_at_begin.sort();
Self {
handle,
tables,
tables_at_begin,
schema_at_begin,
}
}
/// Convenience — the `begin_ts` snapshot timestamp this
/// transaction took at BEGIN. Used at commit to validate
/// against `MvStore` versions that committed after this
/// snapshot.
pub fn begin_ts(&self) -> u64 {
self.handle.begin_ts()
}
/// True if `live_tables` has the same table-name set this
/// transaction recorded at BEGIN. Used at commit to surface a
/// typed error rather than silently committing onto a
/// schema that drifted under us.
pub fn schema_unchanged(&self, live_tables: &HashMap<String, Table>) -> bool {
let mut current: Vec<&String> = live_tables.keys().collect();
current.sort();
if current.len() != self.schema_at_begin.len() {
return false;
}
current
.iter()
.zip(self.schema_at_begin.iter())
.all(|(a, b)| **a == *b)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sql::db::table::Table;
use crate::sql::parser::create::CreateQuery;
use std::collections::HashMap;
fn empty_table(name: &str) -> Table {
let _ = name;
// Build a minimal create-table to materialise a Table —
// mirror the existing test helpers that construct via the
// CREATE pipeline rather than poking the struct directly.
use crate::sql::dialect::SqlriteDialect;
use sqlparser::parser::Parser;
let sql = format!(
"CREATE TABLE {name} (id INTEGER PRIMARY KEY, v TEXT);",
name = name,
);
let dialect = SqlriteDialect::new();
let mut ast = Parser::parse_sql(&dialect, &sql).unwrap();
let stmt = ast.pop().unwrap();
let q = CreateQuery::new(&stmt).unwrap();
Table::new(q)
}
fn live_with_one_table(name: &str) -> HashMap<String, Table> {
let mut m = HashMap::new();
m.insert(name.to_string(), empty_table(name));
m
}
#[test]
fn begin_clones_tables_and_advances_clock() {
let clock = MvccClock::new(0);
let registry = ActiveTxRegistry::new();
let live = live_with_one_table("t");
let tx = ConcurrentTx::begin(&clock, ®istry, &live);
// Clock advanced by one (begin_ts).
assert_eq!(clock.now(), 1);
assert_eq!(tx.begin_ts(), 1);
// Every table cloned.
assert!(tx.tables.contains_key("t"));
// Schema fingerprint matches.
assert_eq!(tx.schema_at_begin, vec!["t".to_string()]);
// Registered with the registry.
assert_eq!(registry.active_count(), 1);
}
#[test]
fn dropping_tx_unregisters() {
let clock = MvccClock::new(0);
let registry = ActiveTxRegistry::new();
let live = live_with_one_table("t");
let tx = ConcurrentTx::begin(&clock, ®istry, &live);
assert_eq!(registry.active_count(), 1);
drop(tx);
assert_eq!(registry.active_count(), 0);
}
/// Clones really are deep — mutating the live map after
/// `begin` doesn't show up in `tx.tables`. The contract every
/// COMMIT-time diff relies on.
#[test]
fn clone_is_independent_of_live_tables() {
let clock = MvccClock::new(0);
let registry = ActiveTxRegistry::new();
let mut live = live_with_one_table("t");
let tx = ConcurrentTx::begin(&clock, ®istry, &live);
// Add a new table to live — tx's snapshot must be unchanged.
live.insert("u".to_string(), empty_table("u"));
assert_eq!(tx.tables.len(), 1);
assert!(tx.tables.contains_key("t"));
assert!(!tx.tables.contains_key("u"));
// schema_unchanged catches the drift.
assert!(!tx.schema_unchanged(&live));
}
#[test]
fn schema_unchanged_recognises_identical_set() {
let clock = MvccClock::new(0);
let registry = ActiveTxRegistry::new();
let live = live_with_one_table("t");
let tx = ConcurrentTx::begin(&clock, ®istry, &live);
// No drift — same single table.
assert!(tx.schema_unchanged(&live));
}
}