Skip to content

Commit d2505b4

Browse files
committed
Add migratable support for native node stores
Extend the native store plumbing to support key enumeration for store migration by wiring `MigratableKVStore` through `DynStoreWrapper` and implementing it for LDK-native backends, including `SqliteStore`, `VssStore`, and tiered storage, plus the in-repo test stores needed to keep the migration path exercised. This lays the plumbing for backup restoration and reconciliation by making migration a first-class capability of the stores ldk-node natively owns and configures, ensuring they can exhaustively enumerate their persisted keys.
1 parent 6f36b59 commit d2505b4

7 files changed

Lines changed: 237 additions & 13 deletions

File tree

src/builder.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ use lightning::routing::scoring::{
3737
use lightning::sign::{EntropySource, NodeSigner};
3838
use lightning::util::config::HTLCInterceptionFlags;
3939
use lightning::util::persist::{
40-
KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
41-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
40+
KVStore, MigratableKVStore, CHANNEL_MANAGER_PERSISTENCE_KEY,
41+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
4242
};
4343
use lightning::util::ser::ReadableArgs;
4444
use lightning::util::sweep::OutputSweeper;
@@ -844,7 +844,7 @@ impl NodeBuilder {
844844
///
845845
/// [`set_ephemeral_store`]: Self::set_ephemeral_store
846846
/// [`set_backup_store`]: Self::set_backup_store
847-
pub fn build_with_store<S: SyncAndAsyncKVStore + Send + Sync + 'static>(
847+
pub fn build_with_store<S: SyncAndAsyncKVStore + MigratableKVStore + Send + Sync + 'static>(
848848
&self, node_entropy: NodeEntropy, kv_store: S,
849849
) -> Result<Node, BuildError> {
850850
let primary_store: Arc<DynStore> = Arc::new(DynStoreWrapper(kv_store));
@@ -1339,7 +1339,7 @@ impl ArcedNodeBuilder {
13391339
/// Builds a [`Node`] instance according to the options previously configured.
13401340
// Note that the generics here don't actually work for Uniffi, but we don't currently expose
13411341
// this so its not needed.
1342-
pub fn build_with_store<S: SyncAndAsyncKVStore + Send + Sync + 'static>(
1342+
pub fn build_with_store<S: SyncAndAsyncKVStore + MigratableKVStore + Send + Sync + 'static>(
13431343
&self, node_entropy: Arc<NodeEntropy>, kv_store: S,
13441344
) -> Result<Arc<Node>, BuildError> {
13451345
self.inner.read().expect("lock").build_with_store(*node_entropy, kv_store).map(Arc::new)

src/io/sqlite_store/mod.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ use std::sync::{Arc, Mutex};
1515

1616
use lightning::io;
1717
use lightning::util::persist::{
18-
KVStore, KVStoreSync, PageToken, PaginatedKVStore, PaginatedKVStoreSync, PaginatedListResponse,
18+
KVStore, KVStoreSync, MigratableKVStore, PageToken, PaginatedKVStore, PaginatedKVStoreSync,
19+
PaginatedListResponse,
1920
};
2021
use lightning_types::string::PrintableString;
2122
use rusqlite::{named_params, Connection};
@@ -255,6 +256,44 @@ impl PaginatedKVStore for SqliteStore {
255256
}
256257
}
257258

259+
impl MigratableKVStore for SqliteStore {
260+
fn list_all_keys(&self) -> io::Result<Vec<(String, String, String)>> {
261+
let locked_conn = self.inner.connection.lock().unwrap();
262+
263+
let sql = format!(
264+
"SELECT DISTINCT primary_namespace, secondary_namespace, key FROM {}",
265+
self.inner.kv_table_name
266+
);
267+
268+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
269+
let msg = format!("Failed to prepare statement: {}", e);
270+
io::Error::new(io::ErrorKind::Other, msg)
271+
})?;
272+
273+
let rows = stmt
274+
.query_map([], |row| {
275+
let primary_namespace: String = row.get(0)?;
276+
let secondary_namespace: String = row.get(1)?;
277+
let key: String = row.get(2)?;
278+
Ok((primary_namespace, secondary_namespace, key))
279+
})
280+
.map_err(|e| {
281+
let msg = format!("Failed to list all keys: {}", e);
282+
io::Error::new(io::ErrorKind::Other, msg)
283+
})?;
284+
285+
let mut keys = Vec::new();
286+
for row in rows {
287+
keys.push(row.map_err(|e| {
288+
let msg = format!("Failed to decode row while listing all keys: {}", e);
289+
io::Error::new(io::ErrorKind::Other, msg)
290+
})?);
291+
}
292+
293+
Ok(keys)
294+
}
295+
}
296+
258297
struct SqliteStoreInner {
259298
connection: Arc<Mutex<Connection>>,
260299
data_dir: PathBuf,

src/io/test_utils.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use lightning::ln::functional_test_utils::{
2121
TestChanMonCfg,
2222
};
2323
use lightning::util::persist::{
24-
KVStore, KVStoreSync, MonitorUpdatingPersister, PageToken, PaginatedKVStore,
24+
KVStore, KVStoreSync, MigratableKVStore, MonitorUpdatingPersister, PageToken, PaginatedKVStore,
2525
PaginatedKVStoreSync, PaginatedListResponse, KVSTORE_NAMESPACE_KEY_MAX_LEN,
2626
};
2727
use lightning::util::test_utils;
@@ -245,6 +245,29 @@ impl PaginatedKVStore for InMemoryStore {
245245
}
246246
}
247247

248+
impl MigratableKVStore for InMemoryStore {
249+
fn list_all_keys(&self) -> io::Result<Vec<(String, String, String)>> {
250+
let persisted_lock = self.persisted_bytes.lock().unwrap();
251+
let mut keys = Vec::new();
252+
253+
for (namespace, entries) in persisted_lock.iter() {
254+
let mut parts = namespace.splitn(2, '/');
255+
let primary_namespace = parts.next().unwrap_or_default();
256+
let secondary_namespace = parts.next().unwrap_or_default();
257+
258+
for key in entries.keys() {
259+
keys.push((
260+
primary_namespace.to_string(),
261+
secondary_namespace.to_string(),
262+
key.clone(),
263+
));
264+
}
265+
}
266+
267+
Ok(keys)
268+
}
269+
}
270+
248271
unsafe impl Sync for InMemoryStore {}
249272
unsafe impl Send for InMemoryStore {}
250273

src/io/tier_store.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::types::DynStore;
1212
use bitcoin::io::Read;
1313
use lightning::ln::msgs::DecodeError;
1414
use lightning::util::persist::{
15-
KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY,
15+
KVStore, KVStoreSync, MigratableKVStore, NETWORK_GRAPH_PERSISTENCE_KEY,
1616
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1717
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1818
};
@@ -1191,6 +1191,12 @@ fn ops_match(a: &PendingBackupOp, b: &PendingBackupOp) -> bool {
11911191
}
11921192
}
11931193

1194+
impl MigratableKVStore for TierStore {
1195+
fn list_all_keys(&self) -> io::Result<Vec<(String, String, String)>> {
1196+
self.inner.primary_store.list_all_keys()
1197+
}
1198+
}
1199+
11941200
#[cfg(test)]
11951201
mod tests {
11961202
use std::future::Future;
@@ -1202,7 +1208,8 @@ mod tests {
12021208
use bitcoin::io::ErrorKind;
12031209
use lightning::util::logger::Level;
12041210
use lightning::util::persist::{
1205-
CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1211+
MigratableKVStore, CHANNEL_MANAGER_PERSISTENCE_KEY,
1212+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
12061213
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
12071214
};
12081215
use lightning_persister::fs_store::v1::FilesystemStore;
@@ -1349,6 +1356,12 @@ mod tests {
13491356
}
13501357
}
13511358

1359+
impl MigratableKVStore for FailingStore {
1360+
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, io::Error> {
1361+
Ok(Vec::new())
1362+
}
1363+
}
1364+
13521365
fn setup_tier_store(primary_store: Arc<DynStore>, logger: Arc<Logger>) -> TierStore {
13531366
TierStore::new(primary_store, logger)
13541367
}

src/io/vss_store.rs

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use bitcoin::Network;
2424
use lightning::impl_writeable_tlv_based_enum;
2525
use lightning::io::{self, Error, ErrorKind};
2626
use lightning::sign::{EntropySource as LdkEntropySource, RandomBytes};
27-
use lightning::util::persist::{KVStore, KVStoreSync};
27+
use lightning::util::persist::{KVStore, KVStoreSync, MigratableKVStore};
2828
use lightning::util::ser::{Readable, Writeable};
2929
use prost::Message;
3030
use vss_client::client::VssClient;
@@ -386,6 +386,27 @@ impl Drop for VssStore {
386386
}
387387
}
388388

389+
impl MigratableKVStore for VssStore {
390+
fn list_all_keys(&self) -> io::Result<Vec<(String, String, String)>> {
391+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
392+
debug_assert!(false, "Failed to access internal runtime");
393+
let msg = format!("Failed to access internal runtime");
394+
Error::new(ErrorKind::Other, msg)
395+
})?;
396+
let inner = Arc::clone(&self.inner);
397+
let fut = async move {
398+
let stored_keys = inner.list_all_stored_keys(&inner.blocking_client).await?;
399+
let mut decoded = Vec::with_capacity(stored_keys.len());
400+
for stored_key in stored_keys {
401+
if let Some(key_parts) = inner.decode_stored_key(&stored_key)? {
402+
decoded.push(key_parts);
403+
}
404+
}
405+
Ok(decoded)
406+
};
407+
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
408+
}
409+
}
389410
struct VssStoreInner {
390411
schema_version: VssSchemaVersion,
391412
blocking_client: VssClient<CustomRetryPolicy>,
@@ -507,6 +528,93 @@ impl VssStoreInner {
507528
Ok(keys)
508529
}
509530

531+
async fn list_all_stored_keys(
532+
&self, client: &VssClient<CustomRetryPolicy>,
533+
) -> io::Result<Vec<String>> {
534+
let mut page_token = None;
535+
let mut keys = Vec::new();
536+
537+
while page_token != Some("".to_string()) {
538+
let request = ListKeyVersionsRequest {
539+
store_id: self.store_id.clone(),
540+
key_prefix: None,
541+
page_token,
542+
page_size: None,
543+
};
544+
545+
let response = client.list_key_versions(&request).await.map_err(|e| {
546+
let msg = format!("Failed to list all stored keys: {}", e);
547+
Error::new(ErrorKind::Other, msg)
548+
})?;
549+
550+
for kv in response.key_versions {
551+
keys.push(kv.key);
552+
}
553+
554+
page_token = response.next_page_token;
555+
}
556+
557+
Ok(keys)
558+
}
559+
560+
fn decode_stored_key(&self, stored_key: &str) -> io::Result<Option<(String, String, String)>> {
561+
match self.schema_version {
562+
VssSchemaVersion::V0 => {
563+
if !stored_key.contains('#') {
564+
let key = self.key_obfuscator.deobfuscate(stored_key)?;
565+
if key == VSS_SCHEMA_VERSION_KEY {
566+
return Ok(None);
567+
}
568+
return Ok(Some(("".to_string(), "".to_string(), key)));
569+
}
570+
571+
let mut parts = stored_key.splitn(3, '#');
572+
let primary_namespace = parts
573+
.next()
574+
.ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?;
575+
let secondary_namespace = parts
576+
.next()
577+
.ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?;
578+
let obfuscated_key = parts
579+
.next()
580+
.ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?;
581+
582+
let key = self.key_obfuscator.deobfuscate(obfuscated_key)?;
583+
if key == VSS_SCHEMA_VERSION_KEY {
584+
return Ok(None);
585+
}
586+
587+
Ok(Some((primary_namespace.to_string(), secondary_namespace.to_string(), key)))
588+
},
589+
VssSchemaVersion::V1 => {
590+
let mut parts = stored_key.splitn(2, '#');
591+
let obfuscated_prefix = parts
592+
.next()
593+
.ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?;
594+
let obfuscated_key = parts
595+
.next()
596+
.ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key format"))?;
597+
598+
let prefix = self.key_obfuscator.deobfuscate(obfuscated_prefix)?;
599+
let key = self.key_obfuscator.deobfuscate(obfuscated_key)?;
600+
601+
if key == VSS_SCHEMA_VERSION_KEY {
602+
return Ok(None);
603+
}
604+
605+
let mut prefix_parts = prefix.splitn(2, '#');
606+
let primary_namespace = prefix_parts
607+
.next()
608+
.ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key prefix"))?;
609+
let secondary_namespace = prefix_parts
610+
.next()
611+
.ok_or_else(|| Error::new(ErrorKind::InvalidData, "Invalid VSS key prefix"))?;
612+
613+
Ok(Some((primary_namespace.to_string(), secondary_namespace.to_string(), key)))
614+
},
615+
}
616+
}
617+
510618
async fn read_internal(
511619
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
512620
secondary_namespace: String, key: String,

src/types.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use lightning::routing::gossip;
3131
use lightning::routing::router::DefaultRouter;
3232
use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters};
3333
use lightning::sign::InMemorySigner;
34-
use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersisterAsync};
34+
use lightning::util::persist::{
35+
KVStore, KVStoreSync, MigratableKVStore, MonitorUpdatingPersisterAsync,
36+
};
3537
use lightning::util::ser::{Readable, Writeable, Writer};
3638
use lightning::util::sweep::OutputSweeper;
3739
use lightning_block_sync::gossip::GossipVerifier;
@@ -85,6 +87,16 @@ pub trait DynStoreTrait: Send + Sync {
8587
fn list(
8688
&self, primary_namespace: &str, secondary_namespace: &str,
8789
) -> Result<Vec<String>, bitcoin::io::Error>;
90+
91+
/// Returns all known keys as `(primary_namespace, secondary_namespace, key)` tuples.
92+
///
93+
/// As with [`lightning::util::persist::MigratableKVStore::list_all_keys`],
94+
/// implementations must exhaustively return all entries known to the store so
95+
/// migration and restoration do not miss data.
96+
///
97+
/// Implementations that do not support exhaustive enumeration may return an
98+
/// error with [`bitcoin::io::ErrorKind::Other`].
99+
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, bitcoin::io::Error>;
88100
}
89101

90102
impl<'a> KVStore for dyn DynStoreTrait + 'a {
@@ -174,9 +186,13 @@ impl KVStore for DynStoreRef {
174186
}
175187
}
176188

177-
pub(crate) struct DynStoreWrapper<T: SyncAndAsyncKVStore + Send + Sync>(pub(crate) T);
189+
pub(crate) struct DynStoreWrapper<T: SyncAndAsyncKVStore + MigratableKVStore + Send + Sync>(
190+
pub(crate) T,
191+
);
178192

179-
impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T> {
193+
impl<T: SyncAndAsyncKVStore + MigratableKVStore + Send + Sync> DynStoreTrait
194+
for DynStoreWrapper<T>
195+
{
180196
fn read_async(
181197
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
182198
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, bitcoin::io::Error>> + Send + 'static>> {
@@ -224,6 +240,10 @@ impl<T: SyncAndAsyncKVStore + Send + Sync> DynStoreTrait for DynStoreWrapper<T>
224240
) -> Result<Vec<String>, bitcoin::io::Error> {
225241
KVStoreSync::list(&self.0, primary_namespace, secondary_namespace)
226242
}
243+
244+
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, bitcoin::io::Error> {
245+
MigratableKVStore::list_all_keys(&self.0)
246+
}
227247
}
228248

229249
pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync<

tests/common/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use ldk_node::{
4242
use lightning::io;
4343
use lightning::ln::msgs::SocketAddress;
4444
use lightning::routing::gossip::NodeAlias;
45-
use lightning::util::persist::{KVStore, KVStoreSync};
45+
use lightning::util::persist::{KVStore, KVStoreSync, MigratableKVStore};
4646
use lightning::util::test_utils::TestStore;
4747
use lightning_invoice::{Bolt11InvoiceDescription, Description};
4848
use lightning_persister::fs_store::v1::FilesystemStore;
@@ -1616,6 +1616,12 @@ impl KVStoreSync for TestSyncStore {
16161616
}
16171617
}
16181618

1619+
impl MigratableKVStore for TestSyncStore {
1620+
fn list_all_keys(&self) -> lightning::io::Result<Vec<(String, String, String)>> {
1621+
self.inner.list_all_keys_internal()
1622+
}
1623+
}
1624+
16191625
struct TestSyncStoreInner {
16201626
serializer: RwLock<()>,
16211627
test_store: TestStore,
@@ -1789,4 +1795,19 @@ impl TestSyncStoreInner {
17891795
let _guard = self.serializer.read().unwrap();
17901796
self.do_list(primary_namespace, secondary_namespace)
17911797
}
1798+
1799+
fn list_all_keys_internal(&self) -> lightning::io::Result<Vec<(String, String, String)>> {
1800+
let _guard = self.serializer.read().unwrap();
1801+
1802+
let mut fs_keys = MigratableKVStore::list_all_keys(&self.fs_store)?;
1803+
fs_keys.sort();
1804+
1805+
let mut sqlite_keys = MigratableKVStore::list_all_keys(&self.sqlite_store)?;
1806+
sqlite_keys.sort();
1807+
assert_eq!(fs_keys, sqlite_keys);
1808+
1809+
// TODO(enigbe): Upstream MigratableKVStore implementation for TestStore
1810+
1811+
Ok(fs_keys)
1812+
}
17921813
}

0 commit comments

Comments
 (0)