Skip to content

Commit 2a618cd

Browse files
committed
fix(pegboard-envoy): allow sqlite requests across lifecycle states
1 parent c008cb0 commit 2a618cd

11 files changed

Lines changed: 141 additions & 90 deletions

File tree

engine/packages/pegboard-envoy/src/actor_lifecycle.rs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -305,30 +305,3 @@ async fn close_actor_on_shutdown(
305305
}
306306
}
307307
}
308-
309-
pub async fn assert_sqlite_actor_active(
310-
conn: &Conn,
311-
actor_id: &str,
312-
sqlite_generation: u64,
313-
) -> Result<ActiveActor> {
314-
// Stopping is accepted in addition to Running: the actor still owns its sqlite
315-
// generation until actor_stopped runs, and may flush a final commit while draining.
316-
let active = conn
317-
.active_actors
318-
.get_async(actor_id)
319-
.await
320-
.map(|entry| entry.get().clone())
321-
.context("sqlite actor is not active on envoy connection")?;
322-
323-
let active_sqlite_generation = active
324-
.sqlite_generation
325-
.context("sqlite actor is still starting")?;
326-
ensure!(
327-
active_sqlite_generation == sqlite_generation,
328-
"sqlite request generation {} did not match active generation {}",
329-
sqlite_generation,
330-
active_sqlite_generation
331-
);
332-
333-
Ok(active)
334-
}

engine/packages/pegboard-envoy/src/sqlite_runtime.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,10 @@ use std::sync::Arc;
33
use anyhow::Result;
44
use gas::prelude::StandaloneCtx;
55
use rivet_envoy_protocol as protocol;
6-
use sqlite_storage::{compaction::CompactionCoordinator, engine::SqliteEngine, open::OpenResult};
7-
use tokio::sync::OnceCell;
8-
use universaldb::Subspace;
9-
10-
static SQLITE_ENGINE: OnceCell<Arc<SqliteEngine>> = OnceCell::const_new();
6+
use sqlite_storage::{engine::SqliteEngine, open::OpenResult};
117

128
pub async fn shared_engine(ctx: &StandaloneCtx) -> Result<Arc<SqliteEngine>> {
13-
let db = (*ctx.udb()?).clone();
14-
let subspace = sqlite_subspace();
15-
16-
SQLITE_ENGINE
17-
.get_or_try_init(|| async move {
18-
tracing::info!("initializing shared sqlite dispatch runtime");
19-
20-
let (engine, compaction_rx) = SqliteEngine::new(db, subspace.clone());
21-
let engine = Arc::new(engine);
22-
tokio::spawn(CompactionCoordinator::run(
23-
compaction_rx,
24-
Arc::clone(&engine),
25-
));
26-
27-
Ok(engine)
28-
})
29-
.await
30-
.cloned()
31-
}
32-
33-
fn sqlite_subspace() -> Subspace {
34-
pegboard::keys::subspace().subspace(&("sqlite-storage",))
9+
pegboard::actor_sqlite::shared_engine(ctx).await
3510
}
3611

3712
pub fn protocol_sqlite_startup_data(startup: OpenResult) -> protocol::SqliteStartupData {

engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -703,8 +703,6 @@ async fn handle_sqlite_get_pages(
703703
) -> Result<protocol::SqliteGetPagesResponse> {
704704
validate_sqlite_get_pages_request(&request)?;
705705
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
706-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
707-
.await?;
708706

709707
match conn
710708
.sqlite_engine
@@ -749,8 +747,6 @@ async fn handle_sqlite_commit(
749747
let decode_request_start = Instant::now();
750748
validate_sqlite_dirty_pages("sqlite commit", &request.dirty_pages)?;
751749
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
752-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
753-
.await?;
754750
let decode_request_duration = decode_request_start.elapsed();
755751
conn.sqlite_engine.metrics().observe_commit_phase(
756752
"fast",
@@ -817,8 +813,6 @@ async fn handle_sqlite_commit_stage(
817813
request: protocol::SqliteCommitStageRequest,
818814
) -> Result<protocol::SqliteCommitStageResponse> {
819815
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
820-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
821-
.await?;
822816

823817
match conn
824818
.sqlite_engine
@@ -856,8 +850,6 @@ async fn handle_sqlite_commit_stage_begin(
856850
request: protocol::SqliteCommitStageBeginRequest,
857851
) -> Result<protocol::SqliteCommitStageBeginResponse> {
858852
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
859-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
860-
.await?;
861853

862854
match conn
863855
.sqlite_engine
@@ -892,8 +884,6 @@ async fn handle_sqlite_commit_finalize(
892884
) -> Result<protocol::SqliteCommitFinalizeResponse> {
893885
let decode_request_start = Instant::now();
894886
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
895-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
896-
.await?;
897887
conn.sqlite_engine.metrics().observe_commit_phase(
898888
"slow",
899889
"decode_request",

engine/packages/pegboard-outbound/src/lib.rs

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,14 @@ use rivet_runtime::TermSignal;
1010
use rivet_types::actor::RunnerPoolError;
1111
use rivet_types::runner_configs::RunnerConfigKind;
1212
use sqlite_storage::{
13-
compaction::CompactionCoordinator,
1413
engine::SqliteEngine,
1514
open::{OpenConfig, OpenResult},
1615
types::{FetchedPage, SqliteMeta},
1716
};
1817
use std::collections::HashMap;
1918
use std::sync::Arc;
2019
use std::time::{Duration, Instant};
21-
use tokio::{sync::OnceCell, task::JoinHandle};
20+
use tokio::task::JoinHandle;
2221
use universalpubsub::NextOutput;
2322
use vbare::OwnedVersionedData;
2423

@@ -29,25 +28,9 @@ const X_RIVET_POOL_NAME: HeaderName = HeaderName::from_static("x-rivet-pool-name
2928
const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token");
3029
const X_RIVET_NAMESPACE_NAME: HeaderName = HeaderName::from_static("x-rivet-namespace-name");
3130
const SHUTDOWN_PROGRESS_INTERVAL: Duration = Duration::from_secs(7);
32-
static SQLITE_ENGINE: OnceCell<Arc<SqliteEngine>> = OnceCell::const_new();
3331

3432
async fn shared_sqlite_engine(ctx: &StandaloneCtx) -> Result<Arc<SqliteEngine>> {
35-
let db = (*ctx.udb()?).clone();
36-
let subspace = pegboard::keys::subspace().subspace(&("sqlite-storage",));
37-
38-
SQLITE_ENGINE
39-
.get_or_try_init(|| async move {
40-
let (engine, compaction_rx) = SqliteEngine::new(db, subspace.clone());
41-
let engine = Arc::new(engine);
42-
tokio::spawn(CompactionCoordinator::run(
43-
compaction_rx,
44-
Arc::clone(&engine),
45-
));
46-
47-
Ok(engine)
48-
})
49-
.await
50-
.cloned()
33+
pegboard::actor_sqlite::shared_engine(ctx).await
5134
}
5235

5336
fn protocol_sqlite_startup_data(startup: OpenResult) -> protocol::SqliteStartupData {

engine/packages/pegboard/src/actor_sqlite.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
1-
use std::time::Instant;
1+
use std::{sync::Arc, time::Instant};
22

33
use anyhow::{Context, Result, ensure};
4-
use gas::prelude::{Id, util::timestamp};
4+
use gas::prelude::{Id, StandaloneCtx, util::timestamp};
55
use rivet_envoy_protocol as protocol;
66
use sqlite_storage::{
7+
compaction::CompactionCoordinator,
78
commit::{CommitFinalizeRequest, CommitStageBeginRequest, CommitStageRequest},
89
engine::SqliteEngine,
910
ltx::{LtxHeader, encode_ltx_v3},
1011
open::OpenConfig,
1112
types::{DirtyPage, SQLITE_PAGE_SIZE, SqliteOrigin},
1213
};
14+
use tokio::sync::OnceCell;
1315
use universaldb::Subspace;
1416

1517
use crate::{actor_kv::Recipient, metrics};
@@ -28,6 +30,7 @@ const FILE_TAG_JOURNAL: u8 = 0x01;
2830
const FILE_TAG_WAL: u8 = 0x02;
2931
const FILE_TAG_SHM: u8 = 0x03;
3032
const SQLITE_MAGIC: &[u8; 16] = b"SQLite format 3\0";
33+
static SQLITE_ENGINE: OnceCell<Arc<SqliteEngine>> = OnceCell::const_new();
3134

3235
pub fn sqlite_subspace() -> Subspace {
3336
crate::keys::subspace().subspace(&("sqlite-storage",))
@@ -39,6 +42,26 @@ pub fn new_engine(
3942
SqliteEngine::new(db, sqlite_subspace())
4043
}
4144

45+
pub async fn shared_engine(ctx: &StandaloneCtx) -> Result<Arc<SqliteEngine>> {
46+
let db = (*ctx.udb()?).clone();
47+
48+
SQLITE_ENGINE
49+
.get_or_try_init(|| async move {
50+
tracing::info!("initializing shared sqlite dispatch runtime");
51+
52+
let (engine, compaction_rx) = new_engine(db);
53+
let engine = Arc::new(engine);
54+
tokio::spawn(CompactionCoordinator::run(
55+
compaction_rx,
56+
Arc::clone(&engine),
57+
));
58+
59+
Ok(engine)
60+
})
61+
.await
62+
.cloned()
63+
}
64+
4265
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Hash)]
4366
pub struct MigrateV1ToV2Input {
4467
pub actor_id: Id,

engine/sdks/rust/envoy-client/src/actor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ async fn actor_inner(
199199
.await;
200200

201201
if let Err(error) = start_result {
202-
tracing::error!(?error, "actor start failed");
202+
let error_chain = error.chain().map(ToString::to_string).collect::<Vec<_>>();
203+
tracing::error!(?error, error_chain = ?error_chain, "actor start failed");
203204
send_event(
204205
&mut ctx,
205206
protocol::Event::EventActorStateUpdate(protocol::EventActorStateUpdate {

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,7 +1398,7 @@ impl ActorTask {
13981398
let clean_exit = match outcome {
13991399
Ok(Ok(())) => true,
14001400
Ok(Err(error)) => {
1401-
tracing::error!(?error, "actor run handler failed");
1401+
log_actor_error(&error, "actor run handler failed");
14021402
false
14031403
}
14041404
Err(error) => {
@@ -1529,7 +1529,7 @@ impl ActorTask {
15291529
match (&mut run_handle).await {
15301530
Ok(Ok(())) => {}
15311531
Ok(Err(error)) => {
1532-
tracing::error!(?error, "actor run handler failed during shutdown");
1532+
log_actor_error(&error, "actor run handler failed during shutdown");
15331533
}
15341534
Err(error) => {
15351535
if !error.is_cancelled() {
@@ -2129,6 +2129,18 @@ fn clone_shutdown_result(result: &Result<()>) -> Result<()> {
21292129
}
21302130
}
21312131

2132+
fn log_actor_error(error: &anyhow::Error, log_message: &'static str) {
2133+
let structured = rivet_error::RivetError::extract(error);
2134+
tracing::error!(
2135+
?error,
2136+
group = structured.group(),
2137+
code = structured.code(),
2138+
message = %structured.message(),
2139+
metadata = ?structured.metadata(),
2140+
"{log_message}"
2141+
);
2142+
}
2143+
21322144
fn result_outcome<T>(result: &Result<T>) -> &'static str {
21332145
match result {
21342146
Ok(_) => "ok",

rivetkit-rust/packages/rivetkit-sqlite/src/vfs.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,15 @@ impl VfsContext {
13081308
{
13091309
Ok(outcome) => outcome,
13101310
Err(err) => {
1311+
tracing::error!(
1312+
actor_id = %self.actor_id,
1313+
generation = request.generation,
1314+
expected_head_txid = request.expected_head_txid,
1315+
new_db_size_pages = request.new_db_size_pages,
1316+
dirty_pages = request.dirty_pages.len(),
1317+
?err,
1318+
"sqlite flush commit failed"
1319+
);
13111320
mark_dead_for_non_fence_commit_error(self, &err);
13121321
return Err(err);
13131322
}
@@ -1387,6 +1396,15 @@ impl VfsContext {
13871396
{
13881397
Ok(outcome) => outcome,
13891398
Err(err) => {
1399+
tracing::error!(
1400+
actor_id = %self.actor_id,
1401+
generation = request.generation,
1402+
expected_head_txid = request.expected_head_txid,
1403+
new_db_size_pages = request.new_db_size_pages,
1404+
dirty_pages = request.dirty_pages.len(),
1405+
?err,
1406+
"sqlite atomic commit failed"
1407+
);
13901408
mark_dead_for_non_fence_commit_error(self, &err);
13911409
return Err(err);
13921410
}
@@ -1460,13 +1478,28 @@ fn assert_batch_atomic_probe(db: *mut sqlite3, vfs: &SqliteVfs) -> std::result::
14601478
";
14611479

14621480
if let Err(err) = sqlite_exec(db, probe_sql) {
1481+
let last_error = vfs.clone_last_error();
1482+
tracing::error!(
1483+
%err,
1484+
last_error = ?last_error,
1485+
commit_atomic_before,
1486+
"sqlite batch atomic probe failed"
1487+
);
14631488
cleanup_batch_atomic_probe(db);
1489+
if let Some(last_error) = last_error {
1490+
return Err(format!(
1491+
"batch atomic probe failed: {err}; vfs last_error: {last_error}"
1492+
));
1493+
}
14641494
return Err(format!("batch atomic probe failed: {err}"));
14651495
}
14661496

14671497
let commit_atomic_after = vfs.commit_atomic_count();
14681498
if commit_atomic_after == commit_atomic_before {
14691499
tracing::error!(
1500+
commit_atomic_before,
1501+
commit_atomic_after,
1502+
last_error = ?vfs.clone_last_error(),
14701503
"batch atomic writes not active for sqlite, SQLITE_ENABLE_BATCH_ATOMIC_WRITE may be missing"
14711504
);
14721505
cleanup_batch_atomic_probe(db);
@@ -2215,6 +2248,12 @@ unsafe extern "C" fn io_sync(p_file: *mut sqlite3_file, _flags: c_int) -> c_int
22152248
match ctx.flush_dirty_pages() {
22162249
Ok(_) => SQLITE_OK,
22172250
Err(err) => {
2251+
tracing::error!(
2252+
actor_id = %ctx.actor_id,
2253+
last_error = ?ctx.clone_last_error(),
2254+
?err,
2255+
"sqlite sync failed"
2256+
);
22182257
mark_dead_from_fence_commit_error(ctx, &err);
22192258
SQLITE_IOERR_FSYNC
22202259
}
@@ -2280,6 +2319,12 @@ unsafe extern "C" fn io_file_control(
22802319
SQLITE_OK
22812320
}
22822321
Err(err) => {
2322+
tracing::error!(
2323+
actor_id = %ctx.actor_id,
2324+
last_error = ?ctx.clone_last_error(),
2325+
?err,
2326+
"sqlite atomic write file control failed"
2327+
);
22832328
mark_dead_from_fence_commit_error(ctx, &err);
22842329
SQLITE_IOERR
22852330
}
@@ -2532,6 +2577,10 @@ impl SqliteVfs {
25322577
unsafe { (*self.ctx_ptr).take_last_error() }
25332578
}
25342579

2580+
fn clone_last_error(&self) -> Option<String> {
2581+
unsafe { (*self.ctx_ptr).clone_last_error() }
2582+
}
2583+
25352584
fn register_with_transport(
25362585
name: &str,
25372586
transport: SqliteTransport,
@@ -2659,6 +2708,13 @@ pub fn open_database(
26592708
};
26602709
if rc != SQLITE_OK {
26612710
let message = sqlite_error_message(db);
2711+
tracing::error!(
2712+
file_name,
2713+
rc,
2714+
%message,
2715+
last_error = ?vfs.clone_last_error(),
2716+
"failed to open sqlite database with custom VFS"
2717+
);
26622718
if !db.is_null() {
26632719
unsafe {
26642720
sqlite3_close(db);
@@ -2676,6 +2732,13 @@ pub fn open_database(
26762732
"PRAGMA locking_mode = EXCLUSIVE;",
26772733
] {
26782734
if let Err(err) = sqlite_exec(db, pragma) {
2735+
tracing::error!(
2736+
file_name,
2737+
pragma,
2738+
%err,
2739+
last_error = ?vfs.clone_last_error(),
2740+
"failed to configure sqlite database"
2741+
);
26792742
unsafe {
26802743
sqlite3_close(db);
26812744
}
@@ -2684,6 +2747,12 @@ pub fn open_database(
26842747
}
26852748

26862749
if let Err(err) = assert_batch_atomic_probe(db, &vfs) {
2750+
tracing::error!(
2751+
file_name,
2752+
%err,
2753+
last_error = ?vfs.clone_last_error(),
2754+
"failed to verify sqlite batch atomic writes"
2755+
);
26872756
unsafe {
26882757
sqlite3_close(db);
26892758
}

0 commit comments

Comments
 (0)