Skip to content

Commit 1f0e127

Browse files
Pipeline js module operations (#4962)
# Description of Changes The core motivation for this change is simple: avoid cross-thread handoffs and synchronization on the main execution path. Before this change, the ingress task for each websocket connection would wait for a completion response on each request before submitting the next request to the database. This was mainly used to guarantee that we delivered message responses in receive-order per connection. However it also meant that for every request, we notified a waiting Tokio task, which potentially incurred kernel-assisted wakeup and scheduler overhead. Note this design existed mainly for historical reasons. Before the database had a dedicated job thread, requests were not serialized through a single queue. The module instance was gated behind a semaphore which guaranteed mutual exclusion, but it did not guarantee FIFO ordering. Awaiting the completion of each request in `ws_recv_task` was therefore the mechanism that enforced per-connection receive-order semantics. However it now serves primarily as a source of overhead. Procedures are the important exception. They are not serialized through the main worker queue. Instead they use their own instance pool so as to be able to run concurrently with other requests. However procedures may be composed of multiple transactions and they may effectively yield between transactions. This means that before this change, if a procedure were to yield, it would effectively block all subsequent requests from that client until it returned which is quite undesirable. So with this change, procedures may execute out of order with other operations received on the same WebSocket. Hence if this is not a desirable property, clients must enforce ordering themselves by waiting for a response before submitting the next request. ## What changed? ### 1. Different instance managers for procedures and everything else Procedures use a bounded instance pool where each instance is backed by an isolate running in a thread. Reducers and all other operations are serialized through an mpsc queue that feeds a single isolate running in a thread. Trapped isolates are replaced inline. Only a fatal error within one of the instance threads results in the `ModuleHost` and all its connections being dropped. The host controller will recreate a new `ModuleHost` lazily on the next request. ### 2. New enqueue-only `ModuleHost` interface `ClientConnection` now calls enqueue-only methods on `ModuleHost` which return immediately after enqueuing on the main instance lane or in the case of a procedure, checking out an available instance and starting the operation. ### 3. Separate `ModuleHost` interfaces for scheduled reducers and scheduled procedures Scheduled reducers now target the main js instance/worker, while scheduled procedures go through the pool. The scheduler now distinguishes between reducers and procedures and calls the appropriate method. Note, the scheduler does not pipeline its operations. It waits for each one to complete before scheduling the next operation. This means that a long running procedure will block all other operations from being scheduled. This will need to be fixed at some point, but this patch doesn't change the current behavior. ### 4. Misc This patch also names the main js worker thread for better diagnostics. It also disables core pinning by default and makes it an explicit opt-in. This last one is pretty important. The current architecture reduces thread and context switching significantly such that naive core pinning may perform worse than just deferring to the OS scheduler on certain platforms. As it stands, the main motivation which led us to our original core pinning strategy no longer exists, so we should probably just defer to the OS until we've designed a proper scheduler that suits our needs. # API and ABI breaking changes As mentioned above, with this change, procedures may execute out of order with other operations received on the same WebSocket. Hence if this is not a desirable property, clients must enforce ordering themselves by waiting for a response before submitting the next request. # Expected complexity level and risk 4 # Testing This is mainly a performance oriented refactor, so no additional correctness tests were added. However this patch does touch a lot of code that could probably use more coverage in general. Benchmarks were run to verify expected performance characteristics. --------- Signed-off-by: joshua-spacetime <josh@clockworklabs.io> Co-authored-by: Noa <coolreader18@gmail.com>
1 parent eb0645c commit 1f0e127

27 files changed

Lines changed: 2838 additions & 1800 deletions

Cargo.lock

Lines changed: 0 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@ enum-map = "2.6.3"
194194
env_logger = "0.10"
195195
ethnum = { version = "1.5.0", features = ["serde"] }
196196
flate2 = "1.0.24"
197-
flume = { version = "0.11", default-features = false, features = ["async"] }
198197
foldhash = "0.2.0"
199198
fs-err = "2.9.0"
200199
fs_extra = "1.3.0"

crates/client-api/src/lib.rs

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl Host {
137137
pub async fn exec_sql(
138138
&self,
139139
auth: AuthCtx,
140-
database: Database,
140+
_database: Database,
141141
confirmed_read: bool,
142142
body: String,
143143
) -> axum::response::Result<Vec<SqlStmtResult<ProductValue>>> {
@@ -146,51 +146,44 @@ impl Host {
146146
.await
147147
.map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;
148148

149-
let (tx_offset, durable_offset, json) = self
150-
.host_controller
151-
.using_database(database, self.replica_id, move |db| async move {
152-
tracing::info!(sql = body);
153-
let mut header = vec![];
154-
let sql_start = std::time::Instant::now();
155-
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,);
156-
let _guard = sql_span.enter();
157-
158-
let result = sql::execute::run(
159-
db.clone(),
160-
body,
161-
auth,
162-
Some(module_host.info.subscriptions.clone()),
163-
Some(module_host),
164-
&mut header,
165-
)
166-
.await
167-
.map_err(|e| {
168-
log::warn!("{e}");
169-
(StatusCode::BAD_REQUEST, e.to_string())
170-
})?;
171-
172-
let total_duration = sql_start.elapsed();
173-
drop(_guard);
174-
sql_span.record("total_duration", tracing::field::debug(total_duration));
175-
176-
let schema = header
177-
.into_iter()
178-
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
179-
.collect();
180-
181-
Ok::<_, (StatusCode, String)>((
182-
result.tx_offset,
183-
db.durable_tx_offset(),
184-
vec![SqlStmtResult {
185-
schema,
186-
rows: result.rows,
187-
total_duration_micros: total_duration.as_micros() as u64,
188-
stats: SqlStmtStats::from_metrics(&result.metrics),
189-
}],
190-
))
191-
})
192-
.await
193-
.map_err(log_and_500)??;
149+
tracing::info!(sql = body);
150+
let mut header = vec![];
151+
let sql_start = std::time::Instant::now();
152+
let sql_span = tracing::trace_span!("execute_sql", total_duration = tracing::field::Empty,);
153+
let _guard = sql_span.enter();
154+
let db = module_host.relational_db().clone();
155+
let durable_offset = db.durable_tx_offset();
156+
157+
let result = sql::execute::run(
158+
db,
159+
body,
160+
auth,
161+
Some(module_host.info.subscriptions.clone()),
162+
Some(module_host),
163+
&mut header,
164+
)
165+
.await
166+
.map_err(|e| {
167+
log::warn!("{e}");
168+
(StatusCode::BAD_REQUEST, e.to_string())
169+
})?;
170+
171+
let total_duration = sql_start.elapsed();
172+
drop(_guard);
173+
sql_span.record("total_duration", tracing::field::debug(total_duration));
174+
175+
let schema = header
176+
.into_iter()
177+
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
178+
.collect();
179+
180+
let tx_offset = result.tx_offset;
181+
let json = vec![SqlStmtResult {
182+
schema,
183+
rows: result.rows,
184+
total_duration_micros: total_duration.as_micros() as u64,
185+
stats: SqlStmtStats::from_metrics(&result.metrics),
186+
}];
194187

195188
if confirmed_read && let Some(mut durable_offset) = durable_offset {
196189
let tx_offset = tx_offset.await.map_err(|_| log_and_500("transaction aborted"))?;

crates/core/Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ dirs.workspace = true
5656
enum-as-inner.workspace = true
5757
enum-map.workspace = true
5858
flate2.workspace = true
59-
flume.workspace = true
6059
fs2.workspace = true
6160
futures.workspace = true
6261
futures-util.workspace = true
@@ -147,9 +146,8 @@ spacetimedb-wasm-instance-env-times = []
147146
test = ["spacetimedb-commitlog/test", "spacetimedb-datastore/test"]
148147
# Perfmaps for profiling modules
149148
perfmap = []
150-
# Disables core pinning
151-
no-core-pinning = []
152-
no-job-core-pinning = []
149+
# Enables core pinning.
150+
core-pinning = []
153151

154152
[dev-dependencies]
155153
spacetimedb-lib = { path = "../lib", features = ["proptest", "test"] }

crates/core/src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ pub mod messages;
1212

1313
pub use client_connection::{
1414
ClientConfig, ClientConnection, ClientConnectionReceiver, ClientConnectionSender, ClientSendError, DataMessage,
15-
MeteredDeque, MeteredReceiver, MeteredSender, Protocol, WsVersion,
15+
MeteredDeque, MeteredReceiver, MeteredSender, MeteredUnboundedReceiver, MeteredUnboundedSender, Protocol,
16+
WsVersion,
1617
};
1718
pub use client_connection_index::ClientActorIndex;
1819
pub use message_handlers::MessageHandleError;

0 commit comments

Comments
 (0)