Skip to content

Commit 884ba90

Browse files
Juan Manuel FernandezGonlo2
authored andcommitted
Fixed wait forever if not updates take place in the table during the cut-over
1 parent f5b7fb0 commit 884ba90

File tree

2 files changed

+64
-18
lines changed

2 files changed

+64
-18
lines changed

doc/cut-over.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ Another option to support a atomic swap without the use of a lock is the use of
1010
gh-ost philosophy is avoid the use of triggers but in some environments like a Galera cluster isn't possible use the
1111
lock command, and like the cut-over should take a little time it shouldn't be a problem. Triggers cut-over works like the following:
1212

13-
- gh-ost disable the writes but it keep reading the binlog to keep a record of the changed rows.
13+
- A stop writes event is injected in the binlog and gh-ost disable the writes once it receive it.
1414
- The triggers are created to handle the modifications in the MySQL side.
1515
- A created triggers event is injected in the binlog and gh-ost wait until receive it.
16-
- The affected rows during the change of writer could be in an inconsistent state, for this reason is necessary sanitize the modified rows removing them if exists and adding them if exists from the original table to the ghost one.
16+
- The affected rows will be in an inconsistent stata during the time between the first and the second event. For this reason, this events are checked and, the values of the fields that are part of the unique key used to do the online alter are saved to sanitize that rows.
1717

1818
`gh-ost` solves this by using an atomic, two-step blocking swap: while one connection holds the lock, another attempts the atomic `RENAME`. The `RENAME` is guaranteed to not be executed prematurely by positioning a sentry table which blocks the `RENAME` operation until `gh-ost` is satisfied all is in order.
1919

go/logic/migrator.go

Lines changed: 62 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type ChangelogState string
2727
const (
2828
GhostTableMigrated ChangelogState = "GhostTableMigrated"
2929
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
30+
StopWriteEvents = "StopWriteEvents"
3031
AllEventsUpToTriggersProcessed = "AllEventsUpToTriggersProcessed"
3132
)
3233

@@ -76,7 +77,7 @@ type Migrator struct {
7677
ghostTableMigrated chan bool
7778
rowCopyComplete chan error
7879
allEventsUpToLockProcessed chan string
79-
stoppedBinlogWrites chan bool
80+
stoppedWriteEvents chan string
8081
allEventsUpToTriggersProcessed chan string
8182

8283
triggerCutoverUniqueKeys [][]interface {}
@@ -100,7 +101,7 @@ func NewMigrator(context *base.MigrationContext) *Migrator {
100101
firstThrottlingCollected: make(chan bool, 3),
101102
rowCopyComplete: make(chan error),
102103
allEventsUpToLockProcessed: make(chan string),
103-
stoppedBinlogWrites: make(chan bool),
104+
stoppedWriteEvents: make(chan string),
104105
allEventsUpToTriggersProcessed: make(chan string),
105106

106107
copyRowsQueue: make(chan tableWriteFunc),
@@ -244,10 +245,26 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
244245
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
245246
}()
246247
}
248+
case StopWriteEvents:
249+
{
250+
var applyEventFunc tableWriteFunc = func() error {
251+
atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1)
252+
this.stoppedWriteEvents <- changelogStateString
253+
return nil
254+
}
255+
// at this point we know that the triggers will be created and we don't want write the
256+
// next events from the streamer, because the streamer works sequentially. So those
257+
// events are either already handled, or have event functions in applyEventsQueue.
258+
// So as not to create a potential deadlock, we write this func to applyEventsQueue
259+
// asynchronously, understanding it doesn't really matter.
260+
go func() {
261+
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
262+
}()
263+
}
247264
case AllEventsUpToTriggersProcessed:
248265
{
249266
var applyEventFunc tableWriteFunc = func() error {
250-
atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 3)
267+
atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2)
251268
this.allEventsUpToTriggersProcessed <- changelogStateString
252269
return nil
253270
}
@@ -561,13 +578,45 @@ func (this *Migrator) cutOver() (err error) {
561578
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
562579
}
563580

564-
// waitForStopWrites Disable the writes from the binlog and keep a record of the affected rows
565-
func (this *Migrator) waitForStopWrites() (err error) {
566-
log.Infof("Stopping the writes")
567-
atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 1)
568-
<-this.stoppedBinlogWrites
581+
// waitForStopWriteEvents Inject the "StopWriteEvents" state hint,
582+
// wait for it to appear in the binary logs, make sure the queue is drained.
583+
func (this *Migrator) waitForStopWriteEvents() (err error) {
584+
timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
569585

570-
log.Infof("Done waiting to stop the writes")
586+
this.migrationContext.MarkPointOfInterest()
587+
stopWriteEventsStartTime := time.Now()
588+
589+
stopWriteEventsChallenge := fmt.Sprintf(
590+
"%s:%d",
591+
string(StopWriteEvents),
592+
stopWriteEventsStartTime.UnixNano())
593+
594+
log.Infof("Writing changelog state: %+v", stopWriteEventsChallenge)
595+
if _, err := this.applier.WriteChangelogState(stopWriteEventsChallenge); err != nil {
596+
return err
597+
}
598+
log.Infof("Waiting for stop writes")
599+
for found := false; !found; {
600+
select {
601+
case <-timeout.C:
602+
{
603+
return log.Errorf("Timeout while waiting for stop writes")
604+
}
605+
case state := <-this.stoppedWriteEvents:
606+
{
607+
if state == stopWriteEventsChallenge {
608+
log.Infof("Waiting for stop writes: got %s", state)
609+
found = true
610+
} else {
611+
log.Infof("Waiting for stop writes: skipping %s", state)
612+
}
613+
}
614+
}
615+
}
616+
stopWriteEventsDuration := time.Since(stopWriteEventsStartTime)
617+
618+
log.Infof("Done waiting for stop writes; duration=%+v", stopWriteEventsDuration)
619+
this.printStatus(ForcePrintStatusAndHintRule)
571620

572621
return nil
573622
}
@@ -691,7 +740,9 @@ func (this *Migrator) cutOverTrigger() (err error) {
691740
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
692741
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
693742

694-
this.waitForStopWrites()
743+
if err := this.waitForStopWriteEvents(); err != nil {
744+
return err
745+
}
695746

696747
defer this.applier.DropTriggersOldTableIfExists()
697748

@@ -1310,17 +1361,12 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
13101361
if eventStruct.dmlEvent != nil {
13111362
applyDMLEventState := atomic.LoadInt64(&this.migrationContext.ApplyDMLEventState)
13121363
if applyDMLEventState == 1 {
1313-
atomic.StoreInt64(&this.migrationContext.ApplyDMLEventState, 2)
1314-
applyDMLEventState = 2
1315-
this.stoppedBinlogWrites <- true
1316-
}
1317-
if applyDMLEventState == 2 {
13181364
this.triggerCutoverUniqueKeys = append(
13191365
this.triggerCutoverUniqueKeys,
13201366
this.applier.ObtainUniqueKeyValuesOfEvent(eventStruct.dmlEvent)...)
13211367
return nil
13221368
}
1323-
if applyDMLEventState == 3 {
1369+
if applyDMLEventState == 2 {
13241370
return nil
13251371
}
13261372

0 commit comments

Comments
 (0)