Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 29 additions & 30 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::prelude::*;
use prometheus::{Histogram, IntCounter, IntGauge};
use spacetimedb_client_api_messages::websocket::{
BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe,
UnsubscribeMulti, WebsocketFormat,
UnsubscribeMulti,
};
use spacetimedb_lib::identity::RequestId;
use spacetimedb_lib::metrics::ExecutionMetrics;
Expand Down Expand Up @@ -440,41 +440,40 @@ impl ClientConnection {
.await
}

pub fn one_off_query_json(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
let response = self.one_off_query::<JsonFormat>(query, message_id, timer);
self.send_message(response)?;
Ok(())
}

pub fn one_off_query_bsatn(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
let response = self.one_off_query::<BsatnFormat>(query, message_id, timer);
self.send_message(response)?;
Ok(())
pub async fn one_off_query_json(
&self,
query: &str,
message_id: &[u8],
timer: Instant,
) -> Result<(), anyhow::Error> {
self.module
.one_off_query::<JsonFormat>(
self.id.identity,
query.to_owned(),
self.sender.clone(),
message_id.to_owned(),
timer,
|msg: OneOffQueryResponseMessage<JsonFormat>| msg.into(),
)
.await
}

fn one_off_query<F: WebsocketFormat>(
pub async fn one_off_query_bsatn(
&self,
query: &str,
message_id: &[u8],
timer: Instant,
) -> OneOffQueryResponseMessage<F> {
let result = self.module.one_off_query::<F>(self.id.identity, query.to_owned());
let message_id = message_id.to_owned();
let total_host_execution_duration = timer.elapsed().into();
match result {
Ok(results) => OneOffQueryResponseMessage {
message_id,
error: None,
results: vec![results],
total_host_execution_duration,
},
Err(err) => OneOffQueryResponseMessage {
message_id,
error: Some(format!("{}", err)),
results: vec![],
total_host_execution_duration,
},
}
) -> Result<(), anyhow::Error> {
self.module
.one_off_query::<BsatnFormat>(
self.id.identity,
query.to_owned(),
self.sender.clone(),
message_id.to_owned(),
timer,
|msg: OneOffQueryResponseMessage<BsatnFormat>| msg.into(),
)
.await
}

pub async fn disconnect(self) {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
message_id,
}) => {
let res = match client.config.protocol {
Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer),
Protocol::Text => client.one_off_query_json(&query, &message_id, timer),
Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer).await,
Protocol::Text => client.one_off_query_json(&query, &message_id, timer).await,
};
mod_metrics
.request_round_trip_sql
Expand Down
140 changes: 94 additions & 46 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{ArgsTuple, InvalidReducerArguments, ReducerArgs, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler};
use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage};
use crate::client::{ClientActorId, ClientConnectionSender};
use crate::database_logger::{LogLevel, Record};
use crate::db::datastore::locking_tx_datastore::MutTxId;
Expand Down Expand Up @@ -32,6 +33,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_execution::pipelined::PipelinedProject;
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
use spacetimedb_lib::identity::{AuthCtx, RequestId};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::ConnectionId;
use spacetimedb_lib::Timestamp;
use spacetimedb_primitives::TableId;
Expand Down Expand Up @@ -993,60 +995,106 @@ impl ModuleHost {
)
}

/// Execute a one-off query and send the results to the given client.
/// This only returns an error if there is a db-level problem.
/// An error with the query itself will be sent to the client.
Comment thread
jsdt marked this conversation as resolved.
#[tracing::instrument(level = "trace", skip_all)]
pub fn one_off_query<F: WebsocketFormat>(
pub async fn one_off_query<F: WebsocketFormat>(
&self,
caller_identity: Identity,
query: String,
) -> Result<OneOffTable<F>, anyhow::Error> {
client: Arc<ClientConnectionSender>,
message_id: Vec<u8>,
timer: Instant,
// We take this because we only have a way to convert with the concrete types (Bsatn and Json)
into_message: impl FnOnce(OneOffQueryResponseMessage<F>) -> SerializableMessage + Send + 'static,
) -> Result<(), anyhow::Error> {
let replica_ctx = self.replica_ctx();
let db = &replica_ctx.relational_db;
let db = replica_ctx.relational_db.clone();
let subscriptions = replica_ctx.subscriptions.clone();
let auth = AuthCtx::new(replica_ctx.owner_identity, caller_identity);
log::debug!("One-off query: {query}");
let metrics = asyncify(move || {
db.with_read_only(Workload::Sql, |tx| {
// We wrap the actual query in a closure so we can use ? to handle errors without making
// the entire transaction abort with an error.
let result: Result<(OneOffTable<F>, ExecutionMetrics), anyhow::Error> = (|| {
let tx = SchemaViewer::new(tx, &auth);

let (
// A query may compile down to several plans.
// This happens when there are multiple RLS rules per table.
// The original query is the union of these plans.
plans,
_,
table_name,
_,
) = compile_subscription(&query, &tx, &auth)?;

// Optimize each fragment
let optimized = plans
.into_iter()
.map(|plan| plan.optimize())
.collect::<Result<Vec<_>, _>>()?;

check_row_limit(
&optimized,
&db,
&tx,
// Estimate the number of rows this query will scan
|plan, tx| estimate_rows_scanned(tx, plan),
&auth,
)?;

let optimized = optimized
.into_iter()
// Convert into something we can execute
.map(PipelinedProject::from)
.collect::<Vec<_>>();

// Execute the union and return the results
execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
.map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
.context("One-off queries are not allowed to modify the database")
})();

let total_host_execution_duration = timer.elapsed().into();
let (message, metrics): (SerializableMessage, Option<ExecutionMetrics>) = match result {
Ok((rows, metrics)) => (
into_message(OneOffQueryResponseMessage {
message_id,
error: None,
results: vec![rows],
total_host_execution_duration,
}),
Some(metrics),
),
Err(err) => (
into_message(OneOffQueryResponseMessage {
message_id,
error: Some(format!("{}", err)),
results: vec![],
total_host_execution_duration,
}),
None,
),
};

subscriptions.send_client_message(client, message, tx)?;
Ok::<Option<ExecutionMetrics>, anyhow::Error>(metrics)
})
})
.await?;

if let Some(metrics) = metrics {
// Record the metrics for the one-off query
replica_ctx
.relational_db
.exec_counters_for(WorkloadType::Sql)
.record(&metrics);
}

let (rows, metrics) = db.with_read_only(Workload::Sql, |tx| {
let tx = SchemaViewer::new(tx, &auth);

let (
// A query may compile down to several plans.
// This happens when there are multiple RLS rules per table.
// The original query is the union of these plans.
plans,
_,
table_name,
_,
) = compile_subscription(&query, &tx, &auth)?;

// Optimize each fragment
let optimized = plans
.into_iter()
.map(|plan| plan.optimize())
.collect::<Result<Vec<_>, _>>()?;

check_row_limit(
&optimized,
db,
&tx,
// Estimate the number of rows this query will scan
|plan, tx| estimate_rows_scanned(tx, plan),
&auth,
)?;

let optimized = optimized
.into_iter()
// Convert into something we can execute
.map(PipelinedProject::from)
.collect::<Vec<_>>();

// Execute the union and return the results
execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx))
.map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics))
.context("One-off queries are not allowed to modify the database")
})?;

db.exec_counters_for(WorkloadType::Sql).record(&metrics);

Ok(rows)
Ok(())
}

/// FIXME(jgilles): this is a temporary workaround for deleting not currently being supported
Expand Down
18 changes: 15 additions & 3 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use super::execution_unit::QueryHash;
use super::module_subscription_manager::{
spawn_send_worker, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager,
spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager,
};
use super::query::compile_query_with_hashes;
use super::tx::DeltaTx;
use super::{collect_table_update, TableUpdateType};
use crate::client::messages::{
SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionRows,
SubscriptionUpdateMessage, TransactionUpdateMessage,
SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult,
SubscriptionRows, SubscriptionUpdateMessage, TransactionUpdateMessage,
};
use crate::client::{ClientActorId, ClientConnectionSender, Protocol};
use crate::db::datastore::locking_tx_datastore::tx::TxId;
Expand Down Expand Up @@ -570,6 +570,18 @@ impl ModuleSubscriptions {
Ok((plans, auth, scopeguard::ScopeGuard::into_inner(tx)))
}

/// Send a message to a client connection.
/// This will eventually be sent by the send-worker.
/// This takes a `TxId`, because this should be called while still holding a lock on the database.
pub fn send_client_message(
&self,
recipient: Arc<ClientConnectionSender>,
message: impl Into<SerializableMessage>,
_tx_id: &TxId,
) -> Result<(), BroadcastError> {
self.broadcast_queue.send_client_message(recipient, message)
}

#[tracing::instrument(level = "trace", skip_all)]
pub fn add_multi_subscription(
&self,
Expand Down
Loading