Skip to content

Commit 682f8fc

Browse files
committed
Merge remote-tracking branch 'origin/master' into bfops/discord-post
2 parents 551d2b4 + 5113488 commit 682f8fc

7 files changed

Lines changed: 252 additions & 35 deletions

File tree

crates/core/src/client/client_connection.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,11 @@ impl ClientConnectionSender {
392392
confirmed_reads = self.config.confirmed_reads,
393393
"client channel capacity exceeded"
394394
);
395+
log::warn!(
396+
"Client {:?} exceeded channel capacity of {}, kicking",
397+
self.id,
398+
self.sendtx.capacity(),
399+
);
395400
self.abort_handle.abort();
396401
self.cancelled.store(true, Ordering::Relaxed);
397402
return Err(ClientSendError::Cancelled);

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use spacetimedb_lib::buffer::DecodeError;
3737
use spacetimedb_lib::db::raw_def::v9::Lifecycle;
3838
use spacetimedb_lib::de::DeserializeSeed;
3939
use spacetimedb_lib::identity::AuthCtx;
40-
use spacetimedb_lib::{bsatn, ConnectionId, RawModuleDef, Timestamp};
40+
use spacetimedb_lib::{bsatn, ConnectionId, Hash, RawModuleDef, Timestamp};
4141
use spacetimedb_primitives::{ProcedureId, TableId, ViewFnPtr, ViewId};
4242
use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError};
4343
use spacetimedb_schema::def::{ModuleDef, ViewDef};
@@ -745,7 +745,13 @@ impl InstanceCommon {
745745
self.handle_outer_error(&result.stats.energy, reducer_name)
746746
}
747747
Err(ExecutionError::User(err)) => {
748-
log_reducer_error(inst.replica_ctx(), timestamp, reducer_name, &err);
748+
log_reducer_error(
749+
inst.replica_ctx(),
750+
timestamp,
751+
reducer_name,
752+
&err,
753+
&self.info.module_hash,
754+
);
749755
EventStatus::Failed(err.into())
750756
}
751757
// We haven't actually committed yet - `commit_and_broadcast_event` will commit
@@ -763,7 +769,13 @@ impl InstanceCommon {
763769
Ok(()) => EventStatus::Committed(DatabaseUpdate::default()),
764770
Err(err) => {
765771
let err = err.to_string();
766-
log_reducer_error(inst.replica_ctx(), timestamp, reducer_name, &err);
772+
log_reducer_error(
773+
inst.replica_ctx(),
774+
timestamp,
775+
reducer_name,
776+
&err,
777+
&self.info.module_hash,
778+
);
767779
EventStatus::Failed(err)
768780
}
769781
}
@@ -1161,9 +1173,20 @@ fn maybe_log_long_running_function(reducer_name: &str, total_duration: Duration)
11611173
}
11621174

11631175
/// Logs an error `message` for `reducer` at `timestamp` into `replica_ctx`.
1164-
fn log_reducer_error(replica_ctx: &ReplicaContext, timestamp: Timestamp, reducer: &str, message: &str) {
1176+
fn log_reducer_error(
1177+
replica_ctx: &ReplicaContext,
1178+
timestamp: Timestamp,
1179+
reducer: &str,
1180+
message: &str,
1181+
module_hash: &Hash,
1182+
) {
11651183
use database_logger::Record;
11661184

1185+
WORKER_METRICS
1186+
.sender_errors
1187+
.with_label_values(&replica_ctx.database_identity, module_hash, reducer)
1188+
.inc();
1189+
11671190
log::info!("reducer returned error: {message}");
11681191

11691192
let record = Record {
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use spacetimedb_physical_plan::plan::PhysicalPlan;
2+
use spacetimedb_schema::schema::TableSchema;
3+
use std::sync::Arc;
4+
5+
/// Scan strategy types for subscription queries
6+
#[derive(Debug, Clone, Copy)]
7+
enum ScanStrategy {
8+
/// Full table scan - no indexes used
9+
Sequential,
10+
/// Uses index but requires post-filtering on non-indexed columns
11+
IndexedWithFilter,
12+
/// Fully indexed - no post-filtering needed
13+
FullyIndexed,
14+
/// Mixed strategy (combination of index and table scans)
15+
Mixed,
16+
/// Unknown/other strategy
17+
Unknown,
18+
}
19+
20+
/// Metrics data for a single subscription query execution
21+
#[derive(Debug)]
22+
pub struct QueryMetrics {
23+
pub scan_type: String,
24+
pub table_name: String,
25+
pub unindexed_columns: String,
26+
pub rows_scanned: u64,
27+
pub execution_time_micros: u64,
28+
}
29+
30+
impl std::fmt::Display for ScanStrategy {
31+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32+
match self {
33+
Self::Sequential => write!(f, "sequential"),
34+
Self::IndexedWithFilter => write!(f, "indexed_with_filter"),
35+
Self::FullyIndexed => write!(f, "fully_indexed"),
36+
Self::Mixed => write!(f, "mixed"),
37+
Self::Unknown => write!(f, "unknown"),
38+
}
39+
}
40+
}
41+
42+
/// Recursively extracts column names from filter expressions
43+
fn extract_columns(
44+
expr: &spacetimedb_physical_plan::plan::PhysicalExpr,
45+
schema: Option<&Arc<TableSchema>>,
46+
columns: &mut Vec<String>,
47+
) {
48+
use spacetimedb_physical_plan::plan::PhysicalExpr;
49+
50+
match expr {
51+
PhysicalExpr::Field(tuple_field) => {
52+
let col_name = schema
53+
.and_then(|s| s.columns.get(tuple_field.field_pos))
54+
.map(|col| col.col_name.to_string())
55+
.unwrap_or_else(|| format!("col_{}", tuple_field.field_pos));
56+
columns.push(col_name);
57+
}
58+
PhysicalExpr::BinOp(_, lhs, rhs) => {
59+
extract_columns(lhs, schema, columns);
60+
extract_columns(rhs, schema, columns);
61+
}
62+
PhysicalExpr::LogOp(_, exprs) => {
63+
for expr in exprs {
64+
extract_columns(expr, schema, columns);
65+
}
66+
}
67+
PhysicalExpr::Value(_) => {}
68+
}
69+
}
70+
71+
/// Analyzes subscription scan strategy and creates QueryMetrics
72+
pub fn get_query_metrics(
73+
table_name: &str,
74+
plan: &PhysicalPlan,
75+
rows_scanned: u64,
76+
execution_time_micros: u64,
77+
) -> QueryMetrics {
78+
let has_table_scan = plan.any(&|p| matches!(p, PhysicalPlan::TableScan(..)));
79+
let has_index_scan = plan.any(&|p| matches!(p, PhysicalPlan::IxScan(..)));
80+
let has_post_filter = plan.any(&|p| matches!(p, PhysicalPlan::Filter(..)));
81+
82+
let strategy = if has_table_scan && has_index_scan {
83+
ScanStrategy::Mixed
84+
} else if has_table_scan {
85+
ScanStrategy::Sequential
86+
} else if has_index_scan && has_post_filter {
87+
ScanStrategy::IndexedWithFilter
88+
} else if has_index_scan {
89+
ScanStrategy::FullyIndexed
90+
} else {
91+
ScanStrategy::Unknown
92+
};
93+
94+
// Extract the schema from the plan
95+
let mut schema: Option<Arc<TableSchema>> = None;
96+
plan.visit(&mut |p| match p {
97+
PhysicalPlan::TableScan(scan, _) => {
98+
schema = Some(scan.schema.clone());
99+
}
100+
PhysicalPlan::IxScan(scan, _) => {
101+
schema = Some(scan.schema.clone());
102+
}
103+
_ => {}
104+
});
105+
106+
let mut columns = Vec::new();
107+
plan.visit(&mut |p| {
108+
if let PhysicalPlan::Filter(_, expr) = p {
109+
extract_columns(expr, schema.as_ref(), &mut columns);
110+
}
111+
});
112+
113+
QueryMetrics {
114+
scan_type: strategy.to_string(),
115+
table_name: table_name.to_string(),
116+
unindexed_columns: columns.join(","),
117+
rows_scanned,
118+
execution_time_micros,
119+
}
120+
}

crates/core/src/subscription/mod.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _};
22
use crate::{error::DBError, worker_metrics::WORKER_METRICS};
33
use anyhow::Result;
4+
use metrics::QueryMetrics;
45
use module_subscription_manager::Plan;
56
use prometheus::IntCounter;
67
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
@@ -19,6 +20,7 @@ use std::sync::Arc;
1920

2021
pub mod delta;
2122
pub mod execution_unit;
23+
pub mod metrics;
2224
pub mod module_subscription_actor;
2325
pub mod module_subscription_manager;
2426
pub mod query;
@@ -239,7 +241,7 @@ pub fn execute_plans<Tx, F>(
239241
plans: &[Arc<Plan>],
240242
tx: &Tx,
241243
update_type: TableUpdateType,
242-
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
244+
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics, Vec<QueryMetrics>), DBError>
243245
where
244246
Tx: Datastore + DeltaStore + Sync,
245247
F: BuildableWebsocketFormat,
@@ -257,21 +259,33 @@ where
257259
.map(|(sql, plan, table_id, table_name)| (sql, plan.optimize(auth), table_id, table_name))
258260
.map(|(sql, plan, table_id, table_name)| {
259261
plan.and_then(|plan| {
260-
if plan.returns_view_table() {
262+
let start_time = std::time::Instant::now();
263+
264+
let result = if plan.returns_view_table() {
261265
if let Some(schema) = plan.return_table() {
262-
let plan = PipelinedProject::from(plan);
263-
let plan = ViewProject::new(plan, schema.num_cols(), schema.num_private_cols());
264-
return collect_table_update_for_view(
265-
&[plan],
266-
table_id,
267-
(&**table_name).into(),
268-
tx,
269-
update_type,
270-
);
266+
let pipelined_plan = PipelinedProject::from(plan.clone());
267+
let view_plan = ViewProject::new(pipelined_plan, schema.num_cols(), schema.num_private_cols());
268+
collect_table_update_for_view(&[view_plan], table_id, (&**table_name).into(), tx, update_type)?
269+
} else {
270+
let pipelined_plan = PipelinedProject::from(plan.clone());
271+
collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)?
271272
}
272-
}
273-
let plan = PipelinedProject::from(plan);
274-
collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type)
273+
} else {
274+
let pipelined_plan = PipelinedProject::from(plan.clone());
275+
collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)?
276+
};
277+
278+
let elapsed = start_time.elapsed();
279+
280+
let (ref _table_update, ref metrics) = result;
281+
let query_metrics = metrics::get_query_metrics(
282+
table_name,
283+
&plan,
284+
metrics.rows_scanned as u64,
285+
elapsed.as_micros() as u64,
286+
);
287+
288+
Ok((result.0, result.1, Some(query_metrics)))
275289
})
276290
.map_err(|err| DBError::WithSql {
277291
sql: sql.into(),
@@ -283,10 +297,15 @@ where
283297
let n = table_updates_with_metrics.len();
284298
let mut tables = Vec::with_capacity(n);
285299
let mut aggregated_metrics = ExecutionMetrics::default();
286-
for (update, metrics) in table_updates_with_metrics {
300+
let mut query_metrics_vec = Vec::new();
301+
302+
for (update, metrics, query_metrics) in table_updates_with_metrics {
287303
tables.push(update);
288304
aggregated_metrics.merge(metrics);
305+
if let Some(qm) = query_metrics {
306+
query_metrics_vec.push(qm);
307+
}
289308
}
290-
(DatabaseUpdate { tables }, aggregated_metrics)
309+
(DatabaseUpdate { tables }, aggregated_metrics, query_metrics_vec)
291310
})
292311
}

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::execution_unit::QueryHash;
2+
use super::metrics::QueryMetrics;
23
use super::module_subscription_manager::{
34
from_tx_offset, spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats,
45
SubscriptionManager, TransactionOffset,
@@ -132,6 +133,24 @@ impl SubscriptionMetrics {
132133
}
133134
}
134135

136+
/// Records subscription query metrics
137+
fn record_query_metrics(database_identity: &Identity, query_metrics: Vec<QueryMetrics>) {
138+
for qm in query_metrics {
139+
WORKER_METRICS
140+
.subscription_rows_examined
141+
.with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns)
142+
.observe(qm.rows_scanned as f64);
143+
WORKER_METRICS
144+
.subscription_query_execution_time_micros
145+
.with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns)
146+
.observe(qm.execution_time_micros as f64);
147+
WORKER_METRICS
148+
.subscription_queries_total
149+
.with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns)
150+
.inc();
151+
}
152+
}
153+
135154
/// Inner result type of [`ModuleSubscriptions::commit_and_broadcast_event`].
136155
pub type CommitAndBroadcastEventResult = Result<CommitAndBroadcastEventSuccess, WriteConflict>;
137156

@@ -345,17 +364,22 @@ impl ModuleSubscriptions {
345364
auth,
346365
)?;
347366

367+
let database_identity = self.relational_db.database_identity();
348368
let tx = DeltaTx::from(tx);
349-
match sender.config.protocol {
369+
let (update, metrics, query_metrics) = match sender.config.protocol {
350370
Protocol::Binary => {
351-
let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?;
352-
Ok((FormatSwitch::Bsatn(update), metrics))
371+
let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?;
372+
(FormatSwitch::Bsatn(update), metrics, query_metrics)
353373
}
354374
Protocol::Text => {
355-
let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?;
356-
Ok((FormatSwitch::Json(update), metrics))
375+
let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?;
376+
(FormatSwitch::Json(update), metrics, query_metrics)
357377
}
358-
}
378+
};
379+
380+
record_query_metrics(&database_identity, query_metrics);
381+
382+
Ok((update, metrics))
359383
}
360384

361385
/// Add a subscription to a single query.
@@ -896,13 +920,17 @@ impl ModuleSubscriptions {
896920
drop(compile_timer);
897921

898922
let tx = DeltaTx::from(&*tx);
899-
let (database_update, metrics) = match sender.config.protocol {
900-
Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe)
901-
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
902-
Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe)
903-
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
923+
let (database_update, metrics, query_metrics) = match sender.config.protocol {
924+
Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map(
925+
|(table_update, metrics, query_metrics)| (FormatSwitch::Bsatn(table_update), metrics, query_metrics),
926+
)?,
927+
Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map(
928+
|(table_update, metrics, query_metrics)| (FormatSwitch::Json(table_update), metrics, query_metrics),
929+
)?,
904930
};
905931

932+
record_query_metrics(&database_identity, query_metrics);
933+
906934
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
907935
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
908936
// but that should not pose an issue.

0 commit comments

Comments
 (0)