diff --git a/Cargo.lock b/Cargo.lock index 32f6758ba6f..3f5cea9f568 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12293,6 +12293,7 @@ checksum = "6a007b6936676aa9ab40207cde35daab0a04b823be8ae004368c0793b96a61e0" dependencies = [ "bit-vec 0.6.3", "bytes 1.11.1", + "chrono", "crc", "crossbeam-queue", "either", @@ -12371,6 +12372,7 @@ dependencies = [ "bitflags 2.11.1", "byteorder", "bytes 1.11.1", + "chrono", "crc", "digest 0.10.7", "dotenvy", @@ -12414,6 +12416,7 @@ dependencies = [ "bit-vec 0.6.3", "bitflags 2.11.1", "byteorder", + "chrono", "crc", "dotenvy", "etcetera", @@ -12449,6 +12452,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f85ca71d3a5b24e64e1d08dd8fe36c6c95c339a896cc33068148906784620540" dependencies = [ "atoi", + "chrono", "flume 0.11.1", "futures-channel", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 42c5ca54df4..bc53981d6ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -357,6 +357,7 @@ snafu = "0.8" snow = { version = "0.10.0", features = ["ring-accelerated"] } sqlx = { version = "=0.8.3", features = [ "bit-vec", + "chrono", "postgres", "runtime-tokio", "tls-native-tls", diff --git a/crates/espresso/node/api/database.toml b/crates/espresso/node/api/database.toml index 1634a7a8cc1..50fff7926e5 100644 --- a/crates/espresso/node/api/database.toml +++ b/crates/espresso/node/api/database.toml @@ -1,3 +1,13 @@ +[route.get_migration_status] +PATH = ["migration-status"] +METHOD = "GET" +DOC = """ +Get the status of all deferred background migrations. + +Returns a list of migrations with their name, when they started, when they completed (if done), +and the last processed offset (useful for estimating progress). +""" + [route.get_table_sizes] PATH = ["table-sizes"] METHOD = "GET" diff --git a/crates/espresso/node/api/migrations/postgres/V1501__hash_bigint_expand.sql b/crates/espresso/node/api/migrations/postgres/V1501__hash_bigint_expand.sql new file mode 100644 index 00000000000..323705c6c7f --- /dev/null +++ b/crates/espresso/node/api/migrations/postgres/V1501__hash_bigint_expand.sql @@ -0,0 +1,47 @@ +-- Expand phase: create new BIGINT-keyed tables alongside the existing ones. +-- Old tables (hash, fee_merkle_tree, block_merkle_tree) are left untouched and +-- serve as the read fallback during the backfill window. +-- The contract phase (dropping old tables and renaming *_bigint to canonical names) +-- is a separate follow-up migration. + +-- Drop reward merkle tree tables — unused and always empty across all deployments. +DROP TABLE reward_merkle_tree; +DROP TABLE reward_merkle_tree_v2; + +-- New hash table with BIGSERIAL. +CREATE TABLE hash_bigint ( + id BIGSERIAL PRIMARY KEY, + value BYTEA NOT NULL UNIQUE +); + +-- Seed the sequence above the old max so new auto-IDs never collide +-- with rows we backfill from hash (which preserve their original INT ids). +SELECT setval( + pg_get_serial_sequence('hash_bigint', 'id'), + GREATEST(COALESCE((SELECT MAX(id) FROM hash), 1), 1) +); + +-- New Merkle-tree tables with BIGINT FK. +CREATE TABLE fee_merkle_tree_bigint ( + path JSONB NOT NULL, + created BIGINT NOT NULL, + hash_id BIGINT NOT NULL REFERENCES hash_bigint(id), + children JSONB, + children_bitvec BIT VARYING, + idx JSONB, + entry JSONB, + PRIMARY KEY (path, created) +); +CREATE INDEX fee_merkle_tree_bigint_created ON fee_merkle_tree_bigint (created); + +CREATE TABLE block_merkle_tree_bigint ( + path JSONB NOT NULL, + created BIGINT NOT NULL, + hash_id BIGINT NOT NULL REFERENCES hash_bigint(id), + children JSONB, + children_bitvec BIT VARYING, + idx JSONB, + entry JSONB, + PRIMARY KEY (path, created) +); +CREATE INDEX block_merkle_tree_bigint_created ON block_merkle_tree_bigint (created); diff --git a/crates/espresso/node/api/migrations/postgres/V1502__deferred_migrations.sql b/crates/espresso/node/api/migrations/postgres/V1502__deferred_migrations.sql new file mode 100644 index 00000000000..af5749fceec --- /dev/null +++ b/crates/espresso/node/api/migrations/postgres/V1502__deferred_migrations.sql @@ -0,0 +1,8 @@ +-- Progress-tracking table for background DataBackfill migrations. +CREATE TABLE deferred_migrations ( + name TEXT PRIMARY KEY, + started_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ, + error TEXT, + last_offset BIGINT +); diff --git a/crates/espresso/node/api/migrations/sqlite/V1501__hash_bigint_expand.sql b/crates/espresso/node/api/migrations/sqlite/V1501__hash_bigint_expand.sql new file mode 100644 index 00000000000..8889cfbc73d --- /dev/null +++ b/crates/espresso/node/api/migrations/sqlite/V1501__hash_bigint_expand.sql @@ -0,0 +1,9 @@ +-- Rename tables to *_bigint for consistency with the postgres schema. +-- SQLite integers are already 64-bit so this is a naming-only change. +ALTER TABLE hash RENAME TO hash_bigint; +ALTER TABLE fee_merkle_tree RENAME TO fee_merkle_tree_bigint; +ALTER TABLE block_merkle_tree RENAME TO block_merkle_tree_bigint; + +-- Drop reward merkle tree tables — unused and always empty across all deployments. +DROP TABLE IF EXISTS reward_merkle_tree; +DROP TABLE IF EXISTS reward_merkle_tree_v2; diff --git a/crates/espresso/node/api/migrations/sqlite/V1502__deferred_migrations.sql b/crates/espresso/node/api/migrations/sqlite/V1502__deferred_migrations.sql new file mode 100644 index 00000000000..80557e06c06 --- /dev/null +++ b/crates/espresso/node/api/migrations/sqlite/V1502__deferred_migrations.sql @@ -0,0 +1,8 @@ +-- Progress-tracking table for background DataBackfill migrations. +CREATE TABLE deferred_migrations ( + name TEXT PRIMARY KEY, + started_at TEXT NOT NULL, + completed_at TEXT, + error TEXT, + last_offset INTEGER +); diff --git a/crates/espresso/node/api/public-env-vars.toml b/crates/espresso/node/api/public-env-vars.toml index 949f814883f..951fad672a4 100644 --- a/crates/espresso/node/api/public-env-vars.toml +++ b/crates/espresso/node/api/public-env-vars.toml @@ -53,6 +53,7 @@ variables = [ "ESPRESSO_NODE_API_PEERS", "ESPRESSO_NODE_API_PORT", "ESPRESSO_NODE_ARCHIVE", + "ESPRESSO_NODE_BACKFILL_BATCH_DELAY_MS", "ESPRESSO_NODE_BOOTSTRAP_EPOCH_CATCHUP_TIMEOUT", "ESPRESSO_NODE_CATCHUP_BACKOFF_FACTOR", "ESPRESSO_NODE_CATCHUP_BACKOFF_JITTER", diff --git a/crates/espresso/node/src/api.rs b/crates/espresso/node/src/api.rs index 74d10681a61..429c1d6baa5 100644 --- a/crates/espresso/node/src/api.rs +++ b/crates/espresso/node/src/api.rs @@ -837,6 +837,10 @@ where async fn get_table_sizes(&self) -> anyhow::Result> { self.inner().get_table_sizes().await } + + async fn get_migration_status(&self) -> anyhow::Result> { + self.inner().get_migration_status().await + } } impl, P: SequencerPersistence, D: CatchupStorage + Send + Sync> @@ -6021,7 +6025,6 @@ mod test { } #[rstest] - #[case(POS_V3)] #[case(POS_V4)] #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_state_reconstruction(#[case] upgrade: Upgrade) -> anyhow::Result<()> { @@ -7655,7 +7658,6 @@ mod test { } #[rstest] - #[case(POS_V3)] #[case(POS_V4)] #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_reward_proof_endpoint(#[case] upgrade: Upgrade) -> anyhow::Result<()> { diff --git a/crates/espresso/node/src/api/data_source.rs b/crates/espresso/node/src/api/data_source.rs index dbd393fc7b1..3f84b3fe551 100644 --- a/crates/espresso/node/src/api/data_source.rs +++ b/crates/espresso/node/src/api/data_source.rs @@ -388,12 +388,26 @@ pub struct TableSize { pub total_size_bytes: Option, } +/// Status of a single deferred background migration. +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct MigrationStatus { + pub name: String, + pub started_at: chrono::DateTime, + pub completed_at: Option>, + pub last_offset: Option, +} + /// Data source for database metadata and statistics. /// /// This trait is only implemented by SQL-based storage backends (PostgreSQL and SQLite). pub(crate) trait DatabaseMetadataSource { /// Get the sizes of all tables in the database. fn get_table_sizes(&self) -> impl Send + Future>>; + + /// Get the status of all deferred background migrations. + fn get_migration_status( + &self, + ) -> impl Send + Future>>; } // ============================================================================ diff --git a/crates/espresso/node/src/api/endpoints.rs b/crates/espresso/node/src/api/endpoints.rs index 1ca9c80b838..d73daa530ed 100644 --- a/crates/espresso/node/src/api/endpoints.rs +++ b/crates/espresso/node/src/api/endpoints.rs @@ -754,6 +754,17 @@ where .map_err(|err| Error::internal(format!("failed to get table sizes: {err:#}"))) } .boxed() + })? + .at("get_migration_status", |_req, state| { + async move { + state + .read(|state| state.get_migration_status().boxed()) + .await + .map_err(|err| { + Error::internal(format!("failed to get migration status: {err:#}")) + }) + } + .boxed() })?; Ok(api) diff --git a/crates/espresso/node/src/api/sql.rs b/crates/espresso/node/src/api/sql.rs index 128213f07ac..a26dfb2fec3 100644 --- a/crates/espresso/node/src/api/sql.rs +++ b/crates/espresso/node/src/api/sql.rs @@ -1167,12 +1167,54 @@ impl super::data_source::DatabaseMetadataSource for SqlStorage { Ok(table_sizes) } } + + async fn get_migration_status( + &self, + ) -> anyhow::Result> { + let mut tx = self + .read() + .await + .context("opening transaction to fetch migration status")?; + + type MigrationStatusRow = ( + String, + chrono::DateTime, + Option>, + Option, + ); + + let rows: Vec = sqlx::query_as( + "SELECT name, started_at, completed_at, last_offset FROM deferred_migrations ORDER BY \ + started_at", + ) + .fetch_all(tx.as_mut()) + .await + .context("failed to query deferred_migrations")?; + + Ok(rows + .into_iter() + .map(|(name, started_at, completed_at, last_offset)| { + super::data_source::MigrationStatus { + name, + started_at, + completed_at, + last_offset, + } + }) + .collect()) + } } impl super::data_source::DatabaseMetadataSource for DataSource { async fn get_table_sizes(&self) -> anyhow::Result> { self.as_ref().get_table_sizes().await } + + async fn get_migration_status( + &self, + ) -> anyhow::Result> { + self.as_ref().get_migration_status().await + } } impl super::data_source::PruningDataSource for DataSource { @@ -1791,39 +1833,27 @@ pub(crate) mod impl_testable_data_source { #[cfg(test)] mod tests { - use alloy::primitives::Address; - use espresso_types::{ - v0_3::RewardAmount, - v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2}, - }; + use espresso_types::v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardMerkleTreeV2}; use hotshot_query_service::{ data_source::{ Transaction, VersionedDataSource, sql::Config, storage::{ - MerklizedStateStorage, UpdateAvailabilityStorage, + UpdateAvailabilityStorage, sql::{ SqlStorage, StorageConnectionType, Transaction as SqlTransaction, Write, testing::TmpDb, }, }, }, - merklized_state::{MerklizedState, Snapshot, UpdateStateData}, - }; - use jf_merkle_tree_compat::{ - LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme, + merklized_state::MerklizedState, }; + use jf_merkle_tree_compat::MerkleTreeScheme; use light_client::testing::{leaf_chain, leaf_chain_with_upgrade}; use versions::{DRB_AND_HEADER_UPGRADE_VERSION, EPOCH_REWARD_VERSION, Upgrade}; use super::impl_testable_data_source::tmp_options; - use crate::{SeqTypes, api::RewardMerkleTreeDataSource}; - - fn make_reward_account(i: usize) -> RewardAccountV2 { - let mut addr_bytes = [0u8; 20]; - addr_bytes[16..20].copy_from_slice(&(i as u32).to_be_bytes()); - RewardAccountV2(Address::from(addr_bytes)) - } + use crate::api::RewardMerkleTreeDataSource; async fn insert_test_header( tx: &mut SqlTransaction, @@ -1862,396 +1892,6 @@ mod tests { .unwrap(); } - async fn batch_insert_proofs( - tx: &mut SqlTransaction, - reward_tree: &RewardMerkleTreeV2, - accounts: &[RewardAccountV2], - block_height: u64, - ) { - let proofs_and_paths: Vec<_> = accounts - .iter() - .map(|account| { - let proof = match reward_tree.universal_lookup(*account) { - LookupResult::Ok(_, proof) => proof, - LookupResult::NotInMemory => panic!("account not in memory"), - LookupResult::NotFound(proof) => proof, - }; - let traversal_path = >::to_traversal_path( - account, reward_tree.height() - ); - (proof, traversal_path) - }) - .collect(); - - UpdateStateData::::insert_merkle_nodes_batch( - tx, - proofs_and_paths, - block_height, - ) - .await - .expect("failed to batch insert proofs"); - } - - #[test_log::test(tokio::test(flavor = "multi_thread"))] - async fn test_reward_accounts_batch_insertion() { - // Batch insertion of 1000 accounts at height 1 - // Balance updates for some accounts at height 2 - // New accounts added at height 2 - // More balance updates at height 3 - // Querying correct balances at each height snapshot - - let db = TmpDb::init().await; - let opt = tmp_options(&db); - let cfg = Config::try_from(&opt).expect("failed to create config from options"); - let storage = SqlStorage::connect(cfg, StorageConnectionType::Query) - .await - .expect("failed to connect to storage"); - - let num_initial_accounts = 1000usize; - - let initial_accounts: Vec = - (0..num_initial_accounts).map(make_reward_account).collect(); - - tracing::info!( - "Height 1: Inserting {} initial accounts", - num_initial_accounts - ); - - let mut reward_tree_h1 = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT); - - for (i, account) in initial_accounts.iter().enumerate() { - let reward_amount = RewardAmount::from(((i + 1) * 1000) as u64); - reward_tree_h1.update(*account, reward_amount).unwrap(); - } - - let mut tx = storage.write().await.unwrap(); - insert_test_header(&mut tx, 1, &reward_tree_h1).await; - batch_insert_proofs(&mut tx, &reward_tree_h1, &initial_accounts, 1).await; - - UpdateStateData::::set_last_state_height(&mut tx, 1) - .await - .unwrap(); - tx.commit().await.unwrap(); - - tracing::info!("Height 2: Updating balances and adding new accounts"); - - let mut reward_tree_h2 = reward_tree_h1.clone(); - - // Update balances for accounts 0-99 - let updated_accounts_h2: Vec = (0..100).map(make_reward_account).collect(); - for (i, account) in updated_accounts_h2.iter().enumerate() { - let new_reward = RewardAmount::from(((i + 1) * 2000) as u64); - reward_tree_h2.update(*account, new_reward).unwrap(); - } - - // Add 100 new accounts (1000..1099) - let new_accounts_h2: Vec = (1000..1100).map(make_reward_account).collect(); - for (i, account) in new_accounts_h2.iter().enumerate() { - let reward_amount = RewardAmount::from(((i + 1001) * 500) as u64); - reward_tree_h2.update(*account, reward_amount).unwrap(); - } - - let mut changed_accounts_h2 = updated_accounts_h2.clone(); - changed_accounts_h2.extend(new_accounts_h2.clone()); - - let mut tx = storage.write().await.unwrap(); - insert_test_header(&mut tx, 2, &reward_tree_h2).await; - batch_insert_proofs(&mut tx, &reward_tree_h2, &changed_accounts_h2, 2).await; - - UpdateStateData::::set_last_state_height(&mut tx, 2) - .await - .unwrap(); - tx.commit().await.unwrap(); - - tracing::info!("Height 3: More balance updates"); - - let mut reward_tree_h3 = reward_tree_h2.clone(); - - // Update balances for accounts 500-599 - let updated_accounts_h3: Vec = - (500..600).map(make_reward_account).collect(); - for (i, account) in updated_accounts_h3.iter().enumerate() { - let new_reward = RewardAmount::from(((500 + i + 1) * 3000) as u64); - reward_tree_h3.update(*account, new_reward).unwrap(); - } - - let mut tx = storage.write().await.unwrap(); - insert_test_header(&mut tx, 3, &reward_tree_h3).await; - batch_insert_proofs(&mut tx, &reward_tree_h3, &updated_accounts_h3, 3).await; - - UpdateStateData::::set_last_state_height(&mut tx, 3) - .await - .unwrap(); - tx.commit().await.unwrap(); - - tracing::info!("Verifying all account proofs at each height"); - - // Verify height=1 - // All 1000 initial accounts - let snapshot_h1 = - Snapshot::::Index(1); - for i in 0..num_initial_accounts { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h1, account) - .await - .unwrap_or_else(|e| panic!("failed to get path for account {i} at h1: {e}")); - - let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64); - let actual_reward = proof.elem().expect("account should exist"); - assert_eq!(*actual_reward, expected_reward,); - - assert!( - RewardMerkleTreeV2::verify(reward_tree_h1.commitment(), account, proof) - .unwrap() - .is_ok(), - ); - } - tracing::info!("Verified height=1 {num_initial_accounts} accounts with proofs",); - - // Verify accounts 1000-1099 don't exist at height 1 - for i in 1000..1100 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h1, account) - .await - .unwrap(); - assert!(proof.elem().is_none(),); - - // Verify non-membership proof - assert!( - RewardMerkleTreeV2::non_membership_verify( - reward_tree_h1.commitment(), - account, - proof - ) - .unwrap(), - ); - } - tracing::info!("Height 1: Verified 100 non-membership proofs"); - - // Verify height 2 - let snapshot_h2 = - Snapshot::::Index(2); - - // Accounts 0-99 - for i in 0..100 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h2, account) - .await - .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}")); - - let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64); - let actual_reward = proof.elem().expect("account should exist"); - assert_eq!(*actual_reward, expected_reward,); - - assert!( - RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof) - .unwrap() - .is_ok(), - ); - } - - // Accounts 100-999: original rewards - for i in 100..1000 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h2, account) - .await - .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}")); - - let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64); - let actual_reward = proof.elem().expect("account should exist"); - assert_eq!(*actual_reward, expected_reward,); - - assert!( - RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof) - .unwrap() - .is_ok(), - ); - } - - // Accounts 1000-1099 - // new accounts - for i in 1000..1100 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h2, account) - .await - .unwrap_or_else(|e| panic!("failed to get path for account {i} at h2: {e}")); - - let expected_reward = RewardAmount::from(((i + 1) * 500) as u64); - let actual_reward = proof.elem().expect("account should exist"); - assert_eq!(*actual_reward, expected_reward,); - - assert!( - RewardMerkleTreeV2::verify(reward_tree_h2.commitment(), account, proof) - .unwrap() - .is_ok(), - ); - } - tracing::info!("Height 2: Verified all 1100 accounts with proofs"); - - // Verify HEIGHT 3: All accounts - let snapshot_h3 = - Snapshot::::Index(3); - - // Accounts 0-99 - for i in 0..100 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h3, account) - .await - .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}")); - - let expected_reward = RewardAmount::from(((i + 1) * 2000) as u64); - let actual_reward = proof.elem().expect("account should exist"); - assert_eq!(*actual_reward, expected_reward,); - - assert!( - RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof) - .unwrap() - .is_ok(), - ); - } - - for i in 100..500 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h3, account) - .await - .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}")); - - let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64); - let actual_reward = proof.elem().expect("account should exist"); - assert_eq!(*actual_reward, expected_reward,); - - assert!( - RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof) - .unwrap() - .is_ok(), - ); - } - - // Accounts 500-599 - for i in 500..600 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h3, account) - .await - .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}")); - - let expected_reward = RewardAmount::from(((i + 1) * 3000) as u64); - let actual_reward = proof.elem().expect("account should exist"); - assert_eq!(*actual_reward, expected_reward,); - - assert!( - RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof) - .unwrap() - .is_ok(), - ); - } - - // Accounts 600-999 - for i in 600..1000 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h3, account) - .await - .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}")); - - let expected_reward = RewardAmount::from(((i + 1) * 1000) as u64); - let actual_reward = proof.elem().expect("account should exist"); - assert_eq!(*actual_reward, expected_reward,); - - assert!( - RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof) - .unwrap() - .is_ok(), - ); - } - - // Accounts 1000-1099: new accounts (from h2) - for i in 1000..1100 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h3, account) - .await - .unwrap_or_else(|e| panic!("failed to get path for account {i} at h3: {e}")); - - let expected_reward = RewardAmount::from(((i + 1) * 500) as u64); - let actual_reward = proof.elem().expect("account should exist"); - assert_eq!(*actual_reward, expected_reward,); - - assert!( - RewardMerkleTreeV2::verify(reward_tree_h3.commitment(), account, proof) - .unwrap() - .is_ok(), - ); - } - tracing::info!("Height 3: Verified all 1100 accounts with proofs"); - - // Verify non-membership proofs for accounts that never existed - for i in 1100..1110 { - let account = make_reward_account(i); - let proof = storage - .read() - .await - .unwrap() - .get_path(snapshot_h3, account) - .await - .unwrap(); - - assert!( - proof.elem().is_none(), - "Account {i} should not exist at height 3" - ); - - assert!( - RewardMerkleTreeV2::non_membership_verify( - reward_tree_h3.commitment(), - account, - proof - ) - .unwrap(), - ); - } - tracing::info!("Height 3: Verified 10 non-membership proofs"); - } - #[tokio::test] #[test_log::test] async fn test_merkle_proof_gc() { diff --git a/crates/espresso/node/src/persistence.rs b/crates/espresso/node/src/persistence.rs index 726874a0802..eeae9d7da7d 100644 --- a/crates/espresso/node/src/persistence.rs +++ b/crates/espresso/node/src/persistence.rs @@ -19,6 +19,7 @@ use espresso_types::{ }; pub mod fs; +pub mod migrations; pub mod no_storage; mod persistence_metrics; pub mod sql; diff --git a/crates/espresso/node/src/persistence/fs.rs b/crates/espresso/node/src/persistence/fs.rs index fbee958b2b2..3c62452e43e 100644 --- a/crates/espresso/node/src/persistence/fs.rs +++ b/crates/espresso/node/src/persistence/fs.rs @@ -674,10 +674,6 @@ impl Inner { #[async_trait] impl SequencerPersistence for Persistence { - async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> { - Ok(()) - } - async fn load_config(&self) -> anyhow::Result> { let inner = self.inner.read().await; let path = inner.config_path(); diff --git a/crates/espresso/node/src/persistence/migrations.rs b/crates/espresso/node/src/persistence/migrations.rs new file mode 100644 index 00000000000..290d947ccca --- /dev/null +++ b/crates/espresso/node/src/persistence/migrations.rs @@ -0,0 +1,403 @@ +#![cfg(not(feature = "embedded-db"))] + +use async_trait::async_trait; +use hotshot_query_service::{ + data_source::storage::sql::{Transaction, Write}, + migration::{DataBackfill, MigrationRegistry}, +}; + +pub struct BackfillHash; + +#[async_trait] +impl DataBackfill for BackfillHash { + fn name(&self) -> &'static str { + "hash_bigint_backfill_hash" + } + + async fn run_batch( + &self, + tx: &mut Transaction, + // Reused as last-seen id (keyset cursor), not a row count. + // Initial value 0 is safe because auto-increment ids start at 1. + offset: u64, + ) -> anyhow::Result> { + let rows: Vec<(i32, Vec)> = + sqlx::query_as("SELECT id, value FROM hash WHERE id > $1 ORDER BY id LIMIT $2") + .bind(offset as i64) + .bind(self.batch_size() as i64) + .fetch_all(tx.as_mut()) + .await?; + + if rows.is_empty() { + return Ok(None); + } + let n = rows.len(); + let last_id = rows.last().expect("non-empty").0 as u64; + + let (ids, values): (Vec, Vec>) = rows + .into_iter() + .map(|(id, value)| (id as i64, value)) + .unzip(); + + sqlx::query( + "INSERT INTO hash_bigint (id, value) + SELECT * FROM UNNEST($1::bigint[], $2::bytea[]) + ON CONFLICT DO NOTHING", + ) + .bind(&ids) + .bind(&values) + .execute(tx.as_mut()) + .await?; + + if n < self.batch_size() { + Ok(None) + } else { + Ok(Some(last_id)) + } + } +} + +macro_rules! merkle_tree_backfill { + ($struct_name:ident, $migration_name:literal, $legacy_table:literal, $new_table:literal) => { + pub struct $struct_name; + + #[async_trait] + impl DataBackfill for $struct_name { + fn name(&self) -> &'static str { + $migration_name + } + + fn requires(&self) -> &'static [&'static str] { + &["hash_bigint_backfill_hash"] + } + + async fn run_batch( + &self, + tx: &mut Transaction, + // Cursor: the start of the `created` (block height) range for this batch. + // Each batch covers [offset, offset + batch_size), using the index on `created`. + offset: u64, + ) -> anyhow::Result> { + let batch_size = self.batch_size() as i64; + + // Check if any rows remain at or beyond the current offset. Checking only the + // current window would cause early termination if block heights have gaps larger + // than batch_size; checking the open-ended tail means gaps are just a few fast + // no-op iterations. + let any: Option<(i64,)> = sqlx::query_as(concat!( + "SELECT created FROM ", + $legacy_table, + " WHERE created >= $1 LIMIT 1" + )) + .bind(offset as i64) + .fetch_optional(tx.as_mut()) + .await?; + if any.is_none() { + return Ok(None); + } + + // Move rows from legacy into the new _bigint table, translating both + // `hash_id` and every element of `children` from legacy hash ids into + // hash_bigint ids by joining on the hash `value`. This removes any + // dependency on `BackfillHash` having preserved the original ids. + sqlx::query(concat!( + "INSERT INTO ", + $new_table, + " (path, created, hash_id, children, children_bitvec, idx, entry) + SELECT + legacy_merkle_node.path, + legacy_merkle_node.created, + new_hash.id, + CASE WHEN legacy_merkle_node.children IS NULL THEN NULL + ELSE COALESCE(( + SELECT jsonb_agg(child_new_hash.id ORDER BY \ + child_position) + FROM \ + jsonb_array_elements_text(legacy_merkle_node.children) + WITH ORDINALITY AS child_elem(legacy_hash_id, \ + child_position) + JOIN hash AS child_legacy_hash + ON child_legacy_hash.id = \ + child_elem.legacy_hash_id::INT + JOIN hash_bigint AS child_new_hash + ON child_new_hash.value = child_legacy_hash.value + ), '[]'::jsonb) + END, + legacy_merkle_node.children_bitvec, + legacy_merkle_node.idx, + legacy_merkle_node.entry + FROM ", + $legacy_table, + " AS legacy_merkle_node + JOIN hash AS legacy_hash ON legacy_hash.id = \ + legacy_merkle_node.hash_id + JOIN hash_bigint AS new_hash ON new_hash.value = legacy_hash.value + WHERE legacy_merkle_node.created >= $1 AND legacy_merkle_node.created \ + < $2 + ON CONFLICT (path, created) DO NOTHING" + )) + .bind(offset as i64) + .bind(offset as i64 + batch_size) + .execute(tx.as_mut()) + .await?; + + // Delete moved rows from the legacy table in the same transaction so that + // storage stays roughly flat during the migration (move, not copy). + sqlx::query(concat!( + "DELETE FROM ", + $legacy_table, + " WHERE created >= $1 AND created < $2" + )) + .bind(offset as i64) + .bind(offset as i64 + batch_size) + .execute(tx.as_mut()) + .await?; + + Ok(Some(offset + self.batch_size() as u64)) + } + } + }; +} + +merkle_tree_backfill!( + BackfillFeeMerkleTree, + "hash_bigint_backfill_fee_merkle_tree", + "fee_merkle_tree", + "fee_merkle_tree_bigint" +); +merkle_tree_backfill!( + BackfillBlockMerkleTree, + "hash_bigint_backfill_block_merkle_tree", + "block_merkle_tree", + "block_merkle_tree_bigint" +); + +pub struct CleanupLegacyHashTable; + +#[async_trait] +impl DataBackfill for CleanupLegacyHashTable { + fn name(&self) -> &'static str { + "hash_bigint_cleanup_legacy_hash" + } + + fn requires(&self) -> &'static [&'static str] { + &[ + "hash_bigint_backfill_fee_merkle_tree", + "hash_bigint_backfill_block_merkle_tree", + ] + } + + async fn run_batch( + &self, + tx: &mut Transaction, + // Keyset cursor: last deleted id (0 on first batch). + offset: u64, + ) -> anyhow::Result> { + // Both merkle tree tables are now empty so there are no FK references to hash.id. + let deleted: Vec<(i64,)> = sqlx::query_as( + "DELETE FROM hash WHERE id IN ( + SELECT id FROM hash WHERE id > $1 ORDER BY id LIMIT $2 + ) RETURNING id", + ) + .bind(offset as i64) + .bind(self.batch_size() as i64) + .fetch_all(tx.as_mut()) + .await?; + + Ok(deleted.last().map(|(id,)| *id as u64)) + } +} + +pub fn hash_bigint_migrations() -> MigrationRegistry { + MigrationRegistry::new() + .backfill(BackfillHash) + .backfill(BackfillFeeMerkleTree) + .backfill(BackfillBlockMerkleTree) + .backfill(CleanupLegacyHashTable) +} + +#[cfg(test)] +mod tests { + use alloy::primitives::Address; + use espresso_types::{FEE_MERKLE_TREE_HEIGHT, FeeAccount, FeeAmount, FeeMerkleTree, SeqTypes}; + use hotshot_query_service::{ + data_source::{ + Transaction as _, VersionedDataSource, + sql::Config, + storage::sql::{ + SqlStorage, StorageConnectionType, Transaction as SqlTransaction, Write, + testing::TmpDb, + }, + }, + merklized_state::UpdateStateData, + }; + use jf_merkle_tree_compat::{ + LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme, + }; + + use super::*; + use crate::api::sql::impl_testable_data_source::tmp_options; + + async fn run_to_completion( + backfill: &dyn DataBackfill, + storage: &SqlStorage, + ) -> anyhow::Result<()> { + let mut offset = 0u64; + loop { + let mut tx = storage.write().await?; + let next = backfill.run_batch(&mut tx, offset).await?; + tx.commit().await?; + match next { + Some(o) => offset = o, + None => return Ok(()), + } + } + } + + async fn write_fee_merkle_proofs( + tx: &mut SqlTransaction, + tree: &FeeMerkleTree, + accounts: &[FeeAccount], + block_height: u64, + ) { + let proofs: Vec<_> = accounts + .iter() + .map(|a| { + let proof = match tree.universal_lookup(a) { + LookupResult::Ok(_, p) => p, + LookupResult::NotFound(p) => p, + LookupResult::NotInMemory => panic!("account not in memory"), + }; + let path = + >::to_traversal_path( + a, + tree.height(), + ); + (proof, path) + }) + .collect(); + UpdateStateData::::insert_merkle_nodes_batch( + tx, + proofs, + block_height, + ) + .await + .expect("insert_merkle_nodes_batch"); + } + + /// Regression test for the FK race between `BackfillHash` and live writes + /// to `hash_bigint`. + /// + /// V1501 seeds the `hash_bigint(id)` sequence above `MAX(hash.id)` so new + /// auto-ids cannot collide with legacy ids — but nothing protects the + /// `value` UNIQUE constraint. Whenever a post-migration write inserts a + /// value that also lives in legacy `hash` (the common case: empty-subtree + /// hashes and unchanged branch hashes are byte-identical across blocks), + /// the live row claims a new id, and the backfill's `INSERT (old_id, value) + /// ON CONFLICT DO NOTHING` is silently dropped. The Merkle tree backfill + /// then copies the legacy `hash_id` verbatim and the FK to `hash_bigint(id)` + /// fires because that id was never inserted. + /// + /// This test exercises the real `UpdateStateData::insert_merkle_nodes_batch` + /// path so the shared-hash overlap arises from realistic Merkle proofs. + #[test_log::test(tokio::test(flavor = "multi_thread"))] + async fn backfill_preserves_fk_when_live_write_shares_hash_value() { + let db = TmpDb::init().await; + let opt = tmp_options(&db); + let cfg = Config::try_from(&opt).expect("config"); + let storage = SqlStorage::connect(cfg, StorageConnectionType::Query) + .await + .expect("connect"); + + let mut tree = FeeMerkleTree::new(FEE_MERKLE_TREE_HEIGHT); + let account = FeeAccount::from(Address::repeat_byte(0x42)); + tree.update(account, FeeAmount::from(100_u64)).unwrap(); + + // Write a real Merkle proof for `account` into the *new* _bigint tables. + let mut tx = storage.write().await.unwrap(); + write_fee_merkle_proofs(&mut tx, &tree, &[account], 1).await; + tx.commit().await.unwrap(); + + // Move every row from the _bigint tables back into the legacy tables to + // simulate a database that was populated before V1501 ran. Then reset + // the hash_bigint sequence the way V1501 itself does. + let mut tx = storage.write().await.unwrap(); + sqlx::query("INSERT INTO hash (id, value) SELECT id::INT, value FROM hash_bigint") + .execute(tx.as_mut()) + .await + .unwrap(); + sqlx::query( + "INSERT INTO fee_merkle_tree (path, created, hash_id, children, children_bitvec, idx, \ + entry) SELECT path, created, hash_id::INT, children, children_bitvec::BIT(256), idx, \ + entry FROM fee_merkle_tree_bigint", + ) + .execute(tx.as_mut()) + .await + .unwrap(); + sqlx::query("TRUNCATE fee_merkle_tree_bigint, block_merkle_tree_bigint, hash_bigint") + .execute(tx.as_mut()) + .await + .unwrap(); + sqlx::query( + "SELECT setval(pg_get_serial_sequence('hash_bigint', 'id'), GREATEST(COALESCE((SELECT \ + MAX(id) FROM hash), 1), 1))", + ) + .execute(tx.as_mut()) + .await + .unwrap(); + tx.commit().await.unwrap(); + + // Live post-V1501 write at a new block height. The proof for the same + // account shares almost every hash value with the legacy proof above, + // so the live `batch_insert_hashes` calls collide on `value` with every + // row BackfillHash is about to copy. + let mut tx = storage.write().await.unwrap(); + write_fee_merkle_proofs(&mut tx, &tree, &[account], 2).await; + tx.commit().await.unwrap(); + + // Drive backfills directly so a failure surfaces immediately rather + // than entering the registry's 5-minute retry loop. + run_to_completion(&BackfillHash, &storage) + .await + .expect("BackfillHash failed"); + run_to_completion(&BackfillFeeMerkleTree, &storage) + .await + .expect("BackfillFeeMerkleTree failed (FK violation from dropped hash row)"); + + let mut tx = storage.read().await.unwrap(); + let (n_legacy,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM fee_merkle_tree") + .fetch_one(tx.as_mut()) + .await + .unwrap(); + assert_eq!(n_legacy, 0, "legacy fee_merkle_tree rows were not migrated"); + + let (n_heights,): (i64,) = + sqlx::query_as("SELECT COUNT(DISTINCT created) FROM fee_merkle_tree_bigint") + .fetch_one(tx.as_mut()) + .await + .unwrap(); + assert_eq!(n_heights, 2, "expected rows at both heights"); + + let (n_orphans,): (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM fee_merkle_tree_bigint fmt LEFT JOIN hash_bigint hb ON hb.id = \ + fmt.hash_id WHERE hb.id IS NULL", + ) + .fetch_one(tx.as_mut()) + .await + .unwrap(); + assert_eq!(n_orphans, 0, "fee_merkle_tree_bigint has dangling hash_id"); + + let (n_orphan_children,): (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM ( SELECT child_id FROM fee_merkle_tree_bigint fmt, \ + jsonb_array_elements_text(fmt.children) AS arr(child_id) WHERE fmt.children IS NOT \ + NULL ) c LEFT JOIN hash_bigint hb ON hb.id = c.child_id::BIGINT WHERE hb.id IS NULL", + ) + .fetch_one(tx.as_mut()) + .await + .unwrap(); + assert_eq!( + n_orphan_children, 0, + "fee_merkle_tree_bigint.children has dangling hash_id" + ); + } +} diff --git a/crates/espresso/node/src/persistence/no_storage.rs b/crates/espresso/node/src/persistence/no_storage.rs index 64939187a13..0a138f96633 100644 --- a/crates/espresso/node/src/persistence/no_storage.rs +++ b/crates/espresso/node/src/persistence/no_storage.rs @@ -56,10 +56,6 @@ pub struct NoStorage; #[async_trait] impl SequencerPersistence for NoStorage { - async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> { - Ok(()) - } - async fn load_config(&self) -> anyhow::Result> { Ok(None) } diff --git a/crates/espresso/node/src/persistence/sql.rs b/crates/espresso/node/src/persistence/sql.rs index 89ec9544bce..536d69da898 100644 --- a/crates/espresso/node/src/persistence/sql.rs +++ b/crates/espresso/node/src/persistence/sql.rs @@ -7,13 +7,12 @@ use std::{ }; use alloy::primitives::Address; -use anyhow::{Context, bail, ensure}; +use anyhow::{Context, bail}; use async_trait::async_trait; use clap::Parser; use committable::Committable; use derivative::Derivative; use derive_more::derive::{From, Into}; -use either::Either; use espresso_types::{ AuthenticatedValidatorMap, BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2, NetworkConfig, Payload, PubKey, Ratio, RegisteredValidatorMap, StakeTableHash, parse_duration, @@ -24,7 +23,6 @@ use espresso_types::{ AuthenticatedValidator, EventKey, IndexedStake, RegisteredValidator, RewardAmount, StakeTableEvent, }, - v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2}, }; use futures::stream::StreamExt; use hotshot::InitializerEpochInfo; @@ -33,11 +31,10 @@ use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{ }; use hotshot_new_protocol::message::Certificate2; use hotshot_query_service::{ - availability::{BlockId, LeafQueryData}, + availability::LeafQueryData, data_source::{ Transaction as _, VersionedDataSource, storage::{ - AvailabilityStorage, pruning::PrunerCfg, sql::{ Config, Db, Read, SqlStorage, StorageConnectionType, Transaction, TransactionMode, @@ -72,12 +69,10 @@ use hotshot_types::{ }; use indexmap::IndexMap; use itertools::Itertools; -use jf_merkle_tree_compat::MerkleTreeScheme; use sqlx::{Executor, QueryBuilder, Row, query}; use crate::{ NodeType, RECENT_STAKE_TABLES_LIMIT, SeqTypes, ViewNumber, - api::RewardMerkleTreeV2Data, catchup::SqlStateCatchup, persistence::{migrate_network_config, persistence_metrics::PersistenceMetricsValue}, }; @@ -698,6 +693,14 @@ impl PersistenceOptions for Options { }; persistence.migrate_quorum_proposal_leaf_hashes().await?; self.pool = Some(persistence.db.pool()); + + #[cfg(not(feature = "embedded-db"))] + { + let registry = super::migrations::hash_bigint_migrations(); + let db = persistence.db.clone(); + tokio::spawn(registry.run_all_migrations(db)); + } + Ok(persistence) } @@ -1253,202 +1256,6 @@ async fn prune_to_view(tx: &mut Transaction, view: u64) -> anyhow::Result #[async_trait] impl SequencerPersistence for Persistence { - async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()> { - let batch_size: i64 = 1000; - - let result = { - let mut tx = self.db.read().await?; - query_as::<(bool, i64)>( - "SELECT completed, migrated_rows FROM epoch_migration WHERE table_name = \ - 'reward_merkle_tree_v2_data'", - ) - .fetch_optional(tx.as_mut()) - .await? - }; - - let (is_completed, mut offset) = result.unwrap_or((false, 0)); - - if is_completed { - tracing::info!("reward_merkle_tree_v2 migration already done"); - return Ok(()); - } - - let max_height: Option = { - let mut tx = self.db.read().await?; - query_as::<(Option,)>("SELECT MAX(created) FROM reward_merkle_tree_v2") - .fetch_one(tx.as_mut()) - .await? - .0 - }; - - let max_height = match max_height { - Some(h) => h, - None => { - tracing::info!("no reward data found in reward_merkle_tree_v2, skipping migration"); - return Ok(()); - }, - }; - - tracing::warn!( - "migrating reward_merkle_tree_v2 to reward_merkle_tree_v2_data at height \ - {max_height}..." - ); - - let mut balances: Vec<(RewardAccountV2, RewardAmount)> = Vec::new(); - - loop { - let mut tx = self.db.read().await?; - - #[cfg(not(feature = "embedded-db"))] - let rows = query_as::<(serde_json::Value, serde_json::Value)>( - "SELECT DISTINCT ON (idx) idx, entry - FROM reward_merkle_tree_v2 - WHERE idx IS NOT NULL AND entry IS NOT NULL - ORDER BY idx, created DESC - LIMIT $1 OFFSET $2", - ) - .bind(batch_size) - .bind(offset) - .fetch_all(tx.as_mut()) - .await - .context("loading reward accounts from reward_merkle_tree_v2")?; - - #[cfg(feature = "embedded-db")] - let rows = query_as::<(serde_json::Value, serde_json::Value)>( - "SELECT idx, entry FROM ( - SELECT idx, entry, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY created DESC) \ - as rn - FROM reward_merkle_tree_v2 - WHERE idx IS NOT NULL AND entry IS NOT NULL - ) sub - WHERE rn = 1 ORDER BY idx - LIMIT $1 OFFSET $2", - ) - .bind(batch_size) - .bind(offset) - .fetch_all(tx.as_mut()) - .await - .context("loading reward accounts from reward_merkle_tree_v2")?; - - drop(tx); - - if rows.is_empty() { - break; - } - - let rows_count = rows.len(); - - for (idx, entry) in rows { - let account: RewardAccountV2 = - serde_json::from_value(idx).context("deserializing reward account")?; - let balance: RewardAmount = serde_json::from_value(entry).context(format!( - "deserializing reward balance for account {account}" - ))?; - balances.push((account, balance)); - } - - offset += rows_count as i64; - let mut tx = self.db.write().await?; - tx.upsert( - "epoch_migration", - ["table_name", "completed", "migrated_rows"], - ["table_name"], - [("reward_merkle_tree_v2_data".to_string(), false, offset)], - ) - .await?; - tx.commit().await?; - - tracing::info!( - "reward_merkle_tree_v2 progress: rows={} offset={}", - rows_count, - offset - ); - - if rows_count < batch_size as usize { - break; - } - } - - if balances.is_empty() { - tracing::info!("no reward accounts found, skipping tree rebuild"); - return Ok(()); - } - - tracing::info!( - "rebuilding RewardMerkleTreeV2 from {} accounts", - balances.len() - ); - - let tree = RewardMerkleTreeV2::from_kv_set(REWARD_MERKLE_TREE_V2_HEIGHT, balances) - .context("failed to rebuild RewardMerkleTreeV2 from balances")?; - - let mut tx = self.db.read().await?; - let header = tx - .get_header(BlockId::::from(max_height as usize)) - .await - .context(format!("header {max_height} not available"))?; - drop(tx); - - match header.reward_merkle_tree_root() { - Either::Right(expected_root) => { - ensure!( - tree.commitment() == expected_root, - "rebuilt RewardMerkleTreeV2 commitment {} does not match header commitment {} \ - at height {max_height}", - tree.commitment(), - expected_root, - ); - }, - Either::Left(_) => { - bail!( - "header at height {max_height} has a v1 reward merkle tree root, expected v2" - ); - }, - } - - let tree_data: RewardMerkleTreeV2Data = (&tree) - .try_into() - .context("failed to convert RewardMerkleTreeV2 to RewardMerkleTreeV2Data")?; - let serialized = - bincode::serialize(&tree_data).context("failed to serialize RewardMerkleTreeV2Data")?; - - let mut tx = self.db.write().await?; - tx.upsert( - "reward_merkle_tree_v2_data", - ["height", "balances"], - ["height"], - [(max_height, serialized)], - ) - .await?; - tx.commit().await?; - - // Mark migration as complete, and clean up old tables. - let mut tx = self.db.write().await?; - tx.upsert( - "epoch_migration", - ["table_name", "completed", "migrated_rows"], - ["table_name"], - [("reward_merkle_tree_v2_data".to_string(), true, offset)], - ) - .await?; - let truncate = if cfg!(feature = "embedded-db") { - "DELETE FROM" - } else { - "TRUNCATE" - }; - query(&format!("{truncate} reward_merkle_tree_v2")) - .execute(tx.as_mut()) - .await?; - query(&format!("{truncate} reward_merkle_tree")) - .execute(tx.as_mut()) - .await?; - tx.commit().await?; - - tracing::warn!("migrated reward_merkle_tree_v2 at height {max_height}"); - - Ok(()) - } - fn into_catchup_provider( self, backoff: BackoffParams, diff --git a/crates/espresso/node/src/state.rs b/crates/espresso/node/src/state.rs index 3dd9bae83d2..f85dd37440b 100644 --- a/crates/espresso/node/src/state.rs +++ b/crates/espresso/node/src/state.rs @@ -7,7 +7,7 @@ use either::Either; use espresso_types::{ BlockMerkleTree, EpochRewardsCalculator, FeeAccount, FeeMerkleTree, Leaf2, ValidatedState, traits::StateCatchup, - v0_3::{ChainConfig, RewardAccountV1, RewardMerkleTreeV1}, + v0_3::{ChainConfig, RewardMerkleTreeV1}, v0_4::Delta, }; use futures::{StreamExt, future::Future}; @@ -135,20 +135,16 @@ pub(crate) async fn compute_state_update( async fn store_state_update( tx: &mut impl SequencerStateUpdate, block_number: u64, - version: Version, + _version: Version, state: &ValidatedState, delta: &Delta, ) -> anyhow::Result<()> { let ValidatedState { fee_merkle_tree, block_merkle_tree, - reward_merkle_tree_v1, .. } = state; - let Delta { - fees_delta, - rewards_delta, - } = delta; + let Delta { fees_delta, .. } = delta; // Collect fee merkle tree proofs for batch insertion let fee_proofs: Vec<_> = fees_delta @@ -195,41 +191,6 @@ async fn store_state_update( .context("failed to store block merkle nodes")?; } - if version <= EPOCH_VERSION { - // Collect reward merkle tree v1 proofs for batch insertion - let reward_proofs: Vec<_> = rewards_delta - .iter() - .map(|delta| { - let key = RewardAccountV1::from(*delta); - let proof = match reward_merkle_tree_v1.universal_lookup(key) { - LookupResult::Ok(_, proof) => proof, - LookupResult::NotFound(proof) => proof, - LookupResult::NotInMemory => { - bail!("missing merkle path for reward account {delta}") - }, - }; - let path = >::to_traversal_path( - &key, reward_merkle_tree_v1.height() - ); - Ok((proof, path)) - }) - .collect::>>()?; - - tracing::debug!( - count = reward_proofs.len(), - "inserting v1 reward accounts in batch" - ); - UpdateStateData::::insert_merkle_nodes_batch( - tx, - reward_proofs, - block_number, - ) - .await - .context("failed to store reward merkle nodes")?; - } - Ok(()) } diff --git a/crates/espresso/types/src/v0/impls/state.rs b/crates/espresso/types/src/v0/impls/state.rs index 011b15ec374..5018b17517f 100644 --- a/crates/espresso/types/src/v0/impls/state.rs +++ b/crates/espresso/types/src/v0/impls/state.rs @@ -1407,7 +1407,7 @@ impl MerklizedState for BlockMerkleTree { type Digest = Sha3Digest; fn state_type() -> &'static str { - "block_merkle_tree" + "block_merkle_tree_bigint" } fn header_state_commitment_field() -> &'static str { @@ -1439,7 +1439,7 @@ impl MerklizedState for FeeMerkleTree { type Digest = Sha3Digest; fn state_type() -> &'static str { - "fee_merkle_tree" + "fee_merkle_tree_bigint" } fn header_state_commitment_field() -> &'static str { diff --git a/crates/espresso/types/src/v0/traits.rs b/crates/espresso/types/src/v0/traits.rs index 29f8d2de805..7cd573ba685 100644 --- a/crates/espresso/types/src/v0/traits.rs +++ b/crates/espresso/types/src/v0/traits.rs @@ -557,8 +557,6 @@ pub trait MembershipPersistence: Send + Sync + 'static { pub trait SequencerPersistence: Sized + Send + Sync + Clone + 'static + DhtPersistentStorage + MembershipPersistence { - async fn migrate_reward_merkle_tree_v2(&self) -> anyhow::Result<()>; - /// Use this storage as a state catchup backend, if supported. fn into_catchup_provider( self, @@ -927,9 +925,6 @@ pub trait SequencerPersistence: self.migrate_vid_shares().await?; self.migrate_quorum_proposals().await?; self.migrate_quorum_certificates().await?; - self.migrate_reward_merkle_tree_v2() - .await - .context("failed to migrate reward merkle tree v2")?; self.migrate_x25519_keys().await?; tracing::warn!("consensus storage has been migrated to new types"); diff --git a/data/genesis/staging.toml b/data/genesis/staging.toml index de59a09d549..856ef9d374f 100644 --- a/data/genesis/staging.toml +++ b/data/genesis/staging.toml @@ -23,7 +23,7 @@ timestamp = "1970-01-01T00:00:00Z" [header.chain_config] chain_id = 888888888 base_fee = "1 wei" -max_block_size = "5mb" +max_block_size = "1mb" fee_recipient = "0x0000000000000000000000000000000000000000" fee_contract = "0xa15bb66138824a1c7167f5e85b957d04dd34e468" stake_table_contract = "0x12975173b87f7595ee45dffb2ab812ece596bf84" diff --git a/hotshot-query-service/Cargo.toml b/hotshot-query-service/Cargo.toml index 61dc83eb2df..2b473d70b64 100644 --- a/hotshot-query-service/Cargo.toml +++ b/hotshot-query-service/Cargo.toml @@ -23,7 +23,12 @@ default = ["file-system-data-source", "metrics-data-source", "sql-data-source"] # Enables support for an embedded SQLite database instead of PostgreSQL. # Ideal for lightweight nodes that benefit from pruning and merklized state storage, # offering advantages over file system storage. -embedded-db = ["sqlx/sqlite"] +embedded-db = ["sqlx/sqlite", "sqlite-options"] + +# Exposes shared SQLite connection helpers (e.g. `sqlite_options`) for callers that build their +# own pool. Separate from `embedded-db` so clients can opt in to the helpers without flipping the +# workspace into SQLite mode. +sqlite-options = ["sqlx/sqlite"] # Enable the availability data source backed by the local file system. file-system-data-source = ["atomic_store"] diff --git a/hotshot-query-service/src/data_source/storage/sql.rs b/hotshot-query-service/src/data_source/storage/sql.rs index e8683a0b518..b63ea699b3a 100644 --- a/hotshot-query-service/src/data_source/storage/sql.rs +++ b/hotshot-query-service/src/data_source/storage/sql.rs @@ -247,12 +247,7 @@ impl Default for Config { #[cfg(feature = "embedded-db")] impl Default for Config { fn default() -> Self { - SqliteConnectOptions::default() - .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) - .busy_timeout(Duration::from_secs(30)) - .auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Incremental) - .create_if_missing(true) - .into() + crate::sqlite_options::sqlite_options().into() } } @@ -1263,13 +1258,13 @@ pub mod testing { ( "BIT(8)", "BYTEA", - "SERIAL PRIMARY KEY", + "BIGSERIAL PRIMARY KEY", "(data->>'test_merkle_tree_root')", ) }; format!( - "CREATE TABLE IF NOT EXISTS hash + "CREATE TABLE IF NOT EXISTS hash_bigint ( id {hash_pk}, value {binary} NOT NULL UNIQUE @@ -1283,7 +1278,7 @@ pub mod testing { ( path JSONB NOT NULL, created BIGINT NOT NULL, - hash_id INT NOT NULL, + hash_id BIGINT NOT NULL, children JSONB, children_bitvec {bit_vec}, idx JSONB, diff --git a/hotshot-query-service/src/data_source/storage/sql/queries/state.rs b/hotshot-query-service/src/data_source/storage/sql/queries/state.rs index 8d91f068de2..90a33daf5bc 100644 --- a/hotshot-query-service/src/data_source/storage/sql/queries/state.rs +++ b/hotshot-query-service/src/data_source/storage/sql/queries/state.rs @@ -33,12 +33,13 @@ use super::{ super::transaction::{Transaction, TransactionMode, Write, query_as}, DecodeError, QueryBuilder, }; +#[cfg(feature = "embedded-db")] +use crate::data_source::storage::sql::build_where_in; use crate::{ QueryError, QueryResult, data_source::storage::{ - MerklizedStateHeightStorage, MerklizedStateStorage, - pruning::PrunedHeightStorage, - sql::{build_where_in, sqlx::Row}, + MerklizedStateHeightStorage, MerklizedStateStorage, pruning::PrunedHeightStorage, + sql::sqlx::Row, }, merklized_state::{MerklizedState, Snapshot}, }; @@ -71,15 +72,58 @@ where let nodes: Vec = rows.into_iter().map(|r| r.into()).collect(); + // On postgres, fall back to legacy tables for any traversal-path nodes not yet backfilled. + #[cfg(not(feature = "embedded-db"))] + let nodes = { + let found: HashSet = nodes.iter().map(|n| n.path.to_string()).collect(); + let missing: Vec = + traversal_path_values(&traversal_path, tree_height) + .into_iter() + .filter(|p| !found.contains(&p.to_string())) + .collect(); + + if missing.is_empty() { + nodes + } else { + let legacy_table = state_type.strip_suffix("_bigint").unwrap_or(state_type); + let mut query = QueryBuilder::default(); + let paths_param = query.bind(missing)?; + let created_param = query.bind(created)?; + let sql = format!( + "SELECT n.path, n.created, n.hash_id::BIGINT AS hash_id, n.children, \ + n.children_bitvec, n.idx, n.entry FROM unnest({paths_param}::jsonb[]) AS \ + p(path), LATERAL (SELECT * FROM {legacy_table} WHERE {legacy_table}.path = \ + p.path AND {legacy_table}.created <= {created_param} ORDER BY \ + {legacy_table}.path DESC, {legacy_table}.created DESC LIMIT 1) AS n ORDER BY \ + n.path DESC" + ); + let legacy_rows = + query + .query(&sql) + .fetch_all(self.as_mut()) + .await + .map_err(|e| QueryError::Error { + message: format!("merkle path fallback lookup failed: {e}"), + })?; + let mut nodes = nodes; + nodes.extend(legacy_rows.into_iter().map(Node::from)); + // Sort leaf-first (longer paths first). + nodes.sort_by_key(|n| { + std::cmp::Reverse(n.path.as_array().map(|v| v.len()).unwrap_or(0)) + }); + nodes + } + }; + // insert all the hash ids to a hashset which is used to query later // HashSet is used to avoid duplicates let mut hash_ids = HashSet::new(); for node in nodes.iter() { hash_ids.insert(node.hash_id); if let Some(children) = &node.children { - let children: Vec = + let children: Vec = serde_json::from_value(children.clone()).map_err(|e| QueryError::Error { - message: format!("Error deserializing 'children' into Vec: {e}"), + message: format!("Error deserializing 'children' into Vec: {e}"), })?; hash_ids.extend(children); } @@ -87,13 +131,53 @@ where // Find all the hash values and create a hashmap // Hashmap will be used to get the hash value of the nodes children and the node itself. - let hashes = if !hash_ids.is_empty() { - let (query, sql) = build_where_in("SELECT id, value FROM hash", "id", hash_ids)?; - query - .query_as(&sql) - .fetch(self.as_mut()) - .try_collect::>>() - .await? + let hashes: HashMap> = if !hash_ids.is_empty() { + #[cfg(not(feature = "embedded-db"))] + { + let hash_ids_arr: Vec = hash_ids.iter().copied().collect(); + let mut result: HashMap> = sqlx::query_as( + "SELECT id::BIGINT, value FROM hash_bigint WHERE id = ANY($1::BIGINT[])", + ) + .bind(&hash_ids_arr) + .fetch_all(self.as_mut()) + .await + .map_err(|e| QueryError::Error { + message: format!("hash lookup failed: {e}"), + })? + .into_iter() + .collect(); + + let missing: Vec = hash_ids + .iter() + .filter(|id| !result.contains_key(id)) + .copied() + .collect(); + + if !missing.is_empty() { + let rows: Vec<(i64, Vec)> = sqlx::query_as( + "SELECT id::BIGINT, value FROM hash WHERE id = ANY($1::BIGINT[])", + ) + .bind(&missing) + .fetch_all(self.as_mut()) + .await + .map_err(|e| QueryError::Error { + message: format!("hash fallback lookup failed: {e}"), + })?; + result.extend(rows); + } + result + } + + #[cfg(feature = "embedded-db")] + { + let (query, sql) = + build_where_in("SELECT id, value FROM hash_bigint", "id", hash_ids)?; + query + .query_as(&sql) + .fetch(self.as_mut()) + .try_collect::>>() + .await? + } } else { HashMap::new() }; @@ -116,11 +200,11 @@ where match (children, children_bitvec, idx, entry) { // If the row has children then its a branch (Some(children), Some(children_bitvec), None, None) => { - let children: Vec = + let children: Vec = serde_json::from_value(children.clone()).map_err(|e| { QueryError::Error { message: format!( - "Error deserializing 'children' into Vec: {e}" + "Error deserializing 'children' into Vec: {e}" ), } })?; @@ -353,7 +437,7 @@ pub(crate) fn build_hash_batch_insert( .map(|hash| Ok(format!("({})", query.bind(hash)?))) .collect::>>()?; let sql = format!( - "INSERT INTO hash(value) values {} ON CONFLICT (value) DO UPDATE SET value = \ + "INSERT INTO hash_bigint(value) values {} ON CONFLICT (value) DO UPDATE SET value = \ EXCLUDED.value returning value, id", params.join(",") ); @@ -366,16 +450,17 @@ pub(crate) fn build_hash_batch_insert( pub(crate) async fn batch_insert_hashes( hashes: Vec>, tx: &mut Transaction, -) -> QueryResult, i32>> { +) -> QueryResult, i64>> { if hashes.is_empty() { return Ok(HashMap::new()); } - // Use UNNEST-based batch insert (more efficient and avoids parameter limits) - let sql = "INSERT INTO hash(value) SELECT * FROM UNNEST($1::bytea[]) ON CONFLICT (value) DO \ - UPDATE SET value = EXCLUDED.value RETURNING value, id"; + // Use UNNEST-based batch insert (more efficient and avoids parameter limits). + // Cast id to BIGINT in RETURNING so the result maps directly to i64. + let sql = "INSERT INTO hash_bigint(value) SELECT * FROM UNNEST($1::bytea[]) ON CONFLICT \ + (value) DO UPDATE SET value = EXCLUDED.value RETURNING value, id::BIGINT"; - let result: HashMap, i32> = sqlx::query_as(sql) + let result: HashMap, i64> = sqlx::query_as(sql) .bind(&hashes) .fetch(tx.as_mut()) .try_collect() @@ -524,7 +609,7 @@ where pub(crate) struct Node { pub(crate) path: JsonValue, pub(crate) created: i64, - pub(crate) hash_id: i32, + pub(crate) hash_id: i64, pub(crate) children: Option, pub(crate) children_bitvec: Option, pub(crate) idx: Option, @@ -662,7 +747,7 @@ impl Node { let sql = format!( r#" INSERT INTO "{name}" (path, created, hash_id, children, children_bitvec, idx, entry) - SELECT * FROM UNNEST($1::jsonb[], $2::bigint[], $3::int[], $4::jsonb[], $5::bit varying[], $6::jsonb[], $7::jsonb[]) + SELECT * FROM UNNEST($1::jsonb[], $2::bigint[], $3::bigint[], $4::jsonb[], $5::bit varying[], $6::jsonb[], $7::jsonb[]) ON CONFLICT (path, created) DO UPDATE SET hash_id = EXCLUDED.hash_id, children = EXCLUDED.children, @@ -688,45 +773,61 @@ impl Node { } } +/// Compute the full set of path JSON values for a traversal path (leaf to root). +/// +/// Each element is the path prefix used as a row key in a Merkle-tree table. +fn traversal_path_values(traversal_path: &[usize], tree_height: usize) -> Vec { + let len = tree_height; + let mut result = Vec::with_capacity(len + 1); + let mut trav = traversal_path.iter().map(|x| *x as i32); + for _ in 0..=len { + let path: Vec = trav.clone().rev().collect(); + result.push(serde_json::Value::from(path)); + trav.next(); + } + result +} + fn build_get_path_query<'q>( table: &'static str, traversal_path: Vec, created: i64, ) -> QueryResult<(QueryBuilder<'q>, String)> { let mut query = QueryBuilder::default(); - let mut traversal_path = traversal_path.into_iter().map(|x| x as i32); - - // We iterate through the path vector skipping the first element after each iteration - let len = traversal_path.len(); - let mut sub_queries = Vec::new(); - - query.bind(created)?; - - for _ in 0..=len { - let path = traversal_path.clone().rev().collect::>(); - let path: serde_json::Value = path.into(); - let node_path = query.bind(path)?; - - let sub_query = format!( - "SELECT * FROM (SELECT * FROM {table} WHERE path = {node_path} AND created <= $1 \ - ORDER BY created DESC LIMIT 1) AS latest_node", - ); - - sub_queries.push(sub_query); - traversal_path.next(); - } + let paths = traversal_path_values(&traversal_path, traversal_path.len()); - let mut sql: String = sub_queries.join(" UNION "); - - sql = format!("SELECT * FROM ({sql}) as t "); + #[cfg(not(feature = "embedded-db"))] + let sql = { + let paths_param = query.bind(paths)?; + let created_param = query.bind(created)?; + // One LATERAL point-lookup per path in the array, instead of N UNION'd subqueries. + // Postgres plans the inner subquery once and reuses it, collapsing N independent + // planner decisions into one. + format!( + "SELECT n.* FROM unnest({paths_param}::jsonb[]) AS p(path), LATERAL (SELECT * FROM \ + {table} WHERE {table}.path = p.path AND {table}.created <= {created_param} ORDER BY \ + {table}.path DESC, {table}.created DESC LIMIT 1) AS n ORDER BY n.path DESC" + ) + }; - // PostgreSQL already orders JSON arrays by length, so no additional function is needed - // For SQLite, `length()` is used to sort by length. - if cfg!(feature = "embedded-db") { - sql.push_str("ORDER BY length(t.path) DESC"); - } else { - sql.push_str("ORDER BY t.path DESC"); - } + #[cfg(feature = "embedded-db")] + let sql = { + // SQLite has no native array binding, so we keep the UNION form. Each subquery + // is a point lookup by the composite (path, created) primary key. + let created_param = query.bind(created)?; + let mut sub_queries = Vec::with_capacity(paths.len()); + for path in paths { + let node_path = query.bind(path)?; + sub_queries.push(format!( + "SELECT * FROM (SELECT * FROM {table} WHERE path = {node_path} AND created <= \ + {created_param} ORDER BY path DESC, created DESC LIMIT 1) AS latest_node", + )); + } + format!( + "SELECT * FROM ({}) as t ORDER BY length(t.path) DESC", + sub_queries.join(" UNION ") + ) + }; Ok((query, sql)) } diff --git a/hotshot-query-service/src/data_source/storage/sql/transaction.rs b/hotshot-query-service/src/data_source/storage/sql/transaction.rs index 05d956eeaf8..2c627c96c14 100644 --- a/hotshot-query-service/src/data_source/storage/sql/transaction.rs +++ b/hotshot-query-service/src/data_source/storage/sql/transaction.rs @@ -896,14 +896,14 @@ impl, const ARITY: usize> let hashes: Vec> = all_hashes.into_iter().collect(); #[cfg(not(feature = "embedded-db"))] - let nodes_hash_ids: HashMap, i32> = batch_insert_hashes(hashes, self).await?; + let nodes_hash_ids: HashMap, i64> = batch_insert_hashes(hashes, self).await?; #[cfg(feature = "embedded-db")] - let nodes_hash_ids: HashMap, i32> = { - let mut hash_ids: HashMap, i32> = HashMap::with_capacity(hashes.len()); + let nodes_hash_ids: HashMap, i64> = { + let mut hash_ids: HashMap, i64> = HashMap::with_capacity(hashes.len()); for hash_chunk in hashes.chunks(20) { let (query, sql) = build_hash_batch_insert(hash_chunk)?; - let chunk_ids: HashMap, i32> = query + let chunk_ids: HashMap, i64> = query .query_as(&sql) .fetch(self.as_mut()) .try_collect() @@ -923,7 +923,7 @@ impl, const ARITY: usize> let children_hashes = children .iter() .map(|c| nodes_hash_ids.get(c).copied()) - .collect::>>() + .collect::>>() .ok_or(QueryError::Error { message: "Missing child hash".to_string(), })?; diff --git a/hotshot-query-service/src/lib.rs b/hotshot-query-service/src/lib.rs index e130390a101..608eb38f197 100644 --- a/hotshot-query-service/src/lib.rs +++ b/hotshot-query-service/src/lib.rs @@ -421,8 +421,11 @@ pub mod explorer; pub mod fetching; pub mod merklized_state; pub mod metrics; +pub mod migration; pub mod node; mod resolvable; +#[cfg(feature = "sqlite-options")] +pub mod sqlite_options; pub mod status; pub mod task; pub mod testing; diff --git a/hotshot-query-service/src/migration.rs b/hotshot-query-service/src/migration.rs new file mode 100644 index 00000000000..a0772e2df4a --- /dev/null +++ b/hotshot-query-service/src/migration.rs @@ -0,0 +1,532 @@ +use std::{env, time::Duration}; + +use async_trait::async_trait; + +use crate::data_source::{ + Transaction as _, VersionedDataSource, + storage::sql::{SqlStorage, Transaction, Write}, +}; + +const RETRY_INTERVAL: Duration = Duration::from_secs(300); + +/// A background migration that copies or transforms data in batches. +#[async_trait] +pub trait DataBackfill: Send + Sync + 'static { + fn name(&self) -> &'static str; + + /// Names of other [`DataBackfill`] migrations that must complete before this one starts. + fn requires(&self) -> &'static [&'static str] { + &[] + } + + /// Number of rows to process per batch. + fn batch_size(&self) -> usize { + 1_000 + } + + /// Number of batches between progress log lines. + fn log_frequency(&self) -> usize { + 10 + } + + /// How long to sleep between batches to avoid saturating the database. + fn batch_delay(&self) -> Duration { + Duration::from_millis(50) + } + + /// Process one batch starting at `offset`. + /// + /// Returns `Some(next_offset)` to continue, or `None` when all rows have been processed. + async fn run_batch( + &self, + tx: &mut Transaction, + offset: u64, + ) -> anyhow::Result>; +} + +/// An ordered registry of [`DataBackfill`] migrations. +pub struct MigrationRegistry { + migrations: Vec>, +} + +impl MigrationRegistry { + /// Create an empty registry. + pub fn new() -> Self { + Self { + migrations: Vec::new(), + } + } + + /// Append a migration. Migrations run in the order they are added. + pub fn backfill(mut self, m: impl DataBackfill) -> Self { + self.migrations.push(Box::new(m)); + self + } + + /// Validate the registry. + /// + /// Checks that: + /// - All migration names are unique. + /// - Every name listed in `requires()` refers to a migration that appears earlier in the list. + pub fn validate(&self) -> anyhow::Result<()> { + let mut seen: std::collections::HashSet<&'static str> = std::collections::HashSet::new(); + for m in &self.migrations { + let name = m.name(); + anyhow::ensure!(seen.insert(name), "duplicate migration name: {name}"); + for dep in m.requires() { + anyhow::ensure!( + seen.contains(dep), + "migration {name} requires {dep} which either does not exist or appears later \ + in the registry", + ); + } + } + Ok(()) + } + + /// Iterate over the registered migrations in order. + pub fn iter(&self) -> impl Iterator { + self.migrations.iter().map(|b| b.as_ref()) + } + + /// Drive all registered migrations to completion. + /// + /// Iterates over all migrations on each pass. Migrations whose prerequisites are not yet + /// complete are skipped with a warning and retried after [`RETRY_INTERVAL`]. The loop exits + /// once every migration has been marked complete. + pub async fn run_all_migrations(self, db: SqlStorage) { + if let Err(e) = self.validate() { + tracing::error!( + "deferred migration registry is invalid, skipping all backfills: {e:#}" + ); + return; + } + + loop { + let mut pending = 0usize; + + for m in &self.migrations { + let name = m.name(); + + // Skip migrations that have already been marked complete. + match Self::is_complete(&db, name).await { + Ok(true) => continue, + Ok(false) => {}, + Err(e) => { + tracing::error!(name, "failed to check migration status: {e:#}"); + pending += 1; + continue; + }, + } + + // Defer if any prerequisite is not yet complete. + let mut deps_ready = true; + for dep in m.requires() { + match Self::is_complete(&db, dep).await { + Ok(true) => {}, + Ok(false) => { + tracing::warn!( + name, + dep, + "prerequisite not yet complete, will retry after \ + {RETRY_INTERVAL:?}" + ); + deps_ready = false; + }, + Err(e) => { + tracing::error!(name, dep, "failed to check prerequisite: {e:#}"); + deps_ready = false; + }, + } + } + if !deps_ready { + pending += 1; + continue; + } + + // Run all batches for this migration. + if !Self::run_migration(&db, m.as_ref()).await { + pending += 1; + } + } + + if pending == 0 { + tracing::warn!("all deferred migrations complete"); + break; + } + + tokio::time::sleep(RETRY_INTERVAL).await; + } + } + + /// Run all batches for a single migration to completion. Returns `true` if the migration + /// finished successfully, `false` if it encountered an error. + async fn run_migration(db: &SqlStorage, m: &dyn DataBackfill) -> bool { + let name = m.name(); + + let mut offset = match Self::init_and_get_offset(db, name).await { + Ok(o) => o, + Err(e) => { + tracing::error!(name, "failed to initialize migration: {e:#}"); + return false; + }, + }; + + let delay = env::var("ESPRESSO_NODE_BACKFILL_BATCH_DELAY_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .map(Duration::from_millis) + .unwrap_or_else(|| m.batch_delay()); + + tracing::warn!(name, offset, ?delay, "starting deferred migration"); + + let mut batch_count: usize = 0; + + loop { + let mut tx = match db.write().await { + Ok(tx) => tx, + Err(e) => { + tracing::error!(name, offset, "failed to open write transaction: {e:#}"); + return false; + }, + }; + + let next = match m.run_batch(&mut tx, offset).await { + Ok(next) => next, + Err(e) => { + tracing::error!(name, offset, "migration batch failed: {e:#}"); + return false; + }, + }; + + if let Err(e) = sqlx::query( + "UPDATE deferred_migrations SET last_offset = $1, completed_at = CASE WHEN $2 \ + THEN CURRENT_TIMESTAMP ELSE completed_at END WHERE name = $3", + ) + .bind(next.unwrap_or(offset) as i64) + .bind(next.is_none()) + .bind(name) + .execute(tx.as_mut()) + .await + { + tracing::error!(name, "failed to persist migration progress: {e:#}"); + return false; + } + + if let Err(e) = tx.commit().await { + tracing::error!(name, "failed to commit migration batch: {e:#}"); + return false; + } + + batch_count += 1; + + let Some(next_offset) = next else { + tracing::warn!(name, batches = batch_count, "deferred migration complete"); + return true; + }; + offset = next_offset; + + if !delay.is_zero() { + tokio::time::sleep(delay).await; + } + + if batch_count.is_multiple_of(m.log_frequency()) { + tracing::warn!( + name, + offset, + batches = batch_count, + "deferred migration progress" + ); + } + } + } + + /// Returns `true` if the named migration has been marked complete in the database. + /// Returns `false` if the row does not exist or `completed_at` is null. + async fn is_complete(db: &SqlStorage, name: &str) -> anyhow::Result { + let mut tx = db.read().await?; + let row: Option<(bool,)> = sqlx::query_as( + "SELECT completed_at IS NOT NULL FROM deferred_migrations WHERE name = $1", + ) + .bind(name) + .fetch_optional(tx.as_mut()) + .await?; + Ok(row.map(|(b,)| b).unwrap_or(false)) + } + + /// Insert a tracking row for `name` if one does not yet exist, then return the stored offset + /// to resume from. + async fn init_and_get_offset(db: &SqlStorage, name: &str) -> anyhow::Result { + let mut tx = db.write().await?; + + sqlx::query( + "INSERT INTO deferred_migrations (name, started_at, last_offset) VALUES ($1, \ + CURRENT_TIMESTAMP, 0) ON CONFLICT (name) DO NOTHING", + ) + .bind(name) + .execute(tx.as_mut()) + .await?; + + let (last_offset,): (i64,) = sqlx::query_as( + "SELECT COALESCE(last_offset, 0) FROM deferred_migrations WHERE name = $1", + ) + .bind(name) + .fetch_one(tx.as_mut()) + .await?; + + tx.commit().await?; + + Ok(last_offset as u64) + } +} + +impl Default for MigrationRegistry { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // A minimal DataBackfill that runs `total` batches (one unit of work each) then stops. + struct CountBatches { + name: &'static str, + total: u64, + deps: &'static [&'static str], + } + + impl CountBatches { + fn new(name: &'static str, total: u64) -> Self { + Self { + name, + total, + deps: &[], + } + } + + fn with_deps(name: &'static str, total: u64, deps: &'static [&'static str]) -> Self { + Self { name, total, deps } + } + } + + #[async_trait] + impl DataBackfill for CountBatches { + fn name(&self) -> &'static str { + self.name + } + + fn requires(&self) -> &'static [&'static str] { + self.deps + } + + fn batch_size(&self) -> usize { + 1 + } + + async fn run_batch( + &self, + _tx: &mut Transaction, + offset: u64, + ) -> anyhow::Result> { + Ok((offset < self.total).then_some(offset + 1)) + } + } + + // --- validate() unit tests (no database required) --- + + #[test] + fn validate_empty() { + MigrationRegistry::new().validate().unwrap(); + } + + #[test] + fn validate_single() { + MigrationRegistry::new() + .backfill(CountBatches::new("a", 1)) + .validate() + .unwrap(); + } + + #[test] + fn validate_duplicate_name() { + let err = MigrationRegistry::new() + .backfill(CountBatches::new("foo", 1)) + .backfill(CountBatches::new("foo", 1)) + .validate() + .unwrap_err(); + assert!(err.to_string().contains("duplicate")); + } + + #[test] + fn validate_unknown_dep() { + let err = MigrationRegistry::new() + .backfill(CountBatches::with_deps("bar", 1, &["nonexistent"])) + .validate() + .unwrap_err(); + assert!(err.to_string().contains("nonexistent")); + } + + #[test] + fn validate_dep_must_precede() { + // "a" depends on "b" but "b" comes after "a" in the registry. + let err = MigrationRegistry::new() + .backfill(CountBatches::with_deps("a", 1, &["b"])) + .backfill(CountBatches::new("b", 1)) + .validate() + .unwrap_err(); + assert!(err.to_string().contains("b")); + } + + #[test] + fn validate_valid_dependency_chain() { + MigrationRegistry::new() + .backfill(CountBatches::new("a", 1)) + .backfill(CountBatches::with_deps("b", 1, &["a"])) + .backfill(CountBatches::with_deps("c", 1, &["a", "b"])) + .validate() + .unwrap(); + } + + // --- Integration tests (require a real postgres database) --- + + #[cfg(not(feature = "embedded-db"))] + mod db_tests { + use super::*; + use crate::data_source::storage::sql::{ + Migration, SqlStorage, StorageConnectionType, testing::TmpDb, + }; + + // The deferred_migrations table lives in espresso-node's migrations; for tests within + // this crate we inject it as an extra migration. + const CREATE_DEFERRED_MIGRATIONS: &str = " + CREATE TABLE IF NOT EXISTS deferred_migrations ( + name TEXT PRIMARY KEY, + started_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ, + last_offset BIGINT + ); + "; + + async fn setup() -> (TmpDb, SqlStorage) { + let db = TmpDb::init().await; + let storage = SqlStorage::connect( + db.config().migrations(vec![ + Migration::unapplied( + "V9990__deferred_migrations.sql", + CREATE_DEFERRED_MIGRATIONS, + ) + .unwrap(), + ]), + StorageConnectionType::Query, + ) + .await + .unwrap(); + (db, storage) + } + + async fn is_complete(storage: &SqlStorage, name: &str) -> bool { + let mut tx = storage.read().await.unwrap(); + let row: Option<(bool,)> = sqlx::query_as( + "SELECT completed_at IS NOT NULL FROM deferred_migrations WHERE name = $1", + ) + .bind(name) + .fetch_optional(tx.as_mut()) + .await + .unwrap(); + row.map(|(b,)| b).unwrap_or(false) + } + + async fn last_offset(storage: &SqlStorage, name: &str) -> Option { + let mut tx = storage.read().await.unwrap(); + let row: Option<(i64,)> = + sqlx::query_as("SELECT last_offset FROM deferred_migrations WHERE name = $1") + .bind(name) + .fetch_optional(tx.as_mut()) + .await + .unwrap(); + row.map(|(o,)| o) + } + + #[test_log::test(tokio::test(flavor = "multi_thread"))] + async fn migration_runs_to_completion() { + let (_db, storage) = setup().await; + + MigrationRegistry::new() + .backfill(CountBatches::new("m", 5)) + .run_all_migrations(storage.clone()) + .await; + + assert!(is_complete(&storage, "m").await); + assert_eq!(last_offset(&storage, "m").await, Some(5)); + } + + #[test_log::test(tokio::test(flavor = "multi_thread"))] + async fn migration_resumes_from_checkpoint() { + let (_db, storage) = setup().await; + + // Seed partial progress: already at offset 3 of 5. + let mut tx = storage.write().await.unwrap(); + sqlx::query( + "INSERT INTO deferred_migrations (name, started_at, last_offset) VALUES ($1, \ + CURRENT_TIMESTAMP, $2)", + ) + .bind("m") + .bind(3i64) + .execute(tx.as_mut()) + .await + .unwrap(); + tx.commit().await.unwrap(); + + MigrationRegistry::new() + .backfill(CountBatches::new("m", 5)) + .run_all_migrations(storage.clone()) + .await; + + assert!(is_complete(&storage, "m").await); + assert_eq!(last_offset(&storage, "m").await, Some(5)); + } + + #[test_log::test(tokio::test(flavor = "multi_thread"))] + async fn completed_migration_is_skipped() { + let (_db, storage) = setup().await; + + // Pre-mark the migration as complete at offset 99. + let mut tx = storage.write().await.unwrap(); + sqlx::query( + "INSERT INTO deferred_migrations (name, started_at, completed_at, last_offset) \ + VALUES ($1, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, $2)", + ) + .bind("m") + .bind(99i64) + .execute(tx.as_mut()) + .await + .unwrap(); + tx.commit().await.unwrap(); + + // Run a migration with total=0 — if it ran, it would reset the offset. + MigrationRegistry::new() + .backfill(CountBatches::new("m", 0)) + .run_all_migrations(storage.clone()) + .await; + + // Offset must not have changed. + assert_eq!(last_offset(&storage, "m").await, Some(99)); + } + + #[test_log::test(tokio::test(flavor = "multi_thread"))] + async fn dependency_ordering_respected() { + let (_db, storage) = setup().await; + + MigrationRegistry::new() + .backfill(CountBatches::new("first", 3)) + .backfill(CountBatches::with_deps("second", 3, &["first"])) + .run_all_migrations(storage.clone()) + .await; + + assert!(is_complete(&storage, "first").await); + assert!(is_complete(&storage, "second").await); + } + } +} diff --git a/hotshot-query-service/src/sqlite_options.rs b/hotshot-query-service/src/sqlite_options.rs new file mode 100644 index 00000000000..9490e1d1e6a --- /dev/null +++ b/hotshot-query-service/src/sqlite_options.rs @@ -0,0 +1,25 @@ +//! Shared SQLite connection defaults. +//! +//! Used by both the embedded-db variant of this crate and by external clients (e.g. the light +//! client) that build their own pool. Keeping the pragma choices in one place ensures every +//! SQLite database in the workspace runs with the same journaling, locking, and vacuum settings. + +use std::time::Duration; + +use sqlx::sqlite::{SqliteAutoVacuum, SqliteConnectOptions, SqliteJournalMode}; + +/// Default [`SqliteConnectOptions`] for SQLite databases in this workspace. +/// +/// WAL journaling is the load-bearing choice: under the default rollback journal, any writer +/// holds an exclusive lock that blocks readers on other connections, which produces spurious +/// `SQLITE_BUSY` ("database is locked") errors under concurrent access. WAL lets readers and +/// the single writer proceed in parallel. +/// +/// Callers add `.filename(...)` (or use `:memory:`) on top. +pub fn sqlite_options() -> SqliteConnectOptions { + SqliteConnectOptions::default() + .journal_mode(SqliteJournalMode::Wal) + .busy_timeout(Duration::from_secs(30)) + .auto_vacuum(SqliteAutoVacuum::Incremental) + .create_if_missing(true) +} diff --git a/light-client/Cargo.toml b/light-client/Cargo.toml index 68eb2f795e3..5ba6747f493 100644 --- a/light-client/Cargo.toml +++ b/light-client/Cargo.toml @@ -21,7 +21,7 @@ derive_builder = { workspace = true } derive_more = { workspace = true } espresso-types = { workspace = true } futures = { workspace = true } -hotshot-query-service = { workspace = true } +hotshot-query-service = { workspace = true, features = ["sqlite-options"] } hotshot-query-service-types = { workspace = true } hotshot-types = { workspace = true } jf-advz = { workspace = true } diff --git a/light-client/src/storage.rs b/light-client/src/storage.rs index 0614b7e1ce0..30f3aca7d2e 100644 --- a/light-client/src/storage.rs +++ b/light-client/src/storage.rs @@ -1,11 +1,13 @@ #[cfg(unix)] use std::{fs::Permissions, os::unix::fs::PermissionsExt}; -use std::{future::Future, path::PathBuf, str::FromStr, sync::Arc}; +use std::{future::Future, path::PathBuf, str::FromStr, sync::Arc, time::Duration}; use alloy::primitives::Address; use anyhow::{Context, Result}; use derive_more::{Display, From}; -use espresso_types::{PubKey, SeqTypes, StakeTableState, v0_3::RegisteredValidator}; +use espresso_types::{ + BackoffParams, PubKey, Ratio, SeqTypes, StakeTableState, v0_3::RegisteredValidator, +}; use futures::TryStreamExt; use hotshot_query_service_types::{ HeightIndexed, @@ -13,10 +15,7 @@ use hotshot_query_service_types::{ }; use hotshot_types::{data::EpochNumber, light_client::StateVerKey, x25519}; use serde_json::Value; -use sqlx::{ - QueryBuilder, SqlitePool, query, query_as, - sqlite::{SqliteConnectOptions, SqlitePoolOptions}, -}; +use sqlx::{QueryBuilder, SqlitePool, query, query_as, sqlite::SqlitePoolOptions}; use tempfile::{Builder, TempDir}; use vbs::version::Version; @@ -32,6 +31,20 @@ pub enum LeafRequest { Header(BlockId), } +/// Maximum number of retries for a failed write before propagating the error. +const WRITE_RETRY_MAX: u32 = 5; + +/// Backoff for retrying failed writes. Staggered so concurrent writers don't lock-step. +const WRITE_BACKOFF: BackoffParams = BackoffParams::new( + Duration::from_millis(50), + Duration::from_millis(1_000), + 2, + Ratio { + numerator: 5, + denominator: 10, + }, +); + /// Client-side database for a [`LightClient`]. pub trait Storage: Sized + Send + Sync + 'static { /// Create a default, empty instance of the state. @@ -188,9 +201,7 @@ impl LightClientSqliteOptions { }, }; - let opt = SqliteConnectOptions::new() - .filename(&path) - .create_if_missing(true); + let opt = hotshot_query_service::sqlite_options::sqlite_options().filename(&path); let pool = SqlitePoolOptions::default() .max_connections(self.num_connections) .connect_with(opt) @@ -302,56 +313,64 @@ impl Storage for SqliteStorage { } async fn insert_leaf(&self, leaf: LeafQueryData) -> Result<()> { - let mut tx = self.pool.begin().await?; - let height = leaf.height() as i64; let hash = leaf.hash().to_string(); let block_hash = leaf.block_hash().to_string(); let payload_hash = leaf.payload_hash().to_string(); - let data = serde_json::to_value(leaf)?; - - tracing::debug!(height, hash, "inserting leaf"); - let (id,): (i32,) = query_as( - "INSERT INTO leaf (height, hash, block_hash, payload_hash, data) VALUES ($1, $2, $3, \ - $4, $5) + let data = serde_json::to_value(&leaf)?; + + WRITE_BACKOFF + .retry_if( + WRITE_RETRY_MAX, + |_| true, + || async { + let mut tx = self.pool.begin().await?; + + tracing::debug!(height, hash, "inserting leaf"); + let (id,): (i32,) = query_as( + "INSERT INTO leaf (height, hash, block_hash, payload_hash, data) VALUES \ + ($1, $2, $3, $4, $5) ON CONFLICT (height) DO UPDATE SET id = excluded.id RETURNING id", - ) - .bind(height) - .bind(&hash) - .bind(&block_hash) - .bind(&payload_hash) - .bind(data) - .fetch_one(tx.as_mut()) - .await - .context("inserting new leaf")?; - tracing::debug!(height, hash, id, "inserted leaf"); - - // Delete the oldest leaves as necessary until the number of leaves stored does not exceed - // `num_leaves`. - let (num_leaves,): (u32,) = query_as("SELECT count(*) FROM leaf") - .fetch_one(tx.as_mut()) - .await - .context("counting leaves")?; - let to_delete = num_leaves.saturating_sub(self.num_leaves); - if to_delete > 0 { - let (id_to_delete,): (i64,) = - query_as("SELECT id FROM leaf ORDER BY id LIMIT 1 OFFSET $1") - .bind(to_delete - 1) + ) + .bind(height) + .bind(&hash) + .bind(&block_hash) + .bind(&payload_hash) + .bind(&data) .fetch_one(tx.as_mut()) .await - .context("finding timestamp for GC")?; - tracing::info!(id_to_delete, "garbage collecting {to_delete} leaves"); - let res = query("DELETE FROM leaf WHERE id <= $1") - .bind(id_to_delete) - .execute(tx.as_mut()) - .await - .context("deleting old leaves")?; - tracing::info!("deleted {} leaves", res.rows_affected()); - } - - tx.commit().await?; - Ok(()) + .context("inserting new leaf")?; + tracing::debug!(height, hash, id, "inserted leaf"); + + // Delete the oldest leaves as necessary until the number of leaves stored does + // not exceed `num_leaves`. + let (num_leaves,): (u32,) = query_as("SELECT count(*) FROM leaf") + .fetch_one(tx.as_mut()) + .await + .context("counting leaves")?; + let to_delete = num_leaves.saturating_sub(self.num_leaves); + if to_delete > 0 { + let (id_to_delete,): (i64,) = + query_as("SELECT id FROM leaf ORDER BY id LIMIT 1 OFFSET $1") + .bind(to_delete - 1) + .fetch_one(tx.as_mut()) + .await + .context("finding timestamp for GC")?; + tracing::info!(id_to_delete, "garbage collecting {to_delete} leaves"); + let res = query("DELETE FROM leaf WHERE id <= $1") + .bind(id_to_delete) + .execute(tx.as_mut()) + .await + .context("deleting old leaves")?; + tracing::info!("deleted {} leaves", res.rows_affected()); + } + + tx.commit().await?; + Ok(()) + }, + ) + .await } async fn stake_table_lower_bound( @@ -462,134 +481,148 @@ impl Storage for SqliteStorage { epoch_root_protocol_version: Version, next_epoch_root_protocol_version: Version, ) -> Result<()> { - let mut tx = self.pool.begin().await?; - - // Record that the stake table for this epoch is available, along with the versions of - // the epoch root headers in epochs `e-2` and `e-1` (snapshot points for `e` and `e+1`). let epoch = i64::try_from(*epoch).context("epoch overflow")?; let epoch_root_protocol_version_str = epoch_root_protocol_version.to_string(); let next_epoch_root_protocol_version_str = next_epoch_root_protocol_version.to_string(); - query( - "INSERT INTO stake_table_epoch (epoch, epoch_root_protocol_version, \ - next_epoch_root_protocol_version) VALUES ($1, $2, $3)", - ) - .bind(epoch) - .bind(&epoch_root_protocol_version_str) - .bind(&next_epoch_root_protocol_version_str) - .execute(tx.as_mut()) - .await - .context(format!( - "recording stake table availability for epoch {epoch}" - ))?; - - // Insert validators for the new stake table. let validators = stake_table .validators() .values() .cloned() .map(serde_json::to_value) .collect::, _>>()?; - QueryBuilder::new("INSERT INTO stake_table_validator (epoch, idx, data) ") - .push_values(validators.into_iter().enumerate(), |mut q, (i, data)| { - q.push_bind(epoch).push_bind(i as i64).push_bind(data); - }) - .build() - .execute(tx.as_mut()) - .await - .context(format!("inserting validators for epoch {epoch}"))?; - - // Insert only newly used BLS keys. - QueryBuilder::new("INSERT INTO stake_table_bls_key (epoch, key) ") - .push_values(stake_table.used_bls_keys(), |mut q, key| { - q.push_bind(epoch).push_bind(key.to_string()); - }) - // If we insert keys out of order, make sure `epoch` reflects the earliest time when - // this key was added to the state. - .push(" ON CONFLICT (key) DO UPDATE SET epoch = min(epoch, excluded.epoch)") - .build() - .execute(tx.as_mut()) - .await - .context(format!("inserting newly used BLS keys for epoch {epoch}"))?; - - // Insert only newly used Schnorr keys. - QueryBuilder::new("INSERT INTO stake_table_schnorr_key (epoch, key) ") - .push_values(stake_table.used_schnorr_keys(), |mut q, key| { - q.push_bind(epoch).push_bind(key.to_string()); - }) - // If we insert keys out of order, make sure `epoch` reflects the earliest time when - // this key was added to the state. - .push(" ON CONFLICT (key) DO UPDATE SET epoch = min(epoch, excluded.epoch)") - .build() - .execute(tx.as_mut()) - .await - .context(format!( - "inserting newly used Schnorr keys for epoch {epoch}" - ))?; - - // Insert only newly used x25519 keys. - if !stake_table.used_x25519_keys().is_empty() { - QueryBuilder::new("INSERT INTO stake_table_x25519_key (epoch, key) ") - .push_values(stake_table.used_x25519_keys(), |mut q, key| { - q.push_bind(epoch).push_bind(key.to_string()); - }) - // If we insert keys out of order, make sure `epoch` reflects the earliest time - // when this key was added to the state. - .push(" ON CONFLICT (key) DO UPDATE SET epoch = min(epoch, excluded.epoch)") - .build() - .execute(tx.as_mut()) - .await - .context(format!( - "inserting newly used x25519 keys for epoch {epoch}" - ))?; - } - - // Insert only the new validator exits. - if !stake_table.validator_exits().is_empty() { - QueryBuilder::new("INSERT INTO stake_table_exit (epoch, address) ") - .push_values(stake_table.validator_exits(), |mut q, address| { - q.push_bind(epoch).push_bind(address.to_string()); - }) - // If we insert exits out of order, make sure `epoch` reflects the earliest time - // when this exit was added to the state. - .push(" ON CONFLICT (address) DO UPDATE SET epoch = min(epoch, excluded.epoch)") - .build() - .execute(tx.as_mut()) - .await - .context(format!("inserting new validator exits for epoch {epoch}"))?; - } - // Delete the second oldest stake table if necessary to ensure the number of stake tables - // stored does not exceed `num_stake_tables`. - let (num_stake_tables,): (u32,) = query_as("SELECT count(*) FROM stake_table_epoch") - .fetch_one(tx.as_mut()) - .await - .context("counting stake tables")?; - if num_stake_tables > self.num_stake_tables { - // We always delete the _second oldest_ stake table. We want to keep the oldest around - // because it is the hardest to catch up for if we need it again (we would have to go - // all the way back to genesis). The second oldest is the least likely to be used again - // after the oldest, while still being easy to replay if we do need it (because we can - // just replay from the cached oldest). - let (epoch_to_delete,): (i64,) = - query_as("SELECT epoch FROM stake_table_epoch ORDER BY epoch LIMIT 1 OFFSET 1") - .fetch_one(tx.as_mut()) + WRITE_BACKOFF + .retry_if( + WRITE_RETRY_MAX, + |_| true, + || async { + let mut tx = self.pool.begin().await?; + + // Record that the stake table for this epoch is available, along with the + // versions of the epoch root headers in epochs `e-2` and `e-1` (snapshot points + // for `e` and `e+1`). + query( + "INSERT INTO stake_table_epoch (epoch, epoch_root_protocol_version, \ + next_epoch_root_protocol_version) VALUES ($1, $2, $3)", + ) + .bind(epoch) + .bind(&epoch_root_protocol_version_str) + .bind(&next_epoch_root_protocol_version_str) + .execute(tx.as_mut()) .await - .context("find second oldest epoch")?; - tracing::info!(epoch_to_delete, "garbage collecting stake table"); - - // Delete from the main epoch table. The corresponding rows from `stake_table_validator` - // will be deleted automatically by cascading. The corresponding rows in the BLS keys, - // Schnorr keys, and validator exits tables cannot be deleted, because those tables are - // cumulative over later epochs. - query("DELETE FROM stake_table_epoch WHERE epoch = $1") - .bind(epoch_to_delete) - .execute(tx.as_mut()) - .await - .context("garbage collecting stake table")?; - } - - tx.commit().await?; - Ok(()) + .context(format!( + "recording stake table availability for epoch {epoch}" + ))?; + + QueryBuilder::new("INSERT INTO stake_table_validator (epoch, idx, data) ") + .push_values(validators.iter().enumerate(), |mut q, (i, data)| { + q.push_bind(epoch).push_bind(i as i64).push_bind(data); + }) + .build() + .execute(tx.as_mut()) + .await + .context(format!("inserting validators for epoch {epoch}"))?; + + // Insert only newly used BLS keys. + QueryBuilder::new("INSERT INTO stake_table_bls_key (epoch, key) ") + .push_values(stake_table.used_bls_keys(), |mut q, key| { + q.push_bind(epoch).push_bind(key.to_string()); + }) + // If we insert keys out of order, make sure `epoch` reflects the earliest time + // when this key was added to the state. + .push(" ON CONFLICT (key) DO UPDATE SET epoch = min(epoch, excluded.epoch)") + .build() + .execute(tx.as_mut()) + .await + .context(format!("inserting newly used BLS keys for epoch {epoch}"))?; + + // Insert only newly used Schnorr keys. + QueryBuilder::new("INSERT INTO stake_table_schnorr_key (epoch, key) ") + .push_values(stake_table.used_schnorr_keys(), |mut q, key| { + q.push_bind(epoch).push_bind(key.to_string()); + }) + // If we insert keys out of order, make sure `epoch` reflects the earliest time + // when this key was added to the state. + .push(" ON CONFLICT (key) DO UPDATE SET epoch = min(epoch, excluded.epoch)") + .build() + .execute(tx.as_mut()) + .await + .context(format!( + "inserting newly used Schnorr keys for epoch {epoch}" + ))?; + + // Insert only newly used x25519 keys. + if !stake_table.used_x25519_keys().is_empty() { + QueryBuilder::new("INSERT INTO stake_table_x25519_key (epoch, key) ") + .push_values(stake_table.used_x25519_keys(), |mut q, key| { + q.push_bind(epoch).push_bind(key.to_string()); + }) + // If we insert keys out of order, make sure `epoch` reflects the earliest + // time when this key was added to the state. + .push(" ON CONFLICT (key) DO UPDATE SET epoch = min(epoch, excluded.epoch)") + .build() + .execute(tx.as_mut()) + .await + .context(format!( + "inserting newly used x25519 keys for epoch {epoch}" + ))?; + } + + // Insert only the new validator exits. + if !stake_table.validator_exits().is_empty() { + QueryBuilder::new("INSERT INTO stake_table_exit (epoch, address) ") + .push_values(stake_table.validator_exits(), |mut q, address| { + q.push_bind(epoch).push_bind(address.to_string()); + }) + // If we insert exits out of order, make sure `epoch` reflects the earliest + // time when this exit was added to the state. + .push( + " ON CONFLICT (address) DO UPDATE SET epoch = min(epoch, \ + excluded.epoch)", + ) + .build() + .execute(tx.as_mut()) + .await + .context(format!("inserting new validator exits for epoch {epoch}"))?; + } + + // Delete the second oldest stake table if necessary to ensure the number of stake + // tables stored does not exceed `num_stake_tables`. + let (num_stake_tables,): (u32,) = + query_as("SELECT count(*) FROM stake_table_epoch") + .fetch_one(tx.as_mut()) + .await + .context("counting stake tables")?; + if num_stake_tables > self.num_stake_tables { + // We always delete the _second oldest_ stake table. We want to keep the oldest + // around because it is the hardest to catch up for if we need it again (we + // would have to go all the way back to genesis). The second oldest is the + // least likely to be used again after the oldest, while still being easy to + // replay if we do need it (because we can just replay from the cached oldest). + let (epoch_to_delete,): (i64,) = query_as( + "SELECT epoch FROM stake_table_epoch ORDER BY epoch LIMIT 1 OFFSET 1", + ) + .fetch_one(tx.as_mut()) + .await + .context("find second oldest epoch")?; + tracing::info!(epoch_to_delete, "garbage collecting stake table"); + + // Delete from the main epoch table. The corresponding rows from + // `stake_table_validator` will be deleted automatically by cascading. The + // corresponding rows in the BLS keys, Schnorr keys, and validator exits tables + // cannot be deleted, because those tables are cumulative over later epochs. + query("DELETE FROM stake_table_epoch WHERE epoch = $1") + .bind(epoch_to_delete) + .execute(tx.as_mut()) + .await + .context("garbage collecting stake table")?; + } + + tx.commit().await?; + Ok(()) + }, + ) + .await } } @@ -599,6 +632,7 @@ mod test { use std::os::unix::fs::PermissionsExt; use pretty_assertions::assert_eq; + use sqlx::sqlite::SqliteConnectOptions; use tempfile::tempdir; use versions::{CLIQUENET_VERSION, EPOCH_VERSION}; diff --git a/slow-tests/tests/state.rs b/slow-tests/tests/state.rs index 5eb1c7a9b0a..d32fb89f69a 100644 --- a/slow-tests/tests/state.rs +++ b/slow-tests/tests/state.rs @@ -1,6 +1,6 @@ -use std::time::{Duration, Instant}; +use std::time::Duration; -use alloy::primitives::{Address, U256}; +use alloy::primitives::U256; use committable::Commitment; use espresso_node::{ SequencerApiVersion, @@ -12,28 +12,10 @@ use espresso_node::{ }, testing::{TestConfig, TestConfigBuilder}, }; -use espresso_types::{ - FeeAccount, FeeAmount, Header, SeqTypes, - v0_3::RewardAmount, - v0_4::{REWARD_MERKLE_TREE_V2_HEIGHT, RewardAccountV2, RewardMerkleTreeV2}, -}; +use espresso_types::{FeeAccount, FeeAmount, Header, SeqTypes}; use futures::{StreamExt, TryStreamExt}; -use hotshot_query_service::{ - availability::BlockQueryData, - data_source::{ - Transaction, VersionedDataSource, - sql::Config, - storage::sql::{ - SqlStorage, StorageConnectionType, Transaction as SqlTransaction, Write, testing::TmpDb, - }, - }, - merklized_state::{MerklizedState, UpdateStateData}, - types::HeightIndexed, -}; -use jf_merkle_tree_compat::{ - LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme, - prelude::{MerkleProof, Sha3Node}, -}; +use hotshot_query_service::{availability::BlockQueryData, types::HeightIndexed}; +use jf_merkle_tree_compat::prelude::{MerkleProof, Sha3Node}; use surf_disco::Client; use test_utils::reserve_tcp_port; use tide_disco::error::ServerError; @@ -115,123 +97,3 @@ async fn slow_test_merklized_state_api() { let expected = U256::MAX; assert_eq!(expected, amount.0); } - -fn make_reward_account(i: usize) -> RewardAccountV2 { - let mut addr_bytes = [0u8; 20]; - addr_bytes[16..20].copy_from_slice(&(i as u32).to_be_bytes()); - RewardAccountV2(Address::from(addr_bytes)) -} - -async fn insert_test_header( - tx: &mut SqlTransaction, - block_height: u64, - reward_tree: &RewardMerkleTreeV2, -) { - let reward_commitment = serde_json::to_value(reward_tree.commitment()).unwrap(); - let test_data = serde_json::json!({ - "block_merkle_tree_root": format!("block_root_{}", block_height), - "fee_merkle_tree_root": format!("fee_root_{}", block_height), - "fields": { - RewardMerkleTreeV2::header_state_commitment_field(): reward_commitment - } - }); - tx.upsert( - "header", - [ - "height", - "hash", - "payload_hash", - "timestamp", - "data", - "ns_table", - ], - ["height"], - [( - block_height as i64, - format!("hash_{}", block_height), - format!("payload_{}", block_height), - block_height as i64, - test_data, - "ns_table".to_string(), - )], - ) - .await - .unwrap(); -} - -async fn batch_insert_proofs( - tx: &mut SqlTransaction, - reward_tree: &RewardMerkleTreeV2, - accounts: &[RewardAccountV2], - block_height: u64, -) { - let proofs_and_paths: Vec<_> = accounts - .iter() - .map(|account| { - let proof = match reward_tree.universal_lookup(*account) { - LookupResult::Ok(_, proof) => proof, - LookupResult::NotInMemory => panic!("account not in memory"), - LookupResult::NotFound(proof) => proof, - }; - let traversal_path = >::to_traversal_path(account, reward_tree.height()); - (proof, traversal_path) - }) - .collect(); - - UpdateStateData::::insert_merkle_nodes_batch( - tx, - proofs_and_paths, - block_height, - ) - .await - .expect("failed to batch insert proofs"); -} - -#[test_log::test(tokio::test(flavor = "multi_thread"))] -async fn slow_test_batch_insertion_20k_accounts() { - let db = TmpDb::init().await; - let opt = SqlDataSource::persistence_options(&db); - let cfg = Config::try_from(&opt).expect("failed to create config from options"); - let storage = SqlStorage::connect(cfg, StorageConnectionType::Query) - .await - .expect("failed to connect to storage"); - - let num_accounts = 20_000usize; - - let accounts: Vec = (0..num_accounts).map(make_reward_account).collect(); - - tracing::info!("Starting tree update for {} accounts", num_accounts); - let tree_update_start = Instant::now(); - let mut reward_tree = RewardMerkleTreeV2::new(REWARD_MERKLE_TREE_V2_HEIGHT); - for (i, account) in accounts.iter().enumerate() { - let reward_amount = RewardAmount::from(((i + 1) * 100) as u64); - reward_tree.update(*account, reward_amount).unwrap(); - if (i + 1) % 5_000 == 0 { - tracing::info!("tree_update: {}/{}", i + 1, num_accounts); - } - } - let tree_update_duration = tree_update_start.elapsed(); - tracing::info!("tree_update complete: {:?}", tree_update_duration); - - let mut tx = storage.write().await.unwrap(); - insert_test_header(&mut tx, 1, &reward_tree).await; - - tracing::info!("Starting batch insert for {} accounts", num_accounts); - let batch_insert_start = Instant::now(); - batch_insert_proofs(&mut tx, &reward_tree, &accounts, 1).await; - let batch_insert_duration = batch_insert_start.elapsed(); - tracing::info!("batch_insert complete: {:?}", batch_insert_duration); - - UpdateStateData::::set_last_state_height(&mut tx, 1) - .await - .unwrap(); - tx.commit().await.unwrap(); - - tracing::info!( - "20k accounts: tree_update={:?}, batch_insert={:?}", - tree_update_duration, - batch_insert_duration - ); -}