@@ -20,6 +20,7 @@ const METADATA_VERSION: u32 = 0;
2020
2121const DEFAULT_MAX_RETRIES : usize = 5 ;
2222const DEFAULT_PUSH_BATCH_SIZE : u32 = 128 ;
23+ const DEFAULT_PULL_BATCH_SIZE : u32 = 128 ;
2324
2425#[ derive( thiserror:: Error , Debug ) ]
2526#[ non_exhaustive]
@@ -66,6 +67,8 @@ pub enum SyncError {
6667 InvalidLocalGeneration ( u32 , u32 ) ,
6768 #[ error( "invalid local state: {0}" ) ]
6869 InvalidLocalState ( String ) ,
70+ #[ error( "server returned invalid length of frames: {0}" ) ]
71+ InvalidPullFrameBytes ( usize ) ,
6972}
7073
7174impl SyncError {
@@ -98,8 +101,8 @@ pub enum PushStatus {
98101}
99102
100103pub enum PullResult {
101- /// A frame was successfully pulled.
102- Frame ( Bytes ) ,
104+ /// Frames were successfully pulled.
105+ Frames ( Bytes ) ,
103106 /// We've reached the end of the generation.
104107 EndOfGeneration { max_generation : u32 } ,
105108}
@@ -122,6 +125,7 @@ pub struct SyncContext {
122125 auth_token : Option < HeaderValue > ,
123126 max_retries : usize ,
124127 push_batch_size : u32 ,
128+ pull_batch_size : u32 ,
125129 /// The current durable generation.
126130 durable_generation : u32 ,
127131 /// Represents the max_frame_no from the server.
@@ -154,6 +158,7 @@ impl SyncContext {
154158 auth_token,
155159 max_retries : DEFAULT_MAX_RETRIES ,
156160 push_batch_size : DEFAULT_PUSH_BATCH_SIZE ,
161+ pull_batch_size : DEFAULT_PULL_BATCH_SIZE ,
157162 client,
158163 durable_generation : 0 ,
159164 durable_frame_num : 0 ,
@@ -175,7 +180,7 @@ impl SyncContext {
175180 }
176181
177182 #[ tracing:: instrument( skip( self ) ) ]
178- pub ( crate ) async fn pull_one_frame (
183+ pub ( crate ) async fn pull_frames (
179184 & mut self ,
180185 generation : u32 ,
181186 frame_no : u32 ,
@@ -185,9 +190,10 @@ impl SyncContext {
185190 self . sync_url,
186191 generation,
187192 frame_no,
188- frame_no + 1
193+ // the server expects the range of [start, end) frames, i.e. end is exclusive
194+ frame_no + self . pull_batch_size
189195 ) ;
190- tracing:: debug!( "pulling frame" ) ;
196+ tracing:: debug!( "pulling frame (uri={})" , uri ) ;
191197 self . pull_with_retry ( uri, self . max_retries ) . await
192198 }
193199
@@ -417,20 +423,39 @@ impl SyncContext {
417423 . map_err ( SyncError :: HttpDispatch ) ?;
418424
419425 if res. status ( ) . is_success ( ) {
420- let frame = hyper:: body:: to_bytes ( res. into_body ( ) )
426+ let frames = hyper:: body:: to_bytes ( res. into_body ( ) )
421427 . await
422428 . map_err ( SyncError :: HttpBody ) ?;
423- return Ok ( PullResult :: Frame ( frame) ) ;
429+ // a success result should always return some frames
430+ if frames. is_empty ( ) {
431+ tracing:: error!( "server returned empty frames in pull response" ) ;
432+ return Err ( SyncError :: InvalidPullFrameBytes ( 0 ) . into ( ) ) ;
433+ }
434+ // the minimum payload size cannot be less than a single frame
435+ if frames. len ( ) < FRAME_SIZE {
436+ tracing:: error!(
437+ "server returned frames with invalid length: {} < {}" ,
438+ frames. len( ) ,
439+ FRAME_SIZE
440+ ) ;
441+ return Err ( SyncError :: InvalidPullFrameBytes ( frames. len ( ) ) . into ( ) ) ;
442+ }
443+ return Ok ( PullResult :: Frames ( frames) ) ;
424444 }
425445 // BUG ALERT: The server returns a 500 error if the remote database is empty.
426446 // This is a bug and should be fixed.
427447 if res. status ( ) == StatusCode :: BAD_REQUEST
428448 || res. status ( ) == StatusCode :: INTERNAL_SERVER_ERROR
429449 {
450+ let status = res. status ( ) ;
430451 let res_body = hyper:: body:: to_bytes ( res. into_body ( ) )
431452 . await
432453 . map_err ( SyncError :: HttpBody ) ?;
433-
454+ tracing:: trace!(
455+ "server returned: {} body: {}" ,
456+ status,
457+ String :: from_utf8_lossy( & res_body[ ..] )
458+ ) ;
434459 let resp = serde_json:: from_slice :: < serde_json:: Value > ( & res_body[ ..] )
435460 . map_err ( SyncError :: JsonDecode ) ?;
436461
@@ -650,22 +675,34 @@ impl SyncContext {
650675
651676 let req = req. body ( Body :: empty ( ) ) . expect ( "valid request" ) ;
652677
653- let res = self
654- . client
655- . request ( req)
656- . await
657- . map_err ( SyncError :: HttpDispatch ) ?;
678+ let ( res, http_duration) =
679+ crate :: replication:: remote_client:: time ( self . client . request ( req) ) . await ;
680+ let res = res. map_err ( SyncError :: HttpDispatch ) ?;
658681
659682 if !res. status ( ) . is_success ( ) {
660683 let status = res. status ( ) ;
661684 let body = hyper:: body:: to_bytes ( res. into_body ( ) )
662685 . await
663686 . map_err ( SyncError :: HttpBody ) ?;
687+ tracing:: error!(
688+ "failed to pull db file from remote server, status={}, body={}, url={}, duration={:?}" ,
689+ status,
690+ String :: from_utf8_lossy( & body) ,
691+ uri,
692+ http_duration
693+ ) ;
664694 return Err (
665695 SyncError :: PullFrame ( status, String :: from_utf8_lossy ( & body) . to_string ( ) ) . into ( ) ,
666696 ) ;
667697 }
668698
699+ tracing:: debug!(
700+ "pulled db file from remote server, status={}, url={}, duration={:?}" ,
701+ res. status( ) ,
702+ uri,
703+ http_duration
704+ ) ;
705+
669706 // todo: do streaming write to the disk
670707 let bytes = hyper:: body:: to_bytes ( res. into_body ( ) )
671708 . await
@@ -887,6 +924,11 @@ async fn try_push(
887924 } )
888925}
889926
927+ /// PAGE_SIZE used by the sync / diskless server
928+ const PAGE_SIZE : usize = 4096 ;
929+ const FRAME_HEADER_SIZE : usize = 24 ;
930+ const FRAME_SIZE : usize = PAGE_SIZE + FRAME_HEADER_SIZE ;
931+
890932pub async fn try_pull (
891933 sync_ctx : & mut SyncContext ,
892934 conn : & Connection ,
@@ -898,10 +940,32 @@ pub async fn try_pull(
898940 loop {
899941 let generation = sync_ctx. durable_generation ( ) ;
900942 let frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
901- match sync_ctx. pull_one_frame ( generation, frame_no) . await {
902- Ok ( PullResult :: Frame ( frame) ) => {
903- insert_handle. insert ( & frame) ?;
904- sync_ctx. durable_frame_num = frame_no;
943+ match sync_ctx. pull_frames ( generation, frame_no) . await {
944+ Ok ( PullResult :: Frames ( frames) ) => {
945+ tracing:: debug!(
946+ "pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}" ,
947+ generation, frame_no, frame_no + sync_ctx. pull_batch_size, sync_ctx. pull_batch_size, frames. len( ) ,
948+ ) ;
949+ if frames. len ( ) % FRAME_SIZE != 0 {
950+ tracing:: error!(
951+ "frame size {} is not a multiple of the expected size {}" ,
952+ frames. len( ) ,
953+ FRAME_SIZE ,
954+ ) ;
955+ return Err ( SyncError :: InvalidPullFrameBytes ( frames. len ( ) ) . into ( ) ) ;
956+ }
957+ for chunk in frames. chunks ( FRAME_SIZE ) {
958+ let r = insert_handle. insert ( & chunk) ;
959+ if let Err ( e) = r {
960+ tracing:: error!(
961+ "insert error (frame= {}) : {:?}" ,
962+ sync_ctx. durable_frame_num + 1 ,
963+ e
964+ ) ;
965+ return Err ( e) ;
966+ }
967+ sync_ctx. durable_frame_num += 1 ;
968+ }
905969 }
906970 Ok ( PullResult :: EndOfGeneration { max_generation } ) => {
907971 // If there are no more generations to pull, we're done.
@@ -920,7 +984,7 @@ pub async fn try_pull(
920984 insert_handle. begin ( ) ?;
921985 }
922986 Err ( e) => {
923- tracing:: debug!( "pull_one_frame error: {:?}" , e) ;
987+ tracing:: debug!( "pull_frames error: {:?}" , e) ;
924988 err. replace ( e) ;
925989 break ;
926990 }
0 commit comments