Skip to content

Commit cb5e711

Browse files
Make subscribe durable (#3894)
# Description of Changes With the addition of module-defined views, subscriptions are no longer read-only as they may invoke view materialization. The way this works is that a subscription starts off as a mutable transaction, materializes views if necessary, and then downgrades to a read-only transaction to evaluate the subscription. Before this patch, we were calling `commit_downgrade` directly on the `MutTxId` in order to downgrade the transaction. This would update the in-memory `CommittedState`, but it wouldn't make the transaction durable. This would result in us incrementing the transaction offset of the in-memory `CommittedState` without writing anything to the commitlog. This in turn would invalidate snapshots as they would be pointing further ahead into the commitlog than they should, and so when replaying from a snapshot we would potentially skip over commits that were not included in the snapshot. This patch changes those call sites to use `RelationalDB::commit_tx_downgrade` which both updates the in-memory state **and** makes the transaction durable. **NOTE:** The fact that views are materialized is purely an implementation detail at this point in time. And technically view tables are ephemeral meaning they are not persisted to the commitlog. So the real bug here was that we were updating the tx offset of the in-memory committed state at all. This is technically fixed by #3884 and so after #3884 lands this change becomes a no-op. However, we still shouldn't be calling `commit` and `commit_downgrade` directly on a `MutTxId` since in most cases it is wrong to bypass the durability layer. And without this change, the bug would still be present were view tables not ephemeral, which they may not be at some point in the future. # API and ABI breaking changes None # Expected complexity level and risk 1. The change itself is trivial, the bug is not. # Testing Adding an automated test for this is not so straightforward. First it's view related which means we don't have many options apart from a smoke test, but I don't believe the smoke tests have a mechanism for replaying the commitlog. If transaction offsets are supposed to be linear, without any gaps, then it would be useful to assert that on each append, in which case we could write a smoke test that would fail as soon as the offsets diverged.
1 parent a7c605c commit cb5e711

6 files changed

Lines changed: 28 additions & 32 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -827,16 +827,10 @@ impl RelationalDB {
827827
}
828828

829829
#[tracing::instrument(level = "trace", skip_all)]
830-
pub fn commit_tx_downgrade(
831-
&self,
832-
tx: MutTx,
833-
workload: Workload,
834-
) -> Result<Option<(Arc<TxData>, TxMetrics, Tx)>, DBError> {
830+
pub fn commit_tx_downgrade(&self, tx: MutTx, workload: Workload) -> (Arc<TxData>, TxMetrics, Tx) {
835831
log::trace!("COMMIT MUT TX");
836832

837-
let Some((tx_data, tx_metrics, tx)) = self.inner.commit_mut_tx_downgrade(tx, workload)? else {
838-
return Ok(None);
839-
};
833+
let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade(tx, workload);
840834

841835
self.maybe_do_snapshot(&tx_data);
842836

@@ -845,7 +839,7 @@ impl RelationalDB {
845839
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
846840
}
847841

848-
Ok(Some((tx_data, tx_metrics, tx)))
842+
(tx_data, tx_metrics, tx)
849843
}
850844

851845
/// Get the [`DurableOffset`] of this database, or `None` if this is an

crates/core/src/sql/execute.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::sync::Arc;
21
use std::time::Duration;
32

43
use super::ast::SchemaViewer;
@@ -215,20 +214,15 @@ pub async fn run(
215214
None => tx,
216215
};
217216

218-
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql);
217+
let (tx_data, tx_metrics_mut, tx) = db.commit_tx_downgrade(tx, Workload::Sql);
219218

220219
let (tx_offset_send, tx_offset) = oneshot::channel();
221220
// Release the tx on drop, so that we record metrics
222221
// and set the transaction offset.
223222
let mut tx = scopeguard::guard(tx, |tx| {
224223
let (offset, tx_metrics_downgrade, reducer) = db.release_tx(tx);
225224
let _ = tx_offset_send.send(offset);
226-
db.report_tx_metrics(
227-
reducer,
228-
Some(Arc::new(tx_data)),
229-
Some(tx_metrics_mut),
230-
Some(tx_metrics_downgrade),
231-
);
225+
db.report_tx_metrics(reducer, Some(tx_data), Some(tx_metrics_mut), Some(tx_metrics_downgrade));
232226
});
233227

234228
// Compute the header for the result set

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,9 +1027,7 @@ impl ModuleSubscriptions {
10271027
// We'll later ensure tx is released/cleaned up once out of scope.
10281028
let (read_tx, tx_data, tx_metrics_mut) = match &mut event.status {
10291029
EventStatus::Committed(db_update) => {
1030-
let Some((tx_data, tx_metrics, read_tx)) = stdb.commit_tx_downgrade(tx, Workload::Update)? else {
1031-
return Ok(Err(WriteConflict));
1032-
};
1030+
let (tx_data, tx_metrics, read_tx) = stdb.commit_tx_downgrade(tx, Workload::Update);
10331031
*db_update = DatabaseUpdate::from_writes(&tx_data);
10341032
(read_tx, tx_data, tx_metrics)
10351033
}
@@ -1102,7 +1100,7 @@ impl ModuleSubscriptions {
11021100
sender: Identity,
11031101
) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
11041102
Self::_unsubscribe_views(&mut tx, view_collector, sender)?;
1105-
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe);
1103+
let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Subscribe);
11061104
let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
11071105
Ok(self.guard_tx(tx, opts))
11081106
}
@@ -1136,7 +1134,7 @@ impl ModuleSubscriptions {
11361134
.materialize_views(tx, view_collector, sender, Workload::Subscribe)
11371135
.await?
11381136
}
1139-
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe);
1137+
let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Subscribe);
11401138
let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
11411139
Ok(self.guard_tx(tx, opts))
11421140
}
@@ -1229,10 +1227,10 @@ impl GuardTxOptions {
12291227
}
12301228
}
12311229

1232-
fn from_mut(tx_data: TxData, tx_metrics_mut: TxMetrics) -> Self {
1230+
fn from_mut(tx_data: Arc<TxData>, tx_metrics_mut: TxMetrics) -> Self {
12331231
Self {
12341232
extra_tx_offset_sender: None,
1235-
tx_data: Some(Arc::new(tx_data)),
1233+
tx_data: Some(tx_data),
12361234
tx_metrics_mut: tx_metrics_mut.into(),
12371235
}
12381236
}

crates/core/src/subscription/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1087,7 +1087,7 @@ mod tests {
10871087
}
10881088
}
10891089

1090-
let (data, _, tx) = tx.commit_downgrade(Workload::ForTests);
1090+
let (data, _, tx) = db.commit_tx_downgrade(tx, Workload::ForTests);
10911091
let table_id = plan.subscribed_table_id();
10921092
// This awful construction to convert `Arc<str>` into `Box<str>`.
10931093
let table_name = (&**plan.subscribed_table_name()).into();

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,8 @@ impl MutTx for Locking {
950950
tx.rollback()
951951
}
952952

953+
/// This method only updates the in-memory `committed_state`.
954+
/// For durability, see `RelationalDB::commit_tx`.
953955
fn commit_mut_tx(&self, tx: Self::MutTx) -> Result<Option<(TxOffset, TxData, TxMetrics, String)>> {
954956
Ok(Some(tx.commit()))
955957
}
@@ -960,12 +962,10 @@ impl Locking {
960962
tx.rollback_downgrade(workload)
961963
}
962964

963-
pub fn commit_mut_tx_downgrade(
964-
&self,
965-
tx: MutTxId,
966-
workload: Workload,
967-
) -> Result<Option<(TxData, TxMetrics, TxId)>> {
968-
Ok(Some(tx.commit_downgrade(workload)))
965+
/// This method only updates the in-memory `committed_state`.
966+
/// For durability, see `RelationalDB::commit_tx_downgrade`.
967+
pub fn commit_mut_tx_downgrade(&self, tx: MutTxId, workload: Workload) -> (TxData, TxMetrics, TxId) {
968+
tx.commit_downgrade(workload)
969969
}
970970
}
971971

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1902,6 +1902,11 @@ impl MutTxId {
19021902
/// Commits this transaction in memory, applying its changes to the committed state.
19031903
/// This doesn't handle the persistence layer at all.
19041904
///
1905+
/// IMPORTANT: This method updates the in-memory state of the database but does not make it durable.
1906+
/// That is, the tx will not be persisted to the commitlog.
1907+
/// Hence you should be careful when calling this method directly.
1908+
/// In most cases you'll want to use `RelationalDB::commit_tx` which makes the tx durable.
1909+
///
19051910
/// Returns:
19061911
/// - [`TxData`], the set of inserts and deletes performed by this transaction.
19071912
/// - [`TxMetrics`], various measurements of the work performed by this transaction.
@@ -1947,11 +1952,16 @@ impl MutTxId {
19471952
/// The lock on the committed state is converted into a read lock,
19481953
/// and returned as a new read-only transaction.
19491954
///
1955+
/// IMPORTANT: This method updates the in-memory state of the database but does not make it durable.
1956+
/// That is, the tx will not be persisted to the commitlog.
1957+
/// Hence you should be careful when calling this method directly.
1958+
/// In most cases you'll want to use `RelationalDB::commit_tx_downgrade` which makes the tx durable.
1959+
///
19501960
/// Returns:
19511961
/// - [`TxData`], the set of inserts and deletes performed by this transaction.
19521962
/// - [`TxMetrics`], various measurements of the work performed by this transaction.
19531963
/// - [`TxId`], a read-only transaction with a shared lock on the committed state.
1954-
pub fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) {
1964+
pub(super) fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) {
19551965
let tx_data = self
19561966
.committed_state_write_lock
19571967
.merge(self.tx_state, self.read_sets, &self.ctx);

0 commit comments

Comments
 (0)