Skip to content

Commit 17d423c

Browse files
committed
Implement migratable database stores
Add migratable key listing for SQLite and PostgreSQL stores so benchmark fixtures can copy persisted node state between database backends. AI-assisted-by: OpenAI Codex
1 parent 21b9896 commit 17d423c

2 files changed

Lines changed: 133 additions & 2 deletions

File tree

src/io/postgres_store/mod.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use std::sync::atomic::{AtomicU64, Ordering};
1212
use std::sync::{Arc, Mutex};
1313

1414
use lightning::io;
15-
use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore, PaginatedListResponse};
15+
use lightning::util::persist::{
16+
KVStore, MigratableKVStore, PageToken, PaginatedKVStore, PaginatedListResponse,
17+
};
1618
use lightning_types::string::PrintableString;
1719
use native_tls::TlsConnector;
1820
use postgres_native_tls::MakeTlsConnector;
@@ -339,6 +341,19 @@ impl PaginatedKVStore for PostgresStore {
339341
}
340342
}
341343

344+
impl MigratableKVStore for PostgresStore {
345+
fn list_all_keys(
346+
&self,
347+
) -> impl Future<Output = Result<Vec<(String, String, String)>, io::Error>> + 'static + Send {
348+
let inner = Arc::clone(&self.inner);
349+
let runtime = self.internal_runtime();
350+
async move {
351+
run_on_internal_runtime(runtime, async move { inner.list_all_keys_internal().await })
352+
.await
353+
}
354+
}
355+
}
356+
342357
struct PostgresStoreInner {
343358
pool: SmallPool,
344359
config: Config,
@@ -713,6 +728,25 @@ impl PostgresStoreInner {
713728
Ok(keys)
714729
}
715730

731+
async fn list_all_keys_internal(&self) -> io::Result<Vec<(String, String, String)>> {
732+
let sql = format!(
733+
"SELECT primary_namespace, secondary_namespace, key FROM {}",
734+
self.kv_table_name_sql
735+
);
736+
737+
let err_map = |e: PgError| {
738+
let msg = format!("Failed to retrieve queried rows: {e}");
739+
io::Error::new(io::ErrorKind::Other, msg)
740+
};
741+
742+
let mut locked = self.locked_client().await?;
743+
let rows = query_with_retry!(self, locked, err_map, locked.query(sql.as_str(), &[]))?;
744+
745+
let keys: Vec<(String, String, String)> =
746+
rows.iter().map(|row| (row.get(0), row.get(1), row.get(2))).collect();
747+
Ok(keys)
748+
}
749+
716750
async fn list_paginated_internal(
717751
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
718752
) -> io::Result<PaginatedListResponse> {
@@ -893,6 +927,29 @@ mod tests {
893927
cleanup_store(&store_1).await;
894928
}
895929

930+
#[tokio::test(flavor = "multi_thread")]
931+
async fn test_postgres_store_list_all_keys() {
932+
let store = create_test_store("test_pg_list_all_keys").await;
933+
934+
KVStore::write(&store, "ns_a", "sub_a", "key_a", vec![1u8]).await.unwrap();
935+
KVStore::write(&store, "ns_a", "sub_b", "key_b", vec![2u8]).await.unwrap();
936+
KVStore::write(&store, "ns_b", "", "key_c", vec![3u8]).await.unwrap();
937+
938+
let mut keys = MigratableKVStore::list_all_keys(&store).await.unwrap();
939+
keys.sort();
940+
941+
assert_eq!(
942+
keys,
943+
vec![
944+
("ns_a".to_string(), "sub_a".to_string(), "key_a".to_string()),
945+
("ns_a".to_string(), "sub_b".to_string(), "key_b".to_string()),
946+
("ns_b".to_string(), "".to_string(), "key_c".to_string()),
947+
]
948+
);
949+
950+
cleanup_store(&store).await;
951+
}
952+
896953
async fn kill_connection(store: &PostgresStore) {
897954
// Terminate every backend in the pool so the next op deterministically
898955
// hits a closed connection regardless of which slot `get` selects.

src/io/sqlite_store/mod.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
1414
use std::sync::{Arc, Mutex};
1515

1616
use lightning::io;
17-
use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore, PaginatedListResponse};
17+
use lightning::util::persist::{
18+
KVStore, MigratableKVStore, PageToken, PaginatedKVStore, PaginatedListResponse,
19+
};
1820
use lightning_types::string::PrintableString;
1921
use rusqlite::{named_params, Connection};
2022

@@ -202,6 +204,21 @@ impl PaginatedKVStore for SqliteStore {
202204
}
203205
}
204206

207+
impl MigratableKVStore for SqliteStore {
208+
fn list_all_keys(
209+
&self,
210+
) -> impl Future<Output = Result<Vec<(String, String, String)>, io::Error>> + 'static + Send {
211+
let inner = Arc::clone(&self.inner);
212+
let fut = tokio::task::spawn_blocking(move || inner.list_all_keys_internal());
213+
async move {
214+
fut.await.unwrap_or_else(|e| {
215+
let msg = format!("Failed to IO operation due join error: {}", e);
216+
Err(io::Error::new(io::ErrorKind::Other, msg))
217+
})
218+
}
219+
}
220+
}
221+
205222
struct SqliteStoreInner {
206223
connection: Arc<Mutex<Connection>>,
207224
data_dir: PathBuf,
@@ -486,6 +503,35 @@ impl SqliteStoreInner {
486503
Ok(keys)
487504
}
488505

506+
fn list_all_keys_internal(&self) -> io::Result<Vec<(String, String, String)>> {
507+
let locked_conn = self.connection.lock().expect("lock");
508+
509+
let sql = format!(
510+
"SELECT primary_namespace, secondary_namespace, key FROM {}",
511+
self.kv_table_name
512+
);
513+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
514+
let msg = format!("Failed to prepare statement: {}", e);
515+
io::Error::new(io::ErrorKind::Other, msg)
516+
})?;
517+
518+
let mut keys = Vec::new();
519+
let rows_iter =
520+
stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?))).map_err(|e| {
521+
let msg = format!("Failed to retrieve queried rows: {}", e);
522+
io::Error::new(io::ErrorKind::Other, msg)
523+
})?;
524+
525+
for key in rows_iter {
526+
keys.push(key.map_err(|e| {
527+
let msg = format!("Failed to retrieve queried rows: {}", e);
528+
io::Error::new(io::ErrorKind::Other, msg)
529+
})?);
530+
}
531+
532+
Ok(keys)
533+
}
534+
489535
fn list_paginated_internal(
490536
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
491537
) -> io::Result<PaginatedListResponse> {
@@ -679,6 +725,34 @@ mod tests {
679725
do_test_store(&store_0, &store_1)
680726
}
681727

728+
#[tokio::test]
729+
async fn test_sqlite_store_list_all_keys() {
730+
let mut temp_path = random_storage_path();
731+
temp_path.push("test_sqlite_store_list_all_keys");
732+
let store = SqliteStore::new(
733+
temp_path,
734+
Some("test_db".to_string()),
735+
Some("test_table".to_string()),
736+
)
737+
.unwrap();
738+
739+
KVStore::write(&store, "ns_a", "sub_a", "key_a", vec![1u8]).await.unwrap();
740+
KVStore::write(&store, "ns_a", "sub_b", "key_b", vec![2u8]).await.unwrap();
741+
KVStore::write(&store, "ns_b", "", "key_c", vec![3u8]).await.unwrap();
742+
743+
let mut keys = MigratableKVStore::list_all_keys(&store).await.unwrap();
744+
keys.sort();
745+
746+
assert_eq!(
747+
keys,
748+
vec![
749+
("ns_a".to_string(), "sub_a".to_string(), "key_a".to_string()),
750+
("ns_a".to_string(), "sub_b".to_string(), "key_b".to_string()),
751+
("ns_b".to_string(), "".to_string(), "key_c".to_string()),
752+
]
753+
);
754+
}
755+
682756
#[tokio::test]
683757
async fn test_sqlite_store_paginated_listing() {
684758
let mut temp_path = random_storage_path();

0 commit comments

Comments
 (0)