Skip to content

Commit 7b70d01

Browse files
committed
Merge remote-tracking branch 'origin/kim/all-ephemeral-does-not-consume-txoffset' into kim/db-shutdown
2 parents db10945 + 83c858f commit 7b70d01

4 files changed

Lines changed: 62 additions & 6 deletions

File tree

crates/core/src/db/durability.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,15 @@ impl DurabilityWorkerActor {
256256
.filter(|ops| !truncates.contains(&ops.table_id))
257257
.collect();
258258

259-
let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect();
259+
let truncates: Box<_> = truncates.into_iter().filter(is_not_ephemeral_table).collect();
260260

261261
let inputs = reducer_context.map(|rcx| rcx.into());
262262

263+
debug_assert!(
264+
!(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()),
265+
"empty transaction"
266+
);
267+
263268
let txdata = Txdata {
264269
inputs,
265270
outputs: None,

crates/core/src/db/relational_db.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2556,6 +2556,36 @@ mod tests {
25562556
Ok(())
25572557
}
25582558

2559+
#[test]
2560+
fn test_view_materialization_does_not_consume_tx_offset() -> ResultTest<()> {
2561+
let stdb = TestDB::durable_without_snapshot_repo()?;
2562+
2563+
let (tx_offset_1, view_id, table_id) = {
2564+
let module_def = view_module_def();
2565+
let view_def = module_def.view("my_view").unwrap();
2566+
2567+
let mut tx = begin_mut_tx(&stdb);
2568+
let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
2569+
let (tx_offset, tx_data, ..) = stdb.commit_tx(tx)?.unwrap();
2570+
assert_eq!(Some(tx_offset), tx_data.tx_offset());
2571+
2572+
(tx_offset, view_id, table_id)
2573+
};
2574+
2575+
let mut tx = begin_mut_tx(&stdb);
2576+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
2577+
stdb.materialize_view(&mut tx, table_id, Identity::ONE, vec![product![10u8]])?;
2578+
let (tx_offset_2, tx_data, ..) = stdb.commit_tx(tx)?.unwrap();
2579+
2580+
// `tx_data.tx_offset()` should return `None`,
2581+
// so that it is not considered for durability.
2582+
// The tx offset reported for confirmed reads should stay the same.
2583+
assert!(tx_data.tx_offset().is_none());
2584+
assert_eq!(tx_offset_1, tx_offset_2);
2585+
2586+
Ok(())
2587+
}
2588+
25592589
#[test]
25602590
fn test_view_tables_are_ephemeral_with_snapshot() -> ResultTest<()> {
25612591
let stdb = TestDB::durable()?;

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -747,15 +747,18 @@ impl CommittedState {
747747
// so that we can pass updated set of table ids.
748748
self.merge_read_sets(read_sets);
749749

750+
// Store in `tx_data` which of the updated tables are ephemeral.
751+
// NOTE: This must be called before `tx_consumes_offset`, so that
752+
// all-ephemeral transactions do not consume a tx offset.
753+
tx_data.set_ephemeral_tables(&self.ephemeral_tables);
754+
750755
// If the TX will be logged, record its projected tx offset,
751756
// then increment the counter.
752757
if self.tx_consumes_offset(&tx_data, ctx) {
753758
tx_data.set_tx_offset(self.next_tx_offset);
754759
self.next_tx_offset += 1;
755760
}
756761

757-
tx_data.set_ephemeral_tables(&self.ephemeral_tables);
758-
759762
tx_data
760763
}
761764

crates/datastore/src/traits.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,16 @@ impl TxData {
252252
self.ephemeral_tables.as_ref()
253253
}
254254

255+
/// Check if `table_id` is in the set of ephemeral tables for this transaction.
256+
///
257+
/// Beware that ephemeral tables are known only after [Self::set_ephemeral_tables]
258+
/// has been called.
259+
pub fn is_ephemeral_table(&self, table_id: &TableId) -> bool {
260+
self.ephemeral_tables
261+
.as_ref()
262+
.is_some_and(|etables| etables.contains(table_id))
263+
}
264+
255265
/// Obtain an iterator over the inserted rows per table.
256266
pub fn inserts(&self) -> impl Iterator<Item = (&TableId, &Arc<[ProductValue]>)> + '_ {
257267
self.inserts.iter()
@@ -304,12 +314,20 @@ impl TxData {
304314
self.truncates.iter().copied()
305315
}
306316

307-
/// Check if this [`TxData`] contains any `inserted | deleted` rows or `connect/disconnect` operations.
317+
/// Check if this [`TxData`] contains any `inserted | deleted` rows
318+
/// or `connect/disconnect` operations.
319+
///
320+
/// Mutations of ephemeral tables are excluded, i.e. if the transaction
321+
/// modifies only ephemeral tables (and is not a connect/disconnect operation),
322+
/// the method returns `false`.
308323
///
309324
/// This is used to determine if a transaction should be written to disk.
310325
pub fn has_rows_or_connect_disconnect(&self, reducer_name: Option<&str>) -> bool {
311-
self.inserts().any(|(_, inserted_rows)| !inserted_rows.is_empty())
312-
|| self.deletes().any(|(.., deleted_rows)| !deleted_rows.is_empty())
326+
let is_non_ephemeral_mutation =
327+
|(table_id, rows): (_, &Arc<[_]>)| !(self.is_ephemeral_table(table_id) || rows.is_empty());
328+
329+
self.inserts().any(is_non_ephemeral_mutation)
330+
|| self.deletes().any(is_non_ephemeral_mutation)
313331
|| matches!(
314332
reducer_name.map(|rn| rn.strip_prefix("__identity_")),
315333
Some(Some("connected__" | "disconnected__"))

0 commit comments

Comments
 (0)