Skip to content

Commit fd17d85

Browse files
committed
flatten db
1 parent 60ea93f commit fd17d85

14 files changed

Lines changed: 145 additions & 158 deletions

File tree

crates/core/src/database_logger.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,7 @@ impl SystemLogger {
674674
}
675675
}
676676

677-
impl spacetimedb_engine::db::update::UpdateLogger for SystemLogger {
677+
impl spacetimedb_engine::update::UpdateLogger for SystemLogger {
678678
fn info(&self, msg: &str) {
679679
self.info(msg);
680680
}

crates/core/src/db.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
pub use spacetimedb_engine::relational_db::{spawn_tx_metrics_recorder, Config, MetricsRecorderQueue, Storage};
2+
3+
pub mod persistence {
4+
pub use spacetimedb_engine::persistence::*;
5+
}
6+
7+
pub mod relational_db {
8+
pub use spacetimedb_engine::relational_db::*;
9+
}
10+
11+
pub mod snapshot {
12+
pub use spacetimedb_engine::snapshot::*;
13+
}
14+
15+
pub mod update {
16+
pub use spacetimedb_engine::update::*;
17+
}

crates/core/src/host/module_host.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,7 @@ fn init_database_inner(
601601
table_defs.sort_by_key(|x| &x.name);
602602
for def in table_defs {
603603
logger.info(&format!("Creating table `{}`", &def.name));
604-
spacetimedb_engine::db::update::create_table_from_def(stdb, tx, module_def, def)?;
604+
spacetimedb_engine::update::create_table_from_def(stdb, tx, module_def, def)?;
605605
}
606606

607607
// Create all in-memory views defined by the module.

crates/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ pub mod energy;
44
pub mod sql;
55

66
pub mod auth;
7-
pub use spacetimedb_engine::db;
7+
pub mod db;
88
pub use spacetimedb_engine::metrics;
99
pub mod messages;
1010
pub use spacetimedb_lib::Identity;

crates/engine/src/db/mod.rs

Lines changed: 0 additions & 144 deletions
This file was deleted.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use spacetimedb_durability::Transaction;
1010
use spacetimedb_lib::Identity;
1111
use spacetimedb_sats::ProductValue;
1212

13-
use crate::db::persistence::Durability;
13+
use crate::persistence::Durability;
1414
use spacetimedb_runtime::Handle;
1515

1616
pub(super) fn request_durability(

crates/engine/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
pub mod db;
21
pub mod error;
32
pub mod metrics;
43
pub mod rls;
5-
mod sql;
4+
mod ast;
5+
pub(crate) mod durability;
6+
pub mod persistence;
7+
pub mod relational_db;
8+
pub mod snapshot;
9+
pub mod update;
610
pub mod util;
711

812
pub use spacetimedb_lib::identity;
Lines changed: 116 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,119 @@
1-
use crate::db::durability::{request_durability, spawn_close as spawn_durability_close};
2-
use crate::db::MetricsRecorderQueue;
1+
use crate::durability::{request_durability, spawn_close as spawn_durability_close};
32
use crate::error::{DBError, RestoreSnapshotError};
43
use crate::metrics::ExecutionCounters;
54
use crate::metrics::ENGINE_METRICS;
5+
6+
/// Whether SpacetimeDB is run in memory, or persists objects and
7+
/// a message log to disk.
8+
#[derive(Clone, Copy)]
9+
pub enum Storage {
10+
/// The object store is in memory, and no message log is kept.
11+
Memory,
12+
13+
/// The object store is persisted to disk, and a message log is kept.
14+
Disk,
15+
}
16+
17+
/// Internal database config parameters
18+
#[derive(Clone, Copy)]
19+
pub struct Config {
20+
/// Specifies the object storage model.
21+
pub storage: Storage,
22+
/// Specifies the page pool max size in bytes.
23+
pub page_pool_max_size: Option<usize>,
24+
}
25+
26+
/// A message that is processed by the [`spawn_metrics_recorder`] actor.
27+
/// We use a separate task to record metrics to avoid blocking transactions.
28+
pub struct MetricsMessage {
29+
/// The reducer the produced these metrics.
30+
reducer: Option<ReducerName>,
31+
/// Metrics from a mutable transaction.
32+
metrics_for_writer: Option<TxMetrics>,
33+
/// Metrics from a read-only transaction.
34+
/// A message may have metrics for both types of transactions,
35+
/// because metrics for a reducer and its subscription updates are recorded together.
36+
metrics_for_reader: Option<TxMetrics>,
37+
/// The row updates for an immutable transaction.
38+
/// Needed for insert and delete counters.
39+
tx_data: Option<Arc<TxData>>,
40+
/// Cached metrics counters for each workload type.
41+
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
42+
}
43+
44+
/// The handle used to send work to the tx metrics recorder.
45+
#[derive(Clone)]
46+
pub struct MetricsRecorderQueue {
47+
tx: spacetimedb_runtime::sync::mpsc::UnboundedSender<MetricsMessage>,
48+
}
49+
50+
impl MetricsRecorderQueue {
51+
pub fn send_metrics(
52+
&self,
53+
reducer: Option<ReducerName>,
54+
metrics_for_writer: Option<TxMetrics>,
55+
metrics_for_reader: Option<TxMetrics>,
56+
tx_data: Option<Arc<TxData>>,
57+
counters: Arc<EnumMap<WorkloadType, ExecutionCounters>>,
58+
) {
59+
if let Err(err) = self.tx.send(MetricsMessage {
60+
reducer,
61+
metrics_for_writer,
62+
metrics_for_reader,
63+
tx_data,
64+
counters,
65+
}) {
66+
log::warn!("failed to send metrics: {err}");
67+
}
68+
}
69+
}
70+
71+
fn record_metrics(
72+
MetricsMessage {
73+
reducer,
74+
metrics_for_writer,
75+
metrics_for_reader,
76+
tx_data,
77+
counters,
78+
}: MetricsMessage,
79+
) {
80+
if let Some(tx_metrics) = metrics_for_writer {
81+
tx_metrics.report(
82+
tx_data.as_deref(),
83+
reducer.as_ref(),
84+
|wl| &counters[wl],
85+
);
86+
}
87+
if let Some(tx_metrics) = metrics_for_reader {
88+
tx_metrics.report(
89+
None,
90+
reducer.as_ref(),
91+
|wl| &counters[wl],
92+
);
93+
}
94+
}
95+
96+
const TX_METRICS_RECORDING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5);
97+
98+
/// Spawns a task for recording transaction metrics.
99+
/// Returns the handle for pushing metrics to the recorder.
100+
pub fn spawn_tx_metrics_recorder(
101+
handle: &spacetimedb_runtime::Handle,
102+
) -> (MetricsRecorderQueue, spacetimedb_runtime::AbortHandle) {
103+
let handle_clone = handle.clone();
104+
let (tx, mut rx) = spacetimedb_runtime::sync::mpsc::unbounded_channel();
105+
let abort_handle = handle
106+
.spawn(async move {
107+
loop {
108+
handle_clone.sleep(TX_METRICS_RECORDING_INTERVAL).await;
109+
while let Ok(metrics) = rx.try_recv() {
110+
record_metrics(metrics);
111+
}
112+
}
113+
})
114+
.abort_handle();
115+
(MetricsRecorderQueue { tx }, abort_handle)
116+
}
6117
use crate::util::asyncify;
7118
use anyhow::{anyhow, Context};
8119
use enum_map::EnumMap;
@@ -1830,8 +1941,8 @@ fn default_row_count_fn(db: Identity) -> RowCountFn {
18301941

18311942
#[cfg(any(test, feature = "test"))]
18321943
pub mod tests_utils {
1833-
use crate::db::snapshot;
1834-
use crate::db::snapshot::SnapshotWorker;
1944+
use crate::snapshot;
1945+
use crate::snapshot::SnapshotWorker;
18351946

18361947
use super::*;
18371948
use core::ops::Deref;
@@ -2331,7 +2442,7 @@ mod tests {
23312442

23322443
use super::tests_utils::begin_mut_tx;
23332444
use super::*;
2334-
use crate::db::relational_db::tests_utils::{begin_tx, create_view_for_test, insert, make_snapshot, TestDB};
2445+
use crate::relational_db::tests_utils::{begin_tx, create_view_for_test, insert, make_snapshot, TestDB};
23352446
use anyhow::bail;
23362447
use bytes::Bytes;
23372448
use commitlog::payload::txdata;

0 commit comments

Comments
 (0)