Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ jobs:

wasm-bindgen --version

- name: Check engine simulation build
run: cargo check -p spacetimedb-engine --no-default-features --features simulation

# Source emsdk environment to make emcc (Emscripten compiler) available in PATH.
- name: Run tests
run: |
Expand Down
43 changes: 43 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"crates/data-structures",
"crates/datastore",
"crates/durability",
"crates/engine",
"crates/execution",
"crates/expr",
"crates/guard",
Expand Down Expand Up @@ -132,6 +133,7 @@ spacetimedb-core = { path = "crates/core", version = "=2.4.1" }
spacetimedb-data-structures = { path = "crates/data-structures", version = "=2.4.1" }
spacetimedb-datastore = { path = "crates/datastore", version = "=2.4.1" }
spacetimedb-durability = { path = "crates/durability", version = "=2.4.1" }
spacetimedb-engine = { path = "crates/engine", version = "=2.4.1" }
spacetimedb-execution = { path = "crates/execution", version = "=2.4.1" }
spacetimedb-expr = { path = "crates/expr", version = "=2.4.1" }
spacetimedb-guard = { path = "crates/guard", version = "=2.4.1" }
Expand All @@ -143,6 +145,7 @@ spacetimedb-pg = { path = "crates/pg", version = "=2.4.1" }
spacetimedb-physical-plan = { path = "crates/physical-plan", version = "=2.4.1" }
spacetimedb-primitives = { path = "crates/primitives", version = "=2.4.1" }
spacetimedb-query = { path = "crates/query", version = "=2.4.1" }
spacetimedb-runtime = { path = "crates/runtime", version = "=2.4.1" }
spacetimedb-sats = { path = "crates/sats", version = "=2.4.1" }
spacetimedb-schema = { path = "crates/schema", version = "=2.4.1" }
spacetimedb-standalone = { path = "crates/standalone", version = "=2.4.1" }
Expand All @@ -152,7 +155,6 @@ spacetimedb-fs-utils = { path = "crates/fs-utils", version = "=2.4.1" }
spacetimedb-snapshot = { path = "crates/snapshot", version = "=2.4.1" }
spacetimedb-subscription = { path = "crates/subscription", version = "=2.4.1" }
spacetimedb-query-builder = { path = "crates/query-builder", version = "=2.4.1" }
spacetimedb-runtime = { path = "crates/runtime", version = "=2.4.1" }

# Prevent `ahash` from pulling in `getrandom` by disabling default features.
# Modules use `getrandom02` and we need to prevent an incompatible version
Expand Down
5 changes: 3 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ spacetimedb-client-api-messages.workspace = true
spacetimedb-commitlog.workspace = true
spacetimedb-datastore.workspace = true
spacetimedb-durability.workspace = true
spacetimedb-engine.workspace = true
spacetimedb-memory-usage.workspace = true
spacetimedb-metrics.workspace = true
spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-physical-plan.workspace = true
spacetimedb-query.workspace = true
spacetimedb-runtime = { workspace = true, features = ["tokio"] }
spacetimedb-runtime.workspace = true
spacetimedb-sats = { workspace = true, features = ["serde"] }
spacetimedb-schema.workspace = true
spacetimedb-table.workspace = true
Expand Down Expand Up @@ -144,7 +145,7 @@ allow_loopback_http_for_tests = []
# Enable timing for wasm ABI calls
spacetimedb-wasm-instance-env-times = []
# Enable test helpers and utils
test = ["spacetimedb-commitlog/test", "spacetimedb-datastore/test"]
test = ["spacetimedb-commitlog/test", "spacetimedb-datastore/test", "spacetimedb-engine/test"]
# Perfmaps for profiling modules
perfmap = []
# Enables core pinning.
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/database_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,12 @@ impl SystemLogger {
}
}

impl spacetimedb_engine::update::UpdateLogger for SystemLogger {
fn info(&self, msg: &str) {
self.info(msg);
}
}

#[cfg(test)]
mod tests {
use std::{ops::Range, sync::Arc};
Expand Down
136 changes: 17 additions & 119 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use std::sync::Arc;
pub mod persistence {
pub use spacetimedb_engine::persistence::*;
}

use enum_map::EnumMap;
use spacetimedb_schema::reducer_name::ReducerName;
use tokio::sync::mpsc;
use tokio::time::MissedTickBehavior;
pub mod relational_db {
pub use spacetimedb_engine::relational_db::*;
}

use crate::subscription::ExecutionCounters;
use spacetimedb_datastore::execution_context::WorkloadType;
use spacetimedb_datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData};
pub mod snapshot {
pub use spacetimedb_engine::snapshot::*;
}

mod durability;
pub mod persistence;
pub mod relational_db;
pub mod snapshot;
pub mod update;
pub mod update {
pub use spacetimedb_engine::update::*;
}

/// Whether SpacetimeDB is run in memory, or persists objects and
/// a message log to disk.
Expand All @@ -35,111 +34,10 @@ pub struct Config {
pub page_pool_max_size: Option<usize>,
}

/// A message that is processed by the [`spawn_metrics_recorder`] actor.
/// We use a separate task to record metrics to avoid blocking transactions.
pub struct MetricsMessage {
/// The reducer the produced these metrics.
reducer: Option<ReducerName>,
/// Metrics from a mutable transaction.
metrics_for_writer: Option<TxMetrics>,
/// Metrics from a read-only transaction.
/// A message may have metrics for both types of transactions,
/// because metrics for a reducer and its subscription updates are recorded together.
metrics_for_reader: Option<TxMetrics>,
/// The row updates for an immutable transaction.
/// Needed for insert and delete counters.
tx_data: Option<Arc<TxData>>,
/// Cached metrics counters for each workload type.
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
}

/// The handle used to send work to the tx metrics recorder.
#[derive(Clone)]
pub struct MetricsRecorderQueue {
tx: mpsc::UnboundedSender<MetricsMessage>,
}

impl MetricsRecorderQueue {
pub fn send_metrics(
&self,
reducer: Option<ReducerName>,
metrics_for_writer: Option<TxMetrics>,
metrics_for_reader: Option<TxMetrics>,
tx_data: Option<Arc<TxData>>,
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
) {
if let Err(err) = self.tx.send(MetricsMessage {
reducer,
metrics_for_writer,
metrics_for_reader,
tx_data,
counters,
}) {
log::warn!("failed to send metrics: {err}");
}
}
}

fn record_metrics(
MetricsMessage {
reducer,
metrics_for_writer,
metrics_for_reader,
tx_data,
counters,
}: MetricsMessage,
) {
if let Some(tx_metrics) = metrics_for_writer {
tx_metrics.report(
// If row updates are present,
// they will always belong to the writer transaction.
tx_data.as_deref(),
reducer.as_ref(),
|wl| &counters[wl],
);
}
if let Some(tx_metrics) = metrics_for_reader {
tx_metrics.report(
// If row updates are present,
// they will never belong to the reader transaction.
// Passing row updates here will most likely panic.
None,
reducer.as_ref(),
|wl| &counters[wl],
);
}
}

/// The metrics recorder is a side channel that the main database thread forwards metrics to.
/// While we want to avoid unnecessary compute on the critical path, communicating with other
/// threads is not free, and for this case in particular waking a parked task is not free.
///
/// Previously, each tx would send its metrics to the recorder task. As soon as the recorder
/// task `recv`d a message, it would update the counters and gauges, and immediately wait for
/// the next tx's message. This meant that the tx would need to be more expensive than the
/// recording of its metrics in order for the recorder task not to be parked on `recv` when
/// the tx would `send` its metrics. This would obviously never be the case, and so each `send`
/// would incur the overhead of waking the task.
///
/// To mitigate this, we now record metrics, for potentially many transactions, periodically
/// every 5ms.
const TX_METRICS_RECORDING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5);

/// Spawns a task for recording transaction metrics.
/// Returns the handle for pushing metrics to the recorder.
pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) {
let (tx, mut rx) = mpsc::unbounded_channel();
let abort_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(TX_METRICS_RECORDING_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
pub type MetricsRecorderQueue = spacetimedb_engine::MetricsRecorderQueue;

loop {
interval.tick().await;
while let Ok(metrics) = rx.try_recv() {
record_metrics(metrics);
}
}
})
.abort_handle();
(MetricsRecorderQueue { tx }, abort_handle)
pub fn spawn_tx_metrics_recorder(
handle: &spacetimedb_runtime::Handle,
) -> (MetricsRecorderQueue, spacetimedb_runtime::AbortHandle) {
spacetimedb_engine::spawn_tx_metrics_recorder(handle)
}
Loading
Loading