@@ -9,11 +9,15 @@ use lightning::io;
99use rusqlite:: Connection ;
1010
1111pub ( super ) fn migrate_schema (
12- connection : & mut Connection , kv_table_name : & str , from_version : u16 , to_version : u16 ,
12+ connection : & mut Connection , kv_table_name : & str , mut from_version : u16 , to_version : u16 ,
1313) -> io:: Result < ( ) > {
1414 assert ! ( from_version < to_version) ;
15- if from_version == 1 && to_version = = 2 {
15+ if from_version == 1 && to_version > = 2 {
1616 migrate_v1_to_v2 ( connection, kv_table_name) ?;
17+ from_version = 2 ;
18+ }
19+ if from_version == 2 && to_version >= 3 {
20+ migrate_v2_to_v3 ( connection, kv_table_name) ?;
1721 }
1822
1923 Ok ( ( ) )
@@ -65,11 +69,107 @@ fn migrate_v1_to_v2(connection: &mut Connection, kv_table_name: &str) -> io::Res
6569 Ok ( ( ) )
6670}
6771
72+ fn migrate_v2_to_v3 ( connection : & mut Connection , kv_table_name : & str ) -> io:: Result < ( ) > {
73+ let map_err = |e : rusqlite:: Error | -> io:: Error {
74+ let msg = format ! ( "Failed to migrate table {} from v2 to v3: {}" , kv_table_name, e) ;
75+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
76+ } ;
77+
78+ // Check whether the primary key already includes secondary_namespace.
79+ // Tables migrated from v1 have PK (primary_namespace, key) only — missing
80+ // secondary_namespace. Tables created directly as v2 already have the correct
81+ // PK (primary_namespace, secondary_namespace, key).
82+ let needs_table_rewrite = {
83+ let sql = format ! ( "PRAGMA table_info({})" , kv_table_name) ;
84+ let mut stmt = connection. prepare ( & sql) . map_err ( map_err) ?;
85+ let mut pk_cols: Vec < ( i64 , String ) > = stmt
86+ . query_map ( [ ] , |row| Ok ( ( row. get :: < _ , i64 > ( 5 ) ?, row. get :: < _ , String > ( 1 ) ?) ) )
87+ . map_err ( map_err) ?
88+ . collect :: < Result < Vec < _ > , _ > > ( )
89+ . map_err ( map_err) ?
90+ . into_iter ( )
91+ . filter ( |( pk, _) | * pk > 0 )
92+ . collect ( ) ;
93+ pk_cols. sort_by_key ( |( pk, _) | * pk) ;
94+ let pk_names: Vec < & str > = pk_cols. iter ( ) . map ( |( _, name) | name. as_str ( ) ) . collect ( ) ;
95+ pk_names != vec ! [ "primary_namespace" , "secondary_namespace" , "key" ]
96+ } ;
97+
98+ let tx = connection. transaction ( ) . map_err ( |e| {
99+ let msg = format ! ( "Failed to migrate table {} from v2 to v3: {}" , kv_table_name, e) ;
100+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
101+ } ) ?;
102+
103+ if needs_table_rewrite {
104+ // Full table rewrite to fix the primary key.
105+ let old_table = format ! ( "{}_v2_old" , kv_table_name) ;
106+
107+ let rename_sql = format ! ( "ALTER TABLE {} RENAME TO {}" , kv_table_name, old_table) ;
108+ tx. execute ( & rename_sql, [ ] ) . map_err ( map_err) ?;
109+
110+ let create_table_sql = format ! (
111+ "CREATE TABLE {} (
112+ primary_namespace TEXT NOT NULL,
113+ secondary_namespace TEXT DEFAULT \" \" NOT NULL,
114+ key TEXT NOT NULL CHECK (key <> ''),
115+ value BLOB,
116+ sort_order INTEGER NOT NULL DEFAULT 0,
117+ PRIMARY KEY (primary_namespace, secondary_namespace, key)
118+ )" ,
119+ kv_table_name
120+ ) ;
121+ tx. execute ( & create_table_sql, [ ] ) . map_err ( map_err) ?;
122+
123+ // Copy data and backfill sort_order from ROWID for relative ordering
124+ let copy_sql = format ! (
125+ "INSERT INTO {} (primary_namespace, secondary_namespace, key, value, sort_order)
126+ SELECT primary_namespace, secondary_namespace, key, value, ROWID FROM {}" ,
127+ kv_table_name, old_table
128+ ) ;
129+ tx. execute ( & copy_sql, [ ] ) . map_err ( map_err) ?;
130+
131+ let drop_old_sql = format ! ( "DROP TABLE {}" , old_table) ;
132+ tx. execute ( & drop_old_sql, [ ] ) . map_err ( map_err) ?;
133+ } else {
134+ // Primary key is already correct — just add the sort_order column and backfill.
135+ let add_col_sql = format ! (
136+ "ALTER TABLE {} ADD COLUMN sort_order INTEGER NOT NULL DEFAULT 0" ,
137+ kv_table_name
138+ ) ;
139+ tx. execute ( & add_col_sql, [ ] ) . map_err ( map_err) ?;
140+
141+ let backfill_sql = format ! ( "UPDATE {} SET sort_order = ROWID" , kv_table_name) ;
142+ tx. execute ( & backfill_sql, [ ] ) . map_err ( map_err) ?;
143+ }
144+
145+ // Create composite index for paginated listing
146+ let sql = format ! (
147+ "CREATE INDEX idx_{}_paginated ON {} (primary_namespace, secondary_namespace, sort_order DESC, key ASC)" ,
148+ kv_table_name, kv_table_name
149+ ) ;
150+ tx. execute ( & sql, [ ] ) . map_err ( map_err) ?;
151+
152+ // Update user_version
153+ tx. pragma ( Some ( rusqlite:: DatabaseName :: Main ) , "user_version" , 3u16 , |_| Ok ( ( ) ) ) . map_err (
154+ |e| {
155+ let msg = format ! ( "Failed to upgrade user_version from 2 to 3: {}" , e) ;
156+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
157+ } ,
158+ ) ?;
159+
160+ tx. commit ( ) . map_err ( |e| {
161+ let msg = format ! ( "Failed to migrate table {} from v2 to v3: {}" , kv_table_name, e) ;
162+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
163+ } ) ?;
164+
165+ Ok ( ( ) )
166+ }
167+
68168#[ cfg( test) ]
69169mod tests {
70170 use std:: fs;
71171
72- use lightning:: util:: persist:: KVStoreSync ;
172+ use lightning:: util:: persist:: { KVStoreSync , PaginatedKVStoreSync } ;
73173 use rusqlite:: { named_params, Connection } ;
74174
75175 use crate :: io:: sqlite_store:: SqliteStore ;
@@ -121,7 +221,7 @@ mod tests {
121221 let sql = format ! (
122222 "INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);" ,
123223 kv_table_name
124- ) ;
224+ ) ;
125225 let mut stmt = connection. prepare_cached ( & sql) . unwrap ( ) ;
126226
127227 stmt. execute ( named_params ! {
@@ -159,4 +259,90 @@ mod tests {
159259 // Check we can continue to use the store just fine.
160260 do_read_write_remove_list_persist ( & store) ;
161261 }
262+
263+ #[ test]
264+ fn rwrl_post_schema_2_migration ( ) {
265+ let old_schema_version = 2u16 ;
266+
267+ let mut temp_path = random_storage_path ( ) ;
268+ temp_path. push ( "rwrl_post_schema_2_migration" ) ;
269+
270+ let db_file_name = "test_db" . to_string ( ) ;
271+ let kv_table_name = "test_table" . to_string ( ) ;
272+
273+ let test_ns = "testspace" ;
274+ let test_sub = "testsub" ;
275+
276+ {
277+ // Create a v2 database manually
278+ fs:: create_dir_all ( temp_path. clone ( ) ) . unwrap ( ) ;
279+ let mut db_file_path = temp_path. clone ( ) ;
280+ db_file_path. push ( db_file_name. clone ( ) ) ;
281+
282+ let connection = Connection :: open ( db_file_path. clone ( ) ) . unwrap ( ) ;
283+
284+ connection
285+ . pragma (
286+ Some ( rusqlite:: DatabaseName :: Main ) ,
287+ "user_version" ,
288+ old_schema_version,
289+ |_| Ok ( ( ) ) ,
290+ )
291+ . unwrap ( ) ;
292+
293+ let sql = format ! (
294+ "CREATE TABLE IF NOT EXISTS {} (
295+ primary_namespace TEXT NOT NULL,
296+ secondary_namespace TEXT DEFAULT \" \" NOT NULL,
297+ key TEXT NOT NULL CHECK (key <> ''),
298+ value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
299+ );" ,
300+ kv_table_name
301+ ) ;
302+ connection. execute ( & sql, [ ] ) . unwrap ( ) ;
303+
304+ // Insert 3 rows in a known order
305+ for i in 0 ..3 {
306+ let key = format ! ( "key_{}" , i) ;
307+ let sql = format ! (
308+ "INSERT INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:ns, :sub, :key, :value);" ,
309+ kv_table_name
310+ ) ;
311+ let mut stmt = connection. prepare_cached ( & sql) . unwrap ( ) ;
312+ stmt. execute ( named_params ! {
313+ ":ns" : test_ns,
314+ ":sub" : test_sub,
315+ ":key" : key,
316+ ":value" : vec![ i as u8 ; 8 ] ,
317+ } )
318+ . unwrap ( ) ;
319+ }
320+ }
321+
322+ // Open with new code, triggering v2→v3 migration
323+ let store = SqliteStore :: new ( temp_path, Some ( db_file_name) , Some ( kv_table_name) ) . unwrap ( ) ;
324+
325+ // Verify data survived
326+ for i in 0 ..3 {
327+ let key = format ! ( "key_{}" , i) ;
328+ let data = store. read ( test_ns, test_sub, & key) . unwrap ( ) ;
329+ assert_eq ! ( data, vec![ i as u8 ; 8 ] ) ;
330+ }
331+
332+ // Verify paginated listing works and returns entries in ROWID-backfilled order (newest first)
333+ let response =
334+ PaginatedKVStoreSync :: list_paginated ( & store, test_ns, test_sub, None ) . unwrap ( ) ;
335+ assert_eq ! ( response. keys. len( ) , 3 ) ;
336+ // ROWIDs were 1, 2, 3 so sort_order was backfilled as 1, 2, 3; newest first
337+ assert_eq ! ( response. keys, vec![ "key_2" , "key_1" , "key_0" ] ) ;
338+
339+ // Verify we can write new entries and they get proper ordering
340+ KVStoreSync :: write ( & store, test_ns, test_sub, "key_new" , vec ! [ 99u8 ; 8 ] ) . unwrap ( ) ;
341+ let response =
342+ PaginatedKVStoreSync :: list_paginated ( & store, test_ns, test_sub, None ) . unwrap ( ) ;
343+ assert_eq ! ( response. keys[ 0 ] , "key_new" ) ;
344+
345+ // Check we can continue to use the store just fine.
346+ do_read_write_remove_list_persist ( & store) ;
347+ }
162348}
0 commit comments