@@ -9,8 +9,9 @@ use std::collections::{BTreeSet, HashSet};
99use std:: time:: { Duration , Instant } ;
1010
1111use crate :: {
12- http_client, metrics ,
12+ http_client,
1313 keys:: CommittedValue ,
14+ metrics,
1415 replica:: {
1516 ballot:: { self , Ballot , BallotSelection } ,
1617 commit_kv:: { self , CommitKvOutcome } ,
@@ -101,13 +102,11 @@ impl SetProposal {
101102 key,
102103 expect_one_of,
103104 new_value : Some ( value) ,
104- } ) if expect_one_of. len ( ) == 1 && matches ! ( expect_one_of. first( ) , Some ( None ) ) => {
105- Ok ( Self {
106- key : key. clone ( ) ,
107- value : value. clone ( ) ,
108- mutable,
109- } )
110- }
105+ } ) if expect_one_of. len ( ) == 1 && matches ! ( expect_one_of. first( ) , Some ( None ) ) => Ok ( Self {
106+ key : key. clone ( ) ,
107+ value : value. clone ( ) ,
108+ mutable,
109+ } ) ,
111110 _ => bail ! (
112111 "epoxy v2 only supports single-key set-if-absent proposals with a concrete value"
113112 ) ,
@@ -314,14 +313,14 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result<Proposal
314313 replica_id,
315314 & quorum_members,
316315 proposal. key . clone ( ) ,
317- CommittedValue {
318- value : proposal. value . clone ( ) ,
319- version : 1 ,
320- mutable : proposal. mutable ,
321- } ,
322- ballot,
323- )
324- . await ?
316+ CommittedValue {
317+ value : proposal. value . clone ( ) ,
318+ version : 1 ,
319+ mutable : proposal. mutable ,
320+ } ,
321+ ballot,
322+ )
323+ . await ?
325324 {
326325 PreparePhaseOutcome :: Prepared { ballot, value } => {
327326 run_slow_path (
@@ -444,16 +443,7 @@ async fn run_accept_path(
444443 version,
445444 mutable,
446445 } = value;
447- commit_kv:: commit_kv (
448- & tx,
449- replica_id,
450- key,
451- value,
452- ballot,
453- mutable,
454- version,
455- )
456- . await
446+ commit_kv:: commit_kv ( & tx, replica_id, key, value, ballot, mutable, version) . await
457447 }
458448 } )
459449 . custom_instrument ( tracing:: info_span!( "commit_kv_tx" ) )
@@ -472,16 +462,15 @@ async fn run_accept_path(
472462 let ballot = ballot;
473463 let purge_cache = purge_cache && proposal. mutable ;
474464 async move {
475- if let Err ( err) =
476- broadcast_commits (
477- & ctx,
478- & config,
479- replica_id,
480- key. clone ( ) ,
481- chosen_value. clone ( ) ,
482- ballot,
483- )
484- . await
465+ if let Err ( err) = broadcast_commits (
466+ & ctx,
467+ & config,
468+ replica_id,
469+ key. clone ( ) ,
470+ chosen_value. clone ( ) ,
471+ ballot,
472+ )
473+ . await
485474 {
486475 tracing:: warn!( ?err, "commit broadcast failed after local commit" ) ;
487476 }
@@ -550,7 +539,8 @@ async fn run_prepare_phase(
550539 }
551540 PrepareRoundOutcome :: Retry { next_ballot } => {
552541 store_prepare_ballot ( ctx, replica_id, key. clone ( ) , next_ballot) . await ?;
553- let Some ( retry_delay) = next_prepare_retry_delay ( retry_count, & mut rand:: thread_rng ( ) )
542+ let Some ( retry_delay) =
543+ next_prepare_retry_delay ( retry_count, & mut rand:: thread_rng ( ) )
554544 else {
555545 tracing:: warn!(
556546 %replica_id,
@@ -592,31 +582,32 @@ async fn send_prepare_round(
592582 ballot : protocol:: Ballot ,
593583) -> Result < PrepareRoundOutcome > {
594584 let target = utils:: calculate_quorum ( replica_ids. len ( ) , utils:: QuorumType :: Slow ) ;
595- let mut pending = futures_util:: stream:: iter ( replica_ids. iter ( ) . copied ( ) . map ( |to_replica_id| {
596- let key = key. clone ( ) ;
597- let proposed_value = proposed_value. clone ( ) ;
598- let ballot = ballot. clone ( ) ;
599- async move {
600- (
601- to_replica_id,
602- tokio:: time:: timeout (
603- crate :: consts:: REQUEST_TIMEOUT ,
604- send_prepare_request (
605- ctx,
606- config,
607- from_replica_id,
608- to_replica_id,
609- key,
610- proposed_value,
611- ballot,
612- ) ,
585+ let mut pending =
586+ futures_util:: stream:: iter ( replica_ids. iter ( ) . copied ( ) . map ( |to_replica_id| {
587+ let key = key. clone ( ) ;
588+ let proposed_value = proposed_value. clone ( ) ;
589+ let ballot = ballot. clone ( ) ;
590+ async move {
591+ (
592+ to_replica_id,
593+ tokio:: time:: timeout (
594+ crate :: consts:: REQUEST_TIMEOUT ,
595+ send_prepare_request (
596+ ctx,
597+ config,
598+ from_replica_id,
599+ to_replica_id,
600+ key,
601+ proposed_value,
602+ ballot,
603+ ) ,
604+ )
605+ . await ,
613606 )
614- . await ,
615- )
616- }
617- } ) )
618- . collect :: < FuturesUnordered < _ > > ( )
619- . await ;
607+ }
608+ } ) )
609+ . collect :: < FuturesUnordered < _ > > ( )
610+ . await ;
620611
621612 let mut ok_responses = 0 ;
622613 let mut remaining = replica_ids. len ( ) ;
@@ -655,7 +646,9 @@ async fn send_prepare_round(
655646 }
656647 ( None , None ) => { }
657648 _ => {
658- bail ! ( "prepare response from replica {to_replica_id} returned partial accepted state" ) ;
649+ bail ! (
650+ "prepare response from replica {to_replica_id} returned partial accepted state"
651+ ) ;
659652 }
660653 }
661654
@@ -720,31 +713,32 @@ async fn send_accept_round(
720713 accept_quorum : utils:: QuorumType ,
721714) -> Result < AcceptPhaseOutcome > {
722715 let target = utils:: calculate_quorum ( replica_ids. len ( ) , accept_quorum) ;
723- let mut pending = futures_util:: stream:: iter ( replica_ids. iter ( ) . copied ( ) . map ( |to_replica_id| {
724- let key = key. clone ( ) ;
725- let value = value. clone ( ) ;
726- let ballot = ballot. clone ( ) ;
727- async move {
728- (
729- to_replica_id,
730- tokio:: time:: timeout (
731- crate :: consts:: REQUEST_TIMEOUT ,
732- send_accept_request (
733- ctx,
734- config,
735- from_replica_id,
736- to_replica_id,
737- key,
738- value,
739- ballot,
740- ) ,
716+ let mut pending =
717+ futures_util:: stream:: iter ( replica_ids. iter ( ) . copied ( ) . map ( |to_replica_id| {
718+ let key = key. clone ( ) ;
719+ let value = value. clone ( ) ;
720+ let ballot = ballot. clone ( ) ;
721+ async move {
722+ (
723+ to_replica_id,
724+ tokio:: time:: timeout (
725+ crate :: consts:: REQUEST_TIMEOUT ,
726+ send_accept_request (
727+ ctx,
728+ config,
729+ from_replica_id,
730+ to_replica_id,
731+ key,
732+ value,
733+ ballot,
734+ ) ,
735+ )
736+ . await ,
741737 )
742- . await ,
743- )
744- }
745- } ) )
746- . collect :: < FuturesUnordered < _ > > ( )
747- . await ;
738+ }
739+ } ) )
740+ . collect :: < FuturesUnordered < _ > > ( )
741+ . await ;
748742
749743 let mut state = AcceptRoundState {
750744 target,
@@ -754,9 +748,7 @@ async fn send_accept_round(
754748
755749 while let Some ( ( to_replica_id, response) ) = pending. next ( ) . await {
756750 let observation = match response {
757- Ok ( Ok ( protocol:: AcceptResponse :: AcceptResponseOk ( _) ) ) => {
758- AcceptObservation :: Ok
759- }
751+ Ok ( Ok ( protocol:: AcceptResponse :: AcceptResponseOk ( _) ) ) => AcceptObservation :: Ok ,
760752 Ok ( Ok ( protocol:: AcceptResponse :: AcceptResponseAlreadyCommitted ( committed) ) ) => {
761753 AcceptObservation :: AlreadyCommitted ( committed. value )
762754 }
0 commit comments