Skip to content

Commit 66a67ac

Browse files
committed
fix(sqlite): tolerate startup takeover races
1 parent 36392d2 commit 66a67ac

2 files changed

Lines changed: 100 additions & 19 deletions

File tree

engine/packages/pegboard-envoy/src/sqlite_runtime.rs

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use sqlite_storage::{
1111
commit::{CommitFinalizeRequest, CommitStageBeginRequest, CommitStageRequest},
1212
compaction::CompactionCoordinator,
1313
engine::SqliteEngine,
14+
error::SqliteStorageError,
1415
ltx::{LtxHeader, encode_ltx_v3},
1516
takeover::TakeoverConfig,
1617
types::{DirtyPage, SQLITE_PAGE_SIZE, SQLITE_VFS_V2_SCHEMA_VERSION, SqliteOrigin},
@@ -32,6 +33,7 @@ const SQLITE_V1_META_VERSION: u16 = 1;
3233
const SQLITE_V1_META_LEN: usize = 10;
3334
const SQLITE_V1_CHUNK_SIZE: usize = 4096;
3435
const SQLITE_V1_MAX_MIGRATION_BYTES: u64 = 128 * 1024 * 1024;
36+
const SQLITE_STARTUP_TAKEOVER_RETRIES: usize = 3;
3537
// A staged v1 import can reserve one txid, write at most 16 delta chunks
3638
// (128 MiB max import / 8 MiB max delta chunk), and finalize. That window should
3739
// stay well under a minute, so keep stale-owner recovery short.
@@ -594,25 +596,77 @@ pub async fn maybe_load_sqlite_startup_data(
594596
}
595597

596598
let actor_id = actor_id.to_string();
599+
let mut observed_generation = 0;
597600
if let Some(meta) = sqlite_engine.try_load_meta(&actor_id).await? {
598601
ensure!(
599602
!matches!(meta.origin, SqliteOrigin::MigratingFromV1),
600603
"sqlite v1 migration for actor {actor_id} is incomplete"
601604
);
605+
observed_generation = meta.generation;
606+
}
607+
608+
for attempt in 0..=SQLITE_STARTUP_TAKEOVER_RETRIES {
609+
match sqlite_engine
610+
.takeover(&actor_id, TakeoverConfig::new(timestamp::now()))
611+
.await
612+
{
613+
Ok(startup) => {
614+
return Ok(Some(protocol::SqliteStartupData {
615+
generation: startup.generation,
616+
meta: protocol_sqlite_meta(startup.meta),
617+
preloaded_pages: startup
618+
.preloaded_pages
619+
.into_iter()
620+
.map(protocol_sqlite_fetched_page)
621+
.collect(),
622+
}));
623+
}
624+
Err(err)
625+
if matches!(
626+
err.downcast_ref::<SqliteStorageError>(),
627+
Some(SqliteStorageError::ConcurrentTakeover)
628+
) =>
629+
{
630+
let current_meta = sqlite_engine.load_meta(&actor_id).await?;
631+
ensure!(
632+
!matches!(current_meta.origin, SqliteOrigin::MigratingFromV1),
633+
"sqlite v1 migration for actor {actor_id} is incomplete"
634+
);
635+
636+
if current_meta.generation > observed_generation {
637+
tracing::warn!(
638+
actor_id = %actor_id,
639+
previous_generation = observed_generation,
640+
current_generation = current_meta.generation,
641+
attempt,
642+
"sqlite startup takeover raced with another startup owner; reusing current generation"
643+
);
644+
return Ok(Some(protocol::SqliteStartupData {
645+
generation: current_meta.generation,
646+
meta: protocol_sqlite_meta(current_meta),
647+
preloaded_pages: Vec::new(),
648+
}));
649+
}
650+
651+
if attempt == SQLITE_STARTUP_TAKEOVER_RETRIES {
652+
return Err(err);
653+
}
654+
655+
tracing::debug!(
656+
actor_id = %actor_id,
657+
generation = current_meta.generation,
658+
attempt,
659+
"sqlite startup takeover raced with an in-flight meta update; retrying"
660+
);
661+
observed_generation = current_meta.generation;
662+
tokio::time::sleep(std::time::Duration::from_millis(10 * (attempt as u64 + 1)))
663+
.await;
664+
}
665+
Err(err) => return Err(err),
666+
}
602667
}
603-
let startup = sqlite_engine
604-
.takeover(&actor_id, TakeoverConfig::new(timestamp::now()))
605-
.await?;
606668

607-
Ok(Some(protocol::SqliteStartupData {
608-
generation: startup.generation,
609-
meta: protocol_sqlite_meta(startup.meta),
610-
preloaded_pages: startup
611-
.preloaded_pages
612-
.into_iter()
613-
.map(protocol_sqlite_fetched_page)
614-
.collect(),
615-
}))
669+
unreachable!("sqlite startup takeover loop should always return")
616670
}
617671

618672
pub fn protocol_sqlite_meta(meta: sqlite_storage::types::SqliteMeta) -> protocol::SqliteMeta {
@@ -676,9 +730,9 @@ mod tests {
676730
use universaldb::driver::RocksDbDatabaseDriver;
677731

678732
use super::{
679-
FILE_TAG_JOURNAL, FILE_TAG_MAIN, FILE_TAG_SHM, FILE_TAG_WAL,
680-
SQLITE_V1_CHUNK_SIZE, SQLITE_V1_MAX_MIGRATION_BYTES, SQLITE_V1_MIGRATION_LEASE_MS,
681-
maybe_migrate_v1_to_v2, read_v1_file, sqlite_subspace, v1_chunk_key, v1_meta_key,
733+
FILE_TAG_JOURNAL, FILE_TAG_MAIN, FILE_TAG_SHM, FILE_TAG_WAL, SQLITE_V1_CHUNK_SIZE,
734+
SQLITE_V1_MAX_MIGRATION_BYTES, SQLITE_V1_MIGRATION_LEASE_MS, maybe_migrate_v1_to_v2,
735+
read_v1_file, sqlite_subspace, v1_chunk_key, v1_meta_key,
682736
};
683737

684738
fn recipient(actor_id: Id) -> Recipient {

engine/packages/sqlite-storage/src/takeover.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,13 @@ impl SqliteEngine {
9696
udb::tx_get_value_serializable(&tx, &subspace, &meta_storage_key).await?
9797
{
9898
let existing_head = decode_db_head(&existing_meta)?;
99-
ensure!(
100-
matches!(existing_head.origin, SqliteOrigin::MigratingFromV1),
101-
SqliteStorageError::ConcurrentTakeover
102-
);
99+
if !matches!(existing_head.origin, SqliteOrigin::MigratingFromV1) {
100+
if require_stage_in_progress {
101+
return Ok(None);
102+
}
103+
104+
return Err(SqliteStorageError::ConcurrentTakeover.into());
105+
}
103106
let stage_in_progress =
104107
existing_head.next_txid > existing_head.head_txid.saturating_add(1);
105108
if require_stage_in_progress && !stage_in_progress {
@@ -750,6 +753,30 @@ mod tests {
750753
Ok(())
751754
}
752755

756+
#[tokio::test]
757+
async fn invalidate_v1_migration_ignores_native_v2_heads() -> Result<()> {
758+
let (db, subspace) = test_db().await?;
759+
let (engine, _compaction_rx) = SqliteEngine::new(db, subspace);
760+
let (head, encoded_head) = encode_db_head_with_usage(TEST_ACTOR, &seeded_head(), 0)?;
761+
apply_write_ops(
762+
engine.db.as_ref(),
763+
&engine.subspace,
764+
engine.op_counter.as_ref(),
765+
vec![WriteOp::put(meta_key(TEST_ACTOR), encoded_head)],
766+
)
767+
.await?;
768+
769+
assert!(!engine.invalidate_v1_migration(TEST_ACTOR, 9_999).await?);
770+
771+
let stored_meta = read_value(&engine, meta_key(TEST_ACTOR))
772+
.await?
773+
.expect("native meta should remain");
774+
let stored_head = decode_db_head(&stored_meta)?;
775+
assert_eq!(stored_head, head);
776+
777+
Ok(())
778+
}
779+
753780
#[tokio::test]
754781
async fn takeover_on_existing_meta_bumps_generation_and_preloads_page_one() -> Result<()> {
755782
let (db, subspace) = test_db().await?;

0 commit comments

Comments
 (0)