@@ -208,17 +208,17 @@ impl CopyState {
208208 } )
209209 } )
210210 . collect :: < Result < _ , _ > > ( ) ?;
211- tables. sort_by_key ( |table| table. batch . dst . object . to_string ( ) ) ;
211+ tables. sort_by_key ( |table| table. dst . object . to_string ( ) ) ;
212212
213213 let values = tables
214214 . iter ( )
215215 . map ( |table| {
216216 (
217- cts:: entity_type. eq ( table. batch . dst . object . as_str ( ) ) ,
217+ cts:: entity_type. eq ( table. dst . object . as_str ( ) ) ,
218218 cts:: dst. eq ( dst. site . id ) ,
219- cts:: next_vid. eq ( table. batch . next_vid ( ) ) ,
220- cts:: target_vid. eq ( table. batch . target_vid ( ) ) ,
221- cts:: batch_size. eq ( table. batch . batch_size ( ) ) ,
219+ cts:: next_vid. eq ( table. batcher . next_vid ( ) ) ,
220+ cts:: target_vid. eq ( table. batcher . target_vid ( ) ) ,
221+ cts:: batch_size. eq ( table. batcher . batch_size ( ) as i64 ) ,
222222 )
223223 } )
224224 . collect :: < Vec < _ > > ( ) ;
@@ -294,51 +294,11 @@ pub(crate) fn source(
294294/// so that we can copy rows from one to the other with very little
295295/// transformation. See `CopyEntityBatchQuery` for the details of what
296296/// exactly that means
297- pub ( crate ) struct BatchCopy {
297+ struct TableState {
298298 src : Arc < Table > ,
299299 dst : Arc < Table > ,
300- batcher : VidBatcher ,
301- }
302-
303- impl BatchCopy {
304- pub fn new ( batcher : VidBatcher , src : Arc < Table > , dst : Arc < Table > ) -> Self {
305- Self { src, dst, batcher }
306- }
307-
308- /// Copy one batch of entities and update internal state so that the
309- /// next call to `run` will copy the next batch
310- pub fn run ( & mut self , conn : & mut PgConnection ) -> Result < Duration , StoreError > {
311- let ( duration, _) = self . batcher . step ( |start, end| {
312- rq:: CopyEntityBatchQuery :: new ( self . dst . as_ref ( ) , & self . src , start, end) ?
313- . execute ( conn) ?;
314- Ok ( ( ) )
315- } ) ?;
316-
317- Ok ( duration)
318- }
319-
320- pub fn finished ( & self ) -> bool {
321- self . batcher . finished ( )
322- }
323-
324- /// The first `vid` that has not been copied yet
325- pub fn next_vid ( & self ) -> i64 {
326- self . batcher . next_vid ( )
327- }
328-
329- /// The last `vid` that should be copied
330- pub fn target_vid ( & self ) -> i64 {
331- self . batcher . target_vid ( )
332- }
333-
334- pub fn batch_size ( & self ) -> i64 {
335- self . batcher . batch_size ( ) as i64
336- }
337- }
338-
339- struct TableState {
340- batch : BatchCopy ,
341300 dst_site : Arc < Site > ,
301+ batcher : VidBatcher ,
342302 duration_ms : i64 ,
343303}
344304
@@ -354,14 +314,16 @@ impl TableState {
354314 let vid_range = VidRange :: for_copy ( conn, & src, target_block) ?;
355315 let batcher = VidBatcher :: load ( conn, & src_layout. site . namespace , src. as_ref ( ) , vid_range) ?;
356316 Ok ( Self {
357- batch : BatchCopy :: new ( batcher, src, dst) ,
317+ src,
318+ dst,
358319 dst_site,
320+ batcher,
359321 duration_ms : 0 ,
360322 } )
361323 }
362324
363325 fn finished ( & self ) -> bool {
364- self . batch . finished ( )
326+ self . batcher . finished ( )
365327 }
366328
367329 fn load (
@@ -427,11 +389,12 @@ impl TableState {
427389 VidRange :: new ( current_vid, target_vid) ,
428390 ) ?
429391 . with_batch_size ( size as usize ) ;
430- let batch = BatchCopy :: new ( batcher, src, dst) ;
431392
432393 Ok ( TableState {
433- batch,
394+ src,
395+ dst,
434396 dst_site : dst_layout. site . clone ( ) ,
397+ batcher,
435398 duration_ms,
436399 } )
437400 }
@@ -460,20 +423,20 @@ impl TableState {
460423 update (
461424 cts:: table
462425 . filter ( cts:: dst. eq ( self . dst_site . id ) )
463- . filter ( cts:: entity_type. eq ( self . batch . dst . object . as_str ( ) ) )
426+ . filter ( cts:: entity_type. eq ( self . dst . object . as_str ( ) ) )
464427 . filter ( cts:: duration_ms. eq ( 0 ) ) ,
465428 )
466429 . set ( cts:: started_at. eq ( sql ( "now()" ) ) )
467430 . execute ( conn) ?;
468431 let values = (
469- cts:: next_vid. eq ( self . batch . next_vid ( ) ) ,
470- cts:: batch_size. eq ( self . batch . batch_size ( ) ) ,
432+ cts:: next_vid. eq ( self . batcher . next_vid ( ) ) ,
433+ cts:: batch_size. eq ( self . batcher . batch_size ( ) as i64 ) ,
471434 cts:: duration_ms. eq ( self . duration_ms ) ,
472435 ) ;
473436 update (
474437 cts:: table
475438 . filter ( cts:: dst. eq ( self . dst_site . id ) )
476- . filter ( cts:: entity_type. eq ( self . batch . dst . object . as_str ( ) ) ) ,
439+ . filter ( cts:: entity_type. eq ( self . dst . object . as_str ( ) ) ) ,
477440 )
478441 . set ( values)
479442 . execute ( conn) ?;
@@ -486,7 +449,7 @@ impl TableState {
486449 update (
487450 cts:: table
488451 . filter ( cts:: dst. eq ( self . dst_site . id ) )
489- . filter ( cts:: entity_type. eq ( self . batch . dst . object . as_str ( ) ) ) ,
452+ . filter ( cts:: entity_type. eq ( self . dst . object . as_str ( ) ) ) ,
490453 )
491454 . set ( cts:: finished_at. eq ( sql ( "now()" ) ) )
492455 . execute ( conn) ?;
@@ -512,7 +475,11 @@ impl TableState {
512475 }
513476
514477 fn copy_batch ( & mut self , conn : & mut PgConnection ) -> Result < Status , StoreError > {
515- let duration = self . batch . run ( conn) ?;
478+ let ( duration, _) = self . batcher . step ( |start, end| {
479+ rq:: CopyEntityBatchQuery :: new ( self . dst . as_ref ( ) , & self . src , start, end) ?
480+ . execute ( conn) ?;
481+ Ok ( ( ) )
482+ } ) ?;
516483
517484 self . record_progress ( conn, duration) ?;
518485
@@ -539,12 +506,12 @@ impl<'a> CopyProgress<'a> {
539506 let target_vid: i64 = state
540507 . tables
541508 . iter ( )
542- . map ( |table| table. batch . target_vid ( ) )
509+ . map ( |table| table. batcher . target_vid ( ) )
543510 . sum ( ) ;
544511 let current_vid = state
545512 . tables
546513 . iter ( )
547- . map ( |table| table. batch . next_vid ( ) )
514+ . map ( |table| table. batcher . next_vid ( ) )
548515 . sum ( ) ;
549516 Self {
550517 logger,
@@ -577,23 +544,23 @@ impl<'a> CopyProgress<'a> {
577544 }
578545 }
579546
580- fn update ( & mut self , batch : & BatchCopy ) {
547+ fn update ( & mut self , entity_type : & EntityType , batcher : & VidBatcher ) {
581548 if self . last_log . elapsed ( ) > LOG_INTERVAL {
582549 info ! (
583550 self . logger,
584551 "Copied {:.2}% of `{}` entities ({}/{} entity versions), {:.2}% of overall data" ,
585- Self :: progress_pct( batch . next_vid( ) , batch . target_vid( ) ) ,
586- batch . dst . object ,
587- batch . next_vid( ) ,
588- batch . target_vid( ) ,
589- Self :: progress_pct( self . current_vid + batch . next_vid( ) , self . target_vid)
552+ Self :: progress_pct( batcher . next_vid( ) , batcher . target_vid( ) ) ,
553+ entity_type ,
554+ batcher . next_vid( ) ,
555+ batcher . target_vid( ) ,
556+ Self :: progress_pct( self . current_vid + batcher . next_vid( ) , self . target_vid)
590557 ) ;
591558 self . last_log = Instant :: now ( ) ;
592559 }
593560 }
594561
595- fn table_finished ( & mut self , batch : & BatchCopy ) {
596- self . current_vid += batch . next_vid ( ) ;
562+ fn table_finished ( & mut self , batcher : & VidBatcher ) {
563+ self . current_vid += batcher . next_vid ( ) ;
597564 }
598565
599566 fn finished ( & self ) {
@@ -728,9 +695,9 @@ impl Connection {
728695 if status == Status :: Cancelled {
729696 return Ok ( status) ;
730697 }
731- progress. update ( & table. batch ) ;
698+ progress. update ( & table. dst . object , & table . batcher ) ;
732699 }
733- progress. table_finished ( & table. batch ) ;
700+ progress. table_finished ( & table. batcher ) ;
734701 }
735702
736703 // Create indexes for all the attributes that were postponed at the start of
@@ -740,8 +707,8 @@ impl Connection {
740707 for table in state. tables . iter ( ) {
741708 let arr = index_list. indexes_for_table (
742709 & self . dst . site . namespace ,
743- & table. batch . src . name . to_string ( ) ,
744- & table. batch . dst ,
710+ & table. src . name . to_string ( ) ,
711+ & table. dst ,
745712 true ,
746713 true ,
747714 ) ?;
@@ -756,18 +723,12 @@ impl Connection {
756723 // Here we need to skip those created in the first step for the old fields.
757724 for table in state. tables . iter ( ) {
758725 let orig_colums = table
759- . batch
760726 . src
761727 . columns
762728 . iter ( )
763729 . map ( |c| c. name . to_string ( ) )
764730 . collect_vec ( ) ;
765- for sql in table
766- . batch
767- . dst
768- . create_postponed_indexes ( orig_colums)
769- . into_iter ( )
770- {
731+ for sql in table. dst . create_postponed_indexes ( orig_colums) . into_iter ( ) {
771732 let query = sql_query ( sql) ;
772733 query. execute ( conn) ?;
773734 }
0 commit comments