-
Notifications
You must be signed in to change notification settings - Fork 201
feat(l1): add automatic DB migration framework #6519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
6a4334a
feat(storage): add automatic DB migration framework
azteca1998 018163d
style(l1): fix formatting in store.rs
azteca1998 26a2877
fix(l1): address review feedback on migration framework
azteca1998 0507639
fix(l1): wrap metadata write error in MigrationFailed
azteca1998 f05b00c
Merge remote-tracking branch 'origin/main' into feat/automatic-db-mig…
iovoid 2f9bd1a
fix(l1): address second round of review feedback on migrations
azteca1998 d68664f
Merge branch 'main' into feat/automatic-db-migrations
azteca1998 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| use std::io::Write; | ||
| use std::path::Path; | ||
|
|
||
| use crate::api::StorageBackend; | ||
| use crate::error::StoreError; | ||
| use crate::{STORE_METADATA_FILENAME, STORE_SCHEMA_VERSION}; | ||
|
|
||
| use super::store::StoreMetadata; | ||
|
|
||
| /// A migration function that upgrades the database schema by one version. | ||
| /// | ||
| /// Receives a reference to the storage backend so it can read/write data | ||
| /// as needed for the migration. | ||
| pub type MigrationFn = fn(backend: &dyn StorageBackend) -> Result<(), StoreError>; | ||
|
|
||
| /// Migration functions indexed by source version. | ||
| /// | ||
| /// `MIGRATIONS[i]` upgrades the schema from version `(i + 1)` to `(i + 2)`. | ||
| /// For example: | ||
| /// - `MIGRATIONS[0]` upgrades v1 → v2 | ||
| /// - `MIGRATIONS[1]` upgrades v2 → v3 | ||
| /// | ||
| /// **Invariant**: `MIGRATIONS.len() == (STORE_SCHEMA_VERSION - 1) as usize` | ||
| /// (empty when `STORE_SCHEMA_VERSION == 1`, one entry when it's 2, etc.) | ||
| pub const MIGRATIONS: &[MigrationFn] = &[ | ||
| // Currently empty — no migrations exist yet. | ||
| // When STORE_SCHEMA_VERSION is bumped to 2, add migrate_1_to_2 here. | ||
| ]; | ||
|
|
||
| // Compile-time check: the number of migration functions must match the number | ||
| // of version gaps (i.e. STORE_SCHEMA_VERSION - 1). | ||
| const _: () = assert!( | ||
| MIGRATIONS.len() == (STORE_SCHEMA_VERSION - 1) as usize, | ||
| "MIGRATIONS length must equal STORE_SCHEMA_VERSION - 1" | ||
| ); | ||
|
|
||
| /// Returns the migration function that upgrades from `version` to `version + 1`. | ||
| fn migration_for_version(version: u64) -> MigrationFn { | ||
| MIGRATIONS[(version - 1) as usize] | ||
| } | ||
|
|
||
| /// Runs all pending migrations from `current_version` up to `STORE_SCHEMA_VERSION`. | ||
| /// | ||
| /// Each migration is applied one version at a time, and the metadata file is | ||
| /// updated (with fsync) after each successful step for crash safety. | ||
| /// | ||
| /// Returns `Ok(())` if `current_version == STORE_SCHEMA_VERSION` (no-op). | ||
| pub fn run_pending_migrations( | ||
| backend: &dyn StorageBackend, | ||
| db_path: &Path, | ||
| current_version: u64, | ||
| ) -> Result<(), StoreError> { | ||
| for version in current_version..STORE_SCHEMA_VERSION { | ||
| let target = version + 1; | ||
|
|
||
| tracing::info!("Running migration v{version} → v{target}"); | ||
|
|
||
| migration_for_version(version)(backend).map_err(|e| StoreError::MigrationFailed { | ||
| from: version, | ||
| to: target, | ||
| reason: e.to_string(), | ||
| })?; | ||
|
|
||
| // Persist the new version to metadata.json after each migration step | ||
| write_metadata_version(db_path, target).map_err(|e| StoreError::MigrationFailed { | ||
| from: version, | ||
| to: target, | ||
| reason: format!("failed to write metadata: {e}"), | ||
| })?; | ||
|
|
||
| tracing::info!("Migration v{version} → v{target} completed"); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Writes the schema version to metadata.json using write-to-temp-then-rename | ||
| /// for crash safety. On POSIX filesystems `rename` is atomic, so the metadata | ||
| /// file is never left in a partial/truncated state. | ||
| // TODO: move metadata persistence into the StorageBackend abstraction so we | ||
| // don't need to pass `db_path` around. | ||
| fn write_metadata_version(db_path: &Path, version: u64) -> Result<(), StoreError> { | ||
| let metadata_path = db_path.join(STORE_METADATA_FILENAME); | ||
| let tmp_path = db_path.join(format!("{}.tmp", STORE_METADATA_FILENAME)); | ||
| let metadata = StoreMetadata::new(version); | ||
| let serialized = serde_json::to_string_pretty(&metadata)?; | ||
|
|
||
| let mut file = std::fs::File::create(&tmp_path)?; | ||
| file.write_all(serialized.as_bytes())?; | ||
| file.sync_all()?; | ||
| std::fs::rename(&tmp_path, &metadata_path)?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn migrations_length_matches_schema_version() { | ||
| assert_eq!( | ||
| MIGRATIONS.len(), | ||
| (STORE_SCHEMA_VERSION - 1) as usize, | ||
| "MIGRATIONS array length must be STORE_SCHEMA_VERSION - 1" | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn run_pending_migrations_noop_when_current() { | ||
| // When current_version == STORE_SCHEMA_VERSION, nothing should happen. | ||
| // We use a dummy in-memory backend since no migrations will actually run. | ||
| let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap(); | ||
| let temp_dir = tempfile::tempdir().unwrap(); | ||
|
|
||
| // Write initial metadata | ||
| write_metadata_version(temp_dir.path(), STORE_SCHEMA_VERSION).unwrap(); | ||
|
|
||
| let result = run_pending_migrations(&backend, temp_dir.path(), STORE_SCHEMA_VERSION); | ||
| assert!(result.is_ok()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn fresh_store_creates_correct_metadata() { | ||
| let temp_dir = tempfile::tempdir().unwrap(); | ||
|
|
||
| write_metadata_version(temp_dir.path(), STORE_SCHEMA_VERSION).unwrap(); | ||
|
|
||
| let metadata_path = temp_dir.path().join(STORE_METADATA_FILENAME); | ||
| let contents = std::fs::read_to_string(&metadata_path).unwrap(); | ||
| let metadata: StoreMetadata = serde_json::from_str(&contents).unwrap(); | ||
| assert_eq!(metadata.schema_version, STORE_SCHEMA_VERSION); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1453,12 +1453,51 @@ impl Store { | |
| } | ||
|
|
||
| pub fn new(path: impl AsRef<Path>, engine_type: EngineType) -> Result<Self, StoreError> { | ||
| // Ignore unused variable warning when compiling without DB features | ||
| let db_path = path.as_ref().to_path_buf(); | ||
|
|
||
| if engine_type != EngineType::InMemory { | ||
| // Check that the last used DB version matches the current version | ||
| validate_store_schema_version(&db_path)?; | ||
| let version = read_store_schema_version(&db_path)?; | ||
|
|
||
| match version { | ||
| None if db_path.exists() && !dir_is_empty(&db_path)? => { | ||
| // Pre-metadata DB — cannot migrate safely | ||
| return Err(StoreError::NotFoundDBVersion); | ||
| } | ||
| None => { | ||
| // Fresh / empty directory — write initial metadata | ||
| init_metadata_file(&db_path)?; | ||
| } | ||
| Some(v) if v < 1 => { | ||
| return Err(StoreError::MigrationFailed { | ||
| from: v, | ||
| to: STORE_SCHEMA_VERSION, | ||
| reason: format!("DB version v{v} is invalid (predates migrations)"), | ||
| }); | ||
| } | ||
| Some(v) if v > STORE_SCHEMA_VERSION => { | ||
| return Err(StoreError::MigrationFailed { | ||
| from: v, | ||
| to: STORE_SCHEMA_VERSION, | ||
| reason: format!( | ||
| "DB version v{v} is more recent than the client expects (v{STORE_SCHEMA_VERSION}). Rolling back is not supported" | ||
| ), | ||
| }); | ||
| } | ||
|
Comment on lines
+1477
to
+1485
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar, the message could be something like MigrationFailed { from: v, to: STORE_SCHEMA_VERSION, reason: "DB version is more recent than the one the client expects. Migrating would involve rolling back, which is not supported." } |
||
| #[cfg(feature = "rocksdb")] | ||
| Some(v) if v < STORE_SCHEMA_VERSION => { | ||
| // Open backend, run migrations, then proceed with the same Arc | ||
| let backend: Arc<dyn crate::api::StorageBackend> = | ||
| Arc::new(RocksDBBackend::open(&path)?); | ||
| crate::migrations::run_pending_migrations(backend.as_ref(), &db_path, v)?; | ||
| return Self::from_backend(backend, db_path, DB_COMMIT_THRESHOLD); | ||
| } | ||
| Some(_) => { | ||
| // version == STORE_SCHEMA_VERSION, proceed normally. | ||
| // Without the `rocksdb` feature this also covers v < target, | ||
| // but that path is unreachable since InMemory is the only | ||
| // engine type and the outer guard excludes it. | ||
| } | ||
| } | ||
| } | ||
|
|
||
| match engine_type { | ||
|
|
@@ -3188,29 +3227,24 @@ impl LatestBlockHeaderCache { | |
| } | ||
|
|
||
| #[derive(Debug, Serialize, Deserialize)] | ||
| struct StoreMetadata { | ||
| schema_version: u64, | ||
| pub struct StoreMetadata { | ||
| pub schema_version: u64, | ||
| } | ||
|
|
||
| impl StoreMetadata { | ||
| fn new(schema_version: u64) -> Self { | ||
| pub fn new(schema_version: u64) -> Self { | ||
| Self { schema_version } | ||
| } | ||
| } | ||
|
|
||
| fn validate_store_schema_version(path: &Path) -> Result<(), StoreError> { | ||
| /// Reads the schema version from the metadata file, if it exists. | ||
| /// | ||
| /// Returns `Some(version)` when metadata.json is present and valid, | ||
| /// or `None` when the file does not exist. | ||
| fn read_store_schema_version(path: &Path) -> Result<Option<u64>, StoreError> { | ||
| let metadata_path = path.join(STORE_METADATA_FILENAME); | ||
| // If metadata file does not exist, try to create it | ||
| if !metadata_path.exists() { | ||
| // If datadir exists but is not empty, this is probably a DB for an | ||
| // old ethrex version and we should return an error | ||
| if path.exists() && !dir_is_empty(path)? { | ||
| return Err(StoreError::NotFoundDBVersion { | ||
| expected: STORE_SCHEMA_VERSION, | ||
| }); | ||
| } | ||
| init_metadata_file(path)?; | ||
| return Ok(()); | ||
| return Ok(None); | ||
| } | ||
| if !metadata_path.is_file() { | ||
| return Err(StoreError::Custom( | ||
|
|
@@ -3219,15 +3253,7 @@ fn validate_store_schema_version(path: &Path) -> Result<(), StoreError> { | |
| } | ||
| let file_contents = std::fs::read_to_string(metadata_path)?; | ||
| let metadata: StoreMetadata = serde_json::from_str(&file_contents)?; | ||
|
|
||
| // Check schema version matches the expected one | ||
| if metadata.schema_version != STORE_SCHEMA_VERSION { | ||
| return Err(StoreError::IncompatibleDBVersion { | ||
| found: metadata.schema_version, | ||
| expected: STORE_SCHEMA_VERSION, | ||
| }); | ||
| } | ||
| Ok(()) | ||
| Ok(Some(metadata.schema_version)) | ||
| } | ||
|
|
||
| fn init_metadata_file(parent_path: &Path) -> Result<(), StoreError> { | ||
|
|
@@ -3246,8 +3272,9 @@ fn dir_is_empty(path: &Path) -> Result<bool, StoreError> { | |
| Ok(is_empty) | ||
| } | ||
|
|
||
| /// Checks whether a valid database exists at the given path by looking for | ||
| /// a metadata.json file with a matching schema version. | ||
| /// Checks whether a valid (or migratable) database exists at the given path | ||
| /// by looking for a metadata.json file with a schema version between 1 and | ||
| /// `STORE_SCHEMA_VERSION` (inclusive). | ||
| pub fn has_valid_db(path: &Path) -> bool { | ||
| let metadata_path = path.join(STORE_METADATA_FILENAME); | ||
| if !metadata_path.is_file() { | ||
|
|
@@ -3259,7 +3286,7 @@ pub fn has_valid_db(path: &Path) -> bool { | |
| let Ok(metadata) = serde_json::from_str::<StoreMetadata>(&contents) else { | ||
| return false; | ||
| }; | ||
| metadata.schema_version == STORE_SCHEMA_VERSION | ||
| metadata.schema_version >= 1 && metadata.schema_version <= STORE_SCHEMA_VERSION | ||
| } | ||
|
|
||
| /// Reads the chain ID from an existing database without performing a full | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be part of a storage abstraction instead? Having to pass the path_db around is a bit weird.