diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index cd3980b9af..af822a3d38 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -11,6 +11,20 @@ jobs: runs-on: ubuntu-latest env: TOOLCHAIN: stable + services: + postgres: + image: postgres:latest + ports: + - 5432:5432 + env: + POSTGRES_DB: postgres + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - name: Checkout source code uses: actions/checkout@v6 @@ -42,5 +56,7 @@ jobs: echo "BITCOIND_EXE=$( pwd )/bin/bitcoind-${{ runner.os }}-${{ runner.arch }}" >> "$GITHUB_ENV" echo "ELECTRS_EXE=$( pwd )/bin/electrs-${{ runner.os }}-${{ runner.arch }}" >> "$GITHUB_ENV" - name: Run benchmarks + env: + TEST_POSTGRES_URL: "host=localhost user=postgres password=postgres" run: | - RUSTFLAGS="--cfg tokio_unstable" cargo bench + RUSTFLAGS="--cfg tokio_unstable" cargo test --benches --features "bench postgres" diff --git a/.gitignore b/.gitignore index 3d24bbceb5..81fe07af9a 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk +# Env file used in benches +.env + # Ignore generated swift related files .swiftpm/ LDKNodeFFI.* diff --git a/Cargo.toml b/Cargo.toml index bed984f071..a14f244926 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ panic = 'abort' # Abort on panic [features] default = [] +bench = [] postgres = ["dep:tokio-postgres", "dep:native-tls", "dep:postgres-native-tls"] [dependencies] @@ -96,6 +97,7 @@ rand = { version = "0.9.2", default-features = false, features = ["std", "thread proptest = "1.0.0" regex = "1.5.6" criterion = { version = "0.7.0", features = ["async_tokio"] } +dotenvy = "0.15" ldk-node-062 = { package = "ldk-node", version = "=0.6.2" } ldk-node-070 = { package = "ldk-node", version = "=0.7.0" } @@ -140,6 +142,15 @@ check-cfg = [ name = "payments" harness = false +[[bench]] +name = "operations" +harness = false + +[[bench]] +name = "database" +harness = false +required-features = ["bench"] + #[patch.crates-io] #lightning = { path = "../rust-lightning/lightning" } #lightning-types = { path = "../rust-lightning/lightning-types" } diff --git a/benches/database.rs b/benches/database.rs new file mode 100644 index 0000000000..11bf4e2ec7 --- /dev/null +++ b/benches/database.rs @@ -0,0 +1,397 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use ldk_node::bench::{ + configured_backends, payment_details_batch, payment_key, payment_update_batch_from_offset, + pending_payment_details_batch_from_offset, pending_payment_update_batch_from_offset, Backend, + PaginatedStoreFixture, StoreFixture, BATCH_LEN, PAGINATED_PAGE_LEN, +}; + +fn database_benchmark(c: &mut Criterion) { + dotenvy::dotenv().ok(); + let backends = configured_backends(); + + benchmark_payment_store_single_ops(c, &backends); + benchmark_payment_store_warm_sequential(c, &backends); + benchmark_payment_store_concurrent(c, &backends); + benchmark_payment_store(c, &backends); + benchmark_payment_store_paginated(c, &backends); + benchmark_payment_store_lifecycle(c, &backends); + benchmark_pending_payment_store(c, &backends); +} + +fn benchmark_payment_store(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store"); + group.throughput(Throughput::Elements(BATCH_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + let insert_payments = payment_details_batch(0); + group.bench_with_input( + BenchmarkId::new("insert_100_sequential_cold", backend.name()), + &backend, + |b, backend| { + let insert_payments = insert_payments.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let insert_payments = insert_payments.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = StoreFixture::new(backend, "payment_insert").await; + let start = Instant::now(); + fixture.write_payment_batch(insert_payments.clone()).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + + let update_payments = payment_details_batch(0); + let updates = payment_update_batch_from_offset(0); + group.bench_with_input( + BenchmarkId::new("update_100_sequential_cold", backend.name()), + &backend, + |b, backend| { + let update_payments = update_payments.clone(); + let updates = updates.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let update_payments = update_payments.clone(); + let updates = updates.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = StoreFixture::new(backend, "payment_update").await; + fixture.write_payment_batch(update_payments.clone()).await; + let start = Instant::now(); + fixture.write_payment_update_batch(updates.clone()).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + + let reload_payments = payment_details_batch(0); + group.bench_with_input( + BenchmarkId::new("reload_100_cold", backend.name()), + &backend, + |b, backend| { + let reload_payments = reload_payments.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let reload_payments = reload_payments.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = StoreFixture::new(backend, "payment_reload").await; + fixture.write_payment_batch(reload_payments.clone()).await; + let start = Instant::now(); + let payments = fixture.reload_payments().await; + elapsed += start.elapsed(); + // Keep the read result observable in optimized benchmark builds. + std::hint::black_box(payments); + } + elapsed + } + }) + }, + ); + } + group.finish(); +} + +fn benchmark_payment_store_paginated(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_paginated"); + group.throughput(Throughput::Elements(PAGINATED_PAGE_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + let Some(fixture) = runtime.block_on(PaginatedStoreFixture::new( + backend, + "payment_list_page", + Arc::clone(&runtime), + )) else { + continue; + }; + let fixture = Arc::new(fixture); + + let runner = Arc::clone(&runtime); + let first_page_fixture = Arc::clone(&fixture); + group.bench_function(BenchmarkId::new("list_page_from_10k", backend.name()), |b| { + b.to_async(runner.as_ref()).iter_custom(|iters| { + let first_page_fixture = Arc::clone(&first_page_fixture); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + let page_len = first_page_fixture.list_first_page().await; + elapsed += start.elapsed(); + debug_assert_eq!(page_len, PAGINATED_PAGE_LEN as usize); + // Keep the page result observable when debug assertions are compiled out. + std::hint::black_box(page_len); + } + elapsed + } + }) + }); + let runner = Arc::clone(&runtime); + let second_page_fixture = Arc::clone(&fixture); + group.bench_function(BenchmarkId::new("list_second_page_from_10k", backend.name()), |b| { + b.to_async(runner.as_ref()).iter_custom(|iters| { + let second_page_fixture = Arc::clone(&second_page_fixture); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + let page_len = second_page_fixture.list_second_page().await; + elapsed += start.elapsed(); + debug_assert_eq!(page_len, PAGINATED_PAGE_LEN as usize); + // Keep the page result observable when debug assertions are compiled out. + std::hint::black_box(page_len); + } + elapsed + } + }) + }); + } + group.finish(); +} + +fn benchmark_pending_payment_store(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/pending_payment_store"); + group.throughput(Throughput::Elements(BATCH_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + let insert_payments = pending_payment_details_batch_from_offset(0); + group.bench_with_input( + BenchmarkId::new("insert_100_sequential_cold", backend.name()), + &backend, + |b, backend| { + let insert_payments = insert_payments.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let insert_payments = insert_payments.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = + StoreFixture::new(backend, "pending_payment_insert").await; + let start = Instant::now(); + fixture.write_pending_payment_batch(insert_payments.clone()).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + + let update_payments = pending_payment_details_batch_from_offset(0); + let updates = pending_payment_update_batch_from_offset(0); + group.bench_with_input( + BenchmarkId::new("update_100_sequential_cold", backend.name()), + &backend, + |b, backend| { + let update_payments = update_payments.clone(); + let updates = updates.clone(); + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + let update_payments = update_payments.clone(); + let updates = updates.clone(); + async move { + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let fixture = + StoreFixture::new(backend, "pending_payment_update").await; + fixture.write_pending_payment_batch(update_payments.clone()).await; + let start = Instant::now(); + fixture.write_pending_payment_update_batch(updates.clone()).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + } + group.finish(); +} + +fn benchmark_payment_store_single_ops(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_single"); + group.throughput(Throughput::Elements(1)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + group.bench_function(BenchmarkId::new("write_new_key", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = StoreFixture::new(backend, "single_write_new_key").await; + let mut elapsed = Duration::ZERO; + for idx in 0..iters { + let start = Instant::now(); + fixture.write_payment(idx).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + + group.bench_function(BenchmarkId::new("write_existing_key", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = StoreFixture::new(backend, "single_update_existing_key").await; + fixture.write_payment(0).await; + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + fixture.write_payment_update(0).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + + group.bench_function(BenchmarkId::new("read_existing_key", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = StoreFixture::new(backend, "single_read_existing_key").await; + fixture.write_payment(0).await; + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + let payment = fixture.read_payment(0).await; + elapsed += start.elapsed(); + // Keep the read result observable in optimized benchmark builds. + std::hint::black_box(payment); + } + elapsed + }) + }); + + group.bench_with_input( + BenchmarkId::new("remove_existing_key", backend.name()), + &backend, + |b, backend| { + b.to_async(runtime.as_ref()).iter_custom(|iters| { + let backend = *backend; + async move { + let mut elapsed = Duration::ZERO; + for idx in 0..iters { + let fixture = + StoreFixture::new(backend, "single_remove_existing_key").await; + fixture.write_payment(idx).await; + let key = payment_key(idx); + let start = Instant::now(); + fixture.remove_payment_key(&key).await; + elapsed += start.elapsed(); + } + elapsed + } + }) + }, + ); + } + group.finish(); +} + +fn benchmark_payment_store_warm_sequential(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_warm"); + group.throughput(Throughput::Elements(BATCH_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + group.bench_function(BenchmarkId::new("insert_100_sequential", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = + StoreFixture::new(backend, "payment_insert_100_sequential_warm").await; + let mut elapsed = Duration::ZERO; + for iter in 0..iters { + let offset = iter * BATCH_LEN; + let start = Instant::now(); + fixture.write_payment_batch_from_offset(offset).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + } + group.finish(); +} + +fn benchmark_payment_store_concurrent(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_concurrent"); + group.throughput(Throughput::Elements(BATCH_LEN)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + group.bench_function(BenchmarkId::new("insert_100_distinct_keys", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = + StoreFixture::new(backend, "payment_insert_100_concurrent_distinct").await; + let mut elapsed = Duration::ZERO; + for iter in 0..iters { + let offset = iter * BATCH_LEN; + let start = Instant::now(); + fixture.write_payment_batch_concurrent(offset, false).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + + group.bench_function(BenchmarkId::new("insert_100_same_key", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = + StoreFixture::new(backend, "payment_insert_100_concurrent_same_key").await; + let mut elapsed = Duration::ZERO; + for _ in 0..iters { + let start = Instant::now(); + fixture.write_payment_batch_concurrent(0, true).await; + elapsed += start.elapsed(); + } + elapsed + }) + }); + } + group.finish(); +} + +fn benchmark_payment_store_lifecycle(c: &mut Criterion, backends: &[Backend]) { + let mut group = c.benchmark_group("database/payment_store_lifecycle"); + group.throughput(Throughput::Elements(1)); + let runtime = benchmark_runtime(); + for backend in backends.iter().copied() { + group.bench_function(BenchmarkId::new("insert_update_read", backend.name()), |b| { + b.to_async(runtime.as_ref()).iter_custom(|iters| async move { + let fixture = StoreFixture::new(backend, "payment_lifecycle").await; + let mut elapsed = Duration::ZERO; + for idx in 0..iters { + let start = Instant::now(); + let payment = fixture.insert_update_read_payment(idx).await; + elapsed += start.elapsed(); + // Keep the read result observable in optimized benchmark builds. + std::hint::black_box(payment); + } + elapsed + }) + }); + } + group.finish(); +} + +fn benchmark_runtime() -> Arc { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(2).enable_all(); + #[cfg(tokio_unstable)] + builder.enable_eager_driver_handoff(); + Arc::new(builder.build().unwrap()) +} + +criterion_group!(benches, database_benchmark); +criterion_main!(benches); diff --git a/benches/operations.rs b/benches/operations.rs new file mode 100644 index 0000000000..a683d693cd --- /dev/null +++ b/benches/operations.rs @@ -0,0 +1,704 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +#[path = "../tests/common/mod.rs"] +mod common; + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use bitcoin::secp256k1::PublicKey; +use bitcoin::Amount; +use common::{ + expect_channel_pending_event, expect_channel_ready_event, expect_event, + generate_blocks_and_wait, premine_and_distribute_funds, random_config, random_storage_path, + setup_bitcoind_and_electrsd, setup_node, setup_two_nodes_with_store, +}; +use criterion::{criterion_group, criterion_main, Criterion}; +use electrsd::corepc_node::{Client as BitcoindClient, Node as BitcoinD}; +use ldk_node::io::sqlite_store::SqliteStore; +use ldk_node::{Event, Node}; +use lightning::ln::channelmanager::PaymentId; +use lightning::util::persist::migrate_kv_store_data_async; +use lightning_invoice::{Bolt11InvoiceDescription, Description}; +use lightning_persister::fs_store::v2::FilesystemStoreV2; + +use crate::common::{open_channel_push_amt, TestChainSource, TestConfig, TestStoreType}; + +#[cfg(feature = "postgres")] +use ldk_node::io::postgres_store::{PostgresStore, POSTGRES_TEST_URL_ENV_VAR}; + +const STARTUP_SEED_SCENARIOS: [StartupSeedScenario; 6] = [ + StartupSeedScenario { channel_count: 1, payment_count: 2 }, + StartupSeedScenario { channel_count: 1, payment_count: 100 }, + StartupSeedScenario { channel_count: 1, payment_count: 1_000 }, + StartupSeedScenario { channel_count: 10, payment_count: 2 }, + StartupSeedScenario { channel_count: 100, payment_count: 2 }, + StartupSeedScenario { channel_count: 100, payment_count: 1_000 }, +]; +const STARTUP_SEED_PAYMENT_AMOUNT_MSAT: u64 = 1_000_000; +const STARTUP_SEED_MIN_CHANNEL_FUNDING_SAT: u64 = 100_000; +const STARTUP_SEED_CHANNEL_BUFFER_SAT: u64 = 1_000_000; +const STARTUP_SEED_CHANNEL_BATCH_SIZE: u64 = 2; + +#[derive(Clone, Copy)] +struct StartupSeedScenario { + channel_count: u64, + payment_count: u64, +} + +impl StartupSeedScenario { + fn bench_name(self, store_name: &str) -> String { + format!("{}/channels_{}_payments_{}", store_name, self.channel_count, self.payment_count) + } + + fn runs_in_ci(self) -> bool { + self.channel_count == 1 && self.payment_count == 2 + } + + fn channel_funding_sat(self) -> u64 { + let payment_amount_sat = STARTUP_SEED_PAYMENT_AMOUNT_MSAT / 1_000; + let payment_funding_sat = + self.payment_count * payment_amount_sat + STARTUP_SEED_CHANNEL_BUFFER_SAT; + payment_funding_sat.max(STARTUP_SEED_MIN_CHANNEL_FUNDING_SAT) + } + + fn premine_amount_sat(self) -> u64 { + self.channel_count * self.channel_funding_sat() + STARTUP_SEED_CHANNEL_BUFFER_SAT + } +} + +#[derive(Clone, Copy)] +struct StoreBenchConfig { + name: &'static str, + store_type: TestStoreType, +} + +fn operations_benchmark(c: &mut Criterion) { + dotenvy::dotenv().ok(); + + forwarding_benchmark(c); + channel_open_benchmark(c); + startup_benchmark(c); +} + +fn forwarding_benchmark(c: &mut Criterion) { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); + let runtime = benchmark_runtime(); + + let mut group = c.benchmark_group("forwarding"); + group.sample_size(10); + + for store_config in store_bench_configs() { + if !should_register_bench("forwarding", store_config.name) { + continue; + } + let nodes = setup_forwarding_nodes( + &chain_source, + &bitcoind, + &electrsd, + store_config.store_type, + &runtime, + ); + let nodes = Arc::new(nodes); + + group.bench_function(store_config.name, |b| { + b.to_async(&runtime).iter_custom(|iter| { + let nodes = Arc::clone(&nodes); + + async move { + let mut total = Duration::ZERO; + for _ in 0..iter { + total += send_forwarded_payments(Arc::clone(&nodes)).await; + } + total + } + }); + }); + } +} + +fn benchmark_runtime() -> tokio::runtime::Runtime { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(4).enable_all(); + #[cfg(tokio_unstable)] + builder.enable_eager_driver_handoff(); + builder.build().unwrap() +} + +fn channel_open_benchmark(c: &mut Criterion) { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); + let runtime = benchmark_runtime(); + + let mut group = c.benchmark_group("channel_open"); + group.sample_size(10); + + for store_config in store_bench_configs() { + if !should_register_bench("channel_open", store_config.name) { + continue; + } + let (node_a, node_b) = + setup_two_nodes_with_store(&chain_source, false, true, false, store_config.store_type); + let node_a = Arc::new(node_a); + let node_b = Arc::new(node_b); + + // connect nodes + node_a + .connect( + node_b.node_id(), + node_b.listening_addresses().unwrap().first().unwrap().clone(), + true, + ) + .unwrap(); + + runtime.block_on(async { + let address_a = node_a.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a], + Amount::from_sat(35_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + }); + + let node_a = Arc::clone(&node_a); + let node_b = Arc::clone(&node_b); + let bitcoind_client = &bitcoind.client; + let electrsd_ref = &electrsd; + + group.bench_function(store_config.name, |b| { + b.to_async(&runtime).iter_custom(|iter| { + let node_a = Arc::clone(&node_a); + let node_b = Arc::clone(&node_b); + + async move { + let mut total = Duration::ZERO; + for _ in 0..iter { + total += open_channel( + Arc::clone(&node_a), + Arc::clone(&node_b), + bitcoind_client, + electrsd_ref, + ) + .await; + } + total + } + }); + }); + } +} + +fn startup_benchmark(c: &mut Criterion) { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); + let runtime = benchmark_runtime(); + + let mut group = c.benchmark_group("startup"); + group.sample_size(10); + + for startup_seed_scenario in STARTUP_SEED_SCENARIOS { + // Larger seeded startup scenarios are useful locally, but take too long to run in CI. + if is_ci() && !startup_seed_scenario.runs_in_ci() { + continue; + } + + let matching_store_configs: Vec<_> = store_bench_configs() + .into_iter() + .filter(|store_config| { + let bench_name = startup_seed_scenario.bench_name(store_config.name); + should_register_bench("startup", &bench_name) + }) + .collect(); + if matching_store_configs.is_empty() { + continue; + } + + // Seed a canonical sqlite node once, then copy its store into each backend under test. This + // keeps the channel/payment history identical across stores while avoiding repeated expensive + // channel and payment setup for every store backend. + let seeded_config = setup_startup_seed_node( + &chain_source, + &bitcoind, + &electrsd, + startup_seed_scenario, + &runtime, + ); + let startup_configs = migrate_startup_seed_configs( + &seeded_config, + startup_seed_scenario, + matching_store_configs, + &runtime, + ); + + for (bench_name, config) in startup_configs { + group.bench_function(bench_name, |b| { + b.iter_custom(|iter| { + let mut total = Duration::ZERO; + for _ in 0..iter { + let start = Instant::now(); + let node = setup_node(&chain_source, config.clone()); + total += start.elapsed(); + node.stop().unwrap(); + } + total + }); + }); + } + } +} + +/// Builds a canonical sqlite node store with the requested channel and payment history. +/// +/// Startup benchmarks use this store as the source fixture for every backend so differences in +/// measured startup time come from loading equivalent persisted state, not from different setup +/// runs. +fn setup_startup_seed_node( + chain_source: &TestChainSource, bitcoind: &BitcoinD, electrsd: &electrsd::ElectrsD, + seed_scenario: StartupSeedScenario, runtime: &tokio::runtime::Runtime, +) -> TestConfig { + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let node_a = Arc::new(setup_node(chain_source, config_a.clone())); + + let mut config_b = random_config(true); + config_b.store_type = TestStoreType::Sqlite; + let node_b = Arc::new(setup_node(chain_source, config_b)); + + runtime.block_on(async { + let address_a = node_a.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a], + Amount::from_sat(seed_scenario.premine_amount_sat()), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let funding_amount_sat = seed_scenario.channel_funding_sat(); + let mut remaining_channel_count = seed_scenario.channel_count; + while remaining_channel_count > 0 { + let channel_batch_size = remaining_channel_count.min(STARTUP_SEED_CHANNEL_BATCH_SIZE); + for _ in 0..channel_batch_size { + node_a + .open_channel( + node_b.node_id(), + node_b.listening_addresses().unwrap().first().unwrap().clone(), + funding_amount_sat, + None, + None, + ) + .unwrap(); + assert!(node_a.list_peers().iter().any(|peer| peer.node_id == node_b.node_id())); + + let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); + let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); + assert_eq!(funding_txo_a, funding_txo_b); + node_a.sync_wallets().unwrap(); + } + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + for _ in 0..channel_batch_size { + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + wait_for_channel_ready_events(&node_a, node_b.node_id(), 1).await; + wait_for_channel_ready_events(&node_b, node_a.node_id(), 1).await; + } + remaining_channel_count -= channel_batch_size; + } + + for idx in 0..seed_scenario.payment_count { + let invoice_description = Bolt11InvoiceDescription::Direct( + Description::new(format!("startup seed {}", idx + 1)).unwrap(), + ); + let invoice = node_b + .bolt11_payment() + .receive(STARTUP_SEED_PAYMENT_AMOUNT_MSAT, &invoice_description.into(), 9217) + .unwrap(); + let payment_id = node_a.bolt11_payment().send(&invoice, None).unwrap(); + wait_for_payment_success(&node_a, payment_id).await; + } + + drain_events(&node_a); + drain_events(&node_b); + }); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + + config_a +} + +/// Produces benchmark configs backed by copies of the canonical seeded store. +/// +/// Sqlite can reuse the source store directly. Other store backends get a fresh storage path and a +/// migrated copy of the same key-value data. +fn migrate_startup_seed_configs( + source_config: &TestConfig, seed_scenario: StartupSeedScenario, + store_configs: Vec, runtime: &tokio::runtime::Runtime, +) -> Vec<(String, TestConfig)> { + let source_store = + SqliteStore::new(source_config.node_config.storage_dir_path.clone().into(), None, None) + .unwrap(); + + store_configs + .into_iter() + .map(|store_config| { + let mut config = source_config.clone(); + config.store_type = store_config.store_type; + if !matches!(store_config.store_type, TestStoreType::Sqlite) { + config.node_config.storage_dir_path = + random_storage_path().to_str().unwrap().to_owned(); + migrate_startup_seed_store(&source_store, &config, runtime); + } + + (seed_scenario.bench_name(store_config.name), config) + }) + .collect() +} + +fn migrate_startup_seed_store( + source_store: &SqliteStore, destination_config: &TestConfig, runtime: &tokio::runtime::Runtime, +) { + runtime.block_on(async { + match destination_config.store_type { + TestStoreType::Sqlite => {}, + TestStoreType::FilesystemStore => { + let destination_store = FilesystemStoreV2::new( + destination_config.node_config.storage_dir_path.clone().into(), + ) + .unwrap(); + migrate_kv_store_data_async(source_store, &destination_store).await.unwrap(); + }, + #[cfg(feature = "postgres")] + TestStoreType::Postgres => { + let connection_string = postgres_connection_string(); + let table_name = postgres_table_name(destination_config); + let destination_store = + PostgresStore::new(connection_string, None, Some(table_name), None) + .await + .unwrap(); + migrate_kv_store_data_async(source_store, &destination_store).await.unwrap(); + }, + TestStoreType::TestSyncStore => { + unreachable!("startup benches do not use TestSyncStore") + }, + } + }); +} + +#[cfg(feature = "postgres")] +fn postgres_connection_string() -> String { + dotenvy::dotenv().ok(); + std::env::var(POSTGRES_TEST_URL_ENV_VAR) + .unwrap_or_else(|_| "host=localhost user=postgres password=postgres".to_string()) +} + +#[cfg(feature = "postgres")] +fn postgres_table_name(config: &TestConfig) -> String { + format!( + "test_{}", + config + .node_config + .storage_dir_path + .chars() + .filter(|c| c.is_ascii_alphanumeric()) + .collect::() + ) +} + +/// Returns whether the benchmark identified by `group/name` matches the CLI filters. +/// +/// Criterion applies its own filters after benchmark registration, but these benches do expensive +/// setup before registration. Pre-filtering here avoids setting up benchmark cases that cannot run. +/// Only non-flag arguments are considered filters, matching either the full target substring or the +/// group name. +fn should_register_bench(group: &str, name: &str) -> bool { + let target = format!("{}/{}", group, name); + let filters: Vec = + std::env::args().skip(1).filter(|arg| !arg.starts_with('-')).collect(); + filters.is_empty() + || filters.iter().any(|filter| { + target.contains(filter) || (filter == group && target.starts_with(&format!("{group}/"))) + }) +} + +fn is_ci() -> bool { + std::env::var("CI").is_ok_and(|value| !value.is_empty() && value != "0" && value != "false") +} + +fn setup_forwarding_nodes( + chain_source: &TestChainSource, bitcoind: &BitcoinD, electrsd: &electrsd::ElectrsD, + store_type: TestStoreType, runtime: &tokio::runtime::Runtime, +) -> Vec> { + let mut nodes = Vec::new(); + for _ in 0..3 { + let mut config = random_config(true); + config.store_type = store_type; + nodes.push(Arc::new(setup_node(chain_source, config))); + } + + runtime.block_on(async { + let addresses = + nodes.iter().map(|node| node.onchain_payment().new_address().unwrap()).collect(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + addresses, + Amount::from_sat(5_000_000), + ) + .await; + for node in &nodes { + node.sync_wallets().unwrap(); + } + + let funding_amount_sat = 1_000_000; + let push_amount_msat = None; + open_channel_push_amt( + &nodes[0], + &nodes[1], + funding_amount_sat, + push_amount_msat, + true, + electrsd, + ) + .await; + open_channel_push_amt( + &nodes[1], + &nodes[2], + funding_amount_sat, + push_amount_msat, + true, + electrsd, + ) + .await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + for node in &nodes { + node.sync_wallets().unwrap(); + } + + expect_event!(nodes[0], ChannelReady); + expect_event!(nodes[1], ChannelReady); + expect_event!(nodes[1], ChannelReady); + expect_event!(nodes[2], ChannelReady); + + tokio::time::sleep(Duration::from_secs(1)).await; + wait_for_forwarding_path(&nodes).await; + }); + + nodes +} + +async fn send_forwarded_payments(nodes: Arc>>) -> Duration { + let total_payments = 25; + let amount_msat = 5_000; + + let mut total = Duration::ZERO; + + for _ in 0..total_payments { + let invoice_description = + Bolt11InvoiceDescription::Direct(Description::new("forwarding".to_string()).unwrap()); + let invoice = nodes[2] + .bolt11_payment() + .receive(amount_msat, &invoice_description.into(), 9217) + .unwrap(); + + let start = Instant::now(); + let payment_id = nodes[0].bolt11_payment().send(&invoice, None).unwrap(); + total += wait_for_forwarded_payment(&nodes, payment_id, start).await; + } + + // return funds and clean up for next run + let invoice_description = + Bolt11InvoiceDescription::Direct(Description::new("return".to_string()).unwrap()); + let invoice = nodes[0] + .bolt11_payment() + .receive(amount_msat * total_payments, &invoice_description.into(), 9217) + .unwrap(); + if let Ok(return_payment_id) = nodes[2].bolt11_payment().send(&invoice, None) { + wait_for_payment_success(&nodes[2], return_payment_id).await + } + tokio::time::sleep(Duration::from_millis(10)).await; + for node in nodes.iter() { + drain_events(node); + } + + total +} + +async fn wait_for_forwarded_payment( + nodes: &[Arc], expected_payment_id: PaymentId, start: Instant, +) -> Duration { + let mut payment_successful = false; + let mut payment_forwarded = false; + + while !payment_successful || !payment_forwarded { + tokio::select! { + event = nodes[0].next_event_async(), if !payment_successful => { + match event { + Event::PaymentSuccessful { payment_id: Some(payment_id), .. } + if payment_id == expected_payment_id => + { + payment_successful = true; + }, + Event::PaymentFailed { payment_id, payment_hash, .. } => { + nodes[0].event_handled().unwrap(); + panic!("Forwarded payment {payment_id:?} failed with hash {payment_hash:?}"); + }, + _ => {}, + } + nodes[0].event_handled().unwrap(); + }, + event = nodes[1].next_event_async(), if !payment_forwarded => { + if matches!(event, Event::PaymentForwarded { .. }) { + payment_forwarded = true; + } + nodes[1].event_handled().unwrap(); + }, + } + } + + start.elapsed() +} + +/// Sends a payment across the benchmark path before measurements start. +/// +/// Channel readiness events alone do not guarantee that the sender can immediately find and use the +/// intended multi-hop path. Waiting for one successful payment keeps route-discovery first-use cost +/// and transient graph propagation failures out of the timed forwarding loop. +async fn wait_for_forwarding_path(nodes: &[Arc]) { + for _ in 0..30 { + let invoice_description = + Bolt11InvoiceDescription::Direct(Description::new("".to_string()).unwrap()); + let invoice = + nodes[2].bolt11_payment().receive(5_000, &invoice_description.into(), 9217).unwrap(); + if let Ok(payment_id) = nodes[0].bolt11_payment().send(&invoice, None) { + wait_for_payment_success(&nodes[0], payment_id).await; + tokio::time::sleep(Duration::from_millis(50)).await; + for node in nodes { + drain_events(node); + } + return; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + panic!("Timed out waiting for forwarding path readiness"); +} + +async fn open_channel( + node_a: Arc, node_b: Arc, bitcoind: &BitcoindClient, electrsd: &electrsd::ElectrsD, +) -> Duration { + let start = Instant::now(); + + node_a + .open_channel( + node_b.node_id(), + node_b.listening_addresses().unwrap().first().unwrap().clone(), + 100_000, + None, + None, + ) + .unwrap(); + + let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); + let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); + let duration = start.elapsed(); + + assert_eq!(funding_txo_a, funding_txo_b); + common::wait_for_tx(&electrsd.client, funding_txo_a.txid).await; + + generate_blocks_and_wait(bitcoind, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_b, node_a.node_id()); + expect_channel_ready_event!(node_a, node_b.node_id()); + + duration +} + +async fn wait_for_payment_success(node: &Node, expected_payment_id: PaymentId) { + loop { + match node.next_event_async().await { + Event::PaymentSuccessful { payment_id: Some(payment_id), .. } + if payment_id == expected_payment_id => + { + node.event_handled().unwrap(); + break; + }, + Event::PaymentFailed { payment_id, payment_hash, .. } => { + node.event_handled().unwrap(); + panic!("Return payment {:?} failed with hash {:?}", payment_id, payment_hash); + }, + _ => node.event_handled().unwrap(), + } + } +} + +async fn wait_for_channel_ready_events(node: &Node, counterparty_node_id: PublicKey, count: u64) { + let mut remaining_count = count; + while remaining_count > 0 { + let event = tokio::time::timeout( + Duration::from_secs(common::INTEROP_TIMEOUT_SECS), + node.next_event_async(), + ) + .await + .unwrap_or_else(|_| { + panic!("{} timed out waiting for ChannelReady event after 60s", node.node_id()) + }); + + match event { + ref e @ Event::ChannelReady { counterparty_node_id: Some(node_id), .. } + if node_id == counterparty_node_id => + { + println!("{} got event {:?}", node.node_id(), e); + remaining_count -= 1; + }, + ref e @ Event::ChannelReady { .. } => { + panic!("{} got unexpected ChannelReady event: {:?}", node.node_id(), e); + }, + _ => {}, + } + node.event_handled().unwrap(); + } +} + +fn drain_events(node: &Node) { + while node.next_event().is_some() { + node.event_handled().unwrap(); + } +} + +fn store_bench_configs() -> Vec { + #[cfg(not(feature = "postgres"))] + { + vec![ + StoreBenchConfig { name: "sqlite", store_type: TestStoreType::Sqlite }, + StoreBenchConfig { name: "filesystem", store_type: TestStoreType::FilesystemStore }, + ] + } + + #[cfg(feature = "postgres")] + { + vec![ + StoreBenchConfig { name: "sqlite", store_type: TestStoreType::Sqlite }, + StoreBenchConfig { name: "filesystem", store_type: TestStoreType::FilesystemStore }, + StoreBenchConfig { name: "postgres", store_type: TestStoreType::Postgres }, + ] + } +} + +criterion_group!(benches, operations_benchmark); +criterion_main!(benches); diff --git a/benches/payments.rs b/benches/payments.rs index 52769d7949..313a799f48 100644 --- a/benches/payments.rs +++ b/benches/payments.rs @@ -1,3 +1,10 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + #[path = "../tests/common/mod.rs"] mod common; @@ -8,16 +15,23 @@ use bitcoin::hex::DisplayHex; use bitcoin::Amount; use common::{ expect_channel_ready_event, generate_blocks_and_wait, premine_and_distribute_funds, - random_chain_source, setup_bitcoind_and_electrsd, setup_two_nodes_with_store, + setup_bitcoind_and_electrsd, setup_two_nodes_with_store, TestChainSource, }; use criterion::{criterion_group, criterion_main, Criterion}; use ldk_node::{Event, Node}; +use lightning::ln::channelmanager::PaymentId; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use rand::RngCore; use tokio::task::{self}; use crate::common::open_channel_push_amt; +#[derive(Clone, Copy)] +struct StoreBenchConfig { + name: &'static str, + store_type: common::TestStoreType, +} + fn spawn_payment(node_a: Arc, node_b: Arc, amount_msat: u64) { let mut preimage_bytes = [0u8; 32]; rand::rng().fill_bytes(&mut preimage_bytes); @@ -102,7 +116,7 @@ async fn send_payments(node_a: Arc, node_b: Arc) -> std::time::Durat // Send back the money for the next iteration. let mut preimage_bytes = [0u8; 32]; rand::rng().fill_bytes(&mut preimage_bytes); - node_b + let return_payment_id = node_b .spontaneous_payment() .send_with_preimage( amount_msat * total_payments, @@ -112,83 +126,139 @@ async fn send_payments(node_a: Arc, node_b: Arc) -> std::time::Durat ) .ok() .unwrap(); + wait_for_payment_success(&node_b, return_payment_id).await; duration } +async fn wait_for_payment_success(node: &Node, expected_payment_id: PaymentId) { + loop { + match node.next_event_async().await { + Event::PaymentSuccessful { payment_id: Some(payment_id), .. } + if payment_id == expected_payment_id => + { + node.event_handled().unwrap(); + break; + }, + Event::PaymentFailed { payment_id, payment_hash, .. } => { + node.event_handled().unwrap(); + panic!("Return payment {:?} failed with hash {:?}", payment_id, payment_hash); + }, + _ => { + node.event_handled().unwrap(); + }, + } + } +} + fn payment_benchmark(c: &mut Criterion) { + dotenvy::dotenv().ok(); + // Set up two nodes. Because this is slow, we reuse the same nodes for each sample. let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let chain_source = random_chain_source(&bitcoind, &electrsd); - - let (node_a, node_b) = setup_two_nodes_with_store( - &chain_source, - false, - true, - false, - common::TestStoreType::Sqlite, - ); - - let runtime = - tokio::runtime::Builder::new_multi_thread().worker_threads(4).enable_all().build().unwrap(); - - let node_a = Arc::new(node_a); - let node_b = Arc::new(node_b); - - // Fund the nodes and setup a channel between them. The criterion function cannot be async, so we need to execute - // the setup using a runtime. - let node_a_cloned = Arc::clone(&node_a); - let node_b_cloned = Arc::clone(&node_b); - runtime.block_on(async move { - let address_a = node_a_cloned.onchain_payment().new_address().unwrap(); - let premine_sat = 25_000_000; - premine_and_distribute_funds( - &bitcoind.client, - &electrsd.client, - vec![address_a], - Amount::from_sat(premine_sat), - ) - .await; - node_a_cloned.sync_wallets().unwrap(); - node_b_cloned.sync_wallets().unwrap(); - open_channel_push_amt( - &node_a_cloned, - &node_b_cloned, - 16_000_000, - Some(1_000_000_000), - false, - &electrsd, - ) - .await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; - node_a_cloned.sync_wallets().unwrap(); - node_b_cloned.sync_wallets().unwrap(); - expect_channel_ready_event!(node_a_cloned, node_b_cloned.node_id()); - expect_channel_ready_event!(node_b_cloned, node_a_cloned.node_id()); - }); + let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); + + let store_configs = store_bench_configs(); + + let runtime = benchmark_runtime(); let mut group = c.benchmark_group("payments"); group.sample_size(10); - group.bench_function("payments", |b| { - // Use custom timing so that sending back the money at the end of each iteration isn't included in the - // measurement. - b.to_async(&runtime).iter_custom(|iter| { + for store_config in store_configs { + let (node_a, node_b) = + setup_two_nodes_with_store(&chain_source, false, true, false, store_config.store_type); + + let node_a = Arc::new(node_a); + let node_b = Arc::new(node_b); + + // Fund the nodes and setup a channel between them. The criterion function cannot be async, + // so we need to execute the setup using a runtime. + let node_a_cloned = Arc::clone(&node_a); + let node_b_cloned = Arc::clone(&node_b); + runtime.block_on(async { + let address_a = node_a_cloned.onchain_payment().new_address().unwrap(); + let premine_sat = 25_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![address_a], + Amount::from_sat(premine_sat), + ) + .await; + node_a_cloned.sync_wallets().unwrap(); + node_b_cloned.sync_wallets().unwrap(); + open_channel_push_amt( + &node_a_cloned, + &node_b_cloned, + 16_000_000, + Some(1_000_000_000), + false, + &electrsd, + ) + .await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a_cloned.sync_wallets().unwrap(); + node_b_cloned.sync_wallets().unwrap(); + expect_channel_ready_event!(node_a_cloned, node_b_cloned.node_id()); + expect_channel_ready_event!(node_b_cloned, node_a_cloned.node_id()); + }); + + group.bench_function(store_config.name, |b| { + // Use custom timing so that sending back the money at the end of each iteration isn't + // included in the measurement. let node_a = Arc::clone(&node_a); let node_b = Arc::clone(&node_b); + b.to_async(&runtime).iter_custom(|iter| { + let node_a = Arc::clone(&node_a); + let node_b = Arc::clone(&node_b); - async move { - let mut total = Duration::ZERO; - for _i in 0..iter { - let node_a = Arc::clone(&node_a); - let node_b = Arc::clone(&node_b); + async move { + let mut total = Duration::ZERO; + for _i in 0..iter { + let node_a = Arc::clone(&node_a); + let node_b = Arc::clone(&node_b); - total += send_payments(node_a, node_b).await; + total += send_payments(node_a, node_b).await; + } + total } - total - } + }); }); - }); + } +} + +fn benchmark_runtime() -> tokio::runtime::Runtime { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.worker_threads(4).enable_all(); + #[cfg(tokio_unstable)] + builder.enable_eager_driver_handoff(); + builder.build().unwrap() +} + +fn store_bench_configs() -> Vec { + #[cfg(not(feature = "postgres"))] + { + vec![ + StoreBenchConfig { name: "sqlite", store_type: common::TestStoreType::Sqlite }, + StoreBenchConfig { + name: "filesystem", + store_type: common::TestStoreType::FilesystemStore, + }, + ] + } + + #[cfg(feature = "postgres")] + { + vec![ + StoreBenchConfig { name: "sqlite", store_type: common::TestStoreType::Sqlite }, + StoreBenchConfig { + name: "filesystem", + store_type: common::TestStoreType::FilesystemStore, + }, + StoreBenchConfig { name: "postgres", store_type: common::TestStoreType::Postgres }, + ] + } } criterion_group!(benches, payment_benchmark); diff --git a/src/bench.rs b/src/bench.rs new file mode 100644 index 0000000000..2c0ef9c35b --- /dev/null +++ b/src/bench.rs @@ -0,0 +1,671 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use bitcoin::hashes::Hash; +use bitcoin::Txid; +use lightning::ln::channelmanager::PaymentId; +use lightning::util::persist::{KVStore, PaginatedKVStore}; +use lightning::util::ser::Writeable; +use lightning_persister::fs_store::v2::FilesystemStoreV2; +use lightning_types::payment::{PaymentHash, PaymentPreimage}; + +use crate::data_store::StorableObjectId; +use crate::io::sqlite_store::SqliteStore; +use crate::io::utils::read_all_objects; +use crate::io::{ + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, +}; +use crate::logger::Logger; +use crate::payment::pending_payment_store::{PendingPaymentDetails, PendingPaymentDetailsUpdate}; +use crate::payment::store::{ + PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, +}; +use crate::types::{DynStore, DynStoreRef, DynStoreWrapper}; + +pub const BATCH_LEN: u64 = 100; +pub const LARGE_PAYMENT_SET_LEN: u64 = 10_000; +pub const PAGINATED_PAGE_LEN: u64 = 50; + +static NEXT_STORE_ID: AtomicU64 = AtomicU64::new(0); + +#[derive(Clone)] +pub struct PaymentUpdateBatch(Vec); + +#[derive(Clone)] +pub struct PendingPaymentBatch(Vec); + +#[derive(Clone)] +pub struct PendingPaymentUpdateBatch(Vec); + +#[derive(Clone, Copy)] +pub enum Backend { + Filesystem, + Sqlite, + #[cfg(feature = "postgres")] + Postgres, +} + +impl Backend { + pub fn name(self) -> &'static str { + match self { + Self::Filesystem => "filesystem", + Self::Sqlite => "sqlite", + #[cfg(feature = "postgres")] + Self::Postgres => "postgres", + } + } +} + +pub struct StoreFixture { + store: Arc, + logger: Arc, + storage_path: PathBuf, + #[cfg(feature = "postgres")] + postgres_cleanup: Option, +} + +impl StoreFixture { + pub async fn new(backend: Backend, benchmark_name: &str) -> Self { + let store_id = next_store_id(); + let storage_path = bench_storage_path(backend, benchmark_name, &store_id); + let logger = Arc::new(Logger::new_log_facade()); + #[cfg(feature = "postgres")] + let mut postgres_cleanup = None; + + let store: Arc = match backend { + Backend::Filesystem => { + let store = FilesystemStoreV2::new(storage_path.clone()).unwrap(); + Arc::new(DynStoreWrapper(store)) + }, + Backend::Sqlite => { + let store = SqliteStore::new(storage_path.clone(), None, None).unwrap(); + Arc::new(DynStoreWrapper(store)) + }, + #[cfg(feature = "postgres")] + Backend::Postgres => { + let connection_string = + std::env::var(crate::io::postgres_store::POSTGRES_TEST_URL_ENV_VAR) + .unwrap_or_else(|_| { + panic!( + "{} must be set to run postgres database benchmarks", + crate::io::postgres_store::POSTGRES_TEST_URL_ENV_VAR + ) + }); + let table_name = postgres_table_name(&store_id); + let store = crate::io::postgres_store::PostgresStore::new( + connection_string.clone(), + None, + Some(table_name.clone()), + None, + ) + .await + .unwrap(); + postgres_cleanup = Some(PostgresCleanup { connection_string, table_name }); + Arc::new(DynStoreWrapper(store)) + }, + }; + + Self { + store, + logger, + storage_path, + #[cfg(feature = "postgres")] + postgres_cleanup, + } + } + + pub async fn write_payment_batch(&self, payments: Vec) { + write_payment_batch(&*self.store, payments).await; + } + + pub async fn write_payment_batch_from_offset(&self, offset: u64) { + self.write_payment_batch(payment_details_batch(offset)).await; + } + + pub async fn write_payment(&self, idx: u64) { + write_payment(&*self.store, payment_details(idx)).await; + } + + pub async fn write_payment_update(&self, idx: u64) { + write_payment_update(&*self.store, payment_update(idx)).await; + } + + pub async fn write_payment_update_batch(&self, updates: PaymentUpdateBatch) { + write_payment_update_batch(&*self.store, updates.0).await; + } + + pub async fn read_payment(&self, idx: u64) -> Vec { + read_payment(&*self.store, payment_key(idx)).await.unwrap() + } + + pub async fn remove_payment_key(&self, key: &str) { + remove_payment(&*self.store, key).await; + } + + pub async fn reload_payments(&self) -> Vec { + read_all_objects::( + &*self.store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Arc::clone(&self.logger), + ) + .await + .unwrap() + } + + pub async fn write_payment_batch_concurrent(&self, offset: u64, same_key: bool) { + write_payment_batch_concurrent(DynStoreRef(Arc::clone(&self.store)), offset, same_key) + .await; + } + + pub async fn write_pending_payment_batch(&self, payments: PendingPaymentBatch) { + write_pending_payment_batch(&*self.store, payments.0).await; + } + + pub async fn write_pending_payment_update_batch(&self, updates: PendingPaymentUpdateBatch) { + write_pending_payment_update_batch(&*self.store, updates.0).await; + } + + pub async fn insert_update_read_payment(&self, idx: u64) -> Vec { + let payment_key = payment_key(idx); + write_payment(&*self.store, payment_details(idx)).await; + write_payment_update(&*self.store, payment_update(idx)).await; + read_payment(&*self.store, payment_key).await.unwrap() + } +} + +impl Drop for StoreFixture { + fn drop(&mut self) { + #[cfg(feature = "postgres")] + if let Some(cleanup) = self.postgres_cleanup.take() { + tokio::spawn(async move { + cleanup.drop_table().await; + }); + return; + } + + let _ = std::fs::remove_dir_all(&self.storage_path); + } +} + +pub struct PaginatedStoreFixture { + inner: PaginatedStoreFixtureInner, +} + +enum PaginatedStoreFixtureInner { + Filesystem { + store: FilesystemStoreV2, + storage_path: PathBuf, + }, + Sqlite { + store: SqliteStore, + storage_path: PathBuf, + }, + #[cfg(feature = "postgres")] + Postgres { + store: crate::io::postgres_store::PostgresStore, + runtime: Arc, + cleanup: PostgresCleanup, + }, +} + +impl PaginatedStoreFixture { + pub async fn new( + backend: Backend, benchmark_name: &str, _runtime: Arc, + ) -> Option { + let store_id = next_store_id(); + let storage_path = bench_storage_path(backend, benchmark_name, &store_id); + + match backend { + Backend::Filesystem => { + let store = FilesystemStoreV2::new(storage_path.clone()).unwrap(); + populate_async_store(&store).await; + Some(Self { inner: PaginatedStoreFixtureInner::Filesystem { store, storage_path } }) + }, + Backend::Sqlite => { + let store = SqliteStore::new(storage_path.clone(), None, None).unwrap(); + populate_async_store(&store).await; + Some(Self { inner: PaginatedStoreFixtureInner::Sqlite { store, storage_path } }) + }, + #[cfg(feature = "postgres")] + Backend::Postgres => { + let connection_string = + std::env::var(crate::io::postgres_store::POSTGRES_TEST_URL_ENV_VAR) + .unwrap_or_else(|_| { + panic!( + "{} must be set to run postgres database benchmarks", + crate::io::postgres_store::POSTGRES_TEST_URL_ENV_VAR + ) + }); + let table_name = postgres_table_name(&store_id); + let store = crate::io::postgres_store::PostgresStore::new( + connection_string.clone(), + None, + Some(table_name.clone()), + None, + ) + .await + .unwrap(); + populate_async_store(&store).await; + let cleanup = PostgresCleanup { connection_string, table_name }; + Some(Self { + inner: PaginatedStoreFixtureInner::Postgres { + store, + runtime: _runtime, + cleanup, + }, + }) + }, + } + } + + pub async fn list_first_page(&self) -> usize { + match &self.inner { + PaginatedStoreFixtureInner::Filesystem { store, .. } => { + PaginatedKVStore::list_paginated( + store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + None, + ) + .await + .unwrap() + .keys + .len() + }, + PaginatedStoreFixtureInner::Sqlite { store, .. } => PaginatedKVStore::list_paginated( + store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + None, + ) + .await + .unwrap() + .keys + .len(), + #[cfg(feature = "postgres")] + PaginatedStoreFixtureInner::Postgres { store, .. } => PaginatedKVStore::list_paginated( + store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + None, + ) + .await + .unwrap() + .keys + .len(), + } + } + + pub async fn list_second_page(&self) -> usize { + match &self.inner { + PaginatedStoreFixtureInner::Filesystem { store, .. } => list_second_page(store).await, + PaginatedStoreFixtureInner::Sqlite { store, .. } => list_second_page(store).await, + #[cfg(feature = "postgres")] + PaginatedStoreFixtureInner::Postgres { store, .. } => list_second_page(store).await, + } + } +} + +impl Drop for PaginatedStoreFixture { + fn drop(&mut self) { + match &mut self.inner { + PaginatedStoreFixtureInner::Filesystem { storage_path, .. } + | PaginatedStoreFixtureInner::Sqlite { storage_path, .. } => { + let _ = std::fs::remove_dir_all(storage_path); + }, + #[cfg(feature = "postgres")] + PaginatedStoreFixtureInner::Postgres { runtime, cleanup, .. } => { + let cleanup = cleanup.clone(); + runtime.spawn(async move { + cleanup.drop_table().await; + }); + }, + } + } +} + +async fn list_second_page(store: &S) -> usize { + let first_page = PaginatedKVStore::list_paginated( + store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + None, + ) + .await + .unwrap(); + let page_token = first_page.next_page_token.expect("first page should have a next token"); + PaginatedKVStore::list_paginated( + store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + Some(page_token), + ) + .await + .unwrap() + .keys + .len() +} + +pub fn configured_backends() -> Vec { + #[cfg(feature = "postgres")] + { + let mut backends = vec![Backend::Filesystem, Backend::Sqlite]; + if std::env::var(crate::io::postgres_store::POSTGRES_TEST_URL_ENV_VAR).is_ok() { + backends.push(Backend::Postgres); + } + backends + } + + #[cfg(not(feature = "postgres"))] + { + vec![Backend::Filesystem, Backend::Sqlite] + } +} + +async fn write_payment_batch(store: &S, payments: Vec) { + for payment in payments { + write_payment(store, payment).await; + } +} + +async fn write_payment(store: &S, payment: PaymentDetails) { + let key = payment.id.encode_to_hex_str(); + KVStore::write( + store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + payment.encode(), + ) + .await + .unwrap(); +} + +async fn write_payment_update_batch( + store: &S, updates: Vec, +) { + for update in updates { + write_payment_update(store, update).await; + } +} + +async fn write_payment_update(store: &S, update: PaymentDetailsUpdate) { + write_payment(store, payment_details_from_update(update)).await; +} + +async fn read_payment(store: &S, key: String) -> bitcoin::io::Result> { + KVStore::read( + store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + ) + .await +} + +async fn remove_payment(store: &S, key: &str) { + KVStore::remove( + store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + key, + false, + ) + .await + .unwrap(); +} + +async fn write_payment_batch_concurrent(store: DynStoreRef, offset: u64, same_key: bool) { + let mut handles = Vec::with_capacity(BATCH_LEN as usize); + for idx in offset..offset + BATCH_LEN { + let store = store.clone(); + let payment = payment_details(idx); + let key = if same_key { + "shared_payment_key".to_string() + } else { + payment.id.encode_to_hex_str() + }; + handles.push(tokio::spawn(async move { + KVStore::write( + &store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + payment.encode(), + ) + .await + .unwrap(); + })); + } + for handle in handles { + handle.await.unwrap(); + } +} + +async fn write_pending_payment_batch( + store: &S, payments: Vec, +) { + for payment in payments { + let key = payment.details.id.encode_to_hex_str(); + KVStore::write( + store, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + payment.encode(), + ) + .await + .unwrap(); + } +} + +async fn write_pending_payment_update_batch( + store: &S, updates: Vec, +) { + for update in updates { + let payment = pending_payment_details_from_update(update); + let key = payment.details.id.encode_to_hex_str(); + KVStore::write( + store, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + payment.encode(), + ) + .await + .unwrap(); + } +} + +async fn populate_async_store(store: &S) { + for idx in 0..LARGE_PAYMENT_SET_LEN { + let payment = payment_details(idx); + KVStore::write( + store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &payment_id(idx).encode_to_hex_str(), + payment.encode(), + ) + .await + .unwrap(); + } +} + +fn next_store_id() -> String { + let store_id = NEXT_STORE_ID.fetch_add(1, Ordering::Relaxed); + format!("{}_{}", std::process::id(), store_id) +} + +fn bench_storage_path(backend: Backend, benchmark_name: &str, store_id: &str) -> PathBuf { + let mut path = std::env::temp_dir(); + path.push("ldk-node-db-benches"); + path.push(backend.name()); + path.push(benchmark_name); + path.push(store_id); + path +} + +#[cfg(feature = "postgres")] +#[derive(Clone)] +struct PostgresCleanup { + connection_string: String, + table_name: String, +} + +#[cfg(feature = "postgres")] +impl PostgresCleanup { + async fn drop_table(&self) { + let Ok((client, connection)) = + tokio_postgres::connect(&self.connection_string, tokio_postgres::NoTls).await + else { + return; + }; + tokio::spawn(async move { + let _ = connection.await; + }); + + let table_name = postgres_identifier(&self.table_name); + let _ = client.execute(&format!("DROP TABLE IF EXISTS {table_name}"), &[]).await; + } +} + +#[cfg(feature = "postgres")] +fn postgres_table_name(store_id: &str) -> String { + format!("ldk_node_bench_{store_id}") +} + +#[cfg(feature = "postgres")] +fn postgres_identifier(identifier: &str) -> String { + format!("\"{}\"", identifier.replace('"', "\"\"")) +} + +pub fn payment_details_batch(offset: u64) -> Vec { + (offset..offset + BATCH_LEN).map(payment_details).collect() +} + +pub fn payment_update_batch_from_offset(offset: u64) -> PaymentUpdateBatch { + PaymentUpdateBatch(payment_update_batch(offset)) +} + +pub fn pending_payment_details_batch_from_offset(offset: u64) -> PendingPaymentBatch { + PendingPaymentBatch(pending_payment_details_batch(offset)) +} + +pub fn pending_payment_update_batch_from_offset(offset: u64) -> PendingPaymentUpdateBatch { + PendingPaymentUpdateBatch(pending_payment_update_batch(offset)) +} + +fn pending_payment_details_batch(offset: u64) -> Vec { + payment_details_batch(offset) + .into_iter() + .enumerate() + .map(|(idx, payment)| { + let txid = Txid::from_byte_array(filled_bytes((idx as u64) + 1)); + PendingPaymentDetails::new(payment, vec![txid]) + }) + .collect() +} + +fn payment_update_batch(offset: u64) -> Vec { + (offset..offset + BATCH_LEN).map(payment_update).collect() +} + +fn payment_update(idx: u64) -> PaymentDetailsUpdate { + let mut update = PaymentDetailsUpdate::new(payment_id(idx)); + update.status = Some(PaymentStatus::Succeeded); + update.fee_paid_msat = Some(Some(42)); + update.preimage = Some(Some(PaymentPreimage(filled_bytes(idx + 2)))); + update +} + +fn pending_payment_update_batch(offset: u64) -> Vec { + payment_update_batch(offset) + .into_iter() + .enumerate() + .map(|(idx, payment_update)| PendingPaymentDetailsUpdate { + id: payment_update.id, + payment_update: Some(payment_update), + conflicting_txids: Some(vec![Txid::from_byte_array(filled_bytes(idx as u64 + 3))]), + }) + .collect() +} + +fn payment_details(idx: u64) -> PaymentDetails { + let preimage = PaymentPreimage(filled_bytes(idx + 1)); + let hash = PaymentHash(filled_bytes(idx + 2)); + let secret = lightning_types::payment::PaymentSecret(filled_bytes(idx + 3)); + payment_details_with_parts( + payment_id(idx), + hash, + Some(preimage), + Some(secret), + Some(10_000 + idx), + None, + PaymentStatus::Pending, + ) +} + +fn payment_details_from_update(update: PaymentDetailsUpdate) -> PaymentDetails { + let id = update.id; + let hash = update.hash.flatten().unwrap_or(PaymentHash(id.0)); + let preimage = update.preimage.unwrap_or(Some(PaymentPreimage(id.0))); + let secret = update.secret.unwrap_or(Some(lightning_types::payment::PaymentSecret(id.0))); + let amount_msat = update.amount_msat.unwrap_or(Some(10_000)); + let fee_paid_msat = update.fee_paid_msat.unwrap_or(None); + let status = update.status.unwrap_or(PaymentStatus::Pending); + payment_details_with_parts(id, hash, preimage, secret, amount_msat, fee_paid_msat, status) +} + +fn pending_payment_details_from_update( + update: PendingPaymentDetailsUpdate, +) -> PendingPaymentDetails { + let id = update.id; + let details = update + .payment_update + .map(payment_details_from_update) + .unwrap_or_else(|| payment_details_from_update(PaymentDetailsUpdate::new(id))); + let conflicting_txids = update.conflicting_txids.unwrap_or_default(); + PendingPaymentDetails::new(details, conflicting_txids) +} + +fn payment_details_with_parts( + id: PaymentId, hash: PaymentHash, preimage: Option, + secret: Option, amount_msat: Option, + fee_paid_msat: Option, status: PaymentStatus, +) -> PaymentDetails { + PaymentDetails::new( + id, + PaymentKind::Bolt11 { hash, preimage, secret, counterparty_skimmed_fee_msat: None }, + amount_msat, + fee_paid_msat, + PaymentDirection::Outbound, + status, + ) +} + +fn payment_id(idx: u64) -> PaymentId { + PaymentId(filled_bytes(idx)) +} + +pub fn payment_key(idx: u64) -> String { + payment_id(idx).encode_to_hex_str() +} + +fn filled_bytes(idx: u64) -> [u8; 32] { + let mut bytes = [0u8; 32]; + bytes[..8].copy_from_slice(&idx.to_be_bytes()); + bytes[8..16].copy_from_slice(&(idx.wrapping_mul(0x9e37_79b9)).to_be_bytes()); + bytes +} diff --git a/src/io/postgres_store/mod.rs b/src/io/postgres_store/mod.rs index c0770de5f0..fe6978f87d 100644 --- a/src/io/postgres_store/mod.rs +++ b/src/io/postgres_store/mod.rs @@ -12,7 +12,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use lightning::io; -use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore, PaginatedListResponse}; +use lightning::util::persist::{ + KVStore, MigratableKVStore, PageToken, PaginatedKVStore, PaginatedListResponse, +}; use lightning_types::string::PrintableString; use native_tls::TlsConnector; use postgres_native_tls::MakeTlsConnector; @@ -33,6 +35,9 @@ pub const DEFAULT_DB_NAME: &str = "ldk_db"; /// The default table in which we store all data. pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data"; +/// Environment variable used by PostgreSQL tests and benchmarks. +pub const POSTGRES_TEST_URL_ENV_VAR: &str = "TEST_POSTGRES_URL"; + // The current schema version for the PostgreSQL store. const SCHEMA_VERSION: u16 = 1; @@ -351,6 +356,19 @@ impl PaginatedKVStore for PostgresStore { } } +impl MigratableKVStore for PostgresStore { + fn list_all_keys( + &self, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + let runtime = self.internal_runtime(); + async move { + run_on_internal_runtime(runtime, async move { inner.list_all_keys_internal().await }) + .await + } + } +} + struct PostgresStoreInner { pool: SmallPool, config: Config, @@ -725,6 +743,25 @@ impl PostgresStoreInner { Ok(keys) } + async fn list_all_keys_internal(&self) -> io::Result> { + let sql = format!( + "SELECT primary_namespace, secondary_namespace, key FROM {}", + self.kv_table_name_sql + ); + + let err_map = |e: PgError| { + let msg = format!("Failed to retrieve queried rows: {e}"); + io::Error::new(io::ErrorKind::Other, msg) + }; + + let mut locked = self.locked_client().await?; + let rows = query_with_retry!(self, locked, err_map, locked.query(sql.as_str(), &[]))?; + + let keys: Vec<(String, String, String)> = + rows.iter().map(|row| (row.get(0), row.get(1), row.get(2))).collect(); + Ok(keys) + } + async fn list_paginated_internal( &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, ) -> io::Result { @@ -858,7 +895,8 @@ mod tests { use crate::io::test_utils::{do_read_write_remove_list_persist, do_test_store}; fn test_connection_string() -> String { - std::env::var("TEST_POSTGRES_URL") + dotenvy::dotenv().ok(); + std::env::var(POSTGRES_TEST_URL_ENV_VAR) .unwrap_or_else(|_| "postgres://postgres:postgres@localhost/ldk_node_tests".to_string()) } @@ -904,6 +942,29 @@ mod tests { cleanup_store(&store_1).await; } + #[tokio::test(flavor = "multi_thread")] + async fn test_postgres_store_list_all_keys() { + let store = create_test_store("test_pg_list_all_keys").await; + + KVStore::write(&store, "ns_a", "sub_a", "key_a", vec![1u8]).await.unwrap(); + KVStore::write(&store, "ns_a", "sub_b", "key_b", vec![2u8]).await.unwrap(); + KVStore::write(&store, "ns_b", "", "key_c", vec![3u8]).await.unwrap(); + + let mut keys = MigratableKVStore::list_all_keys(&store).await.unwrap(); + keys.sort(); + + assert_eq!( + keys, + vec![ + ("ns_a".to_string(), "sub_a".to_string(), "key_a".to_string()), + ("ns_a".to_string(), "sub_b".to_string(), "key_b".to_string()), + ("ns_b".to_string(), "".to_string(), "key_c".to_string()), + ] + ); + + cleanup_store(&store).await; + } + async fn kill_connection(store: &PostgresStore) { // Terminate every backend in the pool so the next op deterministically // hits a closed connection regardless of which slot `get` selects. diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 076aeef9bd..b2d492e852 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -14,7 +14,9 @@ use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use lightning::io; -use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore, PaginatedListResponse}; +use lightning::util::persist::{ + KVStore, MigratableKVStore, PageToken, PaginatedKVStore, PaginatedListResponse, +}; use lightning_types::string::PrintableString; use rusqlite::{named_params, Connection}; @@ -202,6 +204,21 @@ impl PaginatedKVStore for SqliteStore { } } +impl MigratableKVStore for SqliteStore { + fn list_all_keys( + &self, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + let fut = tokio::task::spawn_blocking(move || inner.list_all_keys_internal()); + async move { + fut.await.unwrap_or_else(|e| { + let msg = format!("Failed to IO operation due join error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, msg)) + }) + } + } +} + struct SqliteStoreInner { connection: Arc>, data_dir: PathBuf, @@ -486,6 +503,35 @@ impl SqliteStoreInner { Ok(keys) } + fn list_all_keys_internal(&self) -> io::Result> { + let locked_conn = self.connection.lock().expect("lock"); + + let sql = format!( + "SELECT primary_namespace, secondary_namespace, key FROM {}", + self.kv_table_name + ); + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let mut keys = Vec::new(); + let rows_iter = + stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?))).map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + for key in rows_iter { + keys.push(key.map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?); + } + + Ok(keys) + } + fn list_paginated_internal( &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option, ) -> io::Result { @@ -679,6 +725,34 @@ mod tests { do_test_store(&store_0, &store_1) } + #[tokio::test] + async fn test_sqlite_store_list_all_keys() { + let mut temp_path = random_storage_path(); + temp_path.push("test_sqlite_store_list_all_keys"); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + + KVStore::write(&store, "ns_a", "sub_a", "key_a", vec![1u8]).await.unwrap(); + KVStore::write(&store, "ns_a", "sub_b", "key_b", vec![2u8]).await.unwrap(); + KVStore::write(&store, "ns_b", "", "key_c", vec![3u8]).await.unwrap(); + + let mut keys = MigratableKVStore::list_all_keys(&store).await.unwrap(); + keys.sort(); + + assert_eq!( + keys, + vec![ + ("ns_a".to_string(), "sub_a".to_string(), "key_a".to_string()), + ("ns_a".to_string(), "sub_b".to_string(), "key_b".to_string()), + ("ns_b".to_string(), "".to_string(), "key_c".to_string()), + ] + ); + } + #[tokio::test] async fn test_sqlite_store_paginated_listing() { let mut temp_path = random_storage_path(); diff --git a/src/lib.rs b/src/lib.rs index 7465dfabf5..1fd77b224b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,10 @@ #![cfg_attr(docsrs, feature(doc_cfg))] mod balance; +#[allow(missing_docs)] +#[cfg(feature = "bench")] +#[doc(hidden)] +pub mod bench; mod builder; mod chain; pub mod config; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index d7775e67b3..d01978ef7b 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -420,6 +420,8 @@ pub(crate) enum TestStoreType { TestSyncStore, Sqlite, FilesystemStore, + #[cfg(feature = "postgres")] + Postgres, } impl Default for TestStoreType { @@ -599,6 +601,32 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestStoreType::FilesystemStore => { builder.build_with_fs_store(config.node_entropy.into()).unwrap() }, + #[cfg(feature = "postgres")] + TestStoreType::Postgres => { + use ldk_node::io::postgres_store::POSTGRES_TEST_URL_ENV_VAR; + + dotenvy::dotenv().ok(); + let table_name = format!( + "test_{}", + config + .node_config + .storage_dir_path + .chars() + .filter(|c| c.is_ascii_alphanumeric()) + .collect::() + ); + let connection_string = std::env::var(POSTGRES_TEST_URL_ENV_VAR) + .unwrap_or_else(|_| "host=localhost user=postgres password=postgres".to_string()); + builder + .build_with_postgres_store( + config.node_entropy.into(), + connection_string, + None, + Some(table_name), + None, + ) + .unwrap() + }, }; if config.recovery_mode { diff --git a/tests/integration_tests_postgres.rs b/tests/integration_tests_postgres.rs index b96b0c277c..3b41d8a9bc 100644 --- a/tests/integration_tests_postgres.rs +++ b/tests/integration_tests_postgres.rs @@ -10,11 +10,13 @@ mod common; use ldk_node::entropy::NodeEntropy; +use ldk_node::io::postgres_store::POSTGRES_TEST_URL_ENV_VAR; use ldk_node::Builder; use rand::RngCore; fn test_connection_string() -> String { - std::env::var("TEST_POSTGRES_URL") + dotenvy::dotenv().ok(); + std::env::var(POSTGRES_TEST_URL_ENV_VAR) .unwrap_or_else(|_| "host=localhost user=postgres password=postgres".to_string()) } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 1ea6c45845..0b2158a43e 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -2568,6 +2568,8 @@ async fn build_0_7_0_node( TestStoreType::FilesystemStore => builder_old.build_with_fs_store().unwrap(), TestStoreType::Sqlite => builder_old.build().unwrap(), TestStoreType::TestSyncStore => panic!("TestSyncStore not supported in v0.7.0 builder"), + #[cfg(feature = "postgres")] + TestStoreType::Postgres => panic!("Postgres not supported in v0.7.0 builder"), }; node_old.start().unwrap();