-
Notifications
You must be signed in to change notification settings - Fork 56
Expand file tree
/
Copy pathconcurrent_writers.rs
More file actions
85 lines (78 loc) · 3.31 KB
/
Copy pathconcurrent_writers.rs
File metadata and controls
85 lines (78 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
//! End-to-end `BEGIN CONCURRENT` demo with two sibling handles.
//!
//! Run with: `cargo run --example concurrent_writers`
//!
//! Phase 11 (SQLR-22) opt-in MVCC. The example:
//!
//! 1. Opens a connection, opts the database into `journal_mode = mvcc`.
//! 2. Mints a sibling handle via `Connection::connect` so two writers
//! share the same backing database.
//! 3. Runs two concurrent transactions:
//! - A and B touch *disjoint* rows → both commit.
//! - A and B touch the *same* row → the second commit fails
//! with `SQLRiteError::Busy`; the retry takes a fresh
//! `begin_ts`, observes the post-commit state, and lands.
//!
//! The retry loop is the canonical shape every SDK reuses; see
//! [`docs/concurrent-writes.md`](../../docs/concurrent-writes.md).
use sqlrite::{Connection, Result};
fn main() -> Result<()> {
let mut a = Connection::open_in_memory()?;
a.execute("PRAGMA journal_mode = mvcc")?;
a.execute(
"CREATE TABLE accounts (
id INTEGER PRIMARY KEY,
holder TEXT NOT NULL,
balance INTEGER NOT NULL
)",
)?;
a.execute("INSERT INTO accounts (id, holder, balance) VALUES (1, 'alice', 100)")?;
a.execute("INSERT INTO accounts (id, holder, balance) VALUES (2, 'bob', 100)")?;
// Sibling handle on the same Arc<Mutex<Database>>. In real apps
// you'd hand this to a worker thread; we keep it on the main
// thread to keep the demo readable.
let mut b = a.connect();
println!("=== Disjoint-row commits both succeed ===");
a.execute("BEGIN CONCURRENT")?;
b.execute("BEGIN CONCURRENT")?;
a.execute("UPDATE accounts SET balance = balance + 10 WHERE id = 1")?;
b.execute("UPDATE accounts SET balance = balance + 20 WHERE id = 2")?;
a.execute("COMMIT")?;
b.execute("COMMIT")?; // write-sets don't intersect — no conflict.
print_balances(&mut a)?;
println!("\n=== Same-row commits: A wins, B retries ===");
// Interleave BEGINs so A.begin_ts < B.begin_ts and both see the
// same pre-update value.
a.execute("BEGIN CONCURRENT")?;
b.execute("BEGIN CONCURRENT")?;
a.execute("UPDATE accounts SET balance = balance + 5 WHERE id = 1")?;
b.execute("UPDATE accounts SET balance = balance + 50 WHERE id = 1")?;
a.execute("COMMIT")?;
// B's commit sees a version newer than its own `begin_ts` → Busy.
// The transaction is already dropped on the failed COMMIT;
// there's no ROLLBACK to run. Start a fresh BEGIN CONCURRENT.
match b.execute("COMMIT") {
Err(e) if e.is_retryable() => {
eprintln!(" B lost the race: {e}");
b.execute("BEGIN CONCURRENT")?;
b.execute("UPDATE accounts SET balance = balance + 50 WHERE id = 1")?;
b.execute("COMMIT")?;
}
other => {
other?;
}
}
print_balances(&mut a)?;
Ok(())
}
fn print_balances(conn: &mut Connection) -> Result<()> {
let stmt = conn.prepare("SELECT id, holder, balance FROM accounts ORDER BY id")?;
let mut rows = stmt.query()?;
while let Some(row) = rows.next()? {
let id: i64 = row.get_by_name("id")?;
let holder: String = row.get_by_name("holder")?;
let balance: i64 = row.get_by_name("balance")?;
println!(" account {id} ({holder}): {balance}");
}
Ok(())
}