Skip to content

Commit 71dd250

Browse files
NathanFlurryMasterPtato
authored andcommitted
fix(sqlite): fence remote requests by generation
1 parent f21c5de commit 71dd250

3 files changed

Lines changed: 50 additions & 5 deletions

File tree

engine/packages/depot-client/src/database.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::sync::Arc;
22

33
use anyhow::{Result, anyhow};
4+
use async_trait::async_trait;
5+
use rivet_envoy_protocol as protocol;
46
use tokio::runtime::Handle;
57

68
use crate::{
@@ -23,6 +25,30 @@ pub fn vfs_name_for_actor_database(actor_id: &str, generation: u64) -> String {
2325
format!("envoy-sqlite-{actor_id}-g{generation}")
2426
}
2527

28+
struct GenerationFencedTransport {
29+
inner: SqliteTransportHandle,
30+
generation: u64,
31+
}
32+
33+
#[async_trait]
34+
impl crate::vfs::SqliteTransport for GenerationFencedTransport {
35+
async fn get_pages(
36+
&self,
37+
mut request: protocol::SqliteGetPagesRequest,
38+
) -> Result<protocol::SqliteGetPagesResponse> {
39+
request.expected_generation.get_or_insert(self.generation);
40+
self.inner.get_pages(request).await
41+
}
42+
43+
async fn commit(
44+
&self,
45+
mut request: protocol::SqliteCommitRequest,
46+
) -> Result<protocol::SqliteCommitResponse> {
47+
request.expected_generation.get_or_insert(self.generation);
48+
self.inner.commit(request).await
49+
}
50+
}
51+
2652
pub async fn open_database_from_transport(
2753
transport: SqliteTransportHandle,
2854
actor_id: String,
@@ -32,6 +58,10 @@ pub async fn open_database_from_transport(
3258
) -> Result<NativeDatabaseHandle> {
3359
let vfs_name = vfs_name_for_actor_database(&actor_id, generation);
3460
let config = VfsConfig::default();
61+
let transport: SqliteTransportHandle = Arc::new(GenerationFencedTransport {
62+
inner: transport,
63+
generation,
64+
});
3565
let initial_pages = fetch_initial_pages_for_registration(transport.clone(), &actor_id, &config)
3666
.await
3767
.map_err(|e| anyhow!("failed to preload sqlite pages: {e}"))?;

engine/packages/depot-client/src/vfs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1854,6 +1854,7 @@ async fn fetch_initial_pages(
18541854
fn is_initial_main_page_missing(message: &str) -> bool {
18551855
message.contains("sqlite database was not found in this bucket branch")
18561856
|| message.contains("sqlite meta missing for get_pages")
1857+
|| message == "actor does not exist"
18571858
}
18581859

18591860
fn next_temp_aux_path() -> String {

engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -698,7 +698,8 @@ async fn handle_sqlite_get_pages(
698698
conn: &Conn,
699699
request: protocol::SqliteGetPagesRequest,
700700
) -> Result<protocol::SqliteGetPagesResponse> {
701-
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
701+
validate_sqlite_actor_for_request(ctx, conn, &request.actor_id, request.expected_generation)
702+
.await?;
702703

703704
let actor_db = actor_db(ctx, conn, request.actor_id.clone()).await?;
704705
let result = actor_db
@@ -733,7 +734,8 @@ async fn handle_sqlite_commit(
733734
request: protocol::SqliteCommitRequest,
734735
) -> Result<protocol::SqliteCommitResponse> {
735736
let decode_request_start = Instant::now();
736-
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
737+
validate_sqlite_actor_for_request(ctx, conn, &request.actor_id, request.expected_generation)
738+
.await?;
737739
let decode_request_duration = decode_request_start.elapsed();
738740
crate::metrics::SQLITE_COMMIT_ENVOY_DISPATCH_DURATION
739741
.observe(decode_request_duration.as_secs_f64());
@@ -862,6 +864,19 @@ async fn validate_sqlite_actor(ctx: &StandaloneCtx, conn: &Conn, actor_id: &str)
862864
Ok(())
863865
}
864866

867+
async fn validate_sqlite_actor_for_request(
868+
ctx: &StandaloneCtx,
869+
conn: &Conn,
870+
actor_id: &str,
871+
expected_generation: Option<u64>,
872+
) -> Result<()> {
873+
if let Some(generation) = expected_generation {
874+
validate_remote_sqlite_generation(ctx, conn, actor_id, generation).await
875+
} else {
876+
validate_sqlite_actor(ctx, conn, actor_id).await
877+
}
878+
}
879+
865880
async fn validate_remote_sqlite_generation(
866881
ctx: &StandaloneCtx,
867882
conn: &Conn,
@@ -1125,11 +1140,10 @@ fn depot_error(err: &anyhow::Error) -> Option<&SqliteStorageError> {
11251140

11261141
fn is_startup_database_miss(
11271142
err: &anyhow::Error,
1128-
expected_generation: Option<u64>,
1143+
_expected_generation: Option<u64>,
11291144
expected_head_txid: Option<u64>,
11301145
) -> bool {
1131-
expected_generation.is_none()
1132-
&& expected_head_txid.is_none()
1146+
expected_head_txid.is_none()
11331147
&& matches!(depot_error(err), Some(SqliteStorageError::DatabaseNotFound))
11341148
}
11351149

0 commit comments

Comments
 (0)