@@ -391,29 +391,15 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
391391 updates : Vec < ( S , Diff ) > ,
392392 commit_ts : Timestamp ,
393393 ) -> Result < Timestamp , CompareAndAppendError > {
394- assert_eq ! ( self . mode, Mode :: Writable ) ;
395- assert ! (
396- commit_ts >= self . upper,
397- "expected commit ts, {}, to be greater than or equal to upper, {}" ,
398- commit_ts,
399- self . upper
400- ) ;
401-
402- // This awkward code allows us to perform an expensive soft assert that requires cloning
403- // `updates` twice, after `updates` has been consumed.
394+ // The fencing check is expensive, so run it only with soft assertions enabled.
404395 let contains_fence = if mz_ore:: assert:: soft_assertions_enabled ( ) {
405- let updates: Vec < _ > = updates. clone ( ) ;
406396 let parsed_updates: Vec < _ > = updates
407397 . clone ( )
408398 . into_iter ( )
409- . map ( |( update, diff) | {
410- let update: StateUpdateKindJson = update. into ( ) ;
411- ( update, diff)
412- } )
413399 . filter_map ( |( update, diff) | {
414- < StateUpdateKindJson as TryIntoStateUpdateKind > :: try_into ( update)
415- . ok ( )
416- . map ( |update| ( update, diff) )
400+ let update : StateUpdateKindJson = update. into ( ) ;
401+ let update = TryIntoStateUpdateKind :: try_into ( update ) . ok ( ) ? ;
402+ Some ( ( update, diff) )
417403 } )
418404 . collect ( ) ;
419405 let contains_retraction = parsed_updates. iter ( ) . any ( |( update, diff) | {
@@ -422,10 +408,9 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
422408 let contains_addition = parsed_updates. iter ( ) . any ( |( update, diff) | {
423409 matches ! ( update, StateUpdateKind :: FenceToken ( ..) ) && * diff == Diff :: ONE
424410 } ) ;
425- let contains_fence = contains_retraction && contains_addition;
426- Some ( ( contains_fence, updates) )
411+ contains_retraction && contains_addition
427412 } else {
428- None
413+ false
429414 } ;
430415
431416 let updates = updates. into_iter ( ) . map ( |( kind, diff) | {
@@ -437,6 +422,44 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
437422 )
438423 } ) ;
439424 let next_upper = commit_ts. step_forward ( ) ;
425+ self . compare_and_append_inner ( updates, next_upper)
426+ . await
427+ . inspect_err ( |e| {
428+ // A compare-and-append failure means someone else must have written to the
429+ // catalog. We expect to have been fenced out, since writing to the catalog without
430+ // fencing other catalogs should be impossible. The one exception is if we are
431+ // trying to fence other catalogs with this write.
432+ soft_assert_or_log ! (
433+ matches!( e, CompareAndAppendError :: Fence ( _) ) || contains_fence,
434+ "encountered an upper mismatch on a non-fencing write"
435+ ) ;
436+ } ) ?;
437+
438+ self . sync ( next_upper) . await ?;
439+ Ok ( next_upper)
440+ }
441+
442+ /// Compare-and-append `updates` to the catalog shard, advancing the upper to `next_upper`.
443+ ///
444+ /// On success, updating `self.upper` is left to the caller. The caller can thus decide whether
445+ /// or not it needs to sync the catalog.
446+ ///
447+ /// # Panics
448+ ///
449+ /// Panics if not in `Writable` mode.
450+ /// Panics if `next_upper` is not greater than `self.upper`.
451+ async fn compare_and_append_inner (
452+ & mut self ,
453+ updates : impl IntoIterator < Item = ( ( SourceData , ( ) ) , Timestamp , StorageDiff ) > ,
454+ next_upper : Timestamp ,
455+ ) -> Result < ( ) , CompareAndAppendError > {
456+ assert_eq ! ( self . mode, Mode :: Writable ) ;
457+ assert ! (
458+ next_upper > self . upper,
459+ "next_upper ({next_upper}) not greater than current upper ({})" ,
460+ self . upper,
461+ ) ;
462+
440463 let res = self
441464 . write_handle
442465 . compare_and_append (
@@ -447,18 +470,10 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
447470 . await
448471 . expect ( "invalid usage" ) ;
449472
450- // There was an upper mismatch which means something else must have written to the catalog.
451- // Syncing to the current upper should result in a fence error since writing to the catalog
452- // without fencing other catalogs should be impossible. The one exception is if we are
453- // trying to fence other catalogs with this write, in which case we won't see a fence error.
454473 if let Err ( e @ UpperMismatch { .. } ) = res {
474+ // Most likely we were fenced out.
475+ // Sync to the current upper to detect that.
455476 self . sync_to_current_upper ( ) . await ?;
456- if let Some ( ( contains_fence, updates) ) = contains_fence {
457- assert ! (
458- contains_fence,
459- "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
460- )
461- }
462477 return Err ( e. into ( ) ) ;
463478 }
464479
@@ -483,8 +498,8 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
483498 "updated bound should match expected"
484499 ) ,
485500 }
486- self . sync ( next_upper ) . await ? ;
487- Ok ( next_upper )
501+
502+ Ok ( ( ) )
488503 }
489504
490505 /// Generates an iterator of [`StateUpdate`] that contain all unconsolidated updates to the
@@ -1791,12 +1806,39 @@ impl DurableCatalogState for PersistCatalogState {
17911806 }
17921807
17931808 #[ mz_ore:: instrument( level = "debug" ) ]
1794- async fn confirm_leadership ( & mut self ) -> Result < ( ) , CatalogError > {
1795- // Read only catalog does not care about leadership.
1796- if self . is_read_only ( ) {
1809+ async fn advance_upper ( & mut self , new_upper : Timestamp ) -> Result < ( ) , CatalogError > {
1810+ if self . upper >= new_upper {
1811+ // We don't expect a no-op advancement, but if we are wrong we'd crash the process.
1812+ // Seems safer to only soft-assert and return gracefully in production. If we get here
1813+ // that means we tried to make the catalog shard readable at a time it was already
1814+ // readable, which likely means we are violating linearizability. That's not great, but
1815+ // crashing (or even crash-looping) is worse.
1816+ //
1817+ // TODO: Consider removing this once we have built some confidence.
1818+ soft_panic_or_log ! (
1819+ "new_upper ({new_upper}) not greater than current upper ({})" ,
1820+ self . upper
1821+ ) ;
17971822 return Ok ( ( ) ) ;
17981823 }
1799- self . sync_to_current_upper ( ) . await ?;
1824+
1825+ match self . mode {
1826+ Mode :: Writable => self
1827+ . compare_and_append_inner ( [ ] , new_upper)
1828+ . await
1829+ . map_err ( |e| e. unwrap_fence_error ( ) ) ?,
1830+ Mode :: Savepoint => ( ) ,
1831+ Mode :: Readonly => {
1832+ return Err ( DurableCatalogError :: NotWritable (
1833+ "cannot advance upper of a read-only catalog" . into ( ) ,
1834+ )
1835+ . into ( ) ) ;
1836+ }
1837+ }
1838+
1839+ self . upper = new_upper;
1840+ // No sync needed since no data was written.
1841+
18001842 Ok ( ( ) )
18011843 }
18021844
0 commit comments