Skip to content

Commit 5855974

Browse files
zero capacity queue
1 parent 55b9a63 commit 5855974

2 files changed

Lines changed: 92 additions & 57 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,7 +1239,7 @@ impl ModuleHost {
12391239
.await
12401240
}
12411241

1242-
async fn with_js_procedure_instance<R>(
1242+
async fn with_js_pooled_instance<R>(
12431243
&self,
12441244
label: &str,
12451245
f: impl AsyncFnOnce(&JsInstance) -> R,
@@ -1248,12 +1248,12 @@ impl ModuleHost {
12481248
let timer_guard = self.start_call_timer(label);
12491249

12501250
scopeguard::defer_on_unwind!({
1251-
log::warn!("procedure {label} panicked");
1251+
log::warn!("pooled JS instance operation {label} panicked");
12521252
(self.on_panic)();
12531253
});
12541254

12551255
Ok(match &*self.inner {
1256-
ModuleHostInner::Wasm(_) => unreachable!("WASM procedures should not use the JS procedure instance path"),
1256+
ModuleHostInner::Wasm(_) => unreachable!("WASM should not use the pooled JS instance path"),
12571257
ModuleHostInner::Js(V8ModuleHost {
12581258
procedure_instances, ..
12591259
}) => {
@@ -1268,6 +1268,24 @@ impl ModuleHost {
12681268
})
12691269
}
12701270

1271+
async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result<ViewCommandResult, NoSuchModule> {
1272+
match &*self.inner {
1273+
ModuleHostInner::Wasm(_) => {
1274+
self.call(
1275+
label,
1276+
cmd,
1277+
async |cmd, inst| inst.call_view(cmd),
1278+
async |_cmd, _inst| unreachable!("JS view commands use the pooled JS instance path"),
1279+
)
1280+
.await
1281+
}
1282+
ModuleHostInner::Js(_) => {
1283+
self.with_js_pooled_instance(label, async move |inst| inst.call_view(cmd).await)
1284+
.await
1285+
}
1286+
}
1287+
}
1288+
12711289
pub async fn disconnect_client(&self, client_id: ClientActorId) {
12721290
log::trace!("disconnecting client {client_id}");
12731291
if let Err(e) = self
@@ -1610,12 +1628,7 @@ impl ModuleHost {
16101628
};
16111629

16121630
let res = self
1613-
.call(
1614-
"call_view_add_single_subscription",
1615-
cmd,
1616-
async |cmd, inst| inst.call_view(cmd),
1617-
async |cmd, inst| inst.call_view(cmd).await,
1618-
)
1631+
.call_view_command("call_view_add_single_subscription", cmd)
16191632
.await
16201633
//TODO: handle error better
16211634
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1643,12 +1656,7 @@ impl ModuleHost {
16431656
};
16441657

16451658
let res = self
1646-
.call(
1647-
"call_view_add_multi_subscription",
1648-
cmd,
1649-
async |cmd, inst| inst.call_view(cmd),
1650-
async |cmd, inst| inst.call_view(cmd).await,
1651-
)
1659+
.call_view_command("call_view_add_multi_subscription", cmd)
16521660
.await
16531661
//TODO: handle error better
16541662
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1676,12 +1684,7 @@ impl ModuleHost {
16761684
};
16771685

16781686
let res = self
1679-
.call(
1680-
"call_view_remove_v2_subscription",
1681-
cmd,
1682-
async |cmd, inst| inst.call_view(cmd),
1683-
async |cmd, inst| inst.call_view(cmd).await,
1684-
)
1687+
.call_view_command("call_view_remove_v2_subscription", cmd)
16851688
.await
16861689
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
16871690

@@ -1708,12 +1711,7 @@ impl ModuleHost {
17081711
};
17091712

17101713
let res = self
1711-
.call(
1712-
"call_view_add_multi_subscription",
1713-
cmd,
1714-
async |cmd, inst| inst.call_view(cmd),
1715-
async |cmd, inst| inst.call_view(cmd).await,
1716-
)
1714+
.call_view_command("call_view_add_multi_subscription", cmd)
17171715
.await
17181716
//TODO: handle error better
17191717
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1741,12 +1739,7 @@ impl ModuleHost {
17411739
};
17421740

17431741
let res = self
1744-
.call(
1745-
"call_view_add_legacy_subscription",
1746-
cmd,
1747-
async |cmd, inst| inst.call_view(cmd),
1748-
async |cmd, inst| inst.call_view(cmd).await,
1749-
)
1742+
.call_view_command("call_view_add_legacy_subscription", cmd)
17501743
.await
17511744
//TODO: handle error better
17521745
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1775,12 +1768,7 @@ impl ModuleHost {
17751768
};
17761769

17771770
let res = self
1778-
.call(
1779-
"call_view_sql",
1780-
cmd,
1781-
async |cmd, inst| inst.call_view(cmd),
1782-
async |cmd, inst| inst.call_view(cmd).await,
1783-
)
1771+
.call_view_command("call_view_sql", cmd)
17841772
.await
17851773
//TODO: handle error better
17861774
.map_err(|e| DBError::Other(anyhow::anyhow!(e)))?;
@@ -1895,7 +1883,7 @@ impl ModuleHost {
18951883
.await
18961884
}
18971885
ModuleHostInner::Js(_) => {
1898-
self.with_js_procedure_instance(name, async move |inst| inst.call_procedure(params).await)
1886+
self.with_js_pooled_instance(name, async move |inst| inst.call_procedure(params).await)
18991887
.await
19001888
}
19011889
}
@@ -1925,7 +1913,7 @@ impl ModuleHost {
19251913
}
19261914
};
19271915
if use_procedure_lane {
1928-
self.with_js_procedure_instance("unknown scheduled function", async move |inst| {
1916+
self.with_js_pooled_instance("unknown scheduled function", async move |inst| {
19291917
inst.call_scheduled_function(params).await
19301918
})
19311919
.await

crates/core/src/host/v8/mod.rs

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,13 @@ impl JsReducerLane {
590590
*self.state.active.write() = next;
591591
}
592592

593+
/// Run a reducer-lane operation that may be retried if the active worker
594+
/// disconnects before sending any reply.
595+
///
596+
/// This is used for reducer-style requests whose arguments are cloneable and
597+
/// can therefore be resubmitted to a fresh worker after trap recovery. It is
598+
/// not used to hide a real trapped result: the trapping request still replies
599+
/// first, and only later queued work is retried on the replacement worker.
593600
async fn run_replayable<A, R, F, Fut>(&self, arg: A, label: &'static str, work: F) -> R
594601
where
595602
A: Clone,
@@ -615,6 +622,13 @@ impl JsReducerLane {
615622
}
616623
}
617624

625+
/// Run a reducer-lane operation that must not be retried automatically after
626+
/// worker loss.
627+
///
628+
/// This is currently used for `run_on_thread`, where the reducer lane only
629+
/// sees an opaque closure rather than a structured, cloneable request
630+
/// payload. We still replace the worker for future requests, but this
631+
/// request gets the disconnect error.
618632
async fn run_nonreplayable<R>(
619633
&self,
620634
label: &'static str,
@@ -635,6 +649,11 @@ impl JsReducerLane {
635649
}
636650
}
637651

652+
/// Run an arbitrary closure on the reducer worker thread without replay.
653+
///
654+
/// This is non-replayable because the closure is opaque host code, not a
655+
/// cloneable request payload, and it may have already produced host-side
656+
/// effects before a worker disconnect is observed.
638657
pub async fn run_on_thread<F, R>(&self, f: F) -> anyhow::Result<R>
639658
where
640659
F: AsyncFnOnce() -> R + Send + 'static,
@@ -666,6 +685,11 @@ impl JsReducerLane {
666685
.map_err(|_| anyhow::anyhow!("reducer worker exited while running a non-replayable module-thread task"))
667686
}
668687

688+
/// Run a database update on the reducer lane with replay-on-worker-loss.
689+
///
690+
/// This is replayable because its arguments are cloneable, and if a poisoned
691+
/// worker exits before replying we can resubmit the same update request to
692+
/// the replacement worker rather than dropping later queued work.
669693
pub async fn update_database(
670694
&self,
671695
program: Program,
@@ -688,6 +712,12 @@ impl JsReducerLane {
688712
.await
689713
}
690714

715+
/// Run a reducer on the reducer lane with replay-on-worker-loss.
716+
///
717+
/// `CallReducerParams` is cloneable, so queued reducer requests can be
718+
/// resubmitted to a fresh worker if the previous worker disappears before
719+
/// replying. A reducer that actually traps is not replayed: it replies first,
720+
/// then the poisoned worker exits and only later requests are retried.
691721
pub async fn call_reducer(&self, params: CallReducerParams) -> ReducerCallResult {
692722
self.run_replayable(params, "call_reducer", |inst, params| async move {
693723
inst.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params })
@@ -696,13 +726,22 @@ impl JsReducerLane {
696726
.await
697727
}
698728

729+
/// Clear all reducer-lane client state with replay-on-worker-loss.
730+
///
731+
/// This request carries no non-cloneable payload, and retrying it after a
732+
/// worker disconnect preserves the intended "all clients cleared" outcome.
699733
pub async fn clear_all_clients(&self) -> anyhow::Result<()> {
700734
self.run_replayable((), "clear_all_clients", |inst, _| async move {
701735
inst.send_request(JsWorkerRequest::ClearAllClients).await
702736
})
703737
.await
704738
}
705739

740+
/// Run the `client_connected` lifecycle reducer with replay-on-worker-loss.
741+
///
742+
/// This follows the same retry model as ordinary reducers: the request
743+
/// arguments are cloneable, and only requests that lose their worker before
744+
/// receiving any reply are retried on the replacement worker.
706745
pub async fn call_identity_connected(
707746
&self,
708747
caller_auth: ConnectionAuthCtx,
@@ -723,6 +762,11 @@ impl JsReducerLane {
723762
.await
724763
}
725764

765+
/// Run the `client_disconnected` lifecycle reducer with replay-on-worker-loss.
766+
///
767+
/// This is replayable for the same reason as reducer calls: its payload is
768+
/// cloneable and queued lifecycle work should survive replacement of a
769+
/// poisoned reducer worker.
726770
pub async fn call_identity_disconnected(
727771
&self,
728772
caller_identity: Identity,
@@ -743,6 +787,11 @@ impl JsReducerLane {
743787
.await
744788
}
745789

790+
/// Run disconnect cleanup on the reducer lane with replay-on-worker-loss.
791+
///
792+
/// The request payload is just a cloneable client id, so if the active worker
793+
/// disappears before replying the cleanup request can be resubmitted to the
794+
/// replacement worker.
746795
pub async fn disconnect_client(&self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
747796
self.run_replayable(client_id, "disconnect_client", |inst, client_id| async move {
748797
inst.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id })
@@ -751,6 +800,10 @@ impl JsReducerLane {
751800
.await
752801
}
753802

803+
/// Run reducer-style database initialization with replay-on-worker-loss.
804+
///
805+
/// The initialization request is cloneable, so if the active reducer worker
806+
/// dies before replying we can resubmit it to the replacement worker.
754807
pub async fn init_database(&self, program: Program) -> anyhow::Result<Option<ReducerCallResult>> {
755808
self.run_replayable(program, "init_database", |inst, program| async move {
756809
inst.send_request(|reply_tx| JsWorkerRequest::InitDatabase { reply_tx, program })
@@ -759,19 +812,11 @@ impl JsReducerLane {
759812
.await
760813
}
761814

762-
pub async fn call_view(&self, cmd: ViewCommand) -> ViewCommandResult {
763-
// View/subscription commands are not cheaply replayable today because
764-
// the websocket request payload types they carry are not cloneable.
765-
// If a worker dies here we surface the failure rather than silently
766-
// re-running a command that may already have produced side effects.
767-
self.run_nonreplayable("call_view", async move |inst| {
768-
inst.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd })
769-
.await
770-
})
771-
.await
772-
.unwrap_or_else(|_| panic!("reducer worker exited while handling a non-replayable view command"))
773-
}
774-
815+
/// Run a scheduled reducer function with replay-on-worker-loss.
816+
///
817+
/// Scheduled reducer requests carry cloneable params, so they use the same
818+
/// recovery path as ordinary reducer calls. Scheduled procedures do not come
819+
/// through this lane.
775820
pub(in crate::host) async fn call_scheduled_function(
776821
&self,
777822
params: ScheduledFunctionParams,
@@ -842,10 +887,12 @@ async fn spawn_instance_worker(
842887
load_balance_guard: Arc<LoadBalanceOnDropGuard>,
843888
mut core_pinner: CorePinner,
844889
) -> anyhow::Result<(ModuleCommon, JsInstance)> {
845-
// Spawn a FIFO queue for requests to the worker.
890+
// Spawn a small FIFO queue for requests to the worker.
846891
// Multiple callers can enqueue concurrently, but the worker processes
847-
// requests strictly one at a time on its dedicated thread.
848-
let (request_tx, request_rx) = flume::unbounded();
892+
// requests strictly one at a time on its dedicated thread. Keep a small
893+
// buffer so the next request can already be waiting when the worker
894+
// finishes the current one, without allowing unbounded backlog growth.
895+
let (request_tx, request_rx) = flume::bounded(0);
849896

850897
// This one-shot channel is used for initial startup error handling within the thread.
851898
let (result_tx, result_rx) = oneshot::channel();

0 commit comments

Comments
 (0)