Skip to content

Commit 4bc618f

Browse files
authored
Return the query text with subscription errors (#2609)
1 parent f69113f commit 4bc618f

4 files changed

Lines changed: 87 additions & 73 deletions

File tree

crates/core/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,12 @@ pub enum DBError {
230230
Other(#[from] anyhow::Error),
231231
#[error(transparent)]
232232
TypeError(#[from] TypingError),
233+
#[error("{error}, executing: `{sql}`")]
234+
WithSql {
235+
#[source]
236+
error: Box<DBError>,
237+
sql: Box<str>,
238+
},
233239
}
234240

235241
impl From<bflatn_to::Error> for DBError {

crates/core/src/subscription/mod.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
1010
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
1111
use spacetimedb_primitives::TableId;
1212

13+
use crate::error::DBError;
1314
use crate::{db::db_metrics::DB_METRICS, execution_context::WorkloadType, worker_metrics::WORKER_METRICS};
1415

1516
pub mod delta;
@@ -133,23 +134,27 @@ pub fn execute_plans<Tx, F>(
133134
comp: Compression,
134135
tx: &Tx,
135136
update_type: TableUpdateType,
136-
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics)>
137+
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
137138
where
138139
Tx: Datastore + DeltaStore + Sync,
139140
F: WebsocketFormat,
140141
{
141142
plans
142143
.par_iter()
143-
.flat_map_iter(|plan| plan.plans_fragments())
144-
.map(|plan| (plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
145-
.map(|(plan, table_id, table_name)| {
144+
.flat_map_iter(|plan| plan.plans_fragments().map(|fragment| (plan.sql(), fragment)))
145+
.map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
146+
.map(|(sql, plan, table_id, table_name)| {
146147
plan.physical_plan()
147148
.clone()
148149
.optimize()
149-
.map(PipelinedProject::from)
150-
.and_then(|plan| collect_table_update(&[plan], table_id, table_name.into(), comp, tx, update_type))
150+
.map(|plan| (sql, PipelinedProject::from(plan)))
151+
.and_then(|(_, plan)| collect_table_update(&[plan], table_id, table_name.into(), comp, tx, update_type))
152+
.map_err(|err| DBError::WithSql {
153+
sql: sql.into(),
154+
error: Box::new(DBError::Other(err)),
155+
})
151156
})
152-
.collect::<Result<Vec<_>>>()
157+
.collect::<Result<Vec<_>, _>>()
153158
.map(|table_updates_with_metrics| {
154159
let n = table_updates_with_metrics.len();
155160
let mut tables = Vec::with_capacity(n);

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 56 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use spacetimedb_client_api_messages::websocket::{
2828
};
2929
use spacetimedb_execution::pipelined::PipelinedProject;
3030
use spacetimedb_expr::check::parse_and_type_sub;
31+
use spacetimedb_expr::errors::TypingError;
3132
use spacetimedb_lib::identity::AuthCtx;
3233
use spacetimedb_lib::metrics::ExecutionMetrics;
3334
use spacetimedb_lib::Identity;
@@ -104,8 +105,11 @@ type FullSubscriptionUpdate = FormatSwitch<ws::DatabaseUpdate<BsatnFormat>, ws::
104105

105106
/// A utility for sending an error message to a client and returning early
106107
macro_rules! return_on_err {
107-
($expr:expr, $handler:expr) => {
108-
match $expr {
108+
($expr:expr, $sql:expr, $handler:expr) => {
109+
match $expr.map_err(|err| DBError::WithSql {
110+
sql: $sql.into(),
111+
error: Box::new(DBError::Other(err.into())),
112+
}) {
109113
Ok(val) => val,
110114
Err(e) => {
111115
// TODO: Handle errors sending messages.
@@ -117,9 +121,8 @@ macro_rules! return_on_err {
117121
}
118122

119123
/// Hash a sql query, using the caller's identity if necessary
120-
fn hash_query(sql: &str, tx: &TxId, auth: &AuthCtx) -> Result<QueryHash, DBError> {
124+
fn hash_query(sql: &str, tx: &TxId, auth: &AuthCtx) -> Result<QueryHash, TypingError> {
121125
parse_and_type_sub(sql, &SchemaViewer::new(tx, auth), auth)
122-
.map_err(DBError::from)
123126
.map(|(_, has_param)| QueryHash::from_string(sql, auth.caller, has_param))
124127
}
125128

@@ -185,10 +188,10 @@ impl ModuleSubscriptions {
185188

186189
Ok(match sender.config.protocol {
187190
Protocol::Binary => collect_table_update(&plans, table_id, table_name.into(), comp, &tx, update_type)
188-
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
191+
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)),
189192
Protocol::Text => collect_table_update(&plans, table_id, table_name.into(), comp, &tx, update_type)
190-
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
191-
})
193+
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)),
194+
}?)
192195
}
193196

194197
fn evaluate_queries(
@@ -254,7 +257,7 @@ impl ModuleSubscriptions {
254257
let query = super::query::WHITESPACE.replace_all(&request.query, " ");
255258
let sql = query.trim();
256259

257-
let hash = return_on_err!(hash_query(sql, &tx, &auth), send_err_msg);
260+
let hash = return_on_err!(hash_query(sql, &tx, &auth), sql, send_err_msg);
258261

259262
let existing_query = {
260263
let guard = self.subscriptions.read();
@@ -265,11 +268,13 @@ impl ModuleSubscriptions {
265268
existing_query
266269
.map(Ok)
267270
.unwrap_or_else(|| compile_read_only_query(&auth, &tx, sql).map(Arc::new)),
271+
sql,
268272
send_err_msg
269273
);
270274

271275
let (table_rows, metrics) = return_on_err!(
272276
self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Subscribe),
277+
query.sql(),
273278
send_err_msg
274279
);
275280

@@ -353,18 +358,11 @@ impl ModuleSubscriptions {
353358
self.relational_db.release_tx(tx);
354359
});
355360
let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
356-
let eval_result =
357-
self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe);
358-
359-
// If execution error, send to client
360-
let (table_rows, metrics) = match eval_result {
361-
Ok(ok) => ok,
362-
Err(e) => {
363-
// Apparently we ignore errors sending messages.
364-
let _ = send_err_msg(e.to_string().into());
365-
return Ok(());
366-
}
367-
};
361+
let (table_rows, metrics) = return_on_err!(
362+
self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe),
363+
query.sql(),
364+
send_err_msg
365+
);
368366

369367
record_exec_metrics(
370368
&WorkloadType::Subscribe,
@@ -499,12 +497,12 @@ impl ModuleSubscriptions {
499497
continue;
500498
}
501499

502-
let hash = return_on_err!(hash_query(sql, &tx, &auth), send_err_msg);
500+
let hash = return_on_err!(hash_query(sql, &tx, &auth), sql, send_err_msg);
503501

504502
if let Some(unit) = guard.query(&hash) {
505503
queries.push(unit);
506504
} else {
507-
let compiled = return_on_err!(compile_read_only_query(&auth, &tx, sql), send_err_msg);
505+
let compiled = return_on_err!(compile_read_only_query(&auth, &tx, sql), sql, send_err_msg);
508506
queries.push(Arc::new(compiled));
509507
}
510508
}
@@ -721,7 +719,7 @@ pub struct WriteConflict;
721719
mod tests {
722720
use super::{AssertTxFn, ModuleSubscriptions};
723721
use crate::client::messages::{
724-
SerializableMessage, SubscriptionMessage, SubscriptionResult, SubscriptionUpdateMessage,
722+
SerializableMessage, SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionUpdateMessage,
725723
TransactionUpdateMessage,
726724
};
727725
use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName, Protocol};
@@ -1075,6 +1073,21 @@ mod tests {
10751073
Ok(())
10761074
}
10771075

1076+
fn check_subscription_err(sql: &str, result: Option<SerializableMessage>) {
1077+
if let Some(SerializableMessage::Subscription(SubscriptionMessage {
1078+
result: SubscriptionResult::Error(SubscriptionError { message, .. }),
1079+
..
1080+
})) = result
1081+
{
1082+
assert!(
1083+
message.contains(sql),
1084+
"Expected error message to contain the SQL query: {sql}, but got: {message}",
1085+
);
1086+
return;
1087+
}
1088+
panic!("Expected a subscription error message, but got: {:?}", result);
1089+
}
1090+
10781091
/// Test that clients receive error messages on subscribe
10791092
#[tokio::test]
10801093
async fn subscribe_single_error() -> anyhow::Result<()> {
@@ -1087,15 +1100,11 @@ mod tests {
10871100
db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;
10881101

10891102
// Subscribe to an invalid query (r is not in scope)
1090-
subscribe_single(&subs, "select r.* from t", tx, &mut 0)?;
1103+
let sql = "select r.* from t";
1104+
subscribe_single(&subs, sql, tx, &mut 0)?;
1105+
1106+
check_subscription_err(sql, rx.recv().await);
10911107

1092-
assert!(matches!(
1093-
rx.recv().await,
1094-
Some(SerializableMessage::Subscription(SubscriptionMessage {
1095-
result: SubscriptionResult::Error(..),
1096-
..
1097-
}))
1098-
));
10991108
Ok(())
11001109
}
11011110

@@ -1111,15 +1120,11 @@ mod tests {
11111120
db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?;
11121121

11131122
// Subscribe to an invalid query (r is not in scope)
1114-
subscribe_multi(&subs, &["select r.* from t"], tx, &mut 0)?;
1123+
let sql = "select r.* from t";
1124+
subscribe_multi(&subs, &[sql], tx, &mut 0)?;
1125+
1126+
check_subscription_err(sql, rx.recv().await);
11151127

1116-
assert!(matches!(
1117-
rx.recv().await,
1118-
Some(SerializableMessage::Subscription(SubscriptionMessage {
1119-
result: SubscriptionResult::Error(..),
1120-
..
1121-
}))
1122-
));
11231128
Ok(())
11241129
}
11251130

@@ -1147,7 +1152,8 @@ mod tests {
11471152
let mut query_id = 0;
11481153

11491154
// Subscribe to `t`
1150-
subscribe_single(&subs, "select * from t where id = 1", tx.clone(), &mut query_id)?;
1155+
let sql = "select * from t where id = 1";
1156+
subscribe_single(&subs, sql, tx.clone(), &mut query_id)?;
11511157

11521158
// The initial subscription should succeed
11531159
assert!(matches!(
@@ -1169,13 +1175,8 @@ mod tests {
11691175
// Specifically that we do not recompile queries on unsubscribe.
11701176
// We execute the cached plan which in this case is an index scan.
11711177
// The index no longer exists, and therefore it fails.
1172-
assert!(matches!(
1173-
rx.recv().await,
1174-
Some(SerializableMessage::Subscription(SubscriptionMessage {
1175-
result: SubscriptionResult::Error(..),
1176-
..
1177-
}))
1178-
));
1178+
check_subscription_err(sql, rx.recv().await);
1179+
11791180
Ok(())
11801181
}
11811182

@@ -1203,7 +1204,8 @@ mod tests {
12031204
let mut query_id = 0;
12041205

12051206
// Subscribe to `t`
1206-
subscribe_multi(&subs, &["select * from t where id = 1"], tx.clone(), &mut query_id)?;
1207+
let sql = "select * from t where id = 1";
1208+
subscribe_multi(&subs, &[sql], tx.clone(), &mut query_id)?;
12071209

12081210
// The initial subscription should succeed
12091211
assert!(matches!(
@@ -1225,13 +1227,8 @@ mod tests {
12251227
// Specifically that we do not recompile queries on unsubscribe.
12261228
// We execute the cached plan which in this case is an index scan.
12271229
// The index no longer exists, and therefore it fails.
1228-
assert!(matches!(
1229-
rx.recv().await,
1230-
Some(SerializableMessage::Subscription(SubscriptionMessage {
1231-
result: SubscriptionResult::Error(..),
1232-
..
1233-
}))
1234-
));
1230+
check_subscription_err(sql, rx.recv().await);
1231+
12351232
Ok(())
12361233
}
12371234

@@ -1256,8 +1253,8 @@ mod tests {
12561253
.unwrap()
12571254
})
12581255
})?;
1259-
1260-
subscribe_single(&subs, "select t.* from t join s on t.id = s.id", tx, &mut 0)?;
1256+
let sql = "select t.* from t join s on t.id = s.id";
1257+
subscribe_single(&subs, sql, tx, &mut 0)?;
12611258

12621259
// The initial subscription should succeed
12631260
assert!(matches!(
@@ -1285,13 +1282,8 @@ mod tests {
12851282
// Specifically, plans are cached on the initial subscribe.
12861283
// Hence we execute a cached plan which happens to be an index join.
12871284
// We've removed the index on `s`, and therefore it fails.
1288-
assert!(matches!(
1289-
rx.recv().await,
1290-
Some(SerializableMessage::Subscription(SubscriptionMessage {
1291-
result: SubscriptionResult::Error(..),
1292-
..
1293-
}))
1294-
));
1285+
check_subscription_err(sql, rx.recv().await);
1286+
12951287
Ok(())
12961288
}
12971289

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ impl Plan {
8484
pub fn plans_fragments(&self) -> impl Iterator<Item = &SubscriptionPlan> + '_ {
8585
self.plans.iter()
8686
}
87+
88+
/// The `SQL` text of this subscription.
89+
pub fn sql(&self) -> &str {
90+
&self.sql
91+
}
8792
}
8893

8994
/// For each client, we hold a handle for sending messages, and we track the queries they are subscribed to.
@@ -634,8 +639,14 @@ impl SubscriptionManager {
634639
sql = qstate.query.sql,
635640
reason = ?err,
636641
);
637-
acc.errs
638-
.extend(clients_for_query.map(|id| (id, err.to_string().into_boxed_str())))
642+
let err = DBError::WithSql {
643+
sql: qstate.query.sql.as_str().into(),
644+
error: Box::new(err.into()),
645+
}
646+
.to_string()
647+
.into_boxed_str();
648+
649+
acc.errs.extend(clients_for_query.map(|id| (id, err.clone())))
639650
}
640651
// The query didn't return any rows to update
641652
Ok(None) => {}

0 commit comments

Comments
 (0)