@@ -202,8 +202,8 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro
202202// consumes and drops any further incoming events that may be left hanging.
203203func (this * Migrator ) consumeRowCopyComplete () {
204204 if err := <- this .rowCopyComplete ; err != nil {
205- // Use helper to prevent deadlock if listenOnPanicAbort already exited
206- _ = base . SendWithContext ( this .migrationContext . GetContext (), this . migrationContext . PanicAbort , err )
205+ // Abort synchronously to ensure checkAbort() sees the error immediately
206+ this .abort ( err )
207207 // Don't mark row copy as complete if there was an error
208208 return
209209 }
@@ -212,8 +212,8 @@ func (this *Migrator) consumeRowCopyComplete() {
212212 go func () {
213213 for err := range this .rowCopyComplete {
214214 if err != nil {
215- // Use helper to prevent deadlock if listenOnPanicAbort already exited
216- _ = base . SendWithContext ( this .migrationContext . GetContext (), this . migrationContext . PanicAbort , err )
215+ // Abort synchronously to ensure the error is stored immediately
216+ this .abort ( err )
217217 return
218218 }
219219 }
@@ -285,10 +285,10 @@ func (this *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e
285285 }
286286}
287287
288- // listenOnPanicAbort listens for fatal errors and initiates graceful shutdown
289- func ( this * Migrator ) listenOnPanicAbort () {
290- err := <- this . migrationContext . PanicAbort
291-
288+ // abort stores the error, cancels the context, and logs the abort.
289+ // This is the common abort logic used by both listenOnPanicAbort and
290+ // consumeRowCopyComplete to ensure consistent error handling.
291+ func ( this * Migrator ) abort ( err error ) {
292292 // Store the error for Migrate() to return
293293 this .migrationContext .SetAbortError (err )
294294
@@ -299,6 +299,12 @@ func (this *Migrator) listenOnPanicAbort() {
299299 this .migrationContext .Log .Errorf ("Migration aborted: %v" , err )
300300}
301301
302+ // listenOnPanicAbort listens for fatal errors and initiates graceful shutdown
303+ func (this * Migrator ) listenOnPanicAbort () {
304+ err := <- this .migrationContext .PanicAbort
305+ this .abort (err )
306+ }
307+
302308// validateAlterStatement validates the `alter` statement meets criteria.
303309// At this time this means:
304310// - column renames are approved
0 commit comments