@@ -769,7 +769,106 @@ func (mgtr *Migrator) Revert() error {
769769 return nil
770770}
771771
772- func (mgtr * Migrator ) MoveTables () error {
772+ func (mgtr * Migrator ) MoveTables () (err error ) {
773+ mgtr .migrationContext .Log .Infof ("Moving tables %v from %s to %s (%s)" ,
774+ mgtr .migrationContext .MoveTables .TableNames ,
775+ sql .EscapeName (mgtr .migrationContext .DatabaseName ),
776+ sql .EscapeName (mgtr .migrationContext .MoveTables .TargetDatabase ), mgtr .migrationContext .MoveTables .TargetHost )
777+ mgtr .migrationContext .StartTime = time .Now ()
778+
779+ // Ensure context is cancelled on exit (cleanup)
780+ defer mgtr .migrationContext .CancelContext ()
781+
782+ if mgtr .migrationContext .Hostname , err = os .Hostname (); err != nil {
783+ return err
784+ }
785+
786+ go mgtr .listenOnPanicAbort ()
787+
788+ // Run on-startup hook:
789+ if err := mgtr .hooksExecutor .OnStartup (); err != nil {
790+ return err
791+ }
792+
793+ // After this point, we'll need to teardown anything that's been started
794+ // so we don't leave things hanging around
795+ defer mgtr .teardown ()
796+
797+ if err := mgtr .initiateInspector (); err != nil {
798+ return err
799+ }
800+ if err := mgtr .checkAbort (); err != nil {
801+ return err
802+ }
803+ if err := mgtr .initiateApplier (); err != nil {
804+ return err
805+ }
806+ if err := mgtr .checkAbort (); err != nil {
807+ return err
808+ }
809+
810+ // Validation complete! Run on-validated hook.
811+ if err := mgtr .hooksExecutor .OnValidated (); err != nil {
812+ return err
813+ }
814+
815+ if err := mgtr .initiateServer (); err != nil {
816+ return err
817+ }
818+ defer mgtr .server .RemoveSocketFile ()
819+
820+ if err := mgtr .countTableRows (); err != nil {
821+ return err
822+ }
823+ if err := mgtr .addDMLEventsListener (); err != nil {
824+ return err
825+ }
826+ if err := mgtr .applier .ReadMigrationRangeValues (); err != nil {
827+ return err
828+ }
829+
830+ mgtr .initiateThrottler ()
831+
832+ // Run on-before-row-copy hook
833+ if err := mgtr .hooksExecutor .OnBeforeRowCopy (); err != nil {
834+ return err
835+ }
836+ go func () {
837+ if err := mgtr .executeWriteFuncs (); err != nil {
838+ // Send error to PanicAbort to trigger abort
839+ _ = base .SendWithContext (mgtr .migrationContext .GetContext (), mgtr .migrationContext .PanicAbort , err )
840+ }
841+ }()
842+ go mgtr .iterateChunks ()
843+ mgtr .migrationContext .MarkRowCopyStartTime ()
844+ go mgtr .initiateStatus ()
845+
846+ mgtr .migrationContext .Log .Debugf ("Operating until row copy is complete" )
847+ mgtr .consumeRowCopyComplete ()
848+ mgtr .migrationContext .Log .Infof ("Row copy complete" )
849+ // Check if row copy was aborted due to error
850+ if err := mgtr .checkAbort (); err != nil {
851+ return err
852+ }
853+ if err := mgtr .hooksExecutor .OnRowCopyComplete (); err != nil {
854+ return err
855+ }
856+
857+ //TODO: cutover here
858+
859+ if err := mgtr .finalCleanup (); err != nil {
860+ return nil
861+ }
862+ if err := mgtr .hooksExecutor .OnSuccess (false ); err != nil {
863+ return err
864+ }
865+ mgtr .migrationContext .Log .Infof ("Done moving tables %v from %s to %s (%s)" ,
866+ mgtr .migrationContext .MoveTables .TableNames , sql .EscapeName (mgtr .migrationContext .DatabaseName ),
867+ sql .EscapeName (mgtr .migrationContext .MoveTables .TargetDatabase ), mgtr .migrationContext .MoveTables .TargetHost )
868+ // Final check for abort before declaring success
869+ if err := mgtr .checkAbort (); err != nil {
870+ return err
871+ }
773872 return nil
774873}
775874
0 commit comments