From e000091ab324817547d11d10c5b3376fe8f1ff3a Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Mon, 23 Mar 2026 16:11:20 -0700 Subject: [PATCH 1/3] Transactional sitrep insert --- nexus/db-queries/src/db/datastore/fm.rs | 216 +++++++++--------- .../src/app/background/tasks/fm_sitrep_gc.rs | 64 +++--- 2 files changed, 142 insertions(+), 138 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/fm.rs b/nexus/db-queries/src/db/datastore/fm.rs index f3334621deb..c0f5b0f621d 100644 --- a/nexus/db-queries/src/db/datastore/fm.rs +++ b/nexus/db-queries/src/db/datastore/fm.rs @@ -478,32 +478,10 @@ impl DataStore { let sitrep_id = sitrep.id(); - // Create the sitrep metadata record. - // - // NOTE: we must insert this record before anything else, because it's - // how orphaned sitreps are found when performing garbage collection. - // Were we to first insert some other records and insert the metadata - // record *last*, we could die when we have inserted some sitrep data - // but have yet to create the metadata record. If this occurs, those - // records could not be easily found by the garbage collection task. - // Those (unused) records would then be permanently leaked without - // manual human intervention to delete them. - diesel::insert_into(sitrep_dsl::fm_sitrep) - .values(model::SitrepMetadata::from(sitrep.metadata)) - .execute_async(&*conn) - .await - .map_err(|e| { - public_error_from_diesel(e, ErrorHandler::Server) - .internal_context("failed to insert sitrep metadata record") - })?; - - // Create case records. - // - // We do this by collecting all the records for cases into big `Vec`s - // and inserting each category of case records in one big INSERT query, - // rather than doing smaller ones for each case in the sitrep. This uses - // more memory in Nexus but reduces the number of small db queries we - // perform. + // Collect case records into big `Vec`s so we can insert each category + // in one big INSERT query, rather than doing smaller ones for each case + // in the sitrep. This uses more memory in Nexus but reduces the number + // of small db queries we perform. let mut cases = Vec::with_capacity(sitrep.cases.len()); let mut alerts_requested = Vec::new(); let mut case_ereports = Vec::new(); @@ -522,65 +500,84 @@ impl DataStore { )); } - if !case_ereports.is_empty() { - diesel::insert_into(case_ereport_dsl::fm_ereport_in_case) - .values(case_ereports) - .execute_async(&*conn) - .await - .map_err(|e| { - public_error_from_diesel(e, ErrorHandler::Server) - .internal_context( - "failed to insert case ereport assignments", + // Sitrep insertion is transactional. This ensures that if Nexus + // crashes mid-insert, the transaction rolls back and no partial + // sitrep data is left behind. It also prevents the sitrep GC task + // from racing with insertion: without a transaction, GC could + // delete the metadata row while child rows are still being + // inserted, permanently leaking those children. + // + // See https://github.com/oxidecomputer/omicron/issues/10131 for + // details. + let metadata = model::SitrepMetadata::from(sitrep.metadata); + let err = OptionalError::new(); + self.transaction_retry_wrapper("fm_sitrep_insert") + .transaction(&conn, |conn| { + let err = err.clone(); + let metadata = metadata.clone(); + let case_ereports = case_ereports.clone(); + let alerts_requested = alerts_requested.clone(); + let cases = cases.clone(); + async move { + // Insert the sitrep metadata record. + diesel::insert_into(sitrep_dsl::fm_sitrep) + .values(metadata) + .execute_async(&conn) + .await?; + + // Insert case ereport assignments. + if !case_ereports.is_empty() { + diesel::insert_into( + case_ereport_dsl::fm_ereport_in_case, ) - })?; - } + .values(case_ereports) + .execute_async(&conn) + .await?; + } - if !alerts_requested.is_empty() { - diesel::insert_into(alert_req_dsl::fm_alert_request) - .values(alerts_requested) - .execute_async(&*conn) - .await - .map_err(|e| { - public_error_from_diesel(e, ErrorHandler::Server) - .internal_context("failed to insert alert requests") - })?; - } + // Insert alert requests. + if !alerts_requested.is_empty() { + diesel::insert_into(alert_req_dsl::fm_alert_request) + .values(alerts_requested) + .execute_async(&conn) + .await?; + } - if !cases.is_empty() { - diesel::insert_into(case_dsl::fm_case) - .values(cases) - .execute_async(&*conn) - .await - .map_err(|e| { - public_error_from_diesel(e, ErrorHandler::Server) - .internal_context("failed to insert case records") - })?; - } + // Insert case metadata records. + if !cases.is_empty() { + diesel::insert_into(case_dsl::fm_case) + .values(cases) + .execute_async(&conn) + .await?; + } - // Now, try to make the sitrep current. - let query = Self::insert_sitrep_version_query(sitrep_id); - query - .execute_async(&*conn) - .await - .map_err(|err| match err { - DieselError::DatabaseError( - DatabaseErrorKind::Unknown, - info, - ) if info.message() - == Self::PARENT_NOT_CURRENT_ERROR_MESSAGE => - { - InsertSitrepError::ParentNotCurrent(sitrep_id) - } - err => { - let err = - public_error_from_diesel(err, ErrorHandler::Server) - .internal_context( - "failed to insert new sitrep version", - ); - InsertSitrepError::Other(err) + // Now, try to make the sitrep current. + let query = Self::insert_sitrep_version_query(sitrep_id); + query.execute_async(&conn).await.map_err(|e| match e { + DieselError::DatabaseError( + DatabaseErrorKind::Unknown, + ref info, + ) if info.message() + == Self::PARENT_NOT_CURRENT_ERROR_MESSAGE => + { + err.bail(InsertSitrepError::ParentNotCurrent( + sitrep_id, + )) + } + other => other, + })?; + + Ok(()) } }) - .map(|_| ()) + .await + .map_err(|e| match err.take() { + Some(err) => err, + None => InsertSitrepError::Other(public_error_from_diesel( + e, + ErrorHandler::Server, + )), + }) } // Uncastable sentinel used to detect we attempt to make a sitrep current when @@ -770,6 +767,10 @@ impl DataStore { /// sitreps will never be read, since sitreps are only read when they are /// current,[^1] so they can safely be deleted at any time. /// + /// Because sitrep insertion is transactional (see [`Self::fm_sitrep_insert`]), + /// orphaned sitreps are always fully formed: all child records (cases, + /// ereports, alert requests) are present. + /// /// This query is used by the `fm_sitrep_gc` background task, which is /// responsible for deleting orphaned sitreps. /// @@ -913,7 +914,9 @@ impl DataStore { } = self // Sitrep deletion is transactional to prevent a sitrep from being // left in a partially-deleted state should the Nexus instance - // attempting the delete operation die suddenly. + // attempting the delete operation die suddenly. Sitrep insertion + // is also transactional (see `fm_sitrep_insert`), so the GC + // task cannot race with a partially-inserted sitrep. .transaction_retry_wrapper("fm_sitrep_delete_all") .transaction(&conn, |conn| { let ids = ids.clone(); @@ -1654,6 +1657,13 @@ mod tests { Ok(listed_orphans) } + /// Creates an orphaned sitrep by directly inserting a metadata row + /// into `fm_sitrep`, bypassing `fm_sitrep_insert`. + /// + /// Since `fm_sitrep_insert` is transactional, a failed CAS rolls + /// back the entire transaction (including the metadata row), so it + /// can no longer be used to create orphans in tests. Instead, we + /// insert the metadata row directly. async fn insert_orphan( datastore: &DataStore, opctx: &OpContext, @@ -1662,31 +1672,27 @@ mod tests { v: usize, i: usize, ) { - let sitrep = fm::Sitrep { - metadata: fm::SitrepMetadata { - id: SitrepUuid::new_v4(), - inv_collection_id: CollectionUuid::new_v4(), - creator_id: OmicronZoneUuid::new_v4(), - comment: format!("test sitrep v{i}; orphan {i}"), - time_created: Utc::now(), - parent_sitrep_id, - }, - cases: Default::default(), - }; - match datastore.fm_sitrep_insert(&opctx, sitrep).await { - Ok(_) => { - panic!("inserting sitrep v{v} orphan {i} should not succeed") - } - Err(InsertSitrepError::ParentNotCurrent(id)) => { - orphans.insert(id); - } - Err(InsertSitrepError::Other(e)) => { - panic!( - "expected inserting sitrep v{v} orphan {i} to fail because \ - its parent is out of date, but saw an unexpected error: {e}" - ); - } - } + use nexus_db_schema::schema::fm_sitrep::dsl; + + let id = SitrepUuid::new_v4(); + let metadata = model::SitrepMetadata::from(fm::SitrepMetadata { + id, + inv_collection_id: CollectionUuid::new_v4(), + creator_id: OmicronZoneUuid::new_v4(), + comment: format!("test sitrep v{v}; orphan {i}"), + time_created: Utc::now(), + parent_sitrep_id, + }); + let conn = datastore + .pool_connection_authorized(opctx) + .await + .expect("failed to get connection"); + diesel::insert_into(dsl::fm_sitrep) + .values(metadata) + .execute_async(&*conn) + .await + .expect("failed to insert orphan sitrep metadata"); + orphans.insert(id); } async fn make_sitrep_with_cases( diff --git a/nexus/src/app/background/tasks/fm_sitrep_gc.rs b/nexus/src/app/background/tasks/fm_sitrep_gc.rs index 1782d1b0d42..acb5090c49b 100644 --- a/nexus/src/app/background/tasks/fm_sitrep_gc.rs +++ b/nexus/src/app/background/tasks/fm_sitrep_gc.rs @@ -123,8 +123,9 @@ impl SitrepGc { #[cfg(test)] mod tests { use super::*; + use async_bb8_diesel::AsyncRunQueryDsl; use chrono::Utc; - use nexus_db_queries::db::datastore::fm::InsertSitrepError; + use nexus_db_queries::db::model; use nexus_db_queries::db::pub_test_utils::TestDatabase; use nexus_types::fm; use omicron_common::api::external::Error; @@ -250,44 +251,41 @@ mod tests { logctx.cleanup_successful(); } + /// Creates an orphaned sitrep by directly inserting a metadata row + /// into `fm_sitrep`, bypassing `fm_sitrep_insert`. + /// + /// Since `fm_sitrep_insert` is transactional, a failed CAS rolls + /// back the entire transaction (including the metadata row), so it + /// can no longer be used to create orphans in tests. Instead, we + /// insert the metadata row directly. async fn insert_orphan( datastore: &DataStore, - opctx: &OpContext, + _opctx: &OpContext, orphans: &mut BTreeSet, parent_sitrep_id: Option, v: usize, i: usize, ) { - let sitrep = fm::Sitrep { - metadata: fm::SitrepMetadata { - id: SitrepUuid::new_v4(), - inv_collection_id: CollectionUuid::new_v4(), - creator_id: OmicronZoneUuid::new_v4(), - comment: format!("test sitrep v{i}; orphan {i}"), - time_created: Utc::now(), - parent_sitrep_id, - }, - // We could populate the orphan sitreps with cases and ereports - // here, but there's a unit test - // `test_sitrep_delete_deletes_cases()` in the - // `nexus_db_queries::db::datastore::fm` module which ensures that - // deleting a sitrep removes all the other records associated with - // it, so it should be safe to trust that this works properly. - cases: Default::default(), - }; - match datastore.fm_sitrep_insert(&opctx, sitrep).await { - Ok(_) => { - panic!("inserting sitrep v{v} orphan {i} should not succeed") - } - Err(InsertSitrepError::ParentNotCurrent(id)) => { - orphans.insert(id); - } - Err(InsertSitrepError::Other(e)) => { - panic!( - "expected inserting sitrep v{v} orphan {i} to fail because \ - its parent is out of date, but saw an unexpected error: {e}" - ); - } - } + use nexus_db_schema::schema::fm_sitrep::dsl; + + let id = SitrepUuid::new_v4(); + let metadata = model::SitrepMetadata::from(fm::SitrepMetadata { + id, + inv_collection_id: CollectionUuid::new_v4(), + creator_id: OmicronZoneUuid::new_v4(), + comment: format!("test sitrep v{v}; orphan {i}"), + time_created: Utc::now(), + parent_sitrep_id, + }); + let conn = datastore + .pool_connection_for_tests() + .await + .expect("failed to get connection"); + diesel::insert_into(dsl::fm_sitrep) + .values(metadata) + .execute_async(&*conn) + .await + .expect("failed to insert orphan sitrep metadata"); + orphans.insert(id); } } From 98b6c103b7ed412a4bb5f0bdabf0ed128a34d9aa Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 24 Mar 2026 09:07:18 -0700 Subject: [PATCH 2/3] remove unused opctx --- nexus/src/app/background/tasks/fm_sitrep_gc.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nexus/src/app/background/tasks/fm_sitrep_gc.rs b/nexus/src/app/background/tasks/fm_sitrep_gc.rs index acb5090c49b..380785ebd2d 100644 --- a/nexus/src/app/background/tasks/fm_sitrep_gc.rs +++ b/nexus/src/app/background/tasks/fm_sitrep_gc.rs @@ -163,7 +163,7 @@ mod tests { // Now, create some orphaned sitreps which also have no parent. let mut orphans = BTreeSet::new(); for i in 1..5 { - insert_orphan(&datastore, &opctx, &mut orphans, None, 1, i).await; + insert_orphan(&datastore, &mut orphans, None, 1, i).await; } // Next, create a new sitrep which descends from sitrep 1. @@ -187,7 +187,6 @@ mod tests { for i in 1..4 { insert_orphan( &datastore, - &opctx, &mut orphans, Some(sitrep1.metadata.id), 2, @@ -260,7 +259,6 @@ mod tests { /// insert the metadata row directly. async fn insert_orphan( datastore: &DataStore, - _opctx: &OpContext, orphans: &mut BTreeSet, parent_sitrep_id: Option, v: usize, From a1d650fe707307ba5c4ed7d5748ada8b2eeaf42e Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 24 Mar 2026 15:18:23 -0700 Subject: [PATCH 3/3] better error context --- nexus/db-queries/src/db/datastore/fm.rs | 62 +++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/fm.rs b/nexus/db-queries/src/db/datastore/fm.rs index c0f5b0f621d..21a040d4d34 100644 --- a/nexus/db-queries/src/db/datastore/fm.rs +++ b/nexus/db-queries/src/db/datastore/fm.rs @@ -523,7 +523,18 @@ impl DataStore { diesel::insert_into(sitrep_dsl::fm_sitrep) .values(metadata) .execute_async(&conn) - .await?; + .await + .map_err(|e| { + err.bail(InsertSitrepError::Other( + public_error_from_diesel( + e, + ErrorHandler::Server, + ) + .internal_context( + "failed to insert sitrep metadata record", + ), + )) + })?; // Insert case ereport assignments. if !case_ereports.is_empty() { @@ -532,7 +543,18 @@ impl DataStore { ) .values(case_ereports) .execute_async(&conn) - .await?; + .await + .map_err(|e| { + err.bail(InsertSitrepError::Other( + public_error_from_diesel( + e, + ErrorHandler::Server, + ) + .internal_context( + "failed to insert case ereport assignments", + ), + )) + })?; } // Insert alert requests. @@ -540,7 +562,18 @@ impl DataStore { diesel::insert_into(alert_req_dsl::fm_alert_request) .values(alerts_requested) .execute_async(&conn) - .await?; + .await + .map_err(|e| { + err.bail(InsertSitrepError::Other( + public_error_from_diesel( + e, + ErrorHandler::Server, + ) + .internal_context( + "failed to insert alert requests", + ), + )) + })?; } // Insert case metadata records. @@ -548,7 +581,18 @@ impl DataStore { diesel::insert_into(case_dsl::fm_case) .values(cases) .execute_async(&conn) - .await?; + .await + .map_err(|e| { + err.bail(InsertSitrepError::Other( + public_error_from_diesel( + e, + ErrorHandler::Server, + ) + .internal_context( + "failed to insert case records", + ), + )) + })?; } // Now, try to make the sitrep current. @@ -564,7 +608,15 @@ impl DataStore { sitrep_id, )) } - other => other, + other => err.bail(InsertSitrepError::Other( + public_error_from_diesel( + other, + ErrorHandler::Server, + ) + .internal_context( + "failed to insert new sitrep version", + ), + )), })?; Ok(())