Skip to content

Commit f25ae4a

Browse files
committed
cache table name on replay truncates
1 parent 431f7bb commit f25ae4a

1 file changed

Lines changed: 34 additions & 7 deletions

File tree

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use super::{
66
tx::TxId,
77
tx_state::TxState,
88
};
9-
use crate::execution_context::{Workload, WorkloadType};
109
use crate::{
1110
db_metrics::DB_METRICS,
1211
error::{DatastoreError, TableError},
@@ -23,18 +22,24 @@ use crate::{
2322
DataRow, IsolationLevel, Metadata, MutTx, MutTxDatastore, Program, RowTypeForTable, Tx, TxData, TxDatastore,
2423
},
2524
};
25+
use crate::{
26+
execution_context::{Workload, WorkloadType},
27+
system_tables::StTableRow,
28+
};
2629
use anyhow::{anyhow, Context};
2730
use core::{cell::RefCell, ops::RangeBounds};
2831
use parking_lot::{Mutex, RwLock};
2932
use spacetimedb_commitlog::payload::{txdata, Txdata};
30-
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap};
33+
use spacetimedb_data_structures::map::{HashCollectionExt, HashMap, IntMap};
3134
use spacetimedb_durability::TxOffset;
3235
use spacetimedb_lib::{db::auth::StAccess, metrics::ExecutionMetrics};
3336
use spacetimedb_lib::{ConnectionId, Identity};
3437
use spacetimedb_paths::server::SnapshotDirPath;
3538
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
36-
use spacetimedb_sats::memory_usage::MemoryUsage;
37-
use spacetimedb_sats::{bsatn, buffer::BufReader, AlgebraicValue, ProductValue};
39+
use spacetimedb_sats::{
40+
algebraic_value::de::ValueDeserializer, bsatn, buffer::BufReader, AlgebraicValue, ProductValue,
41+
};
42+
use spacetimedb_sats::{memory_usage::MemoryUsage, Deserialize};
3843
use spacetimedb_schema::schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema};
3944
use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository};
4045
use spacetimedb_table::{indexes::RowPointer, page_pool::PagePool, table::RowRef};
@@ -978,6 +983,7 @@ impl<F> Replay<F> {
978983
database_identity: &self.database_identity,
979984
committed_state: &mut committed_state,
980985
progress: &mut *self.progress.borrow_mut(),
986+
truncated_tables: IntMap::default(),
981987
};
982988
f(&mut visitor)
983989
}
@@ -1083,6 +1089,10 @@ struct ReplayVisitor<'a, F> {
10831089
database_identity: &'a Identity,
10841090
committed_state: &'a mut CommittedState,
10851091
progress: &'a mut F,
1092+
// Since deletes are handled before truncation / drop, sometimes the schema
1093+
// info is gone. We save the name on the first delete of that table so metrics
1094+
// can still show a name.
1095+
truncated_tables: IntMap<TableId, Box<str>>,
10861096
}
10871097

10881098
impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVisitor<'_, F> {
@@ -1139,6 +1149,14 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
11391149
let table_name = schema.table_name.clone();
11401150
let row = ProductValue::decode(schema.get_row_type(), reader)?;
11411151

1152+
// If this is a delete from the `st_table` system table, save the name
1153+
if table_id == ST_TABLE_ID {
1154+
let ab = AlgebraicValue::Product(row.clone());
1155+
let st_table_row = StTableRow::deserialize(ValueDeserializer::from_ref(&ab)).unwrap();
1156+
self.truncated_tables
1157+
.insert(st_table_row.table_id, st_table_row.table_name);
1158+
}
1159+
11421160
self.committed_state
11431161
.replay_delete_by_rel(table_id, &row)
11441162
.with_context(|| {
@@ -1158,9 +1176,18 @@ impl<F: FnMut(u64)> spacetimedb_commitlog::payload::txdata::Visitor for ReplayVi
11581176
}
11591177

11601178
fn visit_truncate(&mut self, table_id: TableId) -> std::result::Result<(), Self::Error> {
1161-
let schema = self.committed_state.schema_for_table(table_id)?;
1162-
// TODO: avoid clone
1163-
let table_name = schema.table_name.clone();
1179+
let table_name = match self.committed_state.schema_for_table(table_id) {
1180+
// TODO: avoid clone
1181+
Ok(schema) => schema.table_name.clone(),
1182+
1183+
Err(_) => {
1184+
if let Some(name) = self.truncated_tables.remove(&table_id) {
1185+
name
1186+
} else {
1187+
return Err(anyhow!("Error looking up name for truncated table {:?}", table_id).into());
1188+
}
1189+
}
1190+
};
11641191

11651192
self.committed_state.replay_truncate(table_id).with_context(|| {
11661193
format!(

0 commit comments

Comments
 (0)