Skip to content

Commit 337772d

Browse files
committed
fix(sqlite): stop actors on head fence mismatch
1 parent 68707ac commit 337772d

27 files changed

Lines changed: 2081 additions & 250 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/artifacts/errors/depot.head_fence_mismatch.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/depot-client-embedded/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ async-trait.workspace = true
1616
depot.workspace = true
1717
depot-client.workspace = true
1818
rivet-envoy-protocol.workspace = true
19+
rivet-error.workspace = true
1920
tokio.workspace = true

engine/packages/depot-client-embedded/src/lib.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::sync::Arc;
88

99
use anyhow::Result;
1010
use async_trait::async_trait;
11+
use depot::error::SqliteStorageError;
1112
use depot_client::{
1213
database::{NativeDatabaseHandle, open_database_from_transport},
1314
vfs::{SqliteTransport, SqliteVfsMetrics},
@@ -48,22 +49,31 @@ impl SqliteTransport for EmbeddedDepotSqliteTransport {
4849
&self,
4950
request: protocol::SqliteGetPagesRequest,
5051
) -> Result<protocol::SqliteGetPagesResponse> {
51-
match self.db.get_pages(request.pgnos).await {
52-
Ok(pages) => Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(
52+
match self
53+
.db
54+
.get_pages_with_options(
55+
request.pgnos,
56+
depot::types::GetPagesOptions {
57+
expected_head_txid: request.expected_head_txid,
58+
},
59+
)
60+
.await
61+
{
62+
Ok(result) => Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(
5363
protocol::SqliteGetPagesOk {
54-
pages: pages
64+
pages: result
65+
.pages
5566
.into_iter()
5667
.map(|page| protocol::SqliteFetchedPage {
5768
pgno: page.pgno,
5869
bytes: page.bytes,
5970
})
6071
.collect(),
72+
head_txid: Some(result.head_txid),
6173
},
6274
)),
6375
Err(err) => Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse(
64-
protocol::SqliteErrorResponse {
65-
message: sqlite_error_reason(&err),
66-
},
76+
sqlite_error_response(&err),
6777
)),
6878
}
6979
}
@@ -74,7 +84,7 @@ impl SqliteTransport for EmbeddedDepotSqliteTransport {
7484
) -> Result<protocol::SqliteCommitResponse> {
7585
match self
7686
.db
77-
.commit(
87+
.commit_with_options(
7888
request
7989
.dirty_pages
8090
.into_iter()
@@ -85,15 +95,20 @@ impl SqliteTransport for EmbeddedDepotSqliteTransport {
8595
.collect(),
8696
request.db_size_pages,
8797
request.now_ms,
98+
depot::types::CommitOptions {
99+
expected_head_txid: request.expected_head_txid,
100+
},
88101
)
89102
.await
90103
{
91-
Ok(()) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk),
92-
Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse(
93-
protocol::SqliteErrorResponse {
94-
message: sqlite_error_reason(&err),
104+
Ok(result) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk(
105+
protocol::SqliteCommitOk {
106+
head_txid: Some(result.head_txid),
95107
},
96108
)),
109+
Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse(
110+
sqlite_error_response(&err),
111+
)),
97112
}
98113
}
99114
}
@@ -104,3 +119,19 @@ fn sqlite_error_reason(err: &anyhow::Error) -> String {
104119
.collect::<Vec<_>>()
105120
.join(": ")
106121
}
122+
123+
fn sqlite_error_response(err: &anyhow::Error) -> protocol::SqliteErrorResponse {
124+
let structured = depot_error(err)
125+
.map(|err| rivet_error::RivetError::extract(&err.clone().build()))
126+
.unwrap_or_else(|| rivet_error::RivetError::extract(err));
127+
protocol::SqliteErrorResponse {
128+
group: structured.group().to_string(),
129+
code: structured.code().to_string(),
130+
message: sqlite_error_reason(err),
131+
}
132+
}
133+
134+
fn depot_error(err: &anyhow::Error) -> Option<&SqliteStorageError> {
135+
err.chain()
136+
.find_map(|source| source.downcast_ref::<SqliteStorageError>())
137+
}

engine/packages/depot-client-types/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
//! Shared SQLite execution types for local and remote depot client backends.
22
3+
pub const HEAD_FENCE_MISMATCH_GROUP: &str = "depot";
4+
pub const HEAD_FENCE_MISMATCH_CODE: &str = "head_fence_mismatch";
5+
6+
pub fn is_head_fence_mismatch(group: &str, code: &str) -> bool {
7+
group == HEAD_FENCE_MISMATCH_GROUP && code == HEAD_FENCE_MISMATCH_CODE
8+
}
9+
310
#[derive(Clone, Debug, PartialEq)]
411
pub enum BindParam {
512
Null,

engine/packages/depot-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ depot = { workspace = true, features = ["test-faults"] }
2929
futures-util.workspace = true
3030
gas.workspace = true
3131
rivet-config.workspace = true
32+
rivet-error.workspace = true
3233
rivet-pools.workspace = true
3334
rivet-test-deps.workspace = true
3435
sha2.workspace = true

engine/packages/depot-client/src/database.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
SqliteVfsMetricsSnapshot, VfsConfig, VfsPreloadHintSnapshot,
1111
fetch_initial_pages_for_registration,
1212
},
13-
worker::SqliteWorkerHandle,
13+
worker::{SqliteWorkerFatalError, SqliteWorkerHandle},
1414
};
1515

1616
#[derive(Clone)]
@@ -70,7 +70,8 @@ impl NativeDatabaseHandle {
7070
}
7171

7272
pub async fn exec(&self, sql: String) -> Result<QueryResult> {
73-
self.worker.exec(sql).await
73+
self.check_fatal_error()?;
74+
self.map_worker_result(self.worker.exec(sql).await)
7475
}
7576

7677
pub async fn query(&self, sql: String, params: Option<Vec<BindParam>>) -> Result<QueryResult> {
@@ -91,11 +92,15 @@ impl NativeDatabaseHandle {
9192
sql: String,
9293
params: Option<Vec<BindParam>>,
9394
) -> Result<ExecuteResult> {
94-
self.worker.execute(sql, params).await
95+
self.check_fatal_error()?;
96+
self.map_worker_result(self.worker.execute(sql, params).await)
9597
}
9698

9799
pub async fn close(&self) -> Result<()> {
98-
self.worker.close().await
100+
match self.worker.close().await {
101+
Ok(()) => Ok(()),
102+
Err(error) => Err(self.fatal_error().unwrap_or(error)),
103+
}
99104
}
100105

101106
pub async fn wait_for_worker_failure(&self) -> bool {
@@ -106,6 +111,10 @@ impl NativeDatabaseHandle {
106111
self.vfs.take_last_error()
107112
}
108113

114+
pub fn clone_fatal_error(&self) -> Option<String> {
115+
self.vfs.clone_fatal_error()
116+
}
117+
109118
pub fn snapshot_preload_hints(&self) -> VfsPreloadHintSnapshot {
110119
self.vfs.snapshot_preload_hints()
111120
}
@@ -130,7 +139,30 @@ impl NativeDatabaseHandle {
130139
}
131140

132141
async fn initialize(&self) -> Result<()> {
133-
self.worker.wait_ready().await
142+
self.map_worker_result(self.worker.wait_ready().await)
143+
}
144+
145+
fn check_fatal_error(&self) -> Result<()> {
146+
if let Some(error) = self.fatal_error() {
147+
return Err(error);
148+
}
149+
150+
Ok(())
151+
}
152+
153+
fn map_worker_result<T>(&self, result: Result<T>) -> Result<T> {
154+
match result {
155+
Ok(value) => {
156+
self.check_fatal_error()?;
157+
Ok(value)
158+
}
159+
Err(error) => Err(self.fatal_error().unwrap_or(error)),
160+
}
161+
}
162+
163+
fn fatal_error(&self) -> Option<anyhow::Error> {
164+
self.clone_fatal_error()
165+
.map(|message| SqliteWorkerFatalError::new(message).into())
134166
}
135167
}
136168

0 commit comments

Comments
 (0)