Skip to content

Commit b7c1e7c

Browse files
authored
Send one-off responses through the broadcast queue. (#2853)
1 parent 053fc6d commit b7c1e7c

4 files changed

Lines changed: 140 additions & 81 deletions

File tree

crates/core/src/client/client_connection.rs

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use futures::prelude::*;
1919
use prometheus::{Histogram, IntCounter, IntGauge};
2020
use spacetimedb_client_api_messages::websocket::{
2121
BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe,
22-
UnsubscribeMulti, WebsocketFormat,
22+
UnsubscribeMulti,
2323
};
2424
use spacetimedb_lib::identity::RequestId;
2525
use spacetimedb_lib::metrics::ExecutionMetrics;
@@ -440,41 +440,40 @@ impl ClientConnection {
440440
.await
441441
}
442442

443-
pub fn one_off_query_json(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
444-
let response = self.one_off_query::<JsonFormat>(query, message_id, timer);
445-
self.send_message(response)?;
446-
Ok(())
447-
}
448-
449-
pub fn one_off_query_bsatn(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
450-
let response = self.one_off_query::<BsatnFormat>(query, message_id, timer);
451-
self.send_message(response)?;
452-
Ok(())
443+
pub async fn one_off_query_json(
444+
&self,
445+
query: &str,
446+
message_id: &[u8],
447+
timer: Instant,
448+
) -> Result<(), anyhow::Error> {
449+
self.module
450+
.one_off_query::<JsonFormat>(
451+
self.id.identity,
452+
query.to_owned(),
453+
self.sender.clone(),
454+
message_id.to_owned(),
455+
timer,
456+
|msg: OneOffQueryResponseMessage<JsonFormat>| msg.into(),
457+
)
458+
.await
453459
}
454460

455-
fn one_off_query<F: WebsocketFormat>(
461+
pub async fn one_off_query_bsatn(
456462
&self,
457463
query: &str,
458464
message_id: &[u8],
459465
timer: Instant,
460-
) -> OneOffQueryResponseMessage<F> {
461-
let result = self.module.one_off_query::<F>(self.id.identity, query.to_owned());
462-
let message_id = message_id.to_owned();
463-
let total_host_execution_duration = timer.elapsed().into();
464-
match result {
465-
Ok(results) => OneOffQueryResponseMessage {
466-
message_id,
467-
error: None,
468-
results: vec![results],
469-
total_host_execution_duration,
470-
},
471-
Err(err) => OneOffQueryResponseMessage {
472-
message_id,
473-
error: Some(format!("{}", err)),
474-
results: vec![],
475-
total_host_execution_duration,
476-
},
477-
}
466+
) -> Result<(), anyhow::Error> {
467+
self.module
468+
.one_off_query::<BsatnFormat>(
469+
self.id.identity,
470+
query.to_owned(),
471+
self.sender.clone(),
472+
message_id.to_owned(),
473+
timer,
474+
|msg: OneOffQueryResponseMessage<BsatnFormat>| msg.into(),
475+
)
476+
.await
478477
}
479478

480479
pub async fn disconnect(self) {

crates/core/src/client/message_handlers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
120120
message_id,
121121
}) => {
122122
let res = match client.config.protocol {
123-
Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer),
124-
Protocol::Text => client.one_off_query_json(&query, &message_id, timer),
123+
Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer).await,
124+
Protocol::Text => client.one_off_query_json(&query, &message_id, timer).await,
125125
};
126126
mod_metrics
127127
.request_round_trip_sql

crates/core/src/host/module_host.rs

Lines changed: 94 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler};
2+
use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage};
23
use crate::client::{ClientActorId, ClientConnectionSender};
34
use crate::database_logger::{LogLevel, Record};
45
use crate::db::datastore::locking_tx_datastore::MutTxId;
@@ -32,6 +33,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3233
use spacetimedb_execution::pipelined::PipelinedProject;
3334
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
3435
use spacetimedb_lib::identity::{AuthCtx, RequestId};
36+
use spacetimedb_lib::metrics::ExecutionMetrics;
3537
use spacetimedb_lib::ConnectionId;
3638
use spacetimedb_lib::Timestamp;
3739
use spacetimedb_primitives::TableId;
@@ -993,60 +995,106 @@ impl ModuleHost {
993995
)
994996
}
995997

998+
/// Execute a one-off query and send the results to the given client.
999+
/// This only returns an error if there is a db-level problem.
1000+
/// An error with the query itself will be sent to the client.
9961001
#[tracing::instrument(level = "trace", skip_all)]
997-
pub fn one_off_query<F: WebsocketFormat>(
1002+
pub async fn one_off_query<F: WebsocketFormat>(
9981003
&self,
9991004
caller_identity: Identity,
10001005
query: String,
1001-
) -> Result<OneOffTable<F>, anyhow::Error> {
1006+
client: Arc<ClientConnectionSender>,
1007+
message_id: Vec<u8>,
1008+
timer: Instant,
1009+
// We take this because we only have a way to convert with the concrete types (Bsatn and Json)
1010+
into_message: impl FnOnce(OneOffQueryResponseMessage<F>) -> SerializableMessage + Send + 'static,
1011+
) -> Result<(), anyhow::Error> {
10021012
let replica_ctx = self.replica_ctx();
1003-
let db = &replica_ctx.relational_db;
1013+
let db = replica_ctx.relational_db.clone();
1014+
let subscriptions = replica_ctx.subscriptions.clone();
10041015
let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity);
10051016
log::debug!("One-off query: {query}");
1017+
let metrics = asyncify(move || {
1018+
db.with_read_only(Workload::Sql, |tx| {
1019+
// We wrap the actual query in a closure so we can use ? to handle errors without making
1020+
// the entire transaction abort with an error.
1021+
let result: Result<(OneOffTable<F>, ExecutionMetrics), anyhow::Error> = (|| {
1022+
let tx = SchemaViewer::new(tx, &auth);
1023+
1024+
let (
1025+
// A query may compile down to several plans.
1026+
// This happens when there are multiple RLS rules per table.
1027+
// The original query is the union of these plans.
1028+
plans,
1029+
_,
1030+
table_name,
1031+
_,
1032+
) = compile_subscription(&query, &tx, &auth)?;
1033+
1034+
// Optimize each fragment
1035+
let optimized = plans
1036+
.into_iter()
1037+
.map(|plan| plan.optimize())
1038+
.collect::<Result<Vec<_>, _>>()?;
1039+
1040+
check_row_limit(
1041+
&optimized,
1042+
&db,
1043+
&tx,
1044+
// Estimate the number of rows this query will scan
1045+
|plan, tx| estimate_rows_scanned(tx, plan),
1046+
&auth,
1047+
)?;
1048+
1049+
let optimized = optimized
1050+
.into_iter()
1051+
// Convert into something we can execute
1052+
.map(PipelinedProject::from)
1053+
.collect::<Vec<_>>();
1054+
1055+
// Execute the union and return the results
1056+
execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
1057+
.map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
1058+
.context("One-off queries are not allowed to modify the database")
1059+
})();
1060+
1061+
let total_host_execution_duration = timer.elapsed().into();
1062+
let (message, metrics): (SerializableMessage, Option<ExecutionMetrics>) = match result {
1063+
Ok((rows, metrics)) => (
1064+
into_message(OneOffQueryResponseMessage {
1065+
message_id,
1066+
error: None,
1067+
results: vec![rows],
1068+
total_host_execution_duration,
1069+
}),
1070+
Some(metrics),
1071+
),
1072+
Err(err) => (
1073+
into_message(OneOffQueryResponseMessage {
1074+
message_id,
1075+
error: Some(format!("{}", err)),
1076+
results: vec![],
1077+
total_host_execution_duration,
1078+
}),
1079+
None,
1080+
),
1081+
};
1082+
1083+
subscriptions.send_client_message(client, message, tx)?;
1084+
Ok::<Option<ExecutionMetrics>, anyhow::Error>(metrics)
1085+
})
1086+
})
1087+
.await?;
1088+
1089+
if let Some(metrics) = metrics {
1090+
// Record the metrics for the one-off query
1091+
replica_ctx
1092+
.relational_db
1093+
.exec_counters_for(WorkloadType::Sql)
1094+
.record(&metrics);
1095+
}
10061096

1007-
let (rows, metrics) = db.with_read_only(Workload::Sql, |tx| {
1008-
let tx = SchemaViewer::new(tx, &auth);
1009-
1010-
let (
1011-
// A query may compile down to several plans.
1012-
// This happens when there are multiple RLS rules per table.
1013-
// The original query is the union of these plans.
1014-
plans,
1015-
_,
1016-
table_name,
1017-
_,
1018-
) = compile_subscription(&query, &tx, &auth)?;
1019-
1020-
// Optimize each fragment
1021-
let optimized = plans
1022-
.into_iter()
1023-
.map(|plan| plan.optimize())
1024-
.collect::<Result<Vec<_>, _>>()?;
1025-
1026-
check_row_limit(
1027-
&optimized,
1028-
db,
1029-
&tx,
1030-
// Estimate the number of rows this query will scan
1031-
|plan, tx| estimate_rows_scanned(tx, plan),
1032-
&auth,
1033-
)?;
1034-
1035-
let optimized = optimized
1036-
.into_iter()
1037-
// Convert into something we can execute
1038-
.map(PipelinedProject::from)
1039-
.collect::<Vec<_>>();
1040-
1041-
// Execute the union and return the results
1042-
execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
1043-
.map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
1044-
.context("One-off queries are not allowed to modify the database")
1045-
})?;
1046-
1047-
db.exec_counters_for(WorkloadType::Sql).record(&metrics);
1048-
1049-
Ok(rows)
1097+
Ok(())
10501098
}
10511099

10521100
/// FIXME(jgilles): this is a temporary workaround for deleting not currently being supported

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use super::execution_unit::QueryHash;
22
use super::module_subscription_manager::{
3-
spawn_send_worker, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager,
3+
spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager,
44
};
55
use super::query::compile_query_with_hashes;
66
use super::tx::DeltaTx;
77
use super::{collect_table_update, TableUpdateType};
88
use crate::client::messages::{
9-
SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionRows,
10-
SubscriptionUpdateMessage, TransactionUpdateMessage,
9+
SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult,
10+
SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage,
1111
};
1212
use crate::client::{ClientActorId, ClientConnectionSender, Protocol};
1313
use crate::db::datastore::locking_tx_datastore::tx::TxId;
@@ -570,6 +570,18 @@ impl ModuleSubscriptions {
570570
Ok((plans, auth, scopeguard::ScopeGuard::into_inner(tx)))
571571
}
572572

573+
/// Send a message to a client connection.
574+
/// This will eventually be sent by the send-worker.
575+
/// This takes a `TxId`, because this should be called while still holding a lock on the database.
576+
pub fn send_client_message(
577+
&self,
578+
recipient: Arc<ClientConnectionSender>,
579+
message: impl Into<SerializableMessage>,
580+
_tx_id: &TxId,
581+
) -> Result<(), BroadcastError> {
582+
self.broadcast_queue.send_client_message(recipient, message)
583+
}
584+
573585
#[tracing::instrument(level = "trace", skip_all)]
574586
pub fn add_multi_subscription(
575587
&self,

0 commit comments

Comments
 (0)