Skip to content

Commit 6eef698

Browse files
Do not evaluate subscriptions that return empty result sets (#2731)
Signed-off-by: joshua-spacetime <josh@clockworklabs.io> Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
1 parent 70a0491 commit 6eef698

4 files changed

Lines changed: 117 additions & 62 deletions

File tree

crates/core/src/client/client_connection.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use spacetimedb_client_api_messages::websocket::{
2020
UnsubscribeMulti, WebsocketFormat,
2121
};
2222
use spacetimedb_lib::identity::RequestId;
23+
use spacetimedb_lib::metrics::ExecutionMetrics;
2324
use tokio::sync::{mpsc, oneshot, watch};
2425
use tokio::task::AbortHandle;
2526

@@ -321,7 +322,11 @@ impl ClientConnection {
321322
.unwrap() // TODO: is unwrapping right here?
322323
}
323324

324-
pub async fn subscribe_multi(&self, request: SubscribeMulti, timer: Instant) -> Result<(), DBError> {
325+
pub async fn subscribe_multi(
326+
&self,
327+
request: SubscribeMulti,
328+
timer: Instant,
329+
) -> Result<Option<ExecutionMetrics>, DBError> {
325330
let me = self.clone();
326331
tokio::task::spawn_blocking(move || {
327332
me.module
@@ -332,7 +337,11 @@ impl ClientConnection {
332337
.unwrap() // TODO: is unwrapping right here?
333338
}
334339

335-
pub async fn unsubscribe_multi(&self, request: UnsubscribeMulti, timer: Instant) -> Result<(), DBError> {
340+
pub async fn unsubscribe_multi(
341+
&self,
342+
request: UnsubscribeMulti,
343+
timer: Instant,
344+
) -> Result<Option<ExecutionMetrics>, DBError> {
336345
let me = self.clone();
337346
tokio::task::spawn_blocking(move || {
338347
me.module
@@ -343,7 +352,7 @@ impl ClientConnection {
343352
.unwrap() // TODO: is unwrapping right here?
344353
}
345354

346-
pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<(), DBError> {
355+
pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<ExecutionMetrics, DBError> {
347356
let me = self.clone();
348357
tokio::task::spawn_blocking(move || {
349358
me.module

crates/core/src/client/message_handlers.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall};
66
use crate::host::{ReducerArgs, ReducerId};
77
use crate::identity::Identity;
88
use crate::messages::websocket::{CallReducer, ClientMessage, OneOffQuery};
9+
use crate::subscription::record_exec_metrics;
910
use crate::worker_metrics::WORKER_METRICS;
1011
use spacetimedb_lib::de::serde::DeserializeWrapper;
1112
use spacetimedb_lib::identity::RequestId;
@@ -86,15 +87,24 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
8687
})
8788
}
8889
ClientMessage::SubscribeMulti(subscription) => {
89-
let res = client.subscribe_multi(subscription, timer).await;
90+
let res = client.subscribe_multi(subscription, timer).await.map(|metrics| {
91+
if let Some(metrics) = metrics {
92+
record_exec_metrics(&WorkloadType::Subscribe, &database_identity, metrics)
93+
}
94+
});
95+
9096
WORKER_METRICS
9197
.request_round_trip
9298
.with_label_values(&WorkloadType::Subscribe, &database_identity, "")
9399
.observe(timer.elapsed().as_secs_f64());
94100
res.map_err(|e| (None, None, e.into()))
95101
}
96102
ClientMessage::UnsubscribeMulti(request) => {
97-
let res = client.unsubscribe_multi(request, timer).await;
103+
let res = client.unsubscribe_multi(request, timer).await.map(|metrics| {
104+
if let Some(metrics) = metrics {
105+
record_exec_metrics(&WorkloadType::Unsubscribe, &database_identity, metrics)
106+
}
107+
});
98108
WORKER_METRICS
99109
.request_round_trip
100110
.with_label_values(&WorkloadType::Unsubscribe, &database_identity, "")
@@ -118,7 +128,10 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
118128
res.map_err(|e| (None, None, e.into()))
119129
}
120130
ClientMessage::Subscribe(subscription) => {
121-
let res = client.subscribe(subscription, timer).await;
131+
let res = client
132+
.subscribe(subscription, timer)
133+
.await
134+
.map(|metrics| record_exec_metrics(&WorkloadType::Subscribe, &database_identity, metrics));
122135
WORKER_METRICS
123136
.request_round_trip
124137
.with_label_values(&WorkloadType::Subscribe, &database_identity, "")

crates/core/src/subscription/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ where
155155
plans
156156
.par_iter()
157157
.flat_map_iter(|plan| plan.plans_fragments().map(|fragment| (plan.sql(), fragment)))
158+
.filter(|(_, plan)| {
159+
// Since subscriptions only support selects and inner joins,
160+
// we filter out any plans that read from an empty table.
161+
plan.table_ids()
162+
.all(|table_id| tx.table(table_id).is_some_and(|t| t.row_count > 0))
163+
})
158164
.map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
159165
.map(|(sql, plan, table_id, table_name)| {
160166
plan.physical_plan()

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 83 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,13 @@ type FullSubscriptionUpdate = FormatSwitch<ws::DatabaseUpdate<BsatnFormat>, ws::
103103

104104
/// A utility for sending an error message to a client and returning early
105105
macro_rules! return_on_err {
106-
($expr:expr, $handler:expr) => {
106+
($expr:expr, $handler:expr, $metrics:expr) => {
107107
match $expr {
108108
Ok(val) => val,
109109
Err(e) => {
110110
// TODO: Handle errors sending messages.
111111
let _ = $handler(e.to_string().into());
112-
return Ok(());
112+
return Ok($metrics);
113113
}
114114
}
115115
};
@@ -398,7 +398,7 @@ impl ModuleSubscriptions {
398398
sender: Arc<ClientConnectionSender>,
399399
request: UnsubscribeMulti,
400400
timer: Instant,
401-
) -> Result<(), DBError> {
401+
) -> Result<Option<ExecutionMetrics>, DBError> {
402402
// Send an error message to the client
403403
let send_err_msg = |message| {
404404
sender.send_message(SubscriptionMessage {
@@ -420,38 +420,23 @@ impl ModuleSubscriptions {
420420
let removed_queries = {
421421
let mut subscriptions = self.subscriptions.write();
422422

423-
match subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id) {
424-
Ok(queries) => queries,
425-
Err(error) => {
426-
// Apparently we ignore errors sending messages.
427-
let _ = send_err_msg(error.to_string().into());
428-
return Ok(());
429-
}
430-
}
431-
};
432-
433-
let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
434-
let eval_result = self.evaluate_queries(
435-
sender.clone(),
436-
&removed_queries,
437-
&tx,
438-
&auth,
439-
TableUpdateType::Unsubscribe,
440-
);
441-
// If execution error, send to client
442-
let (update, metrics) = match eval_result {
443-
Ok(ok) => ok,
444-
Err(e) => {
445-
// Apparently we ignore errors sending messages.
446-
let _ = send_err_msg(e.to_string().into());
447-
return Ok(());
448-
}
423+
return_on_err!(
424+
subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id),
425+
send_err_msg,
426+
None
427+
)
449428
};
450429

451-
record_exec_metrics(
452-
&WorkloadType::Unsubscribe,
453-
&self.relational_db.database_identity(),
454-
metrics,
430+
let (update, metrics) = return_on_err!(
431+
self.evaluate_queries(
432+
sender.clone(),
433+
&removed_queries,
434+
&tx,
435+
&AuthCtx::new(self.owner_identity, sender.id.identity),
436+
TableUpdateType::Unsubscribe,
437+
),
438+
send_err_msg,
439+
None
455440
);
456441

457442
let _ = sender.send_message(SubscriptionMessage {
@@ -460,7 +445,8 @@ impl ModuleSubscriptions {
460445
timer: Some(timer),
461446
result: SubscriptionResult::UnsubscribeMulti(SubscriptionData { data: update }),
462447
});
463-
Ok(())
448+
449+
Ok(Some(metrics))
464450
}
465451

466452
/// Compiles the queries in a [Subscribe] or [SubscribeMulti] message.
@@ -538,7 +524,7 @@ impl ModuleSubscriptions {
538524
request: SubscribeMulti,
539525
timer: Instant,
540526
_assert: Option<AssertTxFn>,
541-
) -> Result<(), DBError> {
527+
) -> Result<Option<ExecutionMetrics>, DBError> {
542528
// Send an error message to the client
543529
let send_err_msg = |message| {
544530
let _ = sender.send_message(SubscriptionMessage {
@@ -555,7 +541,8 @@ impl ModuleSubscriptions {
555541
let num_queries = request.query_strings.len();
556542
let (queries, auth, tx) = return_on_err!(
557543
self.compile_queries(sender.id.identity, request.query_strings, num_queries),
558-
send_err_msg
544+
send_err_msg,
545+
None
559546
);
560547
let tx = scopeguard::guard(tx, |tx| {
561548
self.relational_db.release_tx(tx);
@@ -578,15 +565,9 @@ impl ModuleSubscriptions {
578565
let mut subscriptions = self.subscriptions.write();
579566
subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id)?;
580567
send_err_msg("Internal error evaluating queries".into());
581-
return Ok(());
568+
return Ok(None);
582569
};
583570

584-
record_exec_metrics(
585-
&WorkloadType::Subscribe,
586-
&self.relational_db.database_identity(),
587-
metrics,
588-
);
589-
590571
#[cfg(test)]
591572
if let Some(assert) = _assert {
592573
assert(&tx);
@@ -602,7 +583,8 @@ impl ModuleSubscriptions {
602583
timer: Some(timer),
603584
result: SubscriptionResult::SubscribeMulti(SubscriptionData { data: update }),
604585
});
605-
Ok(())
586+
587+
Ok(Some(metrics))
606588
}
607589

608590
/// Add a subscriber to the module. NOTE: this function is blocking.
@@ -614,7 +596,7 @@ impl ModuleSubscriptions {
614596
subscription: Subscribe,
615597
timer: Instant,
616598
_assert: Option<AssertTxFn>,
617-
) -> Result<(), DBError> {
599+
) -> Result<ExecutionMetrics, DBError> {
618600
let num_queries = subscription.query_strings.len();
619601
let (queries, auth, tx) = self.compile_queries(sender.id.identity, subscription.query_strings, num_queries)?;
620602
let tx = scopeguard::guard(tx, |tx| {
@@ -641,12 +623,6 @@ impl ModuleSubscriptions {
641623
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
642624
};
643625

644-
record_exec_metrics(
645-
&WorkloadType::Subscribe,
646-
&self.relational_db.database_identity(),
647-
metrics,
648-
);
649-
650626
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
651627
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
652628
// but that should not pose an issue.
@@ -667,7 +643,8 @@ impl ModuleSubscriptions {
667643
request_id: Some(subscription.request_id),
668644
timer: Some(timer),
669645
});
670-
Ok(())
646+
647+
Ok(metrics)
671648
}
672649

673650
pub fn remove_subscriber(&self, client_id: ClientActorId) {
@@ -764,6 +741,7 @@ mod tests {
764741
use hashbrown::HashMap;
765742
use itertools::Itertools;
766743
use parking_lot::RwLock;
744+
use pretty_assertions::assert_matches;
767745
use spacetimedb_client_api_messages::energy::EnergyQuanta;
768746
use spacetimedb_client_api_messages::websocket::{
769747
CompressableQueryUpdate, Compression, FormatSwitch, QueryId, Subscribe, SubscribeMulti, SubscribeSingle,
@@ -794,7 +772,8 @@ mod tests {
794772
query_strings: [sql.into()].into(),
795773
request_id: 0,
796774
};
797-
module_subscriptions.add_legacy_subscriber(sender, subscribe, Instant::now(), assert)
775+
module_subscriptions.add_legacy_subscriber(sender, subscribe, Instant::now(), assert)?;
776+
Ok(())
798777
}
799778

800779
/// An in-memory `RelationalDB` for testing
@@ -945,10 +924,12 @@ mod tests {
945924
queries: &[&'static str],
946925
sender: Arc<ClientConnectionSender>,
947926
counter: &mut u32,
948-
) -> anyhow::Result<()> {
927+
) -> anyhow::Result<ExecutionMetrics> {
949928
*counter += 1;
950-
subs.add_multi_subscription(sender, multi_subscribe(queries, *counter), Instant::now(), None)?;
951-
Ok(())
929+
let metrics = subs
930+
.add_multi_subscription(sender, multi_subscribe(queries, *counter), Instant::now(), None)
931+
.map(|metrics| metrics.unwrap_or_default())?;
932+
Ok(metrics)
952933
}
953934

954935
/// Unsubscribe from a single query
@@ -1237,6 +1218,8 @@ mod tests {
12371218
})
12381219
})?;
12391220

1221+
commit_tx(&db, &subs, [], [(table_id, product![0_u8])])?;
1222+
12401223
let mut query_id = 0;
12411224

12421225
// Subscribe to `t`
@@ -1858,6 +1841,50 @@ mod tests {
18581841
Ok(())
18591842
}
18601843

1844+
/// Test that we do not evaluate queries that return trivially empty results
1845+
#[tokio::test]
1846+
async fn test_query_pruning_for_empty_tables() -> anyhow::Result<()> {
1847+
// Establish a client connection
1848+
let (tx, mut rx) = client_connection(client_id_from_u8(1));
1849+
1850+
let db = relational_db()?;
1851+
let subs = module_subscriptions(db.clone());
1852+
1853+
let schema = &[("id", AlgebraicType::U64), ("a", AlgebraicType::U64)];
1854+
let indices = &[0.into()];
1855+
// Create tables `t` and `s` with `(i: u64, a: u64)`.
1856+
db.create_table_for_test("t", schema, indices)?;
1857+
let s_id = db.create_table_for_test("s", schema, indices)?;
1858+
1859+
// Insert one row into `s`, but leave `t` empty.
1860+
commit_tx(&db, &subs, [], [(s_id, product![0u64, 0u64])])?;
1861+
1862+
// Subscribe to queries that return empty results
1863+
let metrics = subscribe_multi(
1864+
&subs,
1865+
&[
1866+
"select t.* from t where a = 0",
1867+
"select t.* from t join s on t.id = s.id where s.a = 0",
1868+
"select s.* from t join s on t.id = s.id where t.a = 0",
1869+
],
1870+
tx,
1871+
&mut 0,
1872+
)?;
1873+
1874+
assert_matches!(
1875+
rx.recv().await,
1876+
Some(SerializableMessage::Subscription(SubscriptionMessage {
1877+
result: SubscriptionResult::SubscribeMulti(_),
1878+
..
1879+
}))
1880+
);
1881+
1882+
assert_eq!(metrics.rows_scanned, 0);
1883+
assert_eq!(metrics.index_seeks, 0);
1884+
1885+
Ok(())
1886+
}
1887+
18611888
/// Asserts that a subscription holds a tx handle for the entire length of its evaluation.
18621889
#[test]
18631890
fn test_tx_subscription_ordering() -> ResultTest<()> {

0 commit comments

Comments
 (0)