Skip to content

Commit f47860f

Browse files
committed
Implement migratable database stores
Add migratable key listing for SQLite and PostgreSQL stores so benchmark fixtures can copy persisted node state between database backends. AI-assisted-by: OpenAI Codex
1 parent 9005b0e commit f47860f

2 files changed

Lines changed: 133 additions & 2 deletions

File tree

src/io/postgres_store/mod.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use std::sync::atomic::{AtomicU64, Ordering};
1212
use std::sync::{Arc, Mutex};
1313

1414
use lightning::io;
15-
use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore, PaginatedListResponse};
15+
use lightning::util::persist::{
16+
KVStore, MigratableKVStore, PageToken, PaginatedKVStore, PaginatedListResponse,
17+
};
1618
use lightning_types::string::PrintableString;
1719
use native_tls::TlsConnector;
1820
use postgres_native_tls::MakeTlsConnector;
@@ -354,6 +356,19 @@ impl PaginatedKVStore for PostgresStore {
354356
}
355357
}
356358

359+
impl MigratableKVStore for PostgresStore {
360+
fn list_all_keys(
361+
&self,
362+
) -> impl Future<Output = Result<Vec<(String, String, String)>, io::Error>> + 'static + Send {
363+
let inner = Arc::clone(&self.inner);
364+
let runtime = self.internal_runtime();
365+
async move {
366+
run_on_internal_runtime(runtime, async move { inner.list_all_keys_internal().await })
367+
.await
368+
}
369+
}
370+
}
371+
357372
struct PostgresStoreInner {
358373
pool: SmallPool,
359374
config: Config,
@@ -728,6 +743,25 @@ impl PostgresStoreInner {
728743
Ok(keys)
729744
}
730745

746+
async fn list_all_keys_internal(&self) -> io::Result<Vec<(String, String, String)>> {
747+
let sql = format!(
748+
"SELECT primary_namespace, secondary_namespace, key FROM {}",
749+
self.kv_table_name_sql
750+
);
751+
752+
let err_map = |e: PgError| {
753+
let msg = format!("Failed to retrieve queried rows: {e}");
754+
io::Error::new(io::ErrorKind::Other, msg)
755+
};
756+
757+
let mut locked = self.locked_client().await?;
758+
let rows = query_with_retry!(self, locked, err_map, locked.query(sql.as_str(), &[]))?;
759+
760+
let keys: Vec<(String, String, String)> =
761+
rows.iter().map(|row| (row.get(0), row.get(1), row.get(2))).collect();
762+
Ok(keys)
763+
}
764+
731765
async fn list_paginated_internal(
732766
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
733767
) -> io::Result<PaginatedListResponse> {
@@ -908,6 +942,29 @@ mod tests {
908942
cleanup_store(&store_1).await;
909943
}
910944

945+
#[tokio::test(flavor = "multi_thread")]
946+
async fn test_postgres_store_list_all_keys() {
947+
let store = create_test_store("test_pg_list_all_keys").await;
948+
949+
KVStore::write(&store, "ns_a", "sub_a", "key_a", vec![1u8]).await.unwrap();
950+
KVStore::write(&store, "ns_a", "sub_b", "key_b", vec![2u8]).await.unwrap();
951+
KVStore::write(&store, "ns_b", "", "key_c", vec![3u8]).await.unwrap();
952+
953+
let mut keys = MigratableKVStore::list_all_keys(&store).await.unwrap();
954+
keys.sort();
955+
956+
assert_eq!(
957+
keys,
958+
vec![
959+
("ns_a".to_string(), "sub_a".to_string(), "key_a".to_string()),
960+
("ns_a".to_string(), "sub_b".to_string(), "key_b".to_string()),
961+
("ns_b".to_string(), "".to_string(), "key_c".to_string()),
962+
]
963+
);
964+
965+
cleanup_store(&store).await;
966+
}
967+
911968
async fn kill_connection(store: &PostgresStore) {
912969
// Terminate every backend in the pool so the next op deterministically
913970
// hits a closed connection regardless of which slot `get` selects.

src/io/sqlite_store/mod.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
1414
use std::sync::{Arc, Mutex};
1515

1616
use lightning::io;
17-
use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore, PaginatedListResponse};
17+
use lightning::util::persist::{
18+
KVStore, MigratableKVStore, PageToken, PaginatedKVStore, PaginatedListResponse,
19+
};
1820
use lightning_types::string::PrintableString;
1921
use rusqlite::{named_params, Connection};
2022

@@ -202,6 +204,21 @@ impl PaginatedKVStore for SqliteStore {
202204
}
203205
}
204206

207+
impl MigratableKVStore for SqliteStore {
208+
fn list_all_keys(
209+
&self,
210+
) -> impl Future<Output = Result<Vec<(String, String, String)>, io::Error>> + 'static + Send {
211+
let inner = Arc::clone(&self.inner);
212+
let fut = tokio::task::spawn_blocking(move || inner.list_all_keys_internal());
213+
async move {
214+
fut.await.unwrap_or_else(|e| {
215+
let msg = format!("Failed to IO operation due join error: {}", e);
216+
Err(io::Error::new(io::ErrorKind::Other, msg))
217+
})
218+
}
219+
}
220+
}
221+
205222
struct SqliteStoreInner {
206223
connection: Arc<Mutex<Connection>>,
207224
data_dir: PathBuf,
@@ -486,6 +503,35 @@ impl SqliteStoreInner {
486503
Ok(keys)
487504
}
488505

506+
fn list_all_keys_internal(&self) -> io::Result<Vec<(String, String, String)>> {
507+
let locked_conn = self.connection.lock().expect("lock");
508+
509+
let sql = format!(
510+
"SELECT primary_namespace, secondary_namespace, key FROM {}",
511+
self.kv_table_name
512+
);
513+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
514+
let msg = format!("Failed to prepare statement: {}", e);
515+
io::Error::new(io::ErrorKind::Other, msg)
516+
})?;
517+
518+
let mut keys = Vec::new();
519+
let rows_iter =
520+
stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?))).map_err(|e| {
521+
let msg = format!("Failed to retrieve queried rows: {}", e);
522+
io::Error::new(io::ErrorKind::Other, msg)
523+
})?;
524+
525+
for key in rows_iter {
526+
keys.push(key.map_err(|e| {
527+
let msg = format!("Failed to retrieve queried rows: {}", e);
528+
io::Error::new(io::ErrorKind::Other, msg)
529+
})?);
530+
}
531+
532+
Ok(keys)
533+
}
534+
489535
fn list_paginated_internal(
490536
&self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>,
491537
) -> io::Result<PaginatedListResponse> {
@@ -679,6 +725,34 @@ mod tests {
679725
do_test_store(&store_0, &store_1)
680726
}
681727

728+
#[tokio::test]
729+
async fn test_sqlite_store_list_all_keys() {
730+
let mut temp_path = random_storage_path();
731+
temp_path.push("test_sqlite_store_list_all_keys");
732+
let store = SqliteStore::new(
733+
temp_path,
734+
Some("test_db".to_string()),
735+
Some("test_table".to_string()),
736+
)
737+
.unwrap();
738+
739+
KVStore::write(&store, "ns_a", "sub_a", "key_a", vec![1u8]).await.unwrap();
740+
KVStore::write(&store, "ns_a", "sub_b", "key_b", vec![2u8]).await.unwrap();
741+
KVStore::write(&store, "ns_b", "", "key_c", vec![3u8]).await.unwrap();
742+
743+
let mut keys = MigratableKVStore::list_all_keys(&store).await.unwrap();
744+
keys.sort();
745+
746+
assert_eq!(
747+
keys,
748+
vec![
749+
("ns_a".to_string(), "sub_a".to_string(), "key_a".to_string()),
750+
("ns_a".to_string(), "sub_b".to_string(), "key_b".to_string()),
751+
("ns_b".to_string(), "".to_string(), "key_c".to_string()),
752+
]
753+
);
754+
}
755+
682756
#[tokio::test]
683757
async fn test_sqlite_store_paginated_listing() {
684758
let mut temp_path = random_storage_path();

0 commit comments

Comments
 (0)