Skip to content

Commit 7d5920a

Browse files
minimize diff
1 parent 9687b4f commit 7d5920a

1 file changed

Lines changed: 57 additions & 69 deletions

File tree

crates/core/src/host/v8/mod.rs

Lines changed: 57 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ impl JsInstance {
333333
Ok(value)
334334
}
335335

336-
async fn run_on_thread_inner<F, R>(&self, f: F) -> Result<R, WorkerDisconnected>
336+
pub async fn run_on_thread<F, R>(&self, f: F) -> R
337337
where
338338
F: AsyncFnOnce() -> R + Send + 'static,
339339
R: Send + 'static,
@@ -352,22 +352,15 @@ impl JsInstance {
352352
.boxed_local()
353353
})))
354354
.await
355-
.map_err(|_| WorkerDisconnected)?;
355+
.unwrap_or_else(|_| panic!("worker should stay live while handling {}", type_name::<R>()));
356356

357-
Ok(match rx.await.map_err(|_| WorkerDisconnected)? {
358-
Ok(r) => r,
359-
Err(e) => std::panic::resume_unwind(e),
360-
})
361-
}
362-
363-
pub async fn run_on_thread<F, R>(&self, f: F) -> R
364-
where
365-
F: AsyncFnOnce() -> R + Send + 'static,
366-
R: Send + 'static,
367-
{
368-
self.run_on_thread_inner(f)
357+
match rx
369358
.await
370359
.unwrap_or_else(|_| panic!("worker should stay live while handling {}", type_name::<R>()))
360+
{
361+
Ok(r) => r,
362+
Err(e) => std::panic::resume_unwind(e),
363+
}
371364
}
372365

373366
pub async fn update_database(
@@ -386,13 +379,8 @@ impl JsInstance {
386379
.unwrap_or_else(|_| panic!("worker should stay live while updating the database"))
387380
}
388381

389-
async fn call_reducer_inner(&self, params: CallReducerParams) -> Result<ReducerCallResult, WorkerDisconnected> {
390-
self.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params })
391-
.await
392-
}
393-
394382
pub async fn call_reducer(&self, params: CallReducerParams) -> ReducerCallResult {
395-
self.call_reducer_inner(params)
383+
self.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params })
396384
.await
397385
.unwrap_or_else(|_| panic!("worker should stay live while calling a reducer"))
398386
}
@@ -403,62 +391,36 @@ impl JsInstance {
403391
.unwrap_or_else(|_| panic!("worker should stay live while clearing clients"))
404392
}
405393

406-
async fn call_identity_connected_inner(
394+
pub async fn call_identity_connected(
407395
&self,
408396
caller_auth: ConnectionAuthCtx,
409397
caller_connection_id: ConnectionId,
410-
) -> Result<Result<(), ClientConnectedError>, WorkerDisconnected> {
398+
) -> Result<(), ClientConnectedError> {
411399
self.send_request(|reply_tx| JsWorkerRequest::CallIdentityConnected {
412400
reply_tx,
413401
caller_auth,
414402
caller_connection_id,
415403
})
416404
.await
405+
.unwrap_or_else(|_| panic!("worker should stay live while running client_connected"))
417406
}
418407

419-
pub async fn call_identity_connected(
420-
&self,
421-
caller_auth: ConnectionAuthCtx,
422-
caller_connection_id: ConnectionId,
423-
) -> Result<(), ClientConnectedError> {
424-
self.call_identity_connected_inner(caller_auth, caller_connection_id)
425-
.await
426-
.unwrap_or_else(|_| panic!("worker should stay live while running client_connected"))
427-
}
428-
429-
async fn call_identity_disconnected_inner(
408+
pub async fn call_identity_disconnected(
430409
&self,
431410
caller_identity: Identity,
432411
caller_connection_id: ConnectionId,
433-
) -> Result<Result<(), ReducerCallError>, WorkerDisconnected> {
412+
) -> Result<(), ReducerCallError> {
434413
self.send_request(|reply_tx| JsWorkerRequest::CallIdentityDisconnected {
435414
reply_tx,
436415
caller_identity,
437416
caller_connection_id,
438417
})
439418
.await
440-
}
441-
442-
pub async fn call_identity_disconnected(
443-
&self,
444-
caller_identity: Identity,
445-
caller_connection_id: ConnectionId,
446-
) -> Result<(), ReducerCallError> {
447-
self.call_identity_disconnected_inner(caller_identity, caller_connection_id)
448-
.await
449-
.unwrap_or_else(|_| panic!("worker should stay live while running client_disconnected"))
450-
}
451-
452-
async fn disconnect_client_inner(
453-
&self,
454-
client_id: ClientActorId,
455-
) -> Result<Result<(), ReducerCallError>, WorkerDisconnected> {
456-
self.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id })
457-
.await
419+
.unwrap_or_else(|_| panic!("worker should stay live while running client_disconnected"))
458420
}
459421

460422
pub async fn disconnect_client(&self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
461-
self.disconnect_client_inner(client_id)
423+
self.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id })
462424
.await
463425
.unwrap_or_else(|_| panic!("worker should stay live while disconnecting a client"))
464426
}
@@ -475,13 +437,8 @@ impl JsInstance {
475437
.unwrap_or_else(|_| panic!("worker should stay live while calling a procedure"))
476438
}
477439

478-
async fn call_view_inner(&self, cmd: ViewCommand) -> Result<ViewCommandResult, WorkerDisconnected> {
479-
self.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd })
480-
.await
481-
}
482-
483440
pub async fn call_view(&self, cmd: ViewCommand) -> ViewCommandResult {
484-
self.call_view_inner(cmd)
441+
self.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd })
485442
.await
486443
.unwrap_or_else(|_| panic!("worker should stay live while calling a view"))
487444
}
@@ -678,7 +635,25 @@ impl JsReducerLane {
678635
{
679636
let span = tracing::Span::current();
680637
self.run_nonreplayable("run_on_thread", async move |inst| {
681-
inst.run_on_thread_inner(async move || f().instrument(span).await).await
638+
let (tx, rx) = oneshot::channel();
639+
640+
inst.request_tx
641+
.send_async(JsWorkerRequest::RunFunction(Box::new(move || {
642+
async move {
643+
let result = AssertUnwindSafe(f().instrument(span)).catch_unwind().await;
644+
if let Err(Err(_panic)) = tx.send(result) {
645+
tracing::warn!("uncaught panic on `SingleCoreExecutor`")
646+
}
647+
}
648+
.boxed_local()
649+
})))
650+
.await
651+
.map_err(|_| WorkerDisconnected)?;
652+
653+
Ok(match rx.await.map_err(|_| WorkerDisconnected)? {
654+
Ok(r) => r,
655+
Err(e) => std::panic::resume_unwind(e),
656+
})
682657
})
683658
.await
684659
.map_err(|_| anyhow::anyhow!("reducer worker exited while running a non-replayable module-thread task"))
@@ -708,7 +683,8 @@ impl JsReducerLane {
708683

709684
pub async fn call_reducer(&self, params: CallReducerParams) -> ReducerCallResult {
710685
self.run_replayable(params, "call_reducer", |inst, params| async move {
711-
inst.call_reducer_inner(params).await
686+
inst.send_request(|reply_tx| JsWorkerRequest::CallReducer { reply_tx, params })
687+
.await
712688
})
713689
.await
714690
}
@@ -729,8 +705,12 @@ impl JsReducerLane {
729705
(caller_auth, caller_connection_id),
730706
"call_identity_connected",
731707
|inst, (caller_auth, caller_connection_id)| async move {
732-
inst.call_identity_connected_inner(caller_auth, caller_connection_id)
733-
.await
708+
inst.send_request(|reply_tx| JsWorkerRequest::CallIdentityConnected {
709+
reply_tx,
710+
caller_auth,
711+
caller_connection_id,
712+
})
713+
.await
734714
},
735715
)
736716
.await
@@ -745,16 +725,21 @@ impl JsReducerLane {
745725
(caller_identity, caller_connection_id),
746726
"call_identity_disconnected",
747727
|inst, (caller_identity, caller_connection_id)| async move {
748-
inst.call_identity_disconnected_inner(caller_identity, caller_connection_id)
749-
.await
728+
inst.send_request(|reply_tx| JsWorkerRequest::CallIdentityDisconnected {
729+
reply_tx,
730+
caller_identity,
731+
caller_connection_id,
732+
})
733+
.await
750734
},
751735
)
752736
.await
753737
}
754738

755739
pub async fn disconnect_client(&self, client_id: ClientActorId) -> Result<(), ReducerCallError> {
756740
self.run_replayable(client_id, "disconnect_client", |inst, client_id| async move {
757-
inst.disconnect_client_inner(client_id).await
741+
inst.send_request(|reply_tx| JsWorkerRequest::DisconnectClient { reply_tx, client_id })
742+
.await
758743
})
759744
.await
760745
}
@@ -772,9 +757,12 @@ impl JsReducerLane {
772757
// the websocket request payload types they carry are not cloneable.
773758
// If a worker dies here we surface the failure rather than silently
774759
// re-running a command that may already have produced side effects.
775-
self.run_nonreplayable("call_view", async move |inst| inst.call_view_inner(cmd).await)
776-
.await
777-
.unwrap_or_else(|_| panic!("reducer worker exited while handling a non-replayable view command"))
760+
self.run_nonreplayable("call_view", async move |inst| {
761+
inst.send_request(|reply_tx| JsWorkerRequest::CallView { reply_tx, cmd })
762+
.await
763+
})
764+
.await
765+
.unwrap_or_else(|_| panic!("reducer worker exited while handling a non-replayable view command"))
778766
}
779767

780768
pub(in crate::host) async fn call_scheduled_function(

0 commit comments

Comments
 (0)