Skip to content

Commit f2a28ab

Browse files
committed
fix(depot-client): only fatal sqlite vfs on fence mismatch
1 parent 011798b commit f2a28ab

3 files changed

Lines changed: 190 additions & 43 deletions

File tree

engine/packages/depot-client/src/vfs.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,6 @@ pub struct BufferedCommitOutcome {
260260
#[derive(Debug, Clone, PartialEq, Eq)]
261261
pub enum CommitBufferError {
262262
FenceMismatch(String),
263-
StageNotFound(u64),
264263
Other(String),
265264
}
266265

@@ -338,6 +337,7 @@ pub struct VfsContext {
338337
state: RwLock<VfsState>,
339338
aux_files: RwLock<BTreeMap<String, Arc<AuxFileState>>>,
340339
last_error: Mutex<Option<String>>,
340+
transient_commit_error: Mutex<Option<String>>,
341341
fatal_error: RwLock<Option<String>>,
342342
#[cfg(test)]
343343
fail_next_aux_open: Mutex<Option<String>>,
@@ -432,7 +432,7 @@ struct RecentPageAccess {
432432

433433
#[derive(Debug)]
434434
enum GetPagesError {
435-
Fatal(String),
435+
FenceMismatch(String),
436436
Other(String),
437437
}
438438

@@ -980,6 +980,7 @@ impl VfsContext {
980980
state: RwLock::new(state),
981981
aux_files: RwLock::new(BTreeMap::new()),
982982
last_error: Mutex::new(None),
983+
transient_commit_error: Mutex::new(None),
983984
fatal_error: RwLock::new(None),
984985
#[cfg(test)]
985986
fail_next_aux_open: Mutex::new(None),
@@ -1018,6 +1019,14 @@ impl VfsContext {
10181019
self.fatal_error.read().clone()
10191020
}
10201021

1022+
fn defer_transient_commit_error(&self, message: String) {
1023+
*self.transient_commit_error.lock() = Some(message);
1024+
}
1025+
1026+
fn take_transient_commit_error(&self) -> Option<String> {
1027+
self.transient_commit_error.lock().take()
1028+
}
1029+
10211030
pub(crate) fn take_last_error(&self) -> Option<String> {
10221031
self.last_error.lock().take()
10231032
}
@@ -1143,13 +1152,9 @@ impl VfsContext {
11431152
self.state.read().dead
11441153
}
11451154

1146-
fn mark_dead(&self, message: String) {
1147-
self.set_last_error(message);
1148-
self.state.write().dead = true;
1149-
}
1150-
11511155
fn mark_fatal(&self, message: String) {
1152-
self.mark_dead(message.clone());
1156+
self.set_last_error(message.clone());
1157+
self.state.write().dead = true;
11531158
let mut fatal_error = self.fatal_error.write();
11541159
if fatal_error.is_none() {
11551160
*fatal_error = Some(message);
@@ -1455,7 +1460,7 @@ impl VfsContext {
14551460
return Ok(resolved);
14561461
}
14571462
if is_head_fence_mismatch_response(&error) {
1458-
return Err(GetPagesError::Fatal(error.message));
1463+
return Err(GetPagesError::FenceMismatch(error.message));
14591464
}
14601465
Err(GetPagesError::Other(error.message))
14611466
}
@@ -1755,12 +1760,7 @@ fn assert_batch_atomic_probe(db: *mut sqlite3, vfs: &SqliteVfs) -> std::result::
17551760
fn handle_non_finalize_commit_error(ctx: &VfsContext, err: &CommitBufferError) {
17561761
match err {
17571762
CommitBufferError::FenceMismatch(message) => ctx.mark_fatal(message.clone()),
1758-
CommitBufferError::StageNotFound(stage_id) => {
1759-
ctx.mark_dead(format!(
1760-
"sqlite stage {stage_id} missing during commit finalize"
1761-
));
1762-
}
1763-
CommitBufferError::Other(message) => ctx.mark_dead(message.clone()),
1763+
CommitBufferError::Other(message) => ctx.set_last_error(message.clone()),
17641764
}
17651765
}
17661766

@@ -2238,7 +2238,7 @@ unsafe extern "C" fn io_read(
22382238

22392239
let resolved = match ctx.resolve_pages(&requested_pages, true) {
22402240
Ok(pages) => pages,
2241-
Err(GetPagesError::Fatal(message)) => {
2241+
Err(GetPagesError::FenceMismatch(message)) => {
22422242
tracing::error!(
22432243
actor_id = %ctx.actor_id,
22442244
requested_pages = ?requested_pages,
@@ -2255,7 +2255,7 @@ unsafe extern "C" fn io_read(
22552255
error = %message,
22562256
"sqlite xRead failed to resolve pages"
22572257
);
2258-
ctx.mark_dead(message);
2258+
ctx.set_last_error(message);
22592259
return SQLITE_IOERR_READ;
22602260
}
22612261
};
@@ -2380,12 +2380,12 @@ unsafe extern "C" fn io_write(
23802380
} else {
23812381
match ctx.resolve_pages(&pages_to_resolve, false) {
23822382
Ok(pages) => pages,
2383-
Err(GetPagesError::Fatal(message)) => {
2383+
Err(GetPagesError::FenceMismatch(message)) => {
23842384
ctx.mark_fatal(message);
23852385
return SQLITE_IOERR_WRITE;
23862386
}
23872387
Err(GetPagesError::Other(message)) => {
2388-
ctx.mark_dead(message);
2388+
ctx.set_last_error(message);
23892389
return SQLITE_IOERR_WRITE;
23902390
}
23912391
}
@@ -2489,6 +2489,10 @@ unsafe extern "C" fn io_sync(p_file: *mut sqlite3_file, _flags: c_int) -> c_int
24892489
return SQLITE_OK;
24902490
}
24912491
let ctx = &*file.ctx;
2492+
if let Some(message) = ctx.take_transient_commit_error() {
2493+
ctx.set_last_error(message);
2494+
return SQLITE_IOERR_FSYNC;
2495+
}
24922496
match ctx.flush_dirty_pages() {
24932497
Ok(_) => SQLITE_OK,
24942498
Err(err) => {
@@ -2575,6 +2579,9 @@ unsafe extern "C" fn io_file_control(
25752579
?err,
25762580
"sqlite atomic write file control failed"
25772581
);
2582+
if let CommitBufferError::Other(message) = &err {
2583+
ctx.defer_transient_commit_error(message.clone());
2584+
}
25782585
handle_finalize_fence_error(ctx, &err);
25792586
SQLITE_IOERR
25802587
}
@@ -3029,6 +3036,8 @@ impl Drop for NativeDatabase {
30293036
Err(err) => {
30303037
handle_non_finalize_commit_error(ctx, &err);
30313038
tracing::warn!(?err, "failed to flush sqlite database before close");
3039+
self.db = ptr::null_mut();
3040+
return;
30323041
}
30333042
}
30343043
}

engine/packages/depot-client/tests/inline/vfs.rs

Lines changed: 143 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2399,29 +2399,36 @@ fn direct_engine_marks_vfs_dead_after_transport_errors() {
23992399
)
24002400
.expect("v2 vfs should register");
24012401
let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open");
2402+
let ctx = direct_vfs_ctx(&db);
2403+
2404+
{
2405+
let mut state = ctx.state.write();
2406+
state.write_buffer.dirty.insert(1, empty_db_page());
2407+
state.db_size_pages = 1;
2408+
}
24022409

24032410
hooks.fail_next_commit("InjectedTransportError: commit transport dropped");
2404-
let err = sqlite_exec(
2405-
db.as_ptr(),
2406-
"CREATE TABLE broken (id INTEGER PRIMARY KEY, value TEXT NOT NULL);",
2407-
)
2408-
.expect_err("failing transport commit should surface as an IO error");
2411+
let err = ctx
2412+
.flush_dirty_pages()
2413+
.expect_err("failing transport commit should surface as a VFS error");
24092414
assert!(
2410-
err.contains("I/O") || err.contains("disk I/O"),
2411-
"sqlite should surface transport failure as an IO error: {err}",
2415+
matches!(err, CommitBufferError::Other(ref message) if message.contains("InjectedTransportError")),
2416+
"VFS should surface transport failure as an IO error: {err:?}",
24122417
);
24132418
assert!(
2414-
direct_vfs_ctx(&db).is_dead(),
2415-
"transport error should kill the v2 VFS"
2419+
!ctx.is_dead(),
2420+
"transport error should not poison the v2 VFS"
2421+
);
2422+
assert!(
2423+
ctx.clone_fatal_error().is_none(),
2424+
"transport error should not be stored as fatal"
24162425
);
24172426
assert_eq!(
24182427
db.take_last_kv_error().as_deref(),
24192428
Some("InjectedTransportError: commit transport dropped"),
24202429
);
2421-
assert!(
2422-
sqlite_query_i64(db.as_ptr(), "PRAGMA page_count;").is_err(),
2423-
"subsequent reads should fail once the VFS is dead",
2424-
);
2430+
ctx.flush_dirty_pages()
2431+
.expect("retry after unapplied transport failure should succeed");
24252432
}
24262433

24272434
#[test]
@@ -2459,13 +2466,23 @@ fn flush_dirty_pages_marks_vfs_dead_after_transport_error() {
24592466
"flush failure should surface as a transport error: {err:?}",
24602467
);
24612468
assert!(
2462-
ctx.is_dead(),
2463-
"flush transport failure should poison the VFS"
2469+
!ctx.is_dead(),
2470+
"flush transport failure should not poison the VFS"
2471+
);
2472+
assert!(
2473+
ctx.clone_fatal_error().is_none(),
2474+
"flush transport failure should not be stored as fatal"
24642475
);
24652476
assert_eq!(
24662477
db.take_last_kv_error().as_deref(),
24672478
Some("InjectedTransportError: flush transport dropped"),
24682479
);
2480+
ctx.flush_dirty_pages()
2481+
.expect("retry after unapplied transport failure should succeed");
2482+
assert!(
2483+
ctx.state.read().write_buffer.dirty.is_empty(),
2484+
"successful retry should clear dirty pages",
2485+
);
24692486
}
24702487

24712488
#[test]
@@ -2505,13 +2522,120 @@ fn commit_atomic_write_marks_vfs_dead_after_transport_error() {
25052522
"atomic-write failure should surface as a transport error: {err:?}",
25062523
);
25072524
assert!(
2508-
ctx.is_dead(),
2509-
"commit_atomic_write transport failure should poison the VFS",
2525+
!ctx.is_dead(),
2526+
"commit_atomic_write transport failure should not poison the VFS",
2527+
);
2528+
assert!(
2529+
ctx.clone_fatal_error().is_none(),
2530+
"commit_atomic_write transport failure should not be stored as fatal",
25102531
);
25112532
assert_eq!(
25122533
db.take_last_kv_error().as_deref(),
25132534
Some("InjectedTransportError: atomic transport dropped"),
25142535
);
2536+
ctx.commit_atomic_write()
2537+
.expect("retry after unapplied atomic transport failure should succeed");
2538+
assert!(
2539+
!ctx.state.read().write_buffer.in_atomic_write,
2540+
"successful retry should leave atomic-write mode",
2541+
);
2542+
}
2543+
2544+
#[test]
2545+
fn lost_commit_response_fails_later_on_head_fence_mismatch() {
2546+
let runtime = direct_runtime();
2547+
let harness = DirectEngineHarness::new();
2548+
let engine = runtime.block_on(harness.open_engine());
2549+
let transport = Arc::new(DirectDepotTransport::new(engine));
2550+
let hooks = transport.direct_hooks();
2551+
let vfs = SqliteVfs::register_with_transport(
2552+
&next_test_name("sqlite-direct-vfs"),
2553+
transport,
2554+
harness.actor_id.clone(),
2555+
runtime.handle().clone(),
2556+
VfsConfig::default(),
2557+
None,
2558+
)
2559+
.expect("v2 vfs should register");
2560+
let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open");
2561+
let ctx = direct_vfs_ctx(&db);
2562+
2563+
{
2564+
let mut state = ctx.state.write();
2565+
state.write_buffer.dirty.insert(1, empty_db_page());
2566+
state.db_size_pages = 1;
2567+
}
2568+
2569+
hooks.fail_next_commit_after_apply("InjectedTransportError: commit response lost");
2570+
let err = ctx
2571+
.flush_dirty_pages()
2572+
.expect_err("lost commit response should surface as a transport error");
2573+
assert!(
2574+
matches!(err, CommitBufferError::Other(ref message) if message.contains("commit response lost")),
2575+
"lost response should be ambiguous before the next fence check: {err:?}",
2576+
);
2577+
assert!(
2578+
!ctx.is_dead(),
2579+
"ambiguous lost response should not immediately poison the VFS"
2580+
);
2581+
assert!(ctx.clone_fatal_error().is_none());
2582+
2583+
let err = ctx
2584+
.flush_dirty_pages()
2585+
.expect_err("retry after applied lost response should hit the stale head fence");
2586+
assert!(
2587+
matches!(err, CommitBufferError::FenceMismatch(ref message) if message.contains("head fence mismatch")),
2588+
"retry should confirm the stale fence: {err:?}",
2589+
);
2590+
assert!(ctx.is_dead());
2591+
assert!(
2592+
ctx.clone_fatal_error()
2593+
.is_some_and(|message| message.contains("head fence mismatch")),
2594+
"confirmed fence mismatch should be stored as fatal"
2595+
);
2596+
}
2597+
2598+
#[test]
2599+
fn unapplied_commit_transport_failure_can_retry_from_same_head() {
2600+
let runtime = direct_runtime();
2601+
let harness = DirectEngineHarness::new();
2602+
let engine = runtime.block_on(harness.open_engine());
2603+
let transport = Arc::new(DirectDepotTransport::new(engine));
2604+
let hooks = transport.direct_hooks();
2605+
let vfs = SqliteVfs::register_with_transport(
2606+
&next_test_name("sqlite-direct-vfs"),
2607+
transport,
2608+
harness.actor_id.clone(),
2609+
runtime.handle().clone(),
2610+
VfsConfig::default(),
2611+
None,
2612+
)
2613+
.expect("v2 vfs should register");
2614+
let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open");
2615+
let ctx = direct_vfs_ctx(&db);
2616+
2617+
{
2618+
let mut state = ctx.state.write();
2619+
state.write_buffer.dirty.insert(1, empty_db_page());
2620+
state.db_size_pages = 1;
2621+
}
2622+
2623+
hooks.fail_next_commit("InjectedTransportError: commit dropped before apply");
2624+
let err = ctx
2625+
.flush_dirty_pages()
2626+
.expect_err("pre-apply transport error should surface");
2627+
assert!(
2628+
matches!(err, CommitBufferError::Other(ref message) if message.contains("before apply")),
2629+
"pre-apply failure should be generic transport error: {err:?}",
2630+
);
2631+
assert!(!ctx.is_dead());
2632+
assert!(ctx.clone_fatal_error().is_none());
2633+
2634+
ctx.flush_dirty_pages()
2635+
.expect("retry from same head should succeed");
2636+
let state = ctx.state.read();
2637+
assert!(state.write_buffer.dirty.is_empty());
2638+
assert!(state.head_txid.is_some());
25152639
}
25162640

25172641
#[test]
@@ -3038,8 +3162,8 @@ fn direct_engine_fresh_reopen_recovers_after_poisoned_handle() {
30383162
"sqlite should surface transport failure as an IO error: {err}",
30393163
);
30403164
assert!(
3041-
direct_vfs_ctx(&db).is_dead(),
3042-
"transport error should kill the live VFS",
3165+
!direct_vfs_ctx(&db).is_dead(),
3166+
"transport error should not poison the live VFS",
30433167
);
30443168

30453169
drop(db);

0 commit comments

Comments
 (0)