diff --git a/sable_history/src/server/mod.rs b/sable_history/src/server/mod.rs index 0e039a8..a00f51f 100644 --- a/sable_history/src/server/mod.rs +++ b/sable_history/src/server/mod.rs @@ -1,7 +1,7 @@ use std::convert::Infallible; use std::sync::Arc; -use anyhow::Context; +use anyhow::{Context, Result}; use diesel::migration::MigrationSource; use diesel::prelude::*; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; @@ -18,6 +18,9 @@ use sable_server::ServerType; mod sync; mod update_handler; +/// Advisory lock key for serializing database migrations across concurrent processes. +const MIGRATION_LOCK_KEY: i64 = 0x5361626c48697374; // value is "SablHist" + pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); #[derive(Debug, Clone, Deserialize)] @@ -59,28 +62,43 @@ impl ServerType for HistoryServer { // run_pending_migrations only support sync connections let mut conn = AsyncConnectionWrapper::::establish(&database) .context("Couldn't connect to database")?; - tracing::info!("Running database migrations"); - tracing::trace!( - "Required migrations: {}", - MIGRATIONS - .migrations() - .map_err(|e| anyhow::anyhow!("Couldn't get migrations: {e}"))? - .iter() - .map(diesel::migration::Migration::::name) - .join(", ") - ); - let migrations = conn - .run_pending_migrations(MIGRATIONS) - .map_err(|e| anyhow::anyhow!("Database migrations failed: {e}"))?; - if migrations.is_empty() { - tracing::info!("No database migrations to run"); - } else { - tracing::info!( - "Applied database migrations: {}", - migrations.iter().map(ToString::to_string).join(", ") - ) - } - Ok(()) + + // Prevent multiple migrations from running at the same time, or processes starting + // while migrations are still running. + diesel::sql_query(format!("SELECT pg_advisory_lock({MIGRATION_LOCK_KEY})")) + .execute(&mut conn) + .context("Couldn't acquire migration advisory lock")?; + + let res = (|| -> Result<_> { + tracing::info!("Running database migrations"); + tracing::trace!( + "Required migrations: {}", + MIGRATIONS + .migrations() + .map_err(|e| anyhow::anyhow!("Couldn't get migrations: {e}"))? + .iter() + .map(diesel::migration::Migration::::name) + .join(", ") + ); + let migrations = conn + .run_pending_migrations(MIGRATIONS) + .map_err(|e| anyhow::anyhow!("Database migrations failed: {e}"))?; + if migrations.is_empty() { + tracing::info!("No database migrations to run"); + } else { + tracing::info!( + "Applied database migrations: {}", + migrations.iter().map(ToString::to_string).join(", ") + ) + } + Ok(()) + })(); + + diesel::sql_query(format!("SELECT pg_advisory_unlock({MIGRATION_LOCK_KEY})")) + .execute(&mut conn) + .context("Couldn't release migration advisory lock")?; + + res }) .await .context("Couldn't join migration task")??;