Skip to content

Commit 2c4e2dc

Browse files
authored
Merge pull request #807 from benthecarman/paginated-sqlite
Paginated sqlite
2 parents a555133 + 9b1da60 commit 2c4e2dc

File tree

5 files changed

+910
-87
lines changed

5 files changed

+910
-87
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ proptest = "1.0.0"
9191
regex = "1.5.6"
9292
criterion = { version = "0.7.0", features = ["async_tokio"] }
9393
ldk-node-062 = { package = "ldk-node", version = "=0.6.2" }
94+
ldk-node-070 = { package = "ldk-node", version = "=0.7.0" }
9495

9596
[target.'cfg(not(no_download))'.dev-dependencies]
9697
electrsd = { version = "0.36.1", default-features = false, features = ["legacy", "esplora_a33e97e1", "corepc-node_27_2"] }

src/io/sqlite_store/migrations.rs

Lines changed: 228 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -9,74 +9,167 @@ use lightning::io;
99
use rusqlite::Connection;
1010

1111
pub(super) fn migrate_schema(
12-
connection: &mut Connection, kv_table_name: &str, from_version: u16, to_version: u16,
12+
connection: &mut Connection, kv_table_name: &str, mut from_version: u16, to_version: u16,
1313
) -> io::Result<()> {
1414
assert!(from_version < to_version);
15-
if from_version == 1 && to_version == 2 {
16-
let tx = connection.transaction().map_err(|e| {
17-
let msg = format!(
18-
"Failed to migrate table {} from user_version {} to {}: {}",
19-
kv_table_name, from_version, to_version, e
20-
);
15+
if from_version == 1 && to_version >= 2 {
16+
migrate_v1_to_v2(connection, kv_table_name)?;
17+
from_version = 2;
18+
}
19+
if from_version == 2 && to_version >= 3 {
20+
migrate_v2_to_v3(connection, kv_table_name)?;
21+
}
22+
23+
Ok(())
24+
}
25+
26+
fn migrate_v1_to_v2(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> {
27+
let tx = connection.transaction().map_err(|e| {
28+
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
29+
io::Error::new(io::ErrorKind::Other, msg)
30+
})?;
31+
32+
// Rename 'namespace' column to 'primary_namespace'
33+
let sql = format!(
34+
"ALTER TABLE {}
35+
RENAME COLUMN namespace TO primary_namespace;",
36+
kv_table_name
37+
);
38+
39+
tx.execute(&sql, []).map_err(|e| {
40+
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
41+
io::Error::new(io::ErrorKind::Other, msg)
42+
})?;
43+
44+
// Add new 'secondary_namespace' column
45+
let sql = format!(
46+
"ALTER TABLE {}
47+
ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;",
48+
kv_table_name
49+
);
50+
51+
tx.execute(&sql, []).map_err(|e| {
52+
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
53+
io::Error::new(io::ErrorKind::Other, msg)
54+
})?;
55+
56+
// Update user_version
57+
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 2u16, |_| Ok(())).map_err(
58+
|e| {
59+
let msg = format!("Failed to upgrade user_version from 1 to 2: {}", e);
2160
io::Error::new(io::ErrorKind::Other, msg)
22-
})?;
61+
},
62+
)?;
63+
64+
tx.commit().map_err(|e| {
65+
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
66+
io::Error::new(io::ErrorKind::Other, msg)
67+
})?;
68+
69+
Ok(())
70+
}
71+
72+
fn migrate_v2_to_v3(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> {
73+
let map_err = |e: rusqlite::Error| -> io::Error {
74+
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
75+
io::Error::new(io::ErrorKind::Other, msg)
76+
};
77+
78+
// Check whether the primary key already includes secondary_namespace.
79+
// Tables migrated from v1 have PK (primary_namespace, key) only — missing
80+
// secondary_namespace. Tables created directly as v2 already have the correct
81+
// PK (primary_namespace, secondary_namespace, key).
82+
let needs_table_rewrite = {
83+
let sql = format!("PRAGMA table_info({})", kv_table_name);
84+
let mut stmt = connection.prepare(&sql).map_err(map_err)?;
85+
let mut pk_cols: Vec<(i64, String)> = stmt
86+
.query_map([], |row| Ok((row.get::<_, i64>(5)?, row.get::<_, String>(1)?)))
87+
.map_err(map_err)?
88+
.collect::<Result<Vec<_>, _>>()
89+
.map_err(map_err)?
90+
.into_iter()
91+
.filter(|(pk, _)| *pk > 0)
92+
.collect();
93+
pk_cols.sort_by_key(|(pk, _)| *pk);
94+
let pk_names: Vec<&str> = pk_cols.iter().map(|(_, name)| name.as_str()).collect();
95+
pk_names != vec!["primary_namespace", "secondary_namespace", "key"]
96+
};
97+
98+
let tx = connection.transaction().map_err(|e| {
99+
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
100+
io::Error::new(io::ErrorKind::Other, msg)
101+
})?;
102+
103+
if needs_table_rewrite {
104+
// Full table rewrite to fix the primary key.
105+
let old_table = format!("{}_v2_old", kv_table_name);
106+
107+
let rename_sql = format!("ALTER TABLE {} RENAME TO {}", kv_table_name, old_table);
108+
tx.execute(&rename_sql, []).map_err(map_err)?;
23109

24-
// Rename 'namespace' column to 'primary_namespace'
25-
let sql = format!(
26-
"ALTER TABLE {}
27-
RENAME COLUMN namespace TO primary_namespace;",
110+
let create_table_sql = format!(
111+
"CREATE TABLE {} (
112+
primary_namespace TEXT NOT NULL,
113+
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
114+
key TEXT NOT NULL CHECK (key <> ''),
115+
value BLOB,
116+
sort_order INTEGER NOT NULL DEFAULT 0,
117+
PRIMARY KEY (primary_namespace, secondary_namespace, key)
118+
)",
28119
kv_table_name
29120
);
121+
tx.execute(&create_table_sql, []).map_err(map_err)?;
30122

31-
tx.execute(&sql, []).map_err(|e| {
32-
let msg = format!(
33-
"Failed to migrate table {} from user_version {} to {}: {}",
34-
kv_table_name, from_version, to_version, e
35-
);
36-
io::Error::new(io::ErrorKind::Other, msg)
37-
})?;
123+
// Copy data and backfill sort_order from ROWID for relative ordering
124+
let copy_sql = format!(
125+
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value, sort_order)
126+
SELECT primary_namespace, secondary_namespace, key, value, ROWID FROM {}",
127+
kv_table_name, old_table
128+
);
129+
tx.execute(&copy_sql, []).map_err(map_err)?;
38130

39-
// Add new 'secondary_namespace' column
40-
let sql = format!(
41-
"ALTER TABLE {}
42-
ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;",
131+
let drop_old_sql = format!("DROP TABLE {}", old_table);
132+
tx.execute(&drop_old_sql, []).map_err(map_err)?;
133+
} else {
134+
// Primary key is already correct — just add the sort_order column and backfill.
135+
let add_col_sql = format!(
136+
"ALTER TABLE {} ADD COLUMN sort_order INTEGER NOT NULL DEFAULT 0",
43137
kv_table_name
44138
);
139+
tx.execute(&add_col_sql, []).map_err(map_err)?;
45140

46-
tx.execute(&sql, []).map_err(|e| {
47-
let msg = format!(
48-
"Failed to migrate table {} from user_version {} to {}: {}",
49-
kv_table_name, from_version, to_version, e
50-
);
51-
io::Error::new(io::ErrorKind::Other, msg)
52-
})?;
53-
54-
// Update user_version
55-
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", to_version, |_| Ok(()))
56-
.map_err(|e| {
57-
let msg = format!(
58-
"Failed to upgrade user_version from {} to {}: {}",
59-
from_version, to_version, e
60-
);
61-
io::Error::new(io::ErrorKind::Other, msg)
62-
})?;
141+
let backfill_sql = format!("UPDATE {} SET sort_order = ROWID", kv_table_name);
142+
tx.execute(&backfill_sql, []).map_err(map_err)?;
143+
}
63144

64-
tx.commit().map_err(|e| {
65-
let msg = format!(
66-
"Failed to migrate table {} from user_version {} to {}: {}",
67-
kv_table_name, from_version, to_version, e
68-
);
145+
// Create composite index for paginated listing
146+
let sql = format!(
147+
"CREATE INDEX idx_{}_paginated ON {} (primary_namespace, secondary_namespace, sort_order DESC, key ASC)",
148+
kv_table_name, kv_table_name
149+
);
150+
tx.execute(&sql, []).map_err(map_err)?;
151+
152+
// Update user_version
153+
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 3u16, |_| Ok(())).map_err(
154+
|e| {
155+
let msg = format!("Failed to upgrade user_version from 2 to 3: {}", e);
69156
io::Error::new(io::ErrorKind::Other, msg)
70-
})?;
71-
}
157+
},
158+
)?;
159+
160+
tx.commit().map_err(|e| {
161+
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
162+
io::Error::new(io::ErrorKind::Other, msg)
163+
})?;
164+
72165
Ok(())
73166
}
74167

75168
#[cfg(test)]
76169
mod tests {
77170
use std::fs;
78171

79-
use lightning::util::persist::KVStoreSync;
172+
use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync};
80173
use rusqlite::{named_params, Connection};
81174

82175
use crate::io::sqlite_store::SqliteStore;
@@ -128,7 +221,7 @@ mod tests {
128221
let sql = format!(
129222
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
130223
kv_table_name
131-
);
224+
);
132225
let mut stmt = connection.prepare_cached(&sql).unwrap();
133226

134227
stmt.execute(named_params! {
@@ -166,4 +259,90 @@ mod tests {
166259
// Check we can continue to use the store just fine.
167260
do_read_write_remove_list_persist(&store);
168261
}
262+
263+
#[test]
264+
fn rwrl_post_schema_2_migration() {
265+
let old_schema_version = 2u16;
266+
267+
let mut temp_path = random_storage_path();
268+
temp_path.push("rwrl_post_schema_2_migration");
269+
270+
let db_file_name = "test_db".to_string();
271+
let kv_table_name = "test_table".to_string();
272+
273+
let test_ns = "testspace";
274+
let test_sub = "testsub";
275+
276+
{
277+
// Create a v2 database manually
278+
fs::create_dir_all(temp_path.clone()).unwrap();
279+
let mut db_file_path = temp_path.clone();
280+
db_file_path.push(db_file_name.clone());
281+
282+
let connection = Connection::open(db_file_path.clone()).unwrap();
283+
284+
connection
285+
.pragma(
286+
Some(rusqlite::DatabaseName::Main),
287+
"user_version",
288+
old_schema_version,
289+
|_| Ok(()),
290+
)
291+
.unwrap();
292+
293+
let sql = format!(
294+
"CREATE TABLE IF NOT EXISTS {} (
295+
primary_namespace TEXT NOT NULL,
296+
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
297+
key TEXT NOT NULL CHECK (key <> ''),
298+
value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
299+
);",
300+
kv_table_name
301+
);
302+
connection.execute(&sql, []).unwrap();
303+
304+
// Insert 3 rows in a known order
305+
for i in 0..3 {
306+
let key = format!("key_{}", i);
307+
let sql = format!(
308+
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:ns, :sub, :key, :value);",
309+
kv_table_name
310+
);
311+
let mut stmt = connection.prepare_cached(&sql).unwrap();
312+
stmt.execute(named_params! {
313+
":ns": test_ns,
314+
":sub": test_sub,
315+
":key": key,
316+
":value": vec![i as u8; 8],
317+
})
318+
.unwrap();
319+
}
320+
}
321+
322+
// Open with new code, triggering v2→v3 migration
323+
let store = SqliteStore::new(temp_path, Some(db_file_name), Some(kv_table_name)).unwrap();
324+
325+
// Verify data survived
326+
for i in 0..3 {
327+
let key = format!("key_{}", i);
328+
let data = store.read(test_ns, test_sub, &key).unwrap();
329+
assert_eq!(data, vec![i as u8; 8]);
330+
}
331+
332+
// Verify paginated listing works and returns entries in ROWID-backfilled order (newest first)
333+
let response =
334+
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
335+
assert_eq!(response.keys.len(), 3);
336+
// ROWIDs were 1, 2, 3 so sort_order was backfilled as 1, 2, 3; newest first
337+
assert_eq!(response.keys, vec!["key_2", "key_1", "key_0"]);
338+
339+
// Verify we can write new entries and they get proper ordering
340+
KVStoreSync::write(&store, test_ns, test_sub, "key_new", vec![99u8; 8]).unwrap();
341+
let response =
342+
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
343+
assert_eq!(response.keys[0], "key_new");
344+
345+
// Check we can continue to use the store just fine.
346+
do_read_write_remove_list_persist(&store);
347+
}
169348
}

0 commit comments

Comments
 (0)