Skip to content

Commit 473d052

Browse files
benthecarmanclaude
andcommitted
Implement PaginatedKVStore trait for SqliteStore and VssStore
Add paginated key listing support using PageToken for cursor-based pagination. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 86c7375 commit 473d052

5 files changed

Lines changed: 410 additions & 20 deletions

File tree

src/io/sqlite_store/mod.rs

Lines changed: 161 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use std::sync::atomic::{AtomicU64, Ordering};
1414
use std::sync::{Arc, Mutex};
1515

1616
use lightning::io;
17-
use lightning::util::persist::{KVStore, KVStoreSync};
17+
use lightning::util::persist::{
18+
KVStore, KVStoreSync, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse,
19+
};
1820
use lightning_types::string::PrintableString;
1921
use rusqlite::{named_params, Connection};
2022

@@ -34,7 +36,10 @@ pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
3436
pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
3537

3638
// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
37-
const SCHEMA_USER_VERSION: u16 = 2;
39+
const SCHEMA_USER_VERSION: u16 = 3;
40+
41+
// The default page size for paginated list operations.
42+
const DEFAULT_PAGE_SIZE: u32 = 100;
3843

3944
/// A [`KVStoreSync`] implementation that writes to and reads from an [SQLite] database.
4045
///
@@ -222,6 +227,33 @@ impl KVStoreSync for SqliteStore {
222227
}
223228
}
224229

230+
impl PaginatedKVStoreSync for SqliteStore {
231+
fn list_paginated(
232+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
233+
) -> io::Result<PaginatedListResponse> {
234+
self.inner.list_paginated_internal(primary_namespace, secondary_namespace, page_token)
235+
}
236+
}
237+
238+
impl PaginatedKVStore for SqliteStore {
239+
fn list_paginated(
240+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
241+
) -> impl Future<Output = Result<PaginatedListResponse, io::Error>> + 'static + Send {
242+
let primary_namespace = primary_namespace.to_string();
243+
let secondary_namespace = secondary_namespace.to_string();
244+
let inner = Arc::clone(&self.inner);
245+
let fut = tokio::task::spawn_blocking(move || {
246+
inner.list_paginated_internal(&primary_namespace, &secondary_namespace, page_token)
247+
});
248+
async move {
249+
fut.await.unwrap_or_else(|e| {
250+
let msg = format!("Failed to IO operation due join error: {}", e);
251+
Err(io::Error::new(io::ErrorKind::Other, msg))
252+
})
253+
}
254+
}
255+
}
256+
225257
struct SqliteStoreInner {
226258
connection: Arc<Mutex<Connection>>,
227259
data_dir: PathBuf,
@@ -289,7 +321,9 @@ impl SqliteStoreInner {
289321
primary_namespace TEXT NOT NULL,
290322
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
291323
key TEXT NOT NULL CHECK (key <> ''),
292-
value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
324+
value BLOB,
325+
created_at INTEGER DEFAULT (strftime('%s','now')),
326+
PRIMARY KEY ( primary_namespace, secondary_namespace, key )
293327
);",
294328
kv_table_name
295329
);
@@ -299,6 +333,17 @@ impl SqliteStoreInner {
299333
io::Error::new(io::ErrorKind::Other, msg)
300334
})?;
301335

336+
// Create index for efficient paginated queries ordered by creation time, then key
337+
let index_sql = format!(
338+
"CREATE INDEX IF NOT EXISTS {}_created_at_idx ON {} (primary_namespace, secondary_namespace, created_at DESC, key ASC);",
339+
kv_table_name, kv_table_name
340+
);
341+
342+
connection.execute(&index_sql, []).map_err(|e| {
343+
let msg = format!("Failed to create index on {}: {}", kv_table_name, e);
344+
io::Error::new(io::ErrorKind::Other, msg)
345+
})?;
346+
302347
let connection = Arc::new(Mutex::new(connection));
303348
let write_version_locks = Mutex::new(HashMap::new());
304349
Ok(Self { connection, data_dir, kv_table_name, write_version_locks })
@@ -472,6 +517,119 @@ impl SqliteStoreInner {
472517
Ok(keys)
473518
}
474519

520+
fn list_paginated_internal(
521+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
522+
) -> io::Result<PaginatedListResponse> {
523+
check_namespace_key_validity(
524+
primary_namespace,
525+
secondary_namespace,
526+
None,
527+
"list_paginated",
528+
)?;
529+
530+
let locked_conn = self.connection.lock().unwrap();
531+
532+
// Extract the last_key from the page token
533+
let last_key = page_token.map(|t| t.0);
534+
535+
// If last_key is provided, we need to get the created_at value for that key to use as a cursor
536+
let last_created_at: Option<i64> = if let Some(ref key) = last_key {
537+
let sql = format!(
538+
"SELECT created_at FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key",
539+
self.kv_table_name
540+
);
541+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
542+
let msg = format!("Failed to prepare statement: {}", e);
543+
io::Error::new(io::ErrorKind::Other, msg)
544+
})?;
545+
546+
stmt.query_row(
547+
named_params! {
548+
":primary_namespace": primary_namespace,
549+
":secondary_namespace": secondary_namespace,
550+
":key": key,
551+
},
552+
|row| row.get(0),
553+
)
554+
.ok()
555+
} else {
556+
None
557+
};
558+
559+
// Query for keys, ordered by created_at DESC, key ASC (newest first, with key as tiebreaker)
560+
// For pagination with composite sort, we need: (created_at < last) OR (created_at = last AND key > last_key)
561+
//
562+
// We fetch DEFAULT_PAGE_SIZE + 1 rows as an optimization to detect if more pages exist.
563+
// If we get more than DEFAULT_PAGE_SIZE rows, we know there are additional results beyond
564+
// this page. We then discard the extra row and use the last returned key as the next page
565+
// token. This avoids needing a separate COUNT(*) query to determine pagination state.
566+
let (sql, params): (String, Vec<(&str, &dyn rusqlite::ToSql)>) = if let (
567+
Some(ref created_at),
568+
Some(ref key),
569+
) =
570+
(&last_created_at, &last_key)
571+
{
572+
(
573+
format!(
574+
"SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace \
575+
AND (created_at < :last_created_at OR (created_at = :last_created_at AND key > :last_key)) \
576+
ORDER BY created_at DESC, key ASC LIMIT :limit",
577+
self.kv_table_name
578+
),
579+
vec![
580+
(":primary_namespace", &primary_namespace as &dyn rusqlite::ToSql),
581+
(":secondary_namespace", &secondary_namespace as &dyn rusqlite::ToSql),
582+
(":last_created_at", created_at as &dyn rusqlite::ToSql),
583+
(":last_key", key as &dyn rusqlite::ToSql),
584+
(":limit", &(DEFAULT_PAGE_SIZE + 1) as &dyn rusqlite::ToSql),
585+
],
586+
)
587+
} else {
588+
(
589+
format!(
590+
"SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace \
591+
ORDER BY created_at DESC, key ASC LIMIT :limit",
592+
self.kv_table_name
593+
),
594+
vec![
595+
(":primary_namespace", &primary_namespace as &dyn rusqlite::ToSql),
596+
(":secondary_namespace", &secondary_namespace as &dyn rusqlite::ToSql),
597+
(":limit", &(DEFAULT_PAGE_SIZE + 1) as &dyn rusqlite::ToSql),
598+
],
599+
)
600+
};
601+
602+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
603+
let msg = format!("Failed to prepare statement: {}", e);
604+
io::Error::new(io::ErrorKind::Other, msg)
605+
})?;
606+
607+
let mut keys = Vec::with_capacity((DEFAULT_PAGE_SIZE + 1) as usize);
608+
609+
let rows_iter = stmt.query_map(params.as_slice(), |row| row.get(0)).map_err(|e| {
610+
let msg = format!("Failed to retrieve queried rows: {}", e);
611+
io::Error::new(io::ErrorKind::Other, msg)
612+
})?;
613+
614+
for k in rows_iter {
615+
keys.push(k.map_err(|e| {
616+
let msg = format!("Failed to retrieve queried rows: {}", e);
617+
io::Error::new(io::ErrorKind::Other, msg)
618+
})?);
619+
}
620+
621+
// Check if we have more pages by seeing if we got the extra "peek" row.
622+
// If so, remove it and generate a page token from the last actual result.
623+
let next_page_token = if keys.len() > DEFAULT_PAGE_SIZE as usize {
624+
keys.pop(); // Remove the extra "peek" row used to detect more pages
625+
keys.last().cloned().map(PageToken)
626+
} else {
627+
None
628+
};
629+
630+
Ok(PaginatedListResponse { keys, next_page_token })
631+
}
632+
475633
fn execute_locked_write<F: FnOnce() -> Result<(), lightning::io::Error>>(
476634
&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64, callback: F,
477635
) -> Result<(), lightning::io::Error> {

src/io/test_utils.rs

Lines changed: 92 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use std::collections::{hash_map, HashMap};
8+
use std::collections::HashMap;
99
use std::future::Future;
1010
use std::panic::RefUnwindSafe;
1111
use std::path::PathBuf;
12+
use std::sync::atomic::{AtomicU64, Ordering};
1213
use std::sync::Mutex;
1314

1415
use lightning::events::ClosureReason;
@@ -18,7 +19,8 @@ use lightning::ln::functional_test_utils::{
1819
create_node_chanmgrs, send_payment, TestChanMonCfg,
1920
};
2021
use lightning::util::persist::{
21-
KVStore, KVStoreSync, MonitorUpdatingPersister, KVSTORE_NAMESPACE_KEY_MAX_LEN,
22+
KVStore, KVStoreSync, MonitorUpdatingPersister, PageToken, PaginatedKVStore,
23+
PaginatedKVStoreSync, PaginatedListResponse, KVSTORE_NAMESPACE_KEY_MAX_LEN,
2224
};
2325
use lightning::util::test_utils;
2426
use lightning::{check_closed_broadcast, io};
@@ -36,14 +38,23 @@ type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister<
3638

3739
const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
3840

41+
/// Entry in the in-memory store, tracking both data and creation order.
42+
#[derive(Debug, Clone)]
43+
struct StoreEntry {
44+
data: Vec<u8>,
45+
created_at: u64,
46+
}
47+
3948
pub struct InMemoryStore {
40-
persisted_bytes: Mutex<HashMap<String, HashMap<String, Vec<u8>>>>,
49+
persisted_bytes: Mutex<HashMap<String, HashMap<String, StoreEntry>>>,
50+
counter: AtomicU64,
4151
}
4252

4353
impl InMemoryStore {
4454
pub fn new() -> Self {
4555
let persisted_bytes = Mutex::new(HashMap::new());
46-
Self { persisted_bytes }
56+
let counter = AtomicU64::new(0);
57+
Self { persisted_bytes, counter }
4758
}
4859

4960
fn read_internal(
@@ -53,9 +64,8 @@ impl InMemoryStore {
5364
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
5465

5566
if let Some(outer_ref) = persisted_lock.get(&prefixed) {
56-
if let Some(inner_ref) = outer_ref.get(key) {
57-
let bytes = inner_ref.clone();
58-
Ok(bytes)
67+
if let Some(entry) = outer_ref.get(key) {
68+
Ok(entry.data.clone())
5969
} else {
6070
Err(io::Error::new(io::ErrorKind::NotFound, "Key not found"))
6171
}
@@ -71,7 +81,15 @@ impl InMemoryStore {
7181

7282
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
7383
let outer_e = persisted_lock.entry(prefixed).or_insert(HashMap::new());
74-
outer_e.insert(key.to_string(), buf);
84+
85+
// Only increment counter if this is a new key
86+
let created_at = if let Some(existing) = outer_e.get(key) {
87+
existing.created_at
88+
} else {
89+
self.counter.fetch_add(1, Ordering::SeqCst)
90+
};
91+
92+
outer_e.insert(key.to_string(), StoreEntry { data: buf, created_at });
7593
Ok(())
7694
}
7795

@@ -91,14 +109,54 @@ impl InMemoryStore {
91109
fn list_internal(
92110
&self, primary_namespace: &str, secondary_namespace: &str,
93111
) -> io::Result<Vec<String>> {
94-
let mut persisted_lock = self.persisted_bytes.lock().unwrap();
112+
let persisted_lock = self.persisted_bytes.lock().unwrap();
95113

96114
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
97-
match persisted_lock.entry(prefixed) {
98-
hash_map::Entry::Occupied(e) => Ok(e.get().keys().cloned().collect()),
99-
hash_map::Entry::Vacant(_) => Ok(Vec::new()),
115+
match persisted_lock.get(&prefixed) {
116+
Some(entries) => Ok(entries.keys().cloned().collect()),
117+
None => Ok(Vec::new()),
100118
}
101119
}
120+
121+
fn list_paginated_internal(
122+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
123+
) -> io::Result<PaginatedListResponse> {
124+
let persisted_lock = self.persisted_bytes.lock().unwrap();
125+
let prefixed = format!("{primary_namespace}/{secondary_namespace}");
126+
127+
const PAGE_SIZE: usize = 100;
128+
129+
let entries = match persisted_lock.get(&prefixed) {
130+
Some(e) => e,
131+
None => return Ok(PaginatedListResponse { keys: Vec::new(), next_page_token: None }),
132+
};
133+
134+
// Extract the last_key from the page token
135+
let last_key = page_token.map(|t| t.0);
136+
137+
// Collect and sort by created_at descending (newest first)
138+
let mut sorted_entries: Vec<(&String, &StoreEntry)> = entries.iter().collect();
139+
sorted_entries
140+
.sort_by(|a, b| b.1.created_at.cmp(&a.1.created_at).then_with(|| a.0.cmp(&b.0)));
141+
142+
// Find the starting index based on last_key
143+
let start_idx = if let Some(ref key) = last_key {
144+
sorted_entries.iter().position(|(k, _)| *k == key).map(|i| i + 1).unwrap_or(0)
145+
} else {
146+
0
147+
};
148+
149+
// Get the next page of keys
150+
let end_idx = std::cmp::min(start_idx + PAGE_SIZE, sorted_entries.len());
151+
let keys: Vec<String> =
152+
sorted_entries[start_idx..end_idx].iter().map(|(k, _)| (*k).clone()).collect();
153+
154+
// Determine if there are more pages
155+
let next_page_token =
156+
if end_idx < sorted_entries.len() { keys.last().cloned().map(PageToken) } else { None };
157+
158+
Ok(PaginatedListResponse { keys, next_page_token })
159+
}
102160
}
103161

104162
impl KVStore for InMemoryStore {
@@ -152,6 +210,28 @@ impl KVStoreSync for InMemoryStore {
152210
}
153211
}
154212

213+
impl PaginatedKVStoreSync for InMemoryStore {
214+
fn list_paginated(
215+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
216+
) -> io::Result<PaginatedListResponse> {
217+
self.list_paginated_internal(primary_namespace, secondary_namespace, page_token)
218+
}
219+
}
220+
221+
impl PaginatedKVStore for InMemoryStore {
222+
fn list_paginated(
223+
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
224+
) -> impl Future<Output = Result<PaginatedListResponse, io::Error>> + 'static + Send {
225+
let res = PaginatedKVStoreSync::list_paginated(
226+
self,
227+
primary_namespace,
228+
secondary_namespace,
229+
page_token,
230+
);
231+
async move { res }
232+
}
233+
}
234+
155235
unsafe impl Sync for InMemoryStore {}
156236
unsafe impl Send for InMemoryStore {}
157237

0 commit comments

Comments
 (0)