Skip to content

Commit 0c0ff85

Browse files
committed
fix(pegboard-envoy): allow sqlite requests across lifecycle states
1 parent c008cb0 commit 0c0ff85

8 files changed

Lines changed: 106 additions & 42 deletions

File tree

engine/packages/pegboard-envoy/src/actor_lifecycle.rs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -305,30 +305,3 @@ async fn close_actor_on_shutdown(
305305
}
306306
}
307307
}
308-
309-
pub async fn assert_sqlite_actor_active(
310-
conn: &Conn,
311-
actor_id: &str,
312-
sqlite_generation: u64,
313-
) -> Result<ActiveActor> {
314-
// Stopping is accepted in addition to Running: the actor still owns its sqlite
315-
// generation until actor_stopped runs, and may flush a final commit while draining.
316-
let active = conn
317-
.active_actors
318-
.get_async(actor_id)
319-
.await
320-
.map(|entry| entry.get().clone())
321-
.context("sqlite actor is not active on envoy connection")?;
322-
323-
let active_sqlite_generation = active
324-
.sqlite_generation
325-
.context("sqlite actor is still starting")?;
326-
ensure!(
327-
active_sqlite_generation == sqlite_generation,
328-
"sqlite request generation {} did not match active generation {}",
329-
sqlite_generation,
330-
active_sqlite_generation
331-
);
332-
333-
Ok(active)
334-
}

engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -703,8 +703,6 @@ async fn handle_sqlite_get_pages(
703703
) -> Result<protocol::SqliteGetPagesResponse> {
704704
validate_sqlite_get_pages_request(&request)?;
705705
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
706-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
707-
.await?;
708706

709707
match conn
710708
.sqlite_engine
@@ -749,8 +747,6 @@ async fn handle_sqlite_commit(
749747
let decode_request_start = Instant::now();
750748
validate_sqlite_dirty_pages("sqlite commit", &request.dirty_pages)?;
751749
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
752-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
753-
.await?;
754750
let decode_request_duration = decode_request_start.elapsed();
755751
conn.sqlite_engine.metrics().observe_commit_phase(
756752
"fast",
@@ -817,8 +813,6 @@ async fn handle_sqlite_commit_stage(
817813
request: protocol::SqliteCommitStageRequest,
818814
) -> Result<protocol::SqliteCommitStageResponse> {
819815
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
820-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
821-
.await?;
822816

823817
match conn
824818
.sqlite_engine
@@ -856,8 +850,6 @@ async fn handle_sqlite_commit_stage_begin(
856850
request: protocol::SqliteCommitStageBeginRequest,
857851
) -> Result<protocol::SqliteCommitStageBeginResponse> {
858852
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
859-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
860-
.await?;
861853

862854
match conn
863855
.sqlite_engine
@@ -892,8 +884,6 @@ async fn handle_sqlite_commit_finalize(
892884
) -> Result<protocol::SqliteCommitFinalizeResponse> {
893885
let decode_request_start = Instant::now();
894886
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
895-
actor_lifecycle::assert_sqlite_actor_active(conn, &request.actor_id, request.generation)
896-
.await?;
897887
conn.sqlite_engine.metrics().observe_commit_phase(
898888
"slow",
899889
"decode_request",

engine/sdks/rust/envoy-client/src/actor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ async fn actor_inner(
199199
.await;
200200

201201
if let Err(error) = start_result {
202-
tracing::error!(?error, "actor start failed");
202+
let error_chain = error.chain().map(ToString::to_string).collect::<Vec<_>>();
203+
tracing::error!(?error, error_chain = ?error_chain, "actor start failed");
203204
send_event(
204205
&mut ctx,
205206
protocol::Event::EventActorStateUpdate(protocol::EventActorStateUpdate {

rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1398,7 +1398,7 @@ impl ActorTask {
13981398
let clean_exit = match outcome {
13991399
Ok(Ok(())) => true,
14001400
Ok(Err(error)) => {
1401-
tracing::error!(?error, "actor run handler failed");
1401+
log_actor_error(&error, "actor run handler failed");
14021402
false
14031403
}
14041404
Err(error) => {
@@ -1529,7 +1529,7 @@ impl ActorTask {
15291529
match (&mut run_handle).await {
15301530
Ok(Ok(())) => {}
15311531
Ok(Err(error)) => {
1532-
tracing::error!(?error, "actor run handler failed during shutdown");
1532+
log_actor_error(&error, "actor run handler failed during shutdown");
15331533
}
15341534
Err(error) => {
15351535
if !error.is_cancelled() {
@@ -2129,6 +2129,18 @@ fn clone_shutdown_result(result: &Result<()>) -> Result<()> {
21292129
}
21302130
}
21312131

2132+
fn log_actor_error(error: &anyhow::Error, log_message: &'static str) {
2133+
let structured = rivet_error::RivetError::extract(error);
2134+
tracing::error!(
2135+
?error,
2136+
group = structured.group(),
2137+
code = structured.code(),
2138+
message = %structured.message(),
2139+
metadata = ?structured.metadata(),
2140+
"{log_message}"
2141+
);
2142+
}
2143+
21322144
fn result_outcome<T>(result: &Result<T>) -> &'static str {
21332145
match result {
21342146
Ok(_) => "ok",

rivetkit-rust/packages/rivetkit-sqlite/src/vfs.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,15 @@ impl VfsContext {
13081308
{
13091309
Ok(outcome) => outcome,
13101310
Err(err) => {
1311+
tracing::error!(
1312+
actor_id = %self.actor_id,
1313+
generation = request.generation,
1314+
expected_head_txid = request.expected_head_txid,
1315+
new_db_size_pages = request.new_db_size_pages,
1316+
dirty_pages = request.dirty_pages.len(),
1317+
?err,
1318+
"sqlite flush commit failed"
1319+
);
13111320
mark_dead_for_non_fence_commit_error(self, &err);
13121321
return Err(err);
13131322
}
@@ -1387,6 +1396,15 @@ impl VfsContext {
13871396
{
13881397
Ok(outcome) => outcome,
13891398
Err(err) => {
1399+
tracing::error!(
1400+
actor_id = %self.actor_id,
1401+
generation = request.generation,
1402+
expected_head_txid = request.expected_head_txid,
1403+
new_db_size_pages = request.new_db_size_pages,
1404+
dirty_pages = request.dirty_pages.len(),
1405+
?err,
1406+
"sqlite atomic commit failed"
1407+
);
13901408
mark_dead_for_non_fence_commit_error(self, &err);
13911409
return Err(err);
13921410
}
@@ -1460,13 +1478,22 @@ fn assert_batch_atomic_probe(db: *mut sqlite3, vfs: &SqliteVfs) -> std::result::
14601478
";
14611479

14621480
if let Err(err) = sqlite_exec(db, probe_sql) {
1481+
tracing::error!(
1482+
%err,
1483+
last_error = ?vfs.clone_last_error(),
1484+
commit_atomic_before,
1485+
"sqlite batch atomic probe failed"
1486+
);
14631487
cleanup_batch_atomic_probe(db);
14641488
return Err(format!("batch atomic probe failed: {err}"));
14651489
}
14661490

14671491
let commit_atomic_after = vfs.commit_atomic_count();
14681492
if commit_atomic_after == commit_atomic_before {
14691493
tracing::error!(
1494+
commit_atomic_before,
1495+
commit_atomic_after,
1496+
last_error = ?vfs.clone_last_error(),
14701497
"batch atomic writes not active for sqlite, SQLITE_ENABLE_BATCH_ATOMIC_WRITE may be missing"
14711498
);
14721499
cleanup_batch_atomic_probe(db);
@@ -2215,6 +2242,12 @@ unsafe extern "C" fn io_sync(p_file: *mut sqlite3_file, _flags: c_int) -> c_int
22152242
match ctx.flush_dirty_pages() {
22162243
Ok(_) => SQLITE_OK,
22172244
Err(err) => {
2245+
tracing::error!(
2246+
actor_id = %ctx.actor_id,
2247+
last_error = ?ctx.clone_last_error(),
2248+
?err,
2249+
"sqlite sync failed"
2250+
);
22182251
mark_dead_from_fence_commit_error(ctx, &err);
22192252
SQLITE_IOERR_FSYNC
22202253
}
@@ -2280,6 +2313,12 @@ unsafe extern "C" fn io_file_control(
22802313
SQLITE_OK
22812314
}
22822315
Err(err) => {
2316+
tracing::error!(
2317+
actor_id = %ctx.actor_id,
2318+
last_error = ?ctx.clone_last_error(),
2319+
?err,
2320+
"sqlite atomic write file control failed"
2321+
);
22832322
mark_dead_from_fence_commit_error(ctx, &err);
22842323
SQLITE_IOERR
22852324
}
@@ -2532,6 +2571,10 @@ impl SqliteVfs {
25322571
unsafe { (*self.ctx_ptr).take_last_error() }
25332572
}
25342573

2574+
fn clone_last_error(&self) -> Option<String> {
2575+
unsafe { (*self.ctx_ptr).clone_last_error() }
2576+
}
2577+
25352578
fn register_with_transport(
25362579
name: &str,
25372580
transport: SqliteTransport,
@@ -2659,6 +2702,13 @@ pub fn open_database(
26592702
};
26602703
if rc != SQLITE_OK {
26612704
let message = sqlite_error_message(db);
2705+
tracing::error!(
2706+
file_name,
2707+
rc,
2708+
%message,
2709+
last_error = ?vfs.clone_last_error(),
2710+
"failed to open sqlite database with custom VFS"
2711+
);
26622712
if !db.is_null() {
26632713
unsafe {
26642714
sqlite3_close(db);
@@ -2676,6 +2726,13 @@ pub fn open_database(
26762726
"PRAGMA locking_mode = EXCLUSIVE;",
26772727
] {
26782728
if let Err(err) = sqlite_exec(db, pragma) {
2729+
tracing::error!(
2730+
file_name,
2731+
pragma,
2732+
%err,
2733+
last_error = ?vfs.clone_last_error(),
2734+
"failed to configure sqlite database"
2735+
);
26792736
unsafe {
26802737
sqlite3_close(db);
26812738
}
@@ -2684,6 +2741,12 @@ pub fn open_database(
26842741
}
26852742

26862743
if let Err(err) = assert_batch_atomic_probe(db, &vfs) {
2744+
tracing::error!(
2745+
file_name,
2746+
%err,
2747+
last_error = ?vfs.clone_last_error(),
2748+
"failed to verify sqlite batch atomic writes"
2749+
);
26872750
unsafe {
26882751
sqlite3_close(db);
26892752
}

rivetkit-typescript/packages/rivetkit-napi/src/actor_factory.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -976,9 +976,11 @@ fn parse_bridge_rivet_error(reason: &str) -> Option<anyhow::Error> {
976976
return None;
977977
}
978978
};
979-
tracing::debug!(
979+
tracing::warn!(
980980
group = %payload.group.as_str(),
981981
code = %payload.code.as_str(),
982+
message = %payload.message.as_str(),
983+
metadata = ?payload.metadata,
982984
has_metadata = payload.metadata.is_some(),
983985
public_ = ?payload.public_,
984986
status_code = ?payload.status_code,
@@ -1001,8 +1003,16 @@ fn parse_bridge_rivet_error(reason: &str) -> Option<anyhow::Error> {
10011003
}
10021004

10031005
pub(crate) fn callback_error(callback_name: &str, error: napi::Error) -> anyhow::Error {
1006+
let status = error.status;
10041007
let reason = error.reason;
10051008
if let Some(error) = parse_bridge_rivet_error(&reason) {
1009+
let error_chain = error.chain().map(ToString::to_string).collect::<Vec<_>>();
1010+
tracing::warn!(
1011+
callback = callback_name,
1012+
status = ?status,
1013+
error_chain = ?error_chain,
1014+
"napi callback failed with structured bridge error"
1015+
);
10061016
return error;
10071017
}
10081018
if error.status == napi::Status::Closing {

rivetkit-typescript/packages/rivetkit-napi/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub(crate) struct NapiInvalidState {
4343
}
4444

4545
pub(crate) fn napi_anyhow_error(error: anyhow::Error) -> napi::Error {
46+
let error_chain = error.chain().map(ToString::to_string).collect::<Vec<_>>();
4647
let bridge_context = error
4748
.chain()
4849
.find_map(|cause| cause.downcast_ref::<crate::actor_factory::BridgeRivetErrorContext>());
@@ -57,9 +58,12 @@ pub(crate) fn napi_anyhow_error(error: anyhow::Error) -> napi::Error {
5758
"public": public_,
5859
"statusCode": status_code,
5960
});
60-
tracing::debug!(
61+
tracing::error!(
6162
group = error.group(),
6263
code = error.code(),
64+
message = %error.message(),
65+
metadata = ?error.metadata(),
66+
error_chain = ?error_chain,
6367
has_metadata = error.metadata().is_some(),
6468
?public_,
6569
?status_code,

rivetkit-typescript/packages/rivetkit/src/registry/native.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,17 @@ function encodeNativeCallbackError(error: unknown): Error {
575575
bridge: "native_callback",
576576
});
577577

578+
logger().warn({
579+
msg: "native callback error encoded for bridge",
580+
group: structuredError.group,
581+
code: structuredError.code,
582+
message: structuredError.message,
583+
metadata: structuredError.metadata,
584+
originalError: stringifyError(error),
585+
stack: error instanceof Error ? error.stack : undefined,
586+
bridge: "native_callback",
587+
});
588+
578589
const bridgeError = new Error(encodeBridgeRivetError(structuredError), {
579590
cause: error instanceof Error ? error : undefined,
580591
});

0 commit comments

Comments
 (0)