diff --git a/engine/packages/pegboard-envoy/src/actor_lifecycle.rs b/engine/packages/pegboard-envoy/src/actor_lifecycle.rs index 963654b29b..8d73fc990c 100644 --- a/engine/packages/pegboard-envoy/src/actor_lifecycle.rs +++ b/engine/packages/pegboard-envoy/src/actor_lifecycle.rs @@ -1,85 +1,10 @@ -use std::sync::Arc; - use anyhow::{Context, Result, ensure}; -use futures_util::{StreamExt, stream}; use gas::prelude::{Id, StandaloneCtx, util::timestamp}; use rivet_envoy_protocol as protocol; -use sqlite_storage::{engine::SqliteEngine, open::OpenConfig}; +use sqlite_storage::open::OpenConfig; use crate::{conn::Conn, sqlite_runtime}; -const SHUTDOWN_CLOSE_PARALLELISM: usize = 256; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ActiveActor { - pub actor_generation: u32, - pub sqlite_generation: Option, - pub state: ActiveActorState, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ActiveActorState { - Starting, - Running, - Stopping, -} - -struct StartActorGuard<'a> { - sqlite_engine: Arc, - active_actors: &'a scc::HashMap, - actor_id: String, - sqlite_generation: Option, - armed: bool, -} - -impl<'a> StartActorGuard<'a> { - fn new( - sqlite_engine: Arc, - active_actors: &'a scc::HashMap, - actor_id: String, - ) -> Self { - Self { - sqlite_engine, - active_actors, - actor_id, - sqlite_generation: None, - armed: true, - } - } - - fn set_sqlite_generation(&mut self, generation: u64) { - self.sqlite_generation = Some(generation); - } - - fn disarm(&mut self) { - self.armed = false; - } -} - -impl<'a> Drop for StartActorGuard<'a> { - fn drop(&mut self) { - if !self.armed { - return; - } - - self.active_actors.remove_sync(&self.actor_id); - - if let Some(generation) = self.sqlite_generation { - let sqlite_engine = self.sqlite_engine.clone(); - let actor_id = std::mem::take(&mut self.actor_id); - tokio::spawn(async move { - if let Err(err) = sqlite_engine.close(&actor_id, generation).await { - tracing::debug!( - actor_id = %actor_id, - ?err, - "sqlite db was already taken over during start cancellation" - ); - } - }); - } - } -} - pub async fn start_actor( ctx: &StandaloneCtx, conn: &Conn, @@ -89,246 +14,39 @@ pub async fn start_actor( let actor_id = Id::parse(&checkpoint.actor_id).context("invalid start actor id")?; let actor_id_string = actor_id.to_string(); - match conn - .active_actors - .entry_async(actor_id_string.clone()) - .await - { - scc::hash_map::Entry::Occupied(_) => { - ensure!(false, "actor already active on envoy connection"); - } - scc::hash_map::Entry::Vacant(entry) => { - entry.insert_entry(ActiveActor { - actor_generation: checkpoint.generation, - sqlite_generation: None, - state: ActiveActorState::Starting, - }); - } - } - - let mut start_guard = StartActorGuard::new( - conn.sqlite_engine.clone(), - &conn.active_actors, - actor_id_string.clone(), - ); - - let result = async { - let sqlite_open = conn - .sqlite_engine - .open(&actor_id_string, OpenConfig::new(timestamp::now())) - .await?; - let sqlite_generation = sqlite_open.generation; - start_guard.set_sqlite_generation(sqlite_generation); - - let populate_res = async { - ensure!(start.sqlite_startup_data.is_none()); - ensure!(start.preloaded_kv.is_none()); - - let hibernating_requests = ctx - .op(pegboard::ops::actor::hibernating_request::list::Input { actor_id }) - .await?; - start.hibernating_requests = hibernating_requests - .into_iter() - .map(|x| protocol::HibernatingRequest { - gateway_id: x.gateway_id, - request_id: x.request_id, - }) - .collect(); - - let db = ctx.udb()?; - start.preloaded_kv = pegboard::actor_kv::preload::fetch_preloaded_kv( - &db, - ctx.config().pegboard(), - actor_id, - conn.namespace_id, - &start.config.name, - ) - .await?; - - start.sqlite_startup_data = - Some(sqlite_runtime::protocol_sqlite_startup_data(sqlite_open)); - - Ok(()) - } - .await; - - // Close SQLite if start command population fails. - if let Err(err) = populate_res { - if let Err(close_err) = conn - .sqlite_engine - .close(&actor_id_string, sqlite_generation) - .await - { - tracing::warn!( - actor_id = %actor_id_string, - ?close_err, - "failed to close sqlite db after start population failed" - ); - } - return Err(err); - } - - Ok(sqlite_generation) - } - .await; - - match result { - Ok(sqlite_generation) => { - let update_result = conn - .active_actors - .update_async(&actor_id_string, |_, active| { - active.actor_generation = checkpoint.generation; - active.sqlite_generation = Some(sqlite_generation); - active.state = ActiveActorState::Running; - }) - .await; - if update_result.is_none() { - if let Err(close_err) = conn - .sqlite_engine - .close(&actor_id_string, sqlite_generation) - .await - { - tracing::warn!( - actor_id = %actor_id_string, - ?close_err, - "failed to close sqlite db after active state disappeared" - ); - } - ensure!(false, "actor active state missing after start"); - } - start_guard.disarm(); - Ok(()) - } - Err(err) => { - conn.active_actors.remove_async(&actor_id_string).await; - start_guard.disarm(); - Err(err) - } - } -} - -pub async fn stop_actor(conn: &Conn, checkpoint: &protocol::ActorCheckpoint) -> Result<()> { - let actor_id = checkpoint.actor_id.clone(); - let update_result = conn - .active_actors - .update_async(&actor_id, |_, active| { - if active.actor_generation == checkpoint.generation { - active.state = ActiveActorState::Stopping; - Ok(()) - } else { - Err(active.actor_generation) - } + ensure!(start.sqlite_startup_data.is_none()); + ensure!(start.preloaded_kv.is_none()); + + let hibernating_requests = ctx + .op(pegboard::ops::actor::hibernating_request::list::Input { actor_id }) + .await?; + start.hibernating_requests = hibernating_requests + .into_iter() + .map(|x| protocol::HibernatingRequest { + gateway_id: x.gateway_id, + request_id: x.request_id, }) - .await - .context("actor is not active on envoy connection")?; - - if let Err(active_generation) = update_result { - ensure!( - false, - "stop actor generation {} did not match active generation {}", - checkpoint.generation, - active_generation - ); - } - Ok(()) -} - -pub async fn actor_stopped(conn: &Conn, checkpoint: &protocol::ActorCheckpoint) -> Result<()> { - let actor_id = checkpoint.actor_id.clone(); - let active = match conn - .active_actors - .get_async(&actor_id) - .await - .map(|entry| entry.get().clone()) - { - Some(active) => active, - None if conn.is_serverless => { - conn.sqlite_engine.force_close(&actor_id).await; - conn.serverless_sqlite_actors.remove_async(&actor_id).await; - return Ok(()); - } - None => { - ensure!(false, "actor stopped without active sqlite state"); - unreachable!(); - } - }; - ensure!( - active.actor_generation == checkpoint.generation, - "stopped actor generation {} did not match active generation {}", - checkpoint.generation, - active.actor_generation - ); - - let sqlite_generation = active - .sqlite_generation - .context("actor stopped before sqlite finished opening")?; - let close_res = conn + .collect(); + + let db = ctx.udb()?; + start.preloaded_kv = pegboard::actor_kv::preload::fetch_preloaded_kv( + &db, + ctx.config().pegboard(), + actor_id, + conn.namespace_id, + &start.config.name, + ) + .await?; + + // Open SQLite to produce startup data for the envoy. The open is + // fire-and-forget from the connection's perspective. The SqliteEngine's + // takeover path on next open and the lenient `ensure_local_open` cache + // catch-up handle ownership transitions. + let sqlite_open = conn .sqlite_engine - .close(&actor_id, sqlite_generation) - .await; - if let Err(err) = &close_res { - tracing::warn!( - %actor_id, - ?err, - "close failed in actor_stopped" - ); - } - // Generation-checked remove so a concurrent `start_actor` for a fresh - // generation between the `get_async` above and this point does not have - // its newly-inserted entry deleted by the stale stop. - conn.active_actors - .remove_if_async(&actor_id, |entry| { - entry.actor_generation == checkpoint.generation - }) - .await; - - close_res -} + .open(&actor_id_string, OpenConfig::new(timestamp::now())) + .await?; + start.sqlite_startup_data = Some(sqlite_runtime::protocol_sqlite_startup_data(sqlite_open)); -pub async fn shutdown_conn_actors(conn: &Conn) { - let mut active_actors = Vec::new(); - conn.active_actors.retain_sync(|actor_id, active| { - active_actors.push((actor_id.clone(), active.clone())); - false - }); - - stream::iter(active_actors.into_iter().map(|(actor_id, active)| { - let sqlite_engine = conn.sqlite_engine.clone(); - close_actor_on_shutdown(sqlite_engine, actor_id, active.sqlite_generation) - })) - .buffer_unordered(SHUTDOWN_CLOSE_PARALLELISM) - .for_each(|_| async {}) - .await; - - let mut serverless_sqlite_actors = Vec::new(); - conn.serverless_sqlite_actors - .retain_sync(|actor_id, _generation| { - serverless_sqlite_actors.push(actor_id.clone()); - false - }); - stream::iter(serverless_sqlite_actors.into_iter().map(|actor_id| { - let sqlite_engine = conn.sqlite_engine.clone(); - async move { - sqlite_engine.force_close(&actor_id).await; - } - })) - .buffer_unordered(SHUTDOWN_CLOSE_PARALLELISM) - .for_each(|_| async {}) - .await; -} - -async fn close_actor_on_shutdown( - sqlite_engine: Arc, - actor_id: String, - sqlite_generation: Option, -) { - if let Some(generation) = sqlite_generation { - if let Err(err) = sqlite_engine.close(&actor_id, generation).await { - tracing::warn!( - actor_id = %actor_id, - ?err, - "close failed during envoy shutdown" - ); - } - } + Ok(()) } diff --git a/engine/packages/pegboard-envoy/src/conn.rs b/engine/packages/pegboard-envoy/src/conn.rs index 3e527f3a1f..4ebf89ad6a 100644 --- a/engine/packages/pegboard-envoy/src/conn.rs +++ b/engine/packages/pegboard-envoy/src/conn.rs @@ -29,8 +29,6 @@ pub struct Conn { pub ws_handle: WebSocketHandle, pub authorized_tunnel_routes: HashMap<(protocol::GatewayId, protocol::RequestId), ()>, pub sqlite_engine: Arc, - pub active_actors: HashMap, - pub serverless_sqlite_actors: HashMap, pub is_serverless: bool, pub last_rtt: AtomicU32, /// Timestamp (epoch ms) of the last pong received from the envoy. @@ -306,33 +304,24 @@ pub async fn init_conn( ws_handle, authorized_tunnel_routes: HashMap::new(), sqlite_engine, - active_actors: HashMap::new(), - serverless_sqlite_actors: HashMap::new(), is_serverless, last_rtt: AtomicU32::new(0), last_ping_ts: AtomicI64::new(util::timestamp::now()), }); - // Send missed commands (must be after init packet). If any step fails - // after one or more `start_actor` calls already opened SQLite dbs, close - // every actor in `conn.active_actors` before returning so we do not leak - // process-wide `SqliteEngine.open_dbs` entries that would block re-opening - // these actors until the process restarts. + // Send missed commands after the init packet. if !missed_commands.is_empty() { let replay_result: Result<()> = async { for cmd_wrapper in &mut missed_commands { if let protocol::Command::CommandStartActor(ref mut start) = cmd_wrapper.inner { actor_lifecycle::start_actor(ctx, &conn, &cmd_wrapper.checkpoint, start) .await?; - } else if let protocol::Command::CommandStopActor(_) = cmd_wrapper.inner { - actor_lifecycle::stop_actor(&conn, &cmd_wrapper.checkpoint).await?; } } Ok(()) } .await; if let Err(err) = replay_result { - actor_lifecycle::shutdown_conn_actors(&conn).await; return Err(err); } @@ -344,7 +333,6 @@ pub async fn init_conn( .send(Message::Binary(msg_serialized.into())) .await { - actor_lifecycle::shutdown_conn_actors(&conn).await; return Err(err.into()); } } diff --git a/engine/packages/pegboard-envoy/src/lib.rs b/engine/packages/pegboard-envoy/src/lib.rs index c4d412ec82..73b6cdcfe3 100644 --- a/engine/packages/pegboard-envoy/src/lib.rs +++ b/engine/packages/pegboard-envoy/src/lib.rs @@ -258,8 +258,6 @@ impl CustomServeTrait for PegboardEnvoyWs { } } - actor_lifecycle::shutdown_conn_actors(&conn).await; - tracing::debug!(%topic, "envoy websocket closed"); metrics::CONNECTION_ACTIVE diff --git a/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs index cb5342d462..d64f233803 100644 --- a/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs @@ -129,8 +129,6 @@ async fn handle_message( if let protocol::Command::CommandStartActor(start) = &mut command_wrapper.inner { actor_lifecycle::start_actor(ctx, conn, &command_wrapper.checkpoint, start) .await?; - } else if let protocol::Command::CommandStopActor(_) = &command_wrapper.inner { - actor_lifecycle::stop_actor(conn, &command_wrapper.checkpoint).await?; } } diff --git a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs index bc1b3c3b7d..f07bc5fc49 100644 --- a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs @@ -24,8 +24,7 @@ use universalpubsub::PublishOpts; use vbare::OwnedVersionedData; use crate::{ - LifecycleResult, actor_event_demuxer::ActorEventDemuxer, actor_lifecycle, conn::Conn, errors, - sqlite_runtime, + LifecycleResult, actor_event_demuxer::ActorEventDemuxer, conn::Conn, errors, sqlite_runtime, }; #[tracing::instrument(name="ws_to_tunnel_task", skip_all, fields(ray_id=?ctx.ray_id(), req_id=?ctx.req_id(), envoy_key=%conn.envoy_key, protocol_version=%conn.protocol_version))] @@ -418,25 +417,6 @@ async fn handle_message( // Forward to demuxer which forwards to actor wf protocol::ToRivet::ToRivetEvents(events) => { for event in events { - if let protocol::Event::EventActorStateUpdate(state_update) = &event.inner { - if let protocol::ActorState::ActorStateStopped(_) = &state_update.state { - // Log + continue on protocol-level disagreement instead of tearing - // down the whole WS for a single bad ActorStateStopped event. - // `actor_stopped` itself force-closes the SQLite db and removes - // the active_actors entry on failure, so the conn does not retain - // half-stopped state for this actor. - if let Err(err) = - actor_lifecycle::actor_stopped(conn, &event.checkpoint).await - { - tracing::warn!( - actor_id = %event.checkpoint.actor_id, - generation = event.checkpoint.generation, - ?err, - "actor_stopped lifecycle update failed; entry already evicted" - ); - } - } - } event_demuxer.ingest(Id::parse(&event.checkpoint.actor_id)?, event); } } @@ -650,7 +630,7 @@ async fn handle_metadata( #[tracing::instrument(skip_all)] async fn handle_tunnel_message( ctx: &StandaloneCtx, - authorized_tunnel_routes: &HashMap<(protocol::GatewayId, protocol::RequestId), ()>, + _authorized_tunnel_routes: &HashMap<(protocol::GatewayId, protocol::RequestId), ()>, msg: protocol::ToRivetTunnelMessage, ) -> Result<()> { // Extract inner data length before consuming msg @@ -703,13 +683,15 @@ async fn handle_sqlite_get_pages( ) -> Result { validate_sqlite_get_pages_request(&request)?; validate_sqlite_actor(ctx, conn, &request.actor_id).await?; - ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; - match conn - .sqlite_engine - .get_pages(&request.actor_id, request.generation, request.pgnos.clone()) - .await - { + let engine_result = async { + ensure_sqlite_open(conn, &request.actor_id, request.generation).await?; + conn.sqlite_engine + .get_pages(&request.actor_id, request.generation, request.pgnos.clone()) + .await + } + .await; + match engine_result { Ok(pages) => Ok(sqlite_get_pages_ok(conn, &request.actor_id, pages).await?), Err(err) => match sqlite_storage_error(&err) { Some(SqliteStorageError::FenceMismatch { reason }) => { @@ -748,7 +730,6 @@ async fn handle_sqlite_commit( let decode_request_start = Instant::now(); validate_sqlite_dirty_pages("sqlite commit", &request.dirty_pages)?; validate_sqlite_actor(ctx, conn, &request.actor_id).await?; - ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; let decode_request_duration = decode_request_start.elapsed(); conn.sqlite_engine.metrics().observe_commit_phase( "fast", @@ -758,23 +739,26 @@ async fn handle_sqlite_commit( crate::metrics::SQLITE_COMMIT_ENVOY_DISPATCH_DURATION .observe(decode_request_duration.as_secs_f64()); - let engine_result = conn - .sqlite_engine - .commit( - &request.actor_id, - sqlite_storage::commit::CommitRequest { - generation: request.generation, - head_txid: request.expected_head_txid, - db_size_pages: request.new_db_size_pages, - dirty_pages: request - .dirty_pages - .into_iter() - .map(storage_dirty_page) - .collect(), - now_ms: util::timestamp::now(), - }, - ) - .await; + let engine_result = async { + ensure_sqlite_open(conn, &request.actor_id, request.generation).await?; + conn.sqlite_engine + .commit( + &request.actor_id, + sqlite_storage::commit::CommitRequest { + generation: request.generation, + head_txid: request.expected_head_txid, + db_size_pages: request.new_db_size_pages, + dirty_pages: request + .dirty_pages + .into_iter() + .map(storage_dirty_page) + .collect(), + now_ms: util::timestamp::now(), + }, + ) + .await + } + .await; let response_build_start = Instant::now(); let response = match engine_result { Ok(result) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk( @@ -815,22 +799,24 @@ async fn handle_sqlite_commit_stage( request: protocol::SqliteCommitStageRequest, ) -> Result { validate_sqlite_actor(ctx, conn, &request.actor_id).await?; - ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; - - match conn - .sqlite_engine - .commit_stage( - &request.actor_id, - sqlite_storage::commit::CommitStageRequest { - generation: request.generation, - txid: request.txid, - chunk_idx: request.chunk_idx, - bytes: request.bytes, - is_last: request.is_last, - }, - ) - .await - { + + let engine_result = async { + ensure_sqlite_open(conn, &request.actor_id, request.generation).await?; + conn.sqlite_engine + .commit_stage( + &request.actor_id, + sqlite_storage::commit::CommitStageRequest { + generation: request.generation, + txid: request.txid, + chunk_idx: request.chunk_idx, + bytes: request.bytes, + is_last: request.is_last, + }, + ) + .await + } + .await; + match engine_result { Ok(result) => Ok(protocol::SqliteCommitStageResponse::SqliteCommitStageOk( protocol::SqliteCommitStageOk { chunk_idx_committed: result.chunk_idx_committed, @@ -853,18 +839,20 @@ async fn handle_sqlite_commit_stage_begin( request: protocol::SqliteCommitStageBeginRequest, ) -> Result { validate_sqlite_actor(ctx, conn, &request.actor_id).await?; - ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; - - match conn - .sqlite_engine - .commit_stage_begin( - &request.actor_id, - sqlite_storage::commit::CommitStageBeginRequest { - generation: request.generation, - }, - ) - .await - { + + let engine_result = async { + ensure_sqlite_open(conn, &request.actor_id, request.generation).await?; + conn.sqlite_engine + .commit_stage_begin( + &request.actor_id, + sqlite_storage::commit::CommitStageBeginRequest { + generation: request.generation, + }, + ) + .await + } + .await; + match engine_result { Ok(result) => Ok( protocol::SqliteCommitStageBeginResponse::SqliteCommitStageBeginOk( protocol::SqliteCommitStageBeginOk { txid: result.txid }, @@ -888,27 +876,29 @@ async fn handle_sqlite_commit_finalize( ) -> Result { let decode_request_start = Instant::now(); validate_sqlite_actor(ctx, conn, &request.actor_id).await?; - ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?; conn.sqlite_engine.metrics().observe_commit_phase( "slow", "decode_request", decode_request_start.elapsed(), ); - let engine_result = conn - .sqlite_engine - .commit_finalize( - &request.actor_id, - sqlite_storage::commit::CommitFinalizeRequest { - generation: request.generation, - expected_head_txid: request.expected_head_txid, - txid: request.txid, - new_db_size_pages: request.new_db_size_pages, - now_ms: util::timestamp::now(), - origin_override: None, - }, - ) - .await; + let engine_result = async { + ensure_sqlite_open(conn, &request.actor_id, request.generation).await?; + conn.sqlite_engine + .commit_finalize( + &request.actor_id, + sqlite_storage::commit::CommitFinalizeRequest { + generation: request.generation, + expected_head_txid: request.expected_head_txid, + txid: request.txid, + new_db_size_pages: request.new_db_size_pages, + now_ms: util::timestamp::now(), + origin_override: None, + }, + ) + .await + } + .await; let response_build_start = Instant::now(); let response = match engine_result { Ok(result) => Ok( @@ -957,20 +947,10 @@ async fn validate_sqlite_actor(ctx: &StandaloneCtx, conn: &Conn, actor_id: &str) Ok(()) } -async fn ensure_serverless_sqlite_open(conn: &Conn, actor_id: &str, generation: u64) -> Result<()> { - if !conn.is_serverless { - return Ok(()); - } - +async fn ensure_sqlite_open(conn: &Conn, actor_id: &str, generation: u64) -> Result<()> { conn.sqlite_engine .ensure_local_open(actor_id, generation) - .await?; - - conn.serverless_sqlite_actors - .upsert_async(actor_id.to_string(), generation) - .await; - - Ok(()) + .await } async fn sqlite_fence_mismatch( diff --git a/engine/packages/pegboard-outbound/src/lib.rs b/engine/packages/pegboard-outbound/src/lib.rs index a31afe499e..87514d4518 100644 --- a/engine/packages/pegboard-outbound/src/lib.rs +++ b/engine/packages/pegboard-outbound/src/lib.rs @@ -267,83 +267,63 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> let sqlite_open = sqlite_engine .open(&actor_id.to_string(), OpenConfig::new(timestamp::now())) .await?; - let sqlite_generation = sqlite_open.generation; - - // Run the request body inside a closure so every error path closes the - // SQLite db. Without this the `?` operators on `serialize`, `signal`, and - // the `serverless_outbound_req` call would leak the open db on the - // process-wide `SqliteEngine`, blocking re-open until the process - // restarts. - let actor_id_str = actor_id.to_string(); - let res = async { - let sqlite_startup_data = protocol_sqlite_startup_data(sqlite_open); - let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![ - protocol::CommandWrapper { - checkpoint, - inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { - config: actor_config, - hibernating_requests: hibernating_requests - .into_iter() - .map(|x| protocol::HibernatingRequest { - gateway_id: x.gateway_id, - request_id: x.request_id, - }) - .collect(), - preloaded_kv, - sqlite_startup_data: Some(sqlite_startup_data), - }), - }, - ])) - .serialize_with_embedded_version(protocol_version)?; - - // Send ack to actor wf before starting an outbound req - ctx.signal(pegboard::workflows::actor2::Allocated { generation }) - .to_workflow::() - .tag("actor_id", &actor_id) - .send() - .await?; - - metrics::REQ_ACTIVE - .with_label_values(&[&namespace_id.to_string(), &pool_name]) - .inc(); - - let token = if let Some(auth) = &ctx.config().auth { - Some(auth.admin_token.read().as_str()) - } else { - None - }; - - let res = serverless_outbound_req( - ctx, - namespace_id, - &pool_name, - &namespace.name, - actor_id, - generation, - payload, - &url, - headers, - request_lifespan, - drain_grace_period, - token, - ) - .await; - metrics::REQ_ACTIVE - .with_label_values(&[&namespace_id.to_string(), &pool_name]) - .dec(); + let sqlite_startup_data = protocol_sqlite_startup_data(sqlite_open); + let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![ + protocol::CommandWrapper { + checkpoint, + inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { + config: actor_config, + hibernating_requests: hibernating_requests + .into_iter() + .map(|x| protocol::HibernatingRequest { + gateway_id: x.gateway_id, + request_id: x.request_id, + }) + .collect(), + preloaded_kv, + sqlite_startup_data: Some(sqlite_startup_data), + }), + }, + ])) + .serialize_with_embedded_version(protocol_version)?; + + // Send ack to actor wf before starting an outbound req + ctx.signal(pegboard::workflows::actor2::Allocated { generation }) + .to_workflow::() + .tag("actor_id", &actor_id) + .send() + .await?; - res - } + metrics::REQ_ACTIVE + .with_label_values(&[&namespace_id.to_string(), &pool_name]) + .inc(); + + let token = if let Some(auth) = &ctx.config().auth { + Some(auth.admin_token.read().as_str()) + } else { + None + }; + + let res = serverless_outbound_req( + ctx, + namespace_id, + &pool_name, + &namespace.name, + actor_id, + generation, + payload, + &url, + headers, + request_lifespan, + drain_grace_period, + token, + ) .await; - if let Err(err) = sqlite_engine.close(&actor_id_str, sqlite_generation).await { - tracing::warn!( - ?err, - ?actor_id, - "close failed for outbound sqlite db" - ); - } + metrics::REQ_ACTIVE + .with_label_values(&[&namespace_id.to_string(), &pool_name]) + .dec(); res } diff --git a/engine/packages/sqlite-storage/src/open.rs b/engine/packages/sqlite-storage/src/open.rs index 4ea8a8b9f4..8cb33435ba 100644 --- a/engine/packages/sqlite-storage/src/open.rs +++ b/engine/packages/sqlite-storage/src/open.rs @@ -405,18 +405,27 @@ impl SqliteEngine { }, ); + // FDB head.generation is the durable fence and was already verified + // above. The in-memory open_dbs entry is a per-process cache, not the + // source of truth. Reject only when the cache shows a strictly-newer + // owner, otherwise advance the cache to match FDB so later close and + // ensure_open calls see the right generation. match self.open_dbs.entry_async(actor_id.to_string()).await { - scc::hash_map::Entry::Occupied(entry) => { + scc::hash_map::Entry::Occupied(mut entry) => { + let cached = entry.get().generation; ensure!( - entry.get().generation == generation, + cached <= generation, SqliteStorageError::FenceMismatch { reason: format!( - "ensure_local_open generation {} did not match open generation {}", - generation, - entry.get().generation + "ensure_local_open generation {} is stale; \ + open generation {} is newer (this conn was taken over)", + generation, cached ), }, ); + if cached < generation { + entry.get_mut().generation = generation; + } } scc::hash_map::Entry::Vacant(entry) => { entry.insert_entry(OpenDb { generation });