@@ -6,7 +6,7 @@ use bytes::Bytes;
66use chrono:: Utc ;
77use http:: { HeaderValue , StatusCode } ;
88use hyper:: Body ;
9- use tokio:: io:: AsyncWriteExt as _;
9+ use tokio:: { io:: AsyncWriteExt as _, task :: AbortHandle } ;
1010use uuid:: Uuid ;
1111
1212#[ cfg( test) ]
@@ -81,6 +81,14 @@ pub struct PushResult {
8181 baton : Option < String > ,
8282}
8383
84+ pub struct DropAbort ( pub AbortHandle ) ;
85+
86+ impl Drop for DropAbort {
87+ fn drop ( & mut self ) {
88+ self . 0 . abort ( ) ;
89+ }
90+ }
91+
8492pub enum PushStatus {
8593 Ok ,
8694 Conflict ,
@@ -216,7 +224,9 @@ impl SyncContext {
216224
217225 match result. status {
218226 PushStatus :: Conflict => {
219- return Err ( SyncError :: InvalidPushFrameConflict ( frame_no, result. max_frame_no ) . into ( ) ) ;
227+ return Err (
228+ SyncError :: InvalidPushFrameConflict ( frame_no, result. max_frame_no ) . into ( ) ,
229+ ) ;
220230 }
221231 _ => { }
222232 }
@@ -251,7 +261,11 @@ impl SyncContext {
251261 tracing:: debug!( ?durable_frame_num, "frame successfully pushed" ) ;
252262
253263 // Update our last known max_frame_no from the server.
254- tracing:: debug!( ?generation, ?durable_frame_num, "updating remote generation and durable_frame_num" ) ;
264+ tracing:: debug!(
265+ ?generation,
266+ ?durable_frame_num,
267+ "updating remote generation and durable_frame_num"
268+ ) ;
255269 self . durable_generation = generation;
256270 self . durable_frame_num = durable_frame_num;
257271
@@ -261,7 +275,12 @@ impl SyncContext {
261275 } )
262276 }
263277
264- async fn push_with_retry ( & self , mut uri : String , body : Bytes , max_retries : usize ) -> Result < PushResult > {
278+ async fn push_with_retry (
279+ & self ,
280+ mut uri : String ,
281+ body : Bytes ,
282+ max_retries : usize ,
283+ ) -> Result < PushResult > {
265284 let mut nr_retries = 0 ;
266285 loop {
267286 let mut req = http:: Request :: post ( uri. clone ( ) ) ;
@@ -402,7 +421,9 @@ impl SyncContext {
402421 }
403422 // BUG ALERT: The server returns a 500 error if the remote database is empty.
404423 // This is a bug and should be fixed.
405- if res. status ( ) == StatusCode :: BAD_REQUEST || res. status ( ) == StatusCode :: INTERNAL_SERVER_ERROR {
424+ if res. status ( ) == StatusCode :: BAD_REQUEST
425+ || res. status ( ) == StatusCode :: INTERNAL_SERVER_ERROR
426+ {
406427 let res_body = hyper:: body:: to_bytes ( res. into_body ( ) )
407428 . await
408429 . map_err ( SyncError :: HttpBody ) ?;
@@ -417,7 +438,9 @@ impl SyncContext {
417438 let generation = generation
418439 . as_u64 ( )
419440 . ok_or_else ( || SyncError :: JsonValue ( generation. clone ( ) ) ) ?;
420- return Ok ( PullResult :: EndOfGeneration { max_generation : generation as u32 } ) ;
441+ return Ok ( PullResult :: EndOfGeneration {
442+ max_generation : generation as u32 ,
443+ } ) ;
421444 }
422445 if res. status ( ) . is_redirection ( ) {
423446 uri = match res. headers ( ) . get ( hyper:: header:: LOCATION ) {
@@ -449,7 +472,6 @@ impl SyncContext {
449472 }
450473 }
451474
452-
453475 pub ( crate ) fn next_generation ( & mut self ) {
454476 self . durable_generation += 1 ;
455477 self . durable_frame_num = 0 ;
@@ -741,9 +763,7 @@ pub async fn bootstrap_db(sync_ctx: &mut SyncContext) -> Result<()> {
741763 // if we are lagging behind, then we will call the export API and get to the latest
742764 // generation directly.
743765 let info = sync_ctx. get_remote_info ( ) . await ?;
744- sync_ctx
745- . sync_db_if_needed ( info. current_generation )
746- . await ?;
766+ sync_ctx. sync_db_if_needed ( info. current_generation ) . await ?;
747767 // when sync_ctx is initialised, we set durable_generation to 0. however, once
748768 // sync_db is called, it should be > 0.
749769 assert ! ( sync_ctx. durable_generation > 0 , "generation should be > 0" ) ;
@@ -871,7 +891,7 @@ pub async fn try_pull(
871891 let insert_handle = conn. wal_insert_handle ( ) ?;
872892
873893 let mut err = None ;
874-
894+
875895 loop {
876896 let generation = sync_ctx. durable_generation ( ) ;
877897 let frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
0 commit comments