Skip to content

Commit a388c48

Browse files
kimCentril
andauthored
Ensure all-ephemeral transactions don't consume a tx offset (#3884)
Views are materialized in mutable transactions, but should not increment the transaction offset maintained in the committed state. This fixes storing completely empty transactions in the commitlog, and maintains that the committed state tx offset is in-sync with the commitlog's tx offset. # Expected complexity level and risk 2 # Testing Added a test. --------- Signed-off-by: Kim Altintop <kim@eagain.io> Co-authored-by: Mazdak Farrokhzad <twingoow@gmail.com>
1 parent b1676ab commit a388c48

3 files changed

Lines changed: 66 additions & 15 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -818,18 +818,13 @@ impl RelationalDB {
818818
Txdata,
819819
};
820820

821-
let is_not_ephemeral_table = |table_id: &TableId| -> bool {
822-
tx_data
823-
.ephemeral_tables()
824-
.map(|etables| !etables.contains(table_id))
825-
.unwrap_or(true)
826-
};
821+
let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) };
827822

828823
if tx_data.tx_offset().is_some() {
829824
let inserts: Box<_> = tx_data
830825
.inserts()
831826
// Skip ephemeral tables
832-
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
827+
.filter(|(table_id, _)| is_persistent_table(table_id))
833828
.map(|(table_id, rowdata)| Ops {
834829
table_id: *table_id,
835830
rowdata: rowdata.clone(),
@@ -840,7 +835,7 @@ impl RelationalDB {
840835

841836
let deletes: Box<_> = tx_data
842837
.deletes()
843-
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
838+
.filter(|(table_id, _)| is_persistent_table(table_id))
844839
.map(|(table_id, rowdata)| Ops {
845840
table_id: *table_id,
846841
rowdata: rowdata.clone(),
@@ -849,10 +844,15 @@ impl RelationalDB {
849844
.filter(|ops| !truncates.contains(&ops.table_id))
850845
.collect();
851846

852-
let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect();
847+
let truncates: Box<_> = truncates.into_iter().filter(is_persistent_table).collect();
853848

854849
let inputs = reducer_context.map(|rcx| rcx.into());
855850

851+
debug_assert!(
852+
!(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()),
853+
"empty transaction"
854+
);
855+
856856
let txdata = Txdata {
857857
inputs,
858858
outputs: None,
@@ -2632,6 +2632,36 @@ mod tests {
26322632
Ok(())
26332633
}
26342634

2635+
#[test]
2636+
fn test_view_materialization_does_not_consume_tx_offset() -> ResultTest<()> {
2637+
let stdb = TestDB::durable_without_snapshot_repo()?;
2638+
2639+
let (tx_offset_1, view_id, table_id) = {
2640+
let module_def = view_module_def();
2641+
let view_def = module_def.view("my_view").unwrap();
2642+
2643+
let mut tx = begin_mut_tx(&stdb);
2644+
let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
2645+
let (tx_offset, tx_data, ..) = stdb.commit_tx(tx)?.unwrap();
2646+
assert_eq!(Some(tx_offset), tx_data.tx_offset());
2647+
2648+
(tx_offset, view_id, table_id)
2649+
};
2650+
2651+
let mut tx = begin_mut_tx(&stdb);
2652+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
2653+
stdb.materialize_view(&mut tx, table_id, Identity::ONE, vec![product![10u8]])?;
2654+
let (tx_offset_2, tx_data, ..) = stdb.commit_tx(tx)?.unwrap();
2655+
2656+
// `tx_data.tx_offset()` should return `None`,
2657+
// so that it is not considered for durability.
2658+
// The tx offset reported for confirmed reads should stay the same.
2659+
assert!(tx_data.tx_offset().is_none());
2660+
assert_eq!(tx_offset_1, tx_offset_2);
2661+
2662+
Ok(())
2663+
}
2664+
26352665
#[test]
26362666
fn test_view_tables_are_ephemeral_with_snapshot() -> ResultTest<()> {
26372667
let stdb = TestDB::durable()?;

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -747,15 +747,18 @@ impl CommittedState {
747747
// so that we can pass updated set of table ids.
748748
self.merge_read_sets(read_sets);
749749

750+
// Store in `tx_data` which of the updated tables are ephemeral.
751+
// NOTE: This must be called before `tx_consumes_offset`, so that
752+
// all-ephemeral transactions do not consume a tx offset.
753+
tx_data.set_ephemeral_tables(&self.ephemeral_tables);
754+
750755
// If the TX will be logged, record its projected tx offset,
751756
// then increment the counter.
752757
if self.tx_consumes_offset(&tx_data, ctx) {
753758
tx_data.set_tx_offset(self.next_tx_offset);
754759
self.next_tx_offset += 1;
755760
}
756761

757-
tx_data.set_ephemeral_tables(&self.ephemeral_tables);
758-
759762
tx_data
760763
}
761764

crates/datastore/src/traits.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ impl TxData {
236236
/// Determines which ephemeral tables were modified in this transaction.
237237
///
238238
/// Iterates over the tables updated in this transaction and records those that
239-
/// also appear in `all_ephemeral_tables`.
239+
/// also appear in `all_ephemeral_tables`.
240240
/// `self.ephemeral_tables` remains `None` if no ephemeral tables were modified.
241241
pub fn set_ephemeral_tables(&mut self, all_ephemeral_tables: &EphemeralTables) {
242242
for tid in self.tables.keys() {
@@ -252,6 +252,16 @@ impl TxData {
252252
self.ephemeral_tables.as_ref()
253253
}
254254

255+
/// Check if `table_id` is in the set of ephemeral tables for this transaction.
256+
///
257+
/// Beware that ephemeral tables are known only after [Self::set_ephemeral_tables]
258+
/// has been called.
259+
pub fn is_ephemeral_table(&self, table_id: &TableId) -> bool {
260+
self.ephemeral_tables
261+
.as_ref()
262+
.is_some_and(|etables| etables.contains(table_id))
263+
}
264+
255265
/// Obtain an iterator over the inserted rows per table.
256266
pub fn inserts(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
257267
self.inserts.iter()
@@ -304,12 +314,20 @@ impl TxData {
304314
self.truncates.iter().copied()
305315
}
306316

307-
/// Check if this [`TxData`] contains any `inserted | deleted` rows or `connect/disconnect` operations.
317+
/// Check if this [`TxData`] contains any `inserted | deleted` rows
318+
/// or `connect/disconnect` operations.
319+
///
320+
/// Mutations of ephemeral tables are excluded, i.e. if the transaction
321+
/// modifies only ephemeral tables (and is not a connect/disconnect operation),
322+
/// the method returns `false`.
308323
///
309324
/// This is used to determine if a transaction should be written to disk.
310325
pub fn has_rows_or_connect_disconnect(&self, reducer_context: Option<&ReducerContext>) -> bool {
311-
self.inserts().any(|(_, inserted_rows)| !inserted_rows.is_empty())
312-
|| self.deletes().any(|(.., deleted_rows)| !deleted_rows.is_empty())
326+
let is_non_ephemeral_mutation =
327+
|(table_id, rows): (_, &Arc<[_]>)| !(self.is_ephemeral_table(table_id) || rows.is_empty());
328+
329+
self.inserts().any(is_non_ephemeral_mutation)
330+
|| self.deletes().any(is_non_ephemeral_mutation)
313331
|| matches!(
314332
reducer_context.map(|rcx| rcx.name.strip_prefix("__identity_")),
315333
Some(Some("connected__" | "disconnected__"))

0 commit comments

Comments
 (0)