Skip to content

Commit 4a97348

Browse files
authored
Merge branch 'master' into kim/compression-stats
2 parents 2ed3d8c + e33cefb commit 4a97348

29 files changed

Lines changed: 2050 additions & 947 deletions

File tree

crates/core/src/config.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ impl MetadataFile {
5656
path.write(self.to_string())
5757
}
5858

59-
fn check_compatibility(previous: &Self, current: &Self) -> anyhow::Result<()> {
59+
fn check_compatibility(previous: &Self, current: &Self, metafile: &Path) -> anyhow::Result<()> {
6060
anyhow::ensure!(
6161
previous.edition == current.edition,
62-
"metadata.toml indicates that this database is from a different \
62+
"metadata.toml at {} indicates that this database is from a different \
6363
edition of SpacetimeDB (running {:?}, but this database is {:?})",
64+
metafile.display(),
6465
current.edition,
6566
previous.edition,
6667
);
@@ -110,8 +111,8 @@ impl MetadataFile {
110111
/// `self` is the metadata file read from a database, and current is
111112
/// the default metadata file that the active database version would
112113
/// right to a new database.
113-
pub fn check_compatibility_and_update(mut self, current: Self) -> anyhow::Result<Self> {
114-
Self::check_compatibility(&self, &current)?;
114+
pub fn check_compatibility_and_update(mut self, current: Self, metafile: &Path) -> anyhow::Result<Self> {
115+
Self::check_compatibility(&self, &current, metafile)?;
115116
// bump the version in the file only if it's being run in a newer database.
116117
self.version = std::cmp::max(self.version, current.version);
117118
Ok(self)
@@ -344,67 +345,67 @@ mod tests {
344345
fn check_metadata_compatibility_checking() {
345346
assert_eq!(
346347
mkmeta(1, 0, 0)
347-
.check_compatibility_and_update(mkmeta(1, 0, 1))
348+
.check_compatibility_and_update(mkmeta(1, 0, 1), Path::new("metadata.toml"))
348349
.unwrap()
349350
.version,
350351
mkver(1, 0, 1)
351352
);
352353
assert_eq!(
353354
mkmeta(1, 0, 1)
354-
.check_compatibility_and_update(mkmeta(1, 0, 0))
355+
.check_compatibility_and_update(mkmeta(1, 0, 0), Path::new("metadata.toml"))
355356
.unwrap()
356357
.version,
357358
mkver(1, 0, 1)
358359
);
359360

360361
mkmeta(1, 1, 0)
361-
.check_compatibility_and_update(mkmeta(1, 0, 5))
362+
.check_compatibility_and_update(mkmeta(1, 0, 5), Path::new("metadata.toml"))
362363
.unwrap_err();
363364
mkmeta(2, 0, 0)
364-
.check_compatibility_and_update(mkmeta(1, 3, 5))
365+
.check_compatibility_and_update(mkmeta(1, 3, 5), Path::new("metadata.toml"))
365366
.unwrap_err();
366367
assert_eq!(
367368
mkmeta(1, 12, 0)
368-
.check_compatibility_and_update(mkmeta(2, 0, 0))
369+
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
369370
.unwrap()
370371
.version,
371372
mkver(2, 0, 0)
372373
);
373374
mkmeta(2, 0, 0)
374-
.check_compatibility_and_update(mkmeta(3, 0, 0))
375+
.check_compatibility_and_update(mkmeta(3, 0, 0), Path::new("metadata.toml"))
375376
.unwrap_err();
376377
}
377378

378379
#[test]
379380
fn check_metadata_compatibility_prerelease() {
380381
mkmeta(1, 9, 0)
381-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
382+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
382383
.unwrap();
383384

384385
mkmeta_pre(2, 0, 0, "rc1")
385-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
386+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
386387
.unwrap();
387388

388389
mkmeta_pre(2, 0, 0, "rc1")
389-
.check_compatibility_and_update(mkmeta(2, 0, 1))
390+
.check_compatibility_and_update(mkmeta(2, 0, 1), Path::new("metadata.toml"))
390391
.unwrap();
391392

392393
mkmeta_pre(2, 0, 0, "rc1")
393-
.check_compatibility_and_update(mkmeta(2, 0, 0))
394+
.check_compatibility_and_update(mkmeta(2, 0, 0), Path::new("metadata.toml"))
394395
.unwrap();
395396

396397
// Now check some failures..
397398

398399
mkmeta_pre(2, 0, 0, "rc1")
399-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"))
400+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc2"), Path::new("metadata.toml"))
400401
.unwrap_err();
401402

402403
mkmeta_pre(2, 0, 0, "rc2")
403-
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"))
404+
.check_compatibility_and_update(mkmeta_pre(2, 0, 0, "rc1"), Path::new("metadata.toml"))
404405
.unwrap_err();
405406

406407
mkmeta(2, 0, 0)
407-
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"))
408+
.check_compatibility_and_update(mkmeta_pre(2, 1, 0, "rc1"), Path::new("metadata.toml"))
408409
.unwrap_err();
409410
}
410411

crates/core/src/db/durability.rs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tokio::{
1515
runtime,
1616
sync::{
1717
futures::OwnedNotified,
18-
mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
18+
mpsc::{self, channel, Receiver, Sender},
1919
oneshot, Notify,
2020
},
2121
time::timeout,
@@ -69,7 +69,7 @@ type ShutdownReply = oneshot::Sender<OwnedNotified>;
6969
/// [RelationalDB]: crate::db::relational_db::RelationalDB
7070
pub struct DurabilityWorker {
7171
database: Identity,
72-
request_tx: UnboundedSender<DurabilityRequest>,
72+
request_tx: Sender<DurabilityRequest>,
7373
shutdown: Sender<ShutdownReply>,
7474
durability: Arc<Durability>,
7575
runtime: runtime::Handle,
@@ -86,7 +86,7 @@ impl DurabilityWorker {
8686
next_tx_offset: TxOffset,
8787
reorder_window_size: NonZeroUsize,
8888
) -> Self {
89-
let (request_tx, request_rx) = unbounded_channel();
89+
let (request_tx, request_rx) = channel(4 * 4096);
9090
let (shutdown_tx, shutdown_rx) = channel(1);
9191

9292
let actor = DurabilityWorkerActor {
@@ -123,8 +123,8 @@ impl DurabilityWorker {
123123
/// this method is responsible only for reading its decision out of the `tx_data`
124124
/// and calling `durability.append_tx`.
125125
///
126-
/// This method does not block,
127-
/// and sends the work to an actor that collects data and calls `durability.append_tx`.
126+
/// This method sends the work to an actor that collects data and calls `durability.append_tx`.
127+
/// It blocks if the queue is at capacity.
128128
///
129129
/// # Panics
130130
///
@@ -135,12 +135,40 @@ impl DurabilityWorker {
135135
/// - [Self::shutdown] was called
136136
///
137137
pub fn request_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
138-
self.request_tx
139-
.send(DurabilityRequest {
140-
reducer_context,
141-
tx_data: tx_data.clone(),
142-
})
143-
.unwrap_or_else(|_| panic!("durability actor vanished database={}", self.database));
138+
// We first try to send it without blocking.
139+
match self.request_tx.try_reserve() {
140+
Ok(permit) => {
141+
permit.send(DurabilityRequest {
142+
reducer_context,
143+
tx_data: tx_data.clone(),
144+
});
145+
}
146+
Err(mpsc::error::TrySendError::Closed(_)) => {
147+
panic!("durability actor vanished database={}", self.database);
148+
}
149+
Err(mpsc::error::TrySendError::Full(_)) => {
150+
// If the channel was full, we use the blocking version.
151+
let start = std::time::Instant::now();
152+
let send = || {
153+
self.request_tx.blocking_send(DurabilityRequest {
154+
reducer_context,
155+
tx_data: tx_data.clone(),
156+
})
157+
};
158+
if tokio::runtime::Handle::try_current().is_ok() {
159+
tokio::task::block_in_place(send)
160+
} else {
161+
send()
162+
}
163+
.unwrap_or_else(|_| panic!("durability actor vanished database={}", self.database));
164+
// We could cache this metric, but if we are already in the blocking code path,
165+
// the extra time of looking up the metric is probably negligible.
166+
WORKER_METRICS
167+
.durability_blocking_send_duration
168+
.with_label_values(&self.database)
169+
.observe(start.elapsed().as_secs_f64());
170+
}
171+
}
144172
}
145173

146174
/// Get the [`DurableOffset`] of this database.
@@ -281,8 +309,8 @@ impl<T> ReorderWindow<T> {
281309
}
282310
}
283311

284-
struct DurabilityWorkerActor {
285-
request_rx: UnboundedReceiver<DurabilityRequest>,
312+
pub struct DurabilityWorkerActor {
313+
request_rx: mpsc::Receiver<DurabilityRequest>,
286314
shutdown: Receiver<ShutdownReply>,
287315
durability: Arc<Durability>,
288316
reorder_window: ReorderWindow<DurabilityRequest>,
@@ -483,7 +511,7 @@ mod tests {
483511
}
484512
}
485513

486-
#[tokio::test]
514+
#[tokio::test(flavor = "multi_thread")]
487515
async fn shutdown_waits_until_durable() {
488516
let durability = Arc::new(CountingDurability::default());
489517
let worker = DurabilityWorker::new(

crates/core/src/db/relational_db.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1717
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1818
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
1919
};
20-
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
20+
use spacetimedb_datastore::locking_tx_datastore::{IndexScanPointOrRange, MutTxId, TxId};
2121
use spacetimedb_datastore::system_tables::{
2222
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
2323
};
@@ -55,10 +55,11 @@ use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotReposit
5555
use spacetimedb_table::indexes::RowPointer;
5656
use spacetimedb_table::page_pool::PagePool;
5757
use spacetimedb_table::table::{RowRef, TableScanIter};
58+
use spacetimedb_table::table_index::IndexKey;
5859
use std::borrow::Cow;
5960
use std::io;
6061
use std::num::NonZeroUsize;
61-
use std::ops::{Bound, RangeBounds};
62+
use std::ops::RangeBounds;
6263
use std::sync::Arc;
6364
use tokio::sync::watch;
6465

@@ -1394,32 +1395,24 @@ impl RelationalDB {
13941395
Ok(self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range)?)
13951396
}
13961397

1397-
pub fn index_scan_range<'a>(
1398+
pub fn index_scan_range<'de, 'a>(
13981399
&'a self,
13991400
tx: &'a MutTx,
14001401
index_id: IndexId,
1401-
prefix: &[u8],
1402+
prefix: &'de [u8],
14021403
prefix_elems: ColId,
1403-
rstart: &[u8],
1404-
rend: &[u8],
1405-
) -> Result<
1406-
(
1407-
TableId,
1408-
Bound<AlgebraicValue>,
1409-
Bound<AlgebraicValue>,
1410-
impl Iterator<Item = RowRef<'a>> + use<'a>,
1411-
),
1412-
DBError,
1413-
> {
1404+
rstart: &'de [u8],
1405+
rend: &'de [u8],
1406+
) -> Result<(TableId, IndexScanPointOrRange<'de, 'a>), DBError> {
14141407
Ok(tx.index_scan_range(index_id, prefix, prefix_elems, rstart, rend)?)
14151408
}
14161409

1417-
pub fn index_scan_point<'a>(
1410+
pub fn index_scan_point<'a, 'p>(
14181411
&'a self,
14191412
tx: &'a MutTx,
14201413
index_id: IndexId,
1421-
point: &[u8],
1422-
) -> Result<(TableId, AlgebraicValue, impl Iterator<Item = RowRef<'a>> + use<'a>), DBError> {
1414+
point: &'p [u8],
1415+
) -> Result<(TableId, IndexKey<'p>, impl Iterator<Item = RowRef<'a>> + use<'a>), DBError> {
14231416
Ok(tx.index_scan_point(index_id, point)?)
14241417
}
14251418

crates/core/src/host/instance_env.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use spacetimedb_client_api_messages::energy::EnergyQuanta;
1717
use spacetimedb_datastore::db_metrics::DB_METRICS;
1818
use spacetimedb_datastore::execution_context::Workload;
1919
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
20-
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId};
20+
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, IndexScanPointOrRange, MutTxId};
2121
use spacetimedb_datastore::traits::IsolationLevel;
2222
use spacetimedb_lib::{http as st_http, ConnectionId, Identity, Timestamp};
2323
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
@@ -489,9 +489,12 @@ impl InstanceEnv {
489489
let tx = &mut *self.get_tx()?;
490490

491491
// Find all rows in the table to delete.
492-
let (table_id, _, _, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
492+
let (table_id, iter) = stdb.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
493493
// Re. `SmallVec`, `delete_by_field` only cares about 1 element, so optimize for that.
494-
let rows_to_delete = iter.map(|row_ref| row_ref.pointer()).collect::<SmallVec<[_; 1]>>();
494+
let rows_to_delete = match iter {
495+
IndexScanPointOrRange::Point(_, iter) => iter.map(|row_ref| row_ref.pointer()).collect(),
496+
IndexScanPointOrRange::Range(iter) => iter.map(|row_ref| row_ref.pointer()).collect(),
497+
};
495498

496499
Ok(Self::datastore_delete_by_index_scan(stdb, tx, table_id, rows_to_delete))
497500
}
@@ -653,19 +656,22 @@ impl InstanceEnv {
653656
let tx = &mut *self.get_tx()?;
654657

655658
// Open index iterator
656-
let (table_id, lower, upper, iter) =
659+
let (table_id, iter) =
657660
self.relational_db()
658661
.index_scan_range(tx, index_id, prefix, prefix_elems, rstart, rend)?;
659662

660663
// Scan the index and serialize rows to BSATN.
661-
let (chunks, rows_scanned, bytes_scanned) = ChunkedWriter::collect_iter(pool, iter);
664+
let (point, (chunks, rows_scanned, bytes_scanned)) = match iter {
665+
IndexScanPointOrRange::Point(point, iter) => (Some(point), ChunkedWriter::collect_iter(pool, iter)),
666+
IndexScanPointOrRange::Range(iter) => (None, ChunkedWriter::collect_iter(pool, iter)),
667+
};
662668

663669
// Record the number of rows and the number of bytes scanned by the iterator.
664670
tx.metrics.index_seeks += 1;
665671
tx.metrics.bytes_scanned += bytes_scanned;
666672
tx.metrics.rows_scanned += rows_scanned;
667673

668-
tx.record_index_scan_range(&self.func_type, table_id, index_id, lower, upper);
674+
tx.record_index_scan_range(&self.func_type, table_id, index_id, point);
669675

670676
Ok(chunks)
671677
}

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4272,7 +4272,7 @@ mod tests {
42724272
Ok(())
42734273
}
42744274

4275-
#[tokio::test]
4275+
#[tokio::test(flavor = "multi_thread")]
42764276
async fn test_confirmed_reads() -> anyhow::Result<()> {
42774277
let (db, durability) = relational_db_with_manual_durability(tokio::runtime::Handle::current())?;
42784278

crates/core/src/worker_metrics/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,12 @@ metrics_group!(
481481
#[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)]
482482
pub subscription_queries_total: IntCounterVec,
483483

484+
#[name = spacetime_durability_blocking_send_duration_sec]
485+
#[help = "Latency of blocking sends in request_durability (seconds); _count gives the number of times the channel was full"]
486+
#[labels(database_identity: Identity)]
487+
#[buckets(0.001, 0.01, 0.1, 1.0, 10.0)]
488+
pub durability_blocking_send_duration: HistogramVec,
489+
484490
#[name = spacetime_durability_worker_reorder_window_length]
485491
#[help = "The number of transactions currently being held in the reorder window"]
486492
#[labels(db: Identity)]

0 commit comments

Comments
 (0)