Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,16 @@ pub async fn init_l1(
let store = match init_store(&datadir, genesis).await {
Ok(store) => store,
Err(err @ StoreError::IncompatibleDBVersion { .. })
| Err(err @ StoreError::NotFoundDBVersion { .. }) => {
| Err(err @ StoreError::NotFoundDBVersion) => {
return Err(eyre::eyre!(
"{err}. Please erase your DB by running `ethrex removedb` and restart node to resync. Note that this will take a while."
));
}
Err(err @ StoreError::MigrationFailed { .. }) => {
return Err(eyre::eyre!(
"{err}. The database may be in an inconsistent state. Please erase your DB by running `ethrex removedb` and restart node to resync."
));
}
Err(error) => return Err(eyre::eyre!("Failed to create Store: {error}")),
};

Expand Down
1 change: 1 addition & 0 deletions crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ default = []
rocksdb = ["dep:rocksdb"]

[dev-dependencies]
tempfile.workspace = true
tokio = { workspace = true, features = ["full"] }

[lib]
Expand Down
8 changes: 6 additions & 2 deletions crates/storage/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@ pub enum StoreError {
IoError(#[from] std::io::Error),
#[error("Error serializing metadata: {0}")]
DbMetadataError(#[from] serde_json::Error),
#[error("Incompatible DB Version: not found, expected v{expected}")]
NotFoundDBVersion { expected: u64 },
#[error(
"Cannot migrate the database: its version is unavailable, which means it predates versioning and migrations. A full resync (removedb) is required."
)]
NotFoundDBVersion,
#[error("Incompatible DB Version: found v{found}, expected v{expected}")]
IncompatibleDBVersion { found: u64, expected: u64 },
#[error("Migration from v{from} to v{to} failed: {reason}")]
MigrationFailed { from: u64, to: u64, reason: String },
}
6 changes: 4 additions & 2 deletions crates/storage/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub mod api;
pub mod backend;
pub mod error;
mod layering;
pub mod migrations;
pub mod rlp;
pub mod store;
pub mod trie;
Expand All @@ -81,8 +82,9 @@ pub use store::{

/// Store Schema Version, must be updated on any breaking change.
///
/// An upgrade to a newer schema version invalidates currently stored data,
/// requiring a re-sync from genesis or a snapshot.
/// When bumping this version, add a corresponding migration function to
/// `migrations::MIGRATIONS`. The migration framework will automatically
/// upgrade existing databases instead of requiring a full resync.
pub const STORE_SCHEMA_VERSION: u64 = 1;

/// Name of the file storing the metadata about the database.
Expand Down
134 changes: 134 additions & 0 deletions crates/storage/migrations.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::io::Write;
use std::path::Path;

use crate::api::StorageBackend;
use crate::error::StoreError;
use crate::{STORE_METADATA_FILENAME, STORE_SCHEMA_VERSION};

use super::store::StoreMetadata;

/// A migration function that upgrades the database schema by one version.
///
/// Receives a reference to the storage backend so it can read/write data
/// as needed for the migration.
pub type MigrationFn = fn(backend: &dyn StorageBackend) -> Result<(), StoreError>;

/// Migration functions indexed by source version.
///
/// `MIGRATIONS[i]` upgrades the schema from version `(i + 1)` to `(i + 2)`.
/// For example:
/// - `MIGRATIONS[0]` upgrades v1 → v2
/// - `MIGRATIONS[1]` upgrades v2 → v3
///
/// **Invariant**: `MIGRATIONS.len() == (STORE_SCHEMA_VERSION - 1) as usize`
/// (empty when `STORE_SCHEMA_VERSION == 1`, one entry when it's 2, etc.)
pub const MIGRATIONS: &[MigrationFn] = &[
// Currently empty — no migrations exist yet.
// When STORE_SCHEMA_VERSION is bumped to 2, add migrate_1_to_2 here.
];

// Compile-time check: the number of migration functions must match the number
// of version gaps (i.e. STORE_SCHEMA_VERSION - 1).
const _: () = assert!(
MIGRATIONS.len() == (STORE_SCHEMA_VERSION - 1) as usize,
"MIGRATIONS length must equal STORE_SCHEMA_VERSION - 1"
);

/// Returns the migration function that upgrades from `version` to `version + 1`.
fn migration_for_version(version: u64) -> MigrationFn {
MIGRATIONS[(version - 1) as usize]
}

/// Runs all pending migrations from `current_version` up to `STORE_SCHEMA_VERSION`.
///
/// Each migration is applied one version at a time, and the metadata file is
/// updated (with fsync) after each successful step for crash safety.
///
/// Returns `Ok(())` if `current_version == STORE_SCHEMA_VERSION` (no-op).
pub fn run_pending_migrations(
backend: &dyn StorageBackend,
db_path: &Path,
current_version: u64,
) -> Result<(), StoreError> {
for version in current_version..STORE_SCHEMA_VERSION {
let target = version + 1;

tracing::info!("Running migration v{version} → v{target}");

migration_for_version(version)(backend).map_err(|e| StoreError::MigrationFailed {
from: version,
to: target,
reason: e.to_string(),
})?;

// Persist the new version to metadata.json after each migration step
write_metadata_version(db_path, target).map_err(|e| StoreError::MigrationFailed {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be part of a storage abstraction instead? Having to pass the path_db around is a bit weird.

from: version,
to: target,
reason: format!("failed to write metadata: {e}"),
})?;

tracing::info!("Migration v{version} → v{target} completed");
}

Ok(())
}

/// Writes the schema version to metadata.json using write-to-temp-then-rename
/// for crash safety. On POSIX filesystems `rename` is atomic, so the metadata
/// file is never left in a partial/truncated state.
// TODO: move metadata persistence into the StorageBackend abstraction so we
// don't need to pass `db_path` around.
fn write_metadata_version(db_path: &Path, version: u64) -> Result<(), StoreError> {
let metadata_path = db_path.join(STORE_METADATA_FILENAME);
let tmp_path = db_path.join(format!("{}.tmp", STORE_METADATA_FILENAME));
let metadata = StoreMetadata::new(version);
let serialized = serde_json::to_string_pretty(&metadata)?;

let mut file = std::fs::File::create(&tmp_path)?;
file.write_all(serialized.as_bytes())?;
file.sync_all()?;
std::fs::rename(&tmp_path, &metadata_path)?;

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn migrations_length_matches_schema_version() {
assert_eq!(
MIGRATIONS.len(),
(STORE_SCHEMA_VERSION - 1) as usize,
"MIGRATIONS array length must be STORE_SCHEMA_VERSION - 1"
);
}

#[test]
fn run_pending_migrations_noop_when_current() {
// When current_version == STORE_SCHEMA_VERSION, nothing should happen.
// We use a dummy in-memory backend since no migrations will actually run.
let backend = crate::backend::in_memory::InMemoryBackend::open().unwrap();
let temp_dir = tempfile::tempdir().unwrap();

// Write initial metadata
write_metadata_version(temp_dir.path(), STORE_SCHEMA_VERSION).unwrap();

let result = run_pending_migrations(&backend, temp_dir.path(), STORE_SCHEMA_VERSION);
assert!(result.is_ok());
}

#[test]
fn fresh_store_creates_correct_metadata() {
let temp_dir = tempfile::tempdir().unwrap();

write_metadata_version(temp_dir.path(), STORE_SCHEMA_VERSION).unwrap();

let metadata_path = temp_dir.path().join(STORE_METADATA_FILENAME);
let contents = std::fs::read_to_string(&metadata_path).unwrap();
let metadata: StoreMetadata = serde_json::from_str(&contents).unwrap();
assert_eq!(metadata.schema_version, STORE_SCHEMA_VERSION);
}
}
85 changes: 56 additions & 29 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1453,12 +1453,51 @@ impl Store {
}

pub fn new(path: impl AsRef<Path>, engine_type: EngineType) -> Result<Self, StoreError> {
// Ignore unused variable warning when compiling without DB features
let db_path = path.as_ref().to_path_buf();

if engine_type != EngineType::InMemory {
// Check that the last used DB version matches the current version
validate_store_schema_version(&db_path)?;
let version = read_store_schema_version(&db_path)?;

match version {
None if db_path.exists() && !dir_is_empty(&db_path)? => {
// Pre-metadata DB — cannot migrate safely
return Err(StoreError::NotFoundDBVersion);
}
None => {
// Fresh / empty directory — write initial metadata
init_metadata_file(&db_path)?;
}
Some(v) if v < 1 => {
return Err(StoreError::MigrationFailed {
from: v,
to: STORE_SCHEMA_VERSION,
reason: format!("DB version v{v} is invalid (predates migrations)"),
});
}
Some(v) if v > STORE_SCHEMA_VERSION => {
return Err(StoreError::MigrationFailed {
from: v,
to: STORE_SCHEMA_VERSION,
reason: format!(
"DB version v{v} is more recent than the client expects (v{STORE_SCHEMA_VERSION}). Rolling back is not supported"
),
});
}
Comment on lines +1477 to +1485
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar, the message could be something like

MigrationFailed { from: v, to: STORE_SCHEMA_VERSION, reason: "DB version is more recent than the one the client expects. Migrating would involve rolling back, which is not supported." }

#[cfg(feature = "rocksdb")]
Some(v) if v < STORE_SCHEMA_VERSION => {
// Open backend, run migrations, then proceed with the same Arc
let backend: Arc<dyn crate::api::StorageBackend> =
Arc::new(RocksDBBackend::open(&path)?);
crate::migrations::run_pending_migrations(backend.as_ref(), &db_path, v)?;
return Self::from_backend(backend, db_path, DB_COMMIT_THRESHOLD);
}
Some(_) => {
// version == STORE_SCHEMA_VERSION, proceed normally.
// Without the `rocksdb` feature this also covers v < target,
// but that path is unreachable since InMemory is the only
// engine type and the outer guard excludes it.
}
}
}

match engine_type {
Expand Down Expand Up @@ -3188,29 +3227,24 @@ impl LatestBlockHeaderCache {
}

#[derive(Debug, Serialize, Deserialize)]
struct StoreMetadata {
schema_version: u64,
pub struct StoreMetadata {
pub schema_version: u64,
}

impl StoreMetadata {
fn new(schema_version: u64) -> Self {
pub fn new(schema_version: u64) -> Self {
Self { schema_version }
}
}

fn validate_store_schema_version(path: &Path) -> Result<(), StoreError> {
/// Reads the schema version from the metadata file, if it exists.
///
/// Returns `Some(version)` when metadata.json is present and valid,
/// or `None` when the file does not exist.
fn read_store_schema_version(path: &Path) -> Result<Option<u64>, StoreError> {
let metadata_path = path.join(STORE_METADATA_FILENAME);
// If metadata file does not exist, try to create it
if !metadata_path.exists() {
// If datadir exists but is not empty, this is probably a DB for an
// old ethrex version and we should return an error
if path.exists() && !dir_is_empty(path)? {
return Err(StoreError::NotFoundDBVersion {
expected: STORE_SCHEMA_VERSION,
});
}
init_metadata_file(path)?;
return Ok(());
return Ok(None);
}
if !metadata_path.is_file() {
return Err(StoreError::Custom(
Expand All @@ -3219,15 +3253,7 @@ fn validate_store_schema_version(path: &Path) -> Result<(), StoreError> {
}
let file_contents = std::fs::read_to_string(metadata_path)?;
let metadata: StoreMetadata = serde_json::from_str(&file_contents)?;

// Check schema version matches the expected one
if metadata.schema_version != STORE_SCHEMA_VERSION {
return Err(StoreError::IncompatibleDBVersion {
found: metadata.schema_version,
expected: STORE_SCHEMA_VERSION,
});
}
Ok(())
Ok(Some(metadata.schema_version))
}

fn init_metadata_file(parent_path: &Path) -> Result<(), StoreError> {
Expand All @@ -3246,8 +3272,9 @@ fn dir_is_empty(path: &Path) -> Result<bool, StoreError> {
Ok(is_empty)
}

/// Checks whether a valid database exists at the given path by looking for
/// a metadata.json file with a matching schema version.
/// Checks whether a valid (or migratable) database exists at the given path
/// by looking for a metadata.json file with a schema version between 1 and
/// `STORE_SCHEMA_VERSION` (inclusive).
pub fn has_valid_db(path: &Path) -> bool {
let metadata_path = path.join(STORE_METADATA_FILENAME);
if !metadata_path.is_file() {
Expand All @@ -3259,7 +3286,7 @@ pub fn has_valid_db(path: &Path) -> bool {
let Ok(metadata) = serde_json::from_str::<StoreMetadata>(&contents) else {
return false;
};
metadata.schema_version == STORE_SCHEMA_VERSION
metadata.schema_version >= 1 && metadata.schema_version <= STORE_SCHEMA_VERSION
}

/// Reads the chain ID from an existing database without performing a full
Expand Down
Loading