Skip to content
39 changes: 21 additions & 18 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,13 @@ impl ClientConnection {
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
asyncify(move || {
me.module
.subscriptions()
.add_single_subscription(me.sender, subscription, timer, None)
})
.await
self.module
.on_module_thread("subscribe_single", move || {
me.module
.subscriptions()
.add_single_subscription(me.sender, subscription, timer, None)
})
.await?
}

pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<Option<ExecutionMetrics>, DBError> {
Expand All @@ -541,12 +542,13 @@ impl ClientConnection {
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
asyncify(move || {
me.module
.subscriptions()
.add_multi_subscription(me.sender, request, timer, None)
})
.await
self.module
.on_module_thread("subscribe_multi", move || {
me.module
.subscriptions()
.add_multi_subscription(me.sender, request, timer, None)
})
.await?
}

pub async fn unsubscribe_multi(
Expand All @@ -555,12 +557,13 @@ impl ClientConnection {
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
asyncify(move || {
me.module
.subscriptions()
.remove_multi_subscription(me.sender, request, timer)
})
.await
self.module
.on_module_thread("unsubscribe_multi", move || {
me.module
.subscriptions()
.remove_multi_subscription(me.sender, request, timer)
})
.await?
}

pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<ExecutionMetrics, DBError> {
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ impl HostController {
warn!("database operation panicked");
on_panic();
});
let result = asyncify(move || f(&module.replica_ctx().relational_db)).await;

let db = module.replica_ctx().relational_db.clone();
let result = module.on_module_thread("using_database", move || f(&db)).await?;
Ok(result)
}

Expand Down Expand Up @@ -639,6 +641,7 @@ async fn make_replica_ctx(
let Some(subscriptions) = downgraded.upgrade() else {
break;
};
// This should happen on the module thread, but we haven't created the module yet.
asyncify(move || subscriptions.write().remove_dropped_clients()).await
}
});
Expand Down
Loading
Loading