Skip to content

Commit 3df0cfa

Browse files
committed
chore(depot-client): split sqlite vfs transports
1 parent 5425291 commit 3df0cfa

14 files changed

Lines changed: 453 additions & 486 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/depot-client/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ crate-type = ["lib"]
1212

1313
[dependencies]
1414
anyhow.workspace = true
15+
async-trait.workspace = true
1516
crossbeam-channel = "0.5"
1617
libsqlite3-sys = { version = "0.30", features = ["bundled"] }
17-
rivet-envoy-client = { workspace = true, features = ["native-transport"] }
1818
tokio.workspace = true
1919
tracing.workspace = true
2020
getrandom = "0.2"
@@ -25,7 +25,6 @@ moka = { version = "0.12", default-features = false, features = ["sync"] }
2525
parking_lot.workspace = true
2626

2727
[dev-dependencies]
28-
async-trait.workspace = true
2928
depot = { workspace = true, features = ["test-faults"] }
3029
futures-util.workspace = true
3130
gas.workspace = true

engine/packages/depot-client/src/database.rs

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
use std::sync::Arc;
22

33
use anyhow::{Result, anyhow};
4-
use rivet_envoy_client::handle::EnvoyHandle;
54
use tokio::runtime::Handle;
65

76
use crate::{
87
query::{BindParam, ExecResult, ExecuteResult, QueryResult},
8+
transport::EmbeddedDepotSqliteTransport,
99
vfs::{
10-
NativeVfsHandle, SqliteTransport, SqliteVfs, SqliteVfsMetrics, SqliteVfsMetricsSnapshot,
11-
VfsConfig, VfsPreloadHintSnapshot, fetch_initial_main_page_for_registration,
10+
NativeVfsHandle, SqliteTransportHandle, SqliteVfs, SqliteVfsMetrics,
11+
SqliteVfsMetricsSnapshot, VfsConfig, VfsPreloadHintSnapshot,
12+
fetch_initial_main_page_for_registration,
1213
},
1314
worker::SqliteWorkerHandle,
1415
};
@@ -23,16 +24,15 @@ pub fn vfs_name_for_actor_database(actor_id: &str, generation: u64) -> String {
2324
format!("envoy-sqlite-{actor_id}-g{generation}")
2425
}
2526

26-
pub async fn open_database_from_envoy(
27-
handle: EnvoyHandle,
27+
pub async fn open_database_from_transport(
28+
transport: SqliteTransportHandle,
2829
actor_id: String,
2930
generation: u64,
3031
rt_handle: Handle,
3132
metrics: Option<Arc<dyn SqliteVfsMetrics>>,
3233
) -> Result<NativeDatabaseHandle> {
3334
let vfs_name = vfs_name_for_actor_database(&actor_id, generation);
34-
let transport = SqliteTransport::from_envoy(handle);
35-
let initial_main_page = fetch_initial_main_page_for_registration(&transport, &actor_id)
35+
let initial_main_page = fetch_initial_main_page_for_registration(transport.clone(), &actor_id)
3636
.await
3737
.map_err(|e| anyhow!("failed to preload sqlite main page: {e}"))?;
3838
let vfs = Arc::new(
@@ -53,34 +53,21 @@ pub async fn open_database_from_envoy(
5353
Ok(native_db)
5454
}
5555

56-
pub async fn open_database_from_conveyer(
56+
pub async fn open_database_from_embedded_depot(
5757
db: Arc<depot::conveyer::Db>,
5858
actor_id: String,
5959
generation: u64,
6060
rt_handle: Handle,
6161
metrics: Option<Arc<dyn SqliteVfsMetrics>>,
6262
) -> Result<NativeDatabaseHandle> {
63-
let vfs_name = vfs_name_for_actor_database(&actor_id, generation);
64-
let transport = SqliteTransport::from_conveyer(db);
65-
let initial_main_page = fetch_initial_main_page_for_registration(&transport, &actor_id)
66-
.await
67-
.map_err(|e| anyhow!("failed to preload sqlite main page: {e}"))?;
68-
let vfs = Arc::new(
69-
SqliteVfs::register_with_transport_and_initial_page(
70-
&vfs_name,
71-
transport,
72-
actor_id.clone(),
73-
rt_handle,
74-
VfsConfig::default(),
75-
initial_main_page,
76-
metrics.clone(),
77-
)
78-
.map_err(|e| anyhow!("failed to register sqlite VFS: {e}"))?,
79-
);
80-
81-
let native_db = NativeDatabaseHandle::new_with_metrics(vfs, actor_id, metrics)?;
82-
native_db.initialize().await?;
83-
Ok(native_db)
63+
open_database_from_transport(
64+
Arc::new(EmbeddedDepotSqliteTransport::new(db)),
65+
actor_id,
66+
generation,
67+
rt_handle,
68+
metrics,
69+
)
70+
.await
8471
}
8572

8673
impl NativeDatabaseHandle {

engine/packages/depot-client/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub mod optimization_flags;
2323
/// SQLite query execution helpers.
2424
pub mod query;
2525

26+
/// SQLite transport adapters for same-process Depot usage.
27+
pub mod transport;
28+
2629
pub use depot_client_types as types;
2730

2831
/// Custom SQLite VFS for actor-side depot transport.
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
//! SQLite transport adapters.
2+
//!
3+
//! `EmbeddedDepotSqliteTransport` is for deployments where the SQLite VFS runs in the
4+
//! same process or server as the Depot backend. It calls `depot::conveyer::Db`
5+
//! directly instead of routing page operations through an actor Envoy transport.
6+
7+
use std::sync::Arc;
8+
9+
use anyhow::Result;
10+
use async_trait::async_trait;
11+
use rivet_envoy_protocol as protocol;
12+
13+
use crate::vfs::SqliteTransport;
14+
15+
pub struct EmbeddedDepotSqliteTransport {
16+
db: Arc<depot::conveyer::Db>,
17+
}
18+
19+
impl EmbeddedDepotSqliteTransport {
20+
pub fn new(db: Arc<depot::conveyer::Db>) -> Self {
21+
Self { db }
22+
}
23+
}
24+
25+
#[async_trait]
26+
impl SqliteTransport for EmbeddedDepotSqliteTransport {
27+
async fn get_pages(
28+
&self,
29+
request: protocol::SqliteGetPagesRequest,
30+
) -> Result<protocol::SqliteGetPagesResponse> {
31+
match self.db.get_pages(request.pgnos).await {
32+
Ok(pages) => Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(
33+
protocol::SqliteGetPagesOk {
34+
pages: pages
35+
.into_iter()
36+
.map(|page| protocol::SqliteFetchedPage {
37+
pgno: page.pgno,
38+
bytes: page.bytes,
39+
})
40+
.collect(),
41+
},
42+
)),
43+
Err(err) => Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse(
44+
protocol::SqliteErrorResponse {
45+
message: sqlite_error_reason(&err),
46+
},
47+
)),
48+
}
49+
}
50+
51+
async fn commit(
52+
&self,
53+
request: protocol::SqliteCommitRequest,
54+
) -> Result<protocol::SqliteCommitResponse> {
55+
match self
56+
.db
57+
.commit(
58+
request
59+
.dirty_pages
60+
.into_iter()
61+
.map(|page| depot::types::DirtyPage {
62+
pgno: page.pgno,
63+
bytes: page.bytes,
64+
})
65+
.collect(),
66+
request.db_size_pages,
67+
request.now_ms,
68+
)
69+
.await
70+
{
71+
Ok(()) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk),
72+
Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse(
73+
protocol::SqliteErrorResponse {
74+
message: sqlite_error_reason(&err),
75+
},
76+
)),
77+
}
78+
}
79+
}
80+
81+
fn sqlite_error_reason(err: &anyhow::Error) -> String {
82+
err.chain()
83+
.map(ToString::to_string)
84+
.collect::<Vec<_>>()
85+
.join(": ")
86+
}

0 commit comments

Comments
 (0)