diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 894951c04a3..ad107d0e0bc 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -19,7 +19,7 @@ use futures::prelude::*; use prometheus::{Histogram, IntCounter, IntGauge}; use spacetimedb_client_api_messages::websocket::{ BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe, - UnsubscribeMulti, WebsocketFormat, + UnsubscribeMulti, }; use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::metrics::ExecutionMetrics; @@ -440,41 +440,40 @@ impl ClientConnection { .await } - pub fn one_off_query_json(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> { - let response = self.one_off_query::(query, message_id, timer); - self.send_message(response)?; - Ok(()) - } - - pub fn one_off_query_bsatn(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> { - let response = self.one_off_query::(query, message_id, timer); - self.send_message(response)?; - Ok(()) + pub async fn one_off_query_json( + &self, + query: &str, + message_id: &[u8], + timer: Instant, + ) -> Result<(), anyhow::Error> { + self.module + .one_off_query::( + self.id.identity, + query.to_owned(), + self.sender.clone(), + message_id.to_owned(), + timer, + |msg: OneOffQueryResponseMessage| msg.into(), + ) + .await } - fn one_off_query( + pub async fn one_off_query_bsatn( &self, query: &str, message_id: &[u8], timer: Instant, - ) -> OneOffQueryResponseMessage { - let result = self.module.one_off_query::(self.id.identity, query.to_owned()); - let message_id = message_id.to_owned(); - let total_host_execution_duration = timer.elapsed().into(); - match result { - Ok(results) => OneOffQueryResponseMessage { - message_id, - error: None, - results: vec![results], - total_host_execution_duration, - }, - Err(err) => OneOffQueryResponseMessage { - message_id, - error: Some(format!("{}", err)), - results: vec![], - total_host_execution_duration, - }, - } + ) -> Result<(), anyhow::Error> { + self.module + .one_off_query::( + self.id.identity, + query.to_owned(), + self.sender.clone(), + message_id.to_owned(), + timer, + |msg: OneOffQueryResponseMessage| msg.into(), + ) + .await } pub async fn disconnect(self) { diff --git a/crates/core/src/client/message_handlers.rs b/crates/core/src/client/message_handlers.rs index 6bb840588eb..3842468cad2 100644 --- a/crates/core/src/client/message_handlers.rs +++ b/crates/core/src/client/message_handlers.rs @@ -120,8 +120,8 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst message_id, }) => { let res = match client.config.protocol { - Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer), - Protocol::Text => client.one_off_query_json(&query, &message_id, timer), + Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer).await, + Protocol::Text => client.one_off_query_json(&query, &message_id, timer).await, }; mod_metrics .request_round_trip_sql diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 742813f3e82..7929df40454 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,4 +1,5 @@ use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler}; +use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage}; use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger::{LogLevel, Record}; use crate::db::datastore::locking_tx_datastore::MutTxId; @@ -32,6 +33,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::identity::{AuthCtx, RequestId}; +use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Timestamp; use spacetimedb_primitives::TableId; @@ -993,60 +995,106 @@ impl ModuleHost { ) } + /// Execute a one-off query and send the results to the given client. + /// This only returns an error if there is a db-level problem. + /// An error with the query itself will be sent to the client. #[tracing::instrument(level = "trace", skip_all)] - pub fn one_off_query( + pub async fn one_off_query( &self, caller_identity: Identity, query: String, - ) -> Result, anyhow::Error> { + client: Arc, + message_id: Vec, + timer: Instant, + // We take this because we only have a way to convert with the concrete types (Bsatn and Json) + into_message: impl FnOnce(OneOffQueryResponseMessage) -> SerializableMessage + Send + 'static, + ) -> Result<(), anyhow::Error> { let replica_ctx = self.replica_ctx(); - let db = &replica_ctx.relational_db; + let db = replica_ctx.relational_db.clone(); + let subscriptions = replica_ctx.subscriptions.clone(); let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity); log::debug!("One-off query: {query}"); + let metrics = asyncify(move || { + db.with_read_only(Workload::Sql, |tx| { + // We wrap the actual query in a closure so we can use ? to handle errors without making + // the entire transaction abort with an error. + let result: Result<(OneOffTable, ExecutionMetrics), anyhow::Error> = (|| { + let tx = SchemaViewer::new(tx, &auth); + + let ( + // A query may compile down to several plans. + // This happens when there are multiple RLS rules per table. + // The original query is the union of these plans. + plans, + _, + table_name, + _, + ) = compile_subscription(&query, &tx, &auth)?; + + // Optimize each fragment + let optimized = plans + .into_iter() + .map(|plan| plan.optimize()) + .collect::, _>>()?; + + check_row_limit( + &optimized, + &db, + &tx, + // Estimate the number of rows this query will scan + |plan, tx| estimate_rows_scanned(tx, plan), + &auth, + )?; + + let optimized = optimized + .into_iter() + // Convert into something we can execute + .map(PipelinedProject::from) + .collect::>(); + + // Execute the union and return the results + execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx)) + .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics)) + .context("One-off queries are not allowed to modify the database") + })(); + + let total_host_execution_duration = timer.elapsed().into(); + let (message, metrics): (SerializableMessage, Option) = match result { + Ok((rows, metrics)) => ( + into_message(OneOffQueryResponseMessage { + message_id, + error: None, + results: vec![rows], + total_host_execution_duration, + }), + Some(metrics), + ), + Err(err) => ( + into_message(OneOffQueryResponseMessage { + message_id, + error: Some(format!("{}", err)), + results: vec![], + total_host_execution_duration, + }), + None, + ), + }; + + subscriptions.send_client_message(client, message, tx)?; + Ok::, anyhow::Error>(metrics) + }) + }) + .await?; + + if let Some(metrics) = metrics { + // Record the metrics for the one-off query + replica_ctx + .relational_db + .exec_counters_for(WorkloadType::Sql) + .record(&metrics); + } - let (rows, metrics) = db.with_read_only(Workload::Sql, |tx| { - let tx = SchemaViewer::new(tx, &auth); - - let ( - // A query may compile down to several plans. - // This happens when there are multiple RLS rules per table. - // The original query is the union of these plans. - plans, - _, - table_name, - _, - ) = compile_subscription(&query, &tx, &auth)?; - - // Optimize each fragment - let optimized = plans - .into_iter() - .map(|plan| plan.optimize()) - .collect::, _>>()?; - - check_row_limit( - &optimized, - db, - &tx, - // Estimate the number of rows this query will scan - |plan, tx| estimate_rows_scanned(tx, plan), - &auth, - )?; - - let optimized = optimized - .into_iter() - // Convert into something we can execute - .map(PipelinedProject::from) - .collect::>(); - - // Execute the union and return the results - execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx)) - .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics)) - .context("One-off queries are not allowed to modify the database") - })?; - - db.exec_counters_for(WorkloadType::Sql).record(&metrics); - - Ok(rows) + Ok(()) } /// FIXME(jgilles): this is a temporary workaround for deleting not currently being supported diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index cbfdb8f056a..3a6ef438284 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1,13 +1,13 @@ use super::execution_unit::QueryHash; use super::module_subscription_manager::{ - spawn_send_worker, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager, + spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager, }; use super::query::compile_query_with_hashes; use super::tx::DeltaTx; use super::{collect_table_update, TableUpdateType}; use crate::client::messages::{ - SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionRows, - SubscriptionUpdateMessage, TransactionUpdateMessage, + SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult, + SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage, }; use crate::client::{ClientActorId, ClientConnectionSender, Protocol}; use crate::db::datastore::locking_tx_datastore::tx::TxId; @@ -570,6 +570,18 @@ impl ModuleSubscriptions { Ok((plans, auth, scopeguard::ScopeGuard::into_inner(tx))) } + /// Send a message to a client connection. + /// This will eventually be sent by the send-worker. + /// This takes a `TxId`, because this should be called while still holding a lock on the database. + pub fn send_client_message( + &self, + recipient: Arc, + message: impl Into, + _tx_id: &TxId, + ) -> Result<(), BroadcastError> { + self.broadcast_queue.send_client_message(recipient, message) + } + #[tracing::instrument(level = "trace", skip_all)] pub fn add_multi_subscription( &self,