Skip to content

Commit 7c871b6

Browse files
VTOrc: fix PrimaryIsReadOnly recovery deadlock against PrimarySemiSyncBlocked (vitessio#20015)
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
1 parent b61058b commit 7c871b6

6 files changed

Lines changed: 335 additions & 19 deletions

File tree

CLAUDE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ To make sure tests are easy to read, we use `github.com/stretchr/testify/assert`
7777
- CI timeouts must be generous (30s+) — GitHub Actions runners can be resource-starved with multi-second pauses; sub-second timeouts cause flakiness with no recourse but retry
7878
- Do not use t.Fatal or t.Error in tests, but instead require and assert
7979

80+
### Test Honesty
81+
- A test must actually exercise the condition its name and doc claim, must fail on `main` without the fix it guards, and must not duplicate coverage that a unit test already pins down precisely. Tests that pass identically with or without the fix waste CI time and create false confidence.
82+
8083
## :rotating_light: Error Handling Excellence
8184

8285
Error handling is not an afterthought - it's core to reliable software.

go/test/endtoend/vtorc/general/vtorc_test.go

Lines changed: 172 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ import (
2020
"context"
2121
"encoding/json"
2222
"fmt"
23+
"os"
24+
"path"
2325
"strconv"
2426
"strings"
27+
"sync"
2528
"testing"
2629
"time"
2730

@@ -104,7 +107,8 @@ func TestErrantGTIDOnPreviousPrimary(t *testing.T) {
104107
output, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
105108
"PlannedReparentShard",
106109
fmt.Sprintf("%s/%s", keyspace.Name, shard0.Name),
107-
"--new-primary", replica.Alias)
110+
"--new-primary", replica.Alias,
111+
)
108112
require.NoError(t, err, "error in PlannedReparentShard output - %s", output)
109113

110114
// Stop replicatin on the previous primary to simulate it not reparenting properly.
@@ -319,7 +323,8 @@ func TestVTOrcRepairs(t *testing.T) {
319323
require.NoError(t, err)
320324

321325
// Wait for problems to be set.
322-
utils.WaitForDetectedProblems(t, vtOrcProcess,
326+
utils.WaitForDetectedProblems(
327+
t, vtOrcProcess,
323328
string(inst.PrimaryIsReadOnly),
324329
curPrimary.Alias,
325330
keyspace.Name,
@@ -333,7 +338,8 @@ func TestVTOrcRepairs(t *testing.T) {
333338
assert.Equal(t, 200, status)
334339

335340
// wait for detected problem to be cleared.
336-
utils.WaitForDetectedProblems(t, vtOrcProcess,
341+
utils.WaitForDetectedProblems(
342+
t, vtOrcProcess,
337343
string(inst.PrimaryIsReadOnly),
338344
curPrimary.Alias,
339345
keyspace.Name,
@@ -608,7 +614,8 @@ func TestVTOrcWithPrs(t *testing.T) {
608614
"PlannedReparentShard",
609615
fmt.Sprintf("%s/%s", keyspace.Name, shard0.Name),
610616
"--wait-replicas-timeout", "31s",
611-
"--new-primary", replica.Alias)
617+
"--new-primary", replica.Alias,
618+
)
612619
require.NoError(t, err, "error in PlannedReparentShard output - %s", output)
613620

614621
// check that the replica gets promoted
@@ -669,14 +676,16 @@ func TestDrainedTablet(t *testing.T) {
669676
require.NotNil(t, replica, "could not find any replica tablet")
670677

671678
output, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
672-
"ChangeTabletType", replica.Alias, "DRAINED")
679+
"ChangeTabletType", replica.Alias, "DRAINED",
680+
)
673681
require.NoError(t, err, "error in changing tablet type output - %s", output)
674682

675683
// Make sure VTOrc sees the drained tablets and doesn't forget them.
676684
utils.WaitForDrainedTabletInVTOrc(t, vtOrcProcess, 1)
677685

678686
output, err = clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
679-
"ChangeTabletType", replica.Alias, "REPLICA")
687+
"ChangeTabletType", replica.Alias, "REPLICA",
688+
)
680689
require.NoError(t, err, "error in changing tablet type output - %s", output)
681690

682691
// We have no drained tablets anymore. Wait for VTOrc to have processed that.
@@ -817,7 +826,8 @@ func TestSemiSyncRecoveryOrdering(t *testing.T) {
817826
// Change durability to semi_sync. VTOrc should detect that replicas and primary
818827
// need semi-sync enabled, and fix them in the correct order.
819828
out, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
820-
"SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy="+policy.DurabilitySemiSync)
829+
"SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy="+policy.DurabilitySemiSync,
830+
)
821831
require.NoError(t, err, out)
822832

823833
// Poll the database-state API to verify recovery ordering.
@@ -958,3 +968,158 @@ func TestReplicationStoppedWithSemiSyncBlocked(t *testing.T) {
958968
}, 30*time.Second, time.Second)
959969
utils.CheckReplication(t, clusterInfo, primary, allNonPrimary, 30*time.Second)
960970
}
971+
972+
// TestRecoveryDeadlocks exercises the `BeforeAnalyses` suppression mechanism
973+
// added by #19925 and extended by #20015 end-to-end: when a tablet-level
974+
// problem coexists with a shard-wide reachable-but-unhealthy primary problem,
975+
// VTOrc must dispatch the tablet-level recovery first and must NOT dispatch
976+
// ERS for the shard-wide problem.
977+
//
978+
// Pre-#19925/#20015, the shard-wide problem caused `recheckPrimaryHealth`
979+
// to abort the tablet-level recovery mid-flight, so the
980+
// `SuccessfulRecoveries[FixPrimary|FixReplica]` counter never incremented.
981+
// The fixes route the tablet-level recovery first via `BeforeAnalyses`, so
982+
// the counter ticks even while the shard-wide problem still exists.
983+
//
984+
// Coverage:
985+
//
986+
// - `PrimaryIsReadOnly × PrimarySemiSyncBlocked` (covered here, this is
987+
// the customer-facing scenario from issue #20011).
988+
//
989+
// - `PrimaryIsReadOnly × PrimaryDiskStalled` and
990+
// `ReplicationStopped × PrimaryDiskStalled` (NOT covered here). Both
991+
// pairings need `PrimaryDiskStalled` to fire, which requires
992+
// `!LastCheckValid && IsDiskStalled` simultaneously. Synthetic fault
993+
// injection (e.g. `chmod 000` on a probe dir) flips `IsDiskStalled` but
994+
// not `LastCheckValid` — the matcher does not match. Anything that
995+
// flips `LastCheckValid` (pausing vttablet, killing mysqld) also breaks
996+
// `fixPrimary`'s ability to run, so the assertion can't succeed either.
997+
// The ordering logic is identical to pair 1 (same `BeforeAnalyses`
998+
// bypass code path); coverage for these two pairings comes from unit
999+
// tests in `analysis_dao_test.go` (`TestDeclaresBefore`,
1000+
// `TestDeclaresAfter`) and `topology_recovery_test.go`
1001+
// (`TestRecheckPrimaryHealth`).
1002+
//
1003+
// - `ReplicationStopped × PrimarySemiSyncBlocked` (#19925's pairing) is
1004+
// covered separately by `TestReplicationStoppedWithSemiSyncBlocked`.
1005+
//
1006+
// The narrow window in which both problems coexist is ~1–2 analysis cycles
1007+
// (long enough for VTOrc to detect both and dispatch recovery once); we do
1008+
// not require sustained co-occurrence.
1009+
func TestRecoveryDeadlocks(t *testing.T) {
1010+
t.Run("PrimaryIsReadOnly+PrimarySemiSyncBlocked", func(t *testing.T) {
1011+
defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance)
1012+
disableSemiSyncOnAllTablets(t)
1013+
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{
1014+
PreventCrossCellFailover: true,
1015+
}, cluster.DefaultVtorcsByCell, policy.DurabilitySemiSync)
1016+
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
1017+
shard0 := &keyspace.Shards[0]
1018+
primary, replica, _ := waitForPrimaryAndPick(t, keyspace, shard0)
1019+
vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0]
1020+
utils.WaitForSuccessfulRecoveryCount(t, vtorc, logic.ElectNewPrimaryRecoveryName, keyspace.Name, shard0.Name, 1)
1021+
1022+
fixPrimaryBefore := utils.GetSuccessfulRecoveryCount(t, vtorc, logic.FixPrimaryRecoveryName, keyspace.Name, shard0.Name)
1023+
ersBefore := utils.GetSuccessfulRecoveryCount(t, vtorc, logic.RecoverDeadPrimaryRecoveryName, keyspace.Name, shard0.Name)
1024+
1025+
// Stop the acker's IO thread so semi-sync ACKs cannot flow.
1026+
_, err := utils.RunSQL(t, "STOP REPLICA IO_THREAD", replica, "")
1027+
require.NoError(t, err)
1028+
1029+
// Issue a write that will block on the semi-sync wait. The connection
1030+
// will return only once fixReplica restarts the acker's IO thread,
1031+
// after which an ACK flows and the write commits.
1032+
//
1033+
// Note: we cannot reliably assert PrimarySemiSyncBlocked is detected
1034+
// in this test (same caveat as TestReplicationStoppedWithSemiSyncBlocked).
1035+
// SemiSyncBlocked only flips when a write is waiting for acks, and
1036+
// VTOrc fixes the replica faster than we can sustain the blocking
1037+
// condition. The deadlock scenario is covered by unit tests in
1038+
// analysis_dao_test.go (TestDeclaresBefore) and
1039+
// topology_recovery_test.go (TestRecheckPrimaryHealth).
1040+
var wg sync.WaitGroup
1041+
wg.Go(func() {
1042+
_, _ = utils.RunSQL(t, "CREATE TABLE IF NOT EXISTS test_recovery_deadlocks (id INT PRIMARY KEY)", primary, "vt_ks")
1043+
})
1044+
t.Cleanup(func() {
1045+
// Defensively unblock the goroutine if the test fails before
1046+
// the cluster recovers naturally.
1047+
_, _ = utils.RunSQL(t, "SET GLOBAL super_read_only = OFF", primary, "")
1048+
_, _ = utils.RunSQL(t, "START REPLICA", replica, "")
1049+
wg.Wait()
1050+
})
1051+
1052+
// Set the primary read-only while the write is hanging.
1053+
_, err = utils.RunSQL(t, "SET GLOBAL super_read_only = ON", primary, "")
1054+
require.NoError(t, err)
1055+
1056+
// PrimaryIsReadOnly is detected within ~1 analysis cycle.
1057+
utils.WaitForDetectedProblems(t, vtorc, string(inst.PrimaryIsReadOnly), primary.Alias, keyspace.Name, shard0.Name, 1)
1058+
1059+
// fixPrimary must complete despite the shard-wide problem also being
1060+
// present. Pre-fix this counter never increments because
1061+
// recheckPrimaryHealth aborts the recovery mid-flight.
1062+
utils.WaitForSuccessfulRecoveryCount(t, vtorc, logic.FixPrimaryRecoveryName, keyspace.Name, shard0.Name, fixPrimaryBefore+1)
1063+
1064+
// ERS must not have been dispatched.
1065+
ersAfter := utils.GetSuccessfulRecoveryCount(t, vtorc, logic.RecoverDeadPrimaryRecoveryName, keyspace.Name, shard0.Name)
1066+
assert.Equal(t, ersBefore, ersAfter, "ERS should not have been dispatched")
1067+
1068+
// Primary should no longer be read-only.
1069+
assert.True(t, utils.WaitForReadOnlyValue(t, primary, 0))
1070+
})
1071+
}
1072+
1073+
// disableSemiSyncOnAllTablets clears `rpl_semi_sync_source_enabled` and
1074+
// `rpl_semi_sync_replica_enabled` on every tablet's mysqld in the shared
1075+
// cluster. Required before SetupVttabletsAndVTOrcs when the previous test
1076+
// left a primary with semi-sync source on: vttablet's TearDown does not
1077+
// disable semi-sync, and `cleanAndStartVttablet` issues a DROP DATABASE
1078+
// before restarting vttablet, which hangs forever in the semi-sync wait
1079+
// because no acker is connected.
1080+
//
1081+
// Uses mysql.Connect directly (not utils.RunSQL) so that tablets whose
1082+
// mysqld is not yet running (e.g., first subtest) are silently skipped
1083+
// rather than failing the test.
1084+
func disableSemiSyncOnAllTablets(t *testing.T) {
1085+
t.Helper()
1086+
for _, cellInfo := range clusterInfo.CellInfos {
1087+
all := append([]*cluster.Vttablet{}, cellInfo.ReplicaTablets...)
1088+
all = append(all, cellInfo.RdonlyTablets...)
1089+
for _, tablet := range all {
1090+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
1091+
params := mysql.ConnParams{
1092+
Uname: "vt_dba",
1093+
UnixSocket: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.sock", tablet.TabletUID)),
1094+
}
1095+
conn, err := mysql.Connect(ctx, &params)
1096+
cancel()
1097+
if err != nil {
1098+
continue
1099+
}
1100+
_, _ = conn.ExecuteFetch("SET GLOBAL rpl_semi_sync_source_enabled = 0, GLOBAL rpl_semi_sync_replica_enabled = 0", 1, false)
1101+
conn.Close()
1102+
}
1103+
}
1104+
}
1105+
1106+
// waitForPrimaryAndPick blocks until VTOrc has elected a primary and returns
1107+
// it along with the surviving REPLICA (semi-sync acker) and RDONLY tablets.
1108+
func waitForPrimaryAndPick(t *testing.T, keyspace *cluster.Keyspace, shard *cluster.Shard) (primary, replica, rdonly *cluster.Vttablet) {
1109+
t.Helper()
1110+
primary = utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard)
1111+
require.NotNil(t, primary, "should have elected a primary")
1112+
for _, tablet := range shard.Vttablets {
1113+
if tablet.Alias == primary.Alias {
1114+
continue
1115+
}
1116+
if tablet.Type == "rdonly" {
1117+
rdonly = tablet
1118+
} else {
1119+
replica = tablet
1120+
}
1121+
}
1122+
require.NotNil(t, replica, "should have a REPLICA tablet")
1123+
require.NotNil(t, rdonly, "should have an RDONLY tablet")
1124+
return primary, replica, rdonly
1125+
}

go/vt/vtorc/inst/analysis_dao.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package inst
1818

1919
import (
2020
"fmt"
21+
"log/slog"
2122
"slices"
2223
"time"
2324

@@ -401,7 +402,8 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
401402
a.IsDiskStalled = m.GetBool("is_disk_stalled")
402403

403404
if !a.LastCheckValid {
404-
analysisMessage := fmt.Sprintf("analysis: Alias: %+v, Keyspace: %+v, Shard: %+v, IsPrimary: %+v, PrimaryHealthUnhealthy: %+v, LastCheckValid: %+v, LastCheckPartialSuccess: %+v, CountReplicas: %+v, CountValidReplicas: %+v, CountValidReplicatingReplicas: %+v, CountLaggingReplicas: %+v, CountDelayedReplicas: %+v",
405+
analysisMessage := fmt.Sprintf(
406+
"analysis: Alias: %+v, Keyspace: %+v, Shard: %+v, IsPrimary: %+v, PrimaryHealthUnhealthy: %+v, LastCheckValid: %+v, LastCheckPartialSuccess: %+v, CountReplicas: %+v, CountValidReplicas: %+v, CountValidReplicatingReplicas: %+v, CountLaggingReplicas: %+v, CountDelayedReplicas: %+v",
405407
a.AnalyzedInstanceAlias, a.AnalyzedKeyspace, a.AnalyzedShard, a.IsPrimary, a.PrimaryHealthUnhealthy, a.LastCheckValid, a.LastCheckPartialSuccess, a.CountReplicas, a.CountValidReplicas, a.CountValidReplicatingReplicas, a.CountLaggingReplicas, a.CountDelayedReplicas,
406408
)
407409
if util.ClearToLog("analysis_dao", analysisMessage) {
@@ -468,9 +470,25 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
468470
chosenProblem := matchedProblems[0]
469471
a.Analysis = chosenProblem.Meta.Analysis
470472
a.Description = chosenProblem.Meta.Description
473+
// Per-decision log keys gated by util.ClearToLog so operators
474+
// see the first occurrence of each unique prioritization decision
475+
// and a periodic refresh while it persists, without flooding the
476+
// log every analysis cycle for the duration of an incident.
477+
tabletAliasString := topoproto.TabletAliasString(tablet.Alias)
471478
if chosenProblem.Meta.Priority == detectionAnalysisPriorityShardWideAction {
472479
if ca.hasShardWideAction {
473480
// Already have a shard-wide action — suppress this one.
481+
key := fmt.Sprintf("%s.%s.%s.%s.%s", tabletAliasString, a.AnalyzedKeyspace, a.AnalyzedShard, chosenProblem.Meta.Analysis, ca.shardWideAnalysisCode)
482+
if util.ClearToLog("analysis_dao.suppress_duplicate_shard_wide", key) {
483+
log.Info(
484+
"suppressing duplicate shard-wide action",
485+
slog.String("tablet", tabletAliasString),
486+
slog.String("keyspace", a.AnalyzedKeyspace),
487+
slog.String("shard", a.AnalyzedShard),
488+
slog.String("suppressed", string(chosenProblem.Meta.Analysis)),
489+
slog.String("active_shard_wide", string(ca.shardWideAnalysisCode)),
490+
)
491+
}
474492
return nil
475493
}
476494
ca.hasShardWideAction = true
@@ -491,17 +509,52 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
491509
return declaresBefore(p, ca.shardWideAnalysisCode) ||
492510
declaresAfter(ca.shardWideProblem, p.Meta.Analysis)
493511
}
494-
if !survives(chosenProblem) {
512+
if survives(chosenProblem) {
513+
key := fmt.Sprintf("%s.%s.%s.%s.%s", tabletAliasString, a.AnalyzedKeyspace, a.AnalyzedShard, chosenProblem.Meta.Analysis, ca.shardWideAnalysisCode)
514+
if util.ClearToLog("analysis_dao.prioritize", key) {
515+
log.Info(
516+
"prioritizing tablet problem before shard-wide action",
517+
slog.String("tablet", tabletAliasString),
518+
slog.String("keyspace", a.AnalyzedKeyspace),
519+
slog.String("shard", a.AnalyzedShard),
520+
slog.String("chosen", string(chosenProblem.Meta.Analysis)),
521+
slog.String("deferred_shard_wide", string(ca.shardWideAnalysisCode)),
522+
)
523+
}
524+
} else {
495525
found := false
496526
for _, p := range matchedProblems[1:] {
497527
if survives(p) {
528+
key := fmt.Sprintf("%s.%s.%s.%s.%s.%s", tabletAliasString, a.AnalyzedKeyspace, a.AnalyzedShard, p.Meta.Analysis, chosenProblem.Meta.Analysis, ca.shardWideAnalysisCode)
529+
if util.ClearToLog("analysis_dao.prioritize_alt", key) {
530+
log.Info(
531+
"prioritizing tablet problem before shard-wide action",
532+
slog.String("tablet", tabletAliasString),
533+
slog.String("keyspace", a.AnalyzedKeyspace),
534+
slog.String("shard", a.AnalyzedShard),
535+
slog.String("chosen", string(p.Meta.Analysis)),
536+
slog.String("higher_priority_skipped", string(chosenProblem.Meta.Analysis)),
537+
slog.String("deferred_shard_wide", string(ca.shardWideAnalysisCode)),
538+
)
539+
}
498540
a.Analysis = p.Meta.Analysis
499541
a.Description = p.Meta.Description
500542
found = true
501543
break
502544
}
503545
}
504546
if !found {
547+
key := fmt.Sprintf("%s.%s.%s.%s.%s", tabletAliasString, a.AnalyzedKeyspace, a.AnalyzedShard, chosenProblem.Meta.Analysis, ca.shardWideAnalysisCode)
548+
if util.ClearToLog("analysis_dao.suppress_tablet", key) {
549+
log.Info(
550+
"suppressing tablet problem in favor of shard-wide action",
551+
slog.String("tablet", tabletAliasString),
552+
slog.String("keyspace", a.AnalyzedKeyspace),
553+
slog.String("shard", a.AnalyzedShard),
554+
slog.String("suppressed", string(chosenProblem.Meta.Analysis)),
555+
slog.String("shard_wide", string(ca.shardWideAnalysisCode)),
556+
)
557+
}
505558
return nil
506559
}
507560
}
@@ -642,7 +695,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias *topodatapb.TabletAlias, analy
642695
// Find if the lastAnalysisHasChanged or not while updating the row if it has.
643696
lastAnalysisChanged := false
644697
{
645-
sqlResult, err := db.ExecVTOrc(`UPDATE database_instance_last_analysis
698+
sqlResult, err := db.ExecVTOrc(
699+
`UPDATE database_instance_last_analysis
646700
SET
647701
analysis = ?,
648702
analysis_timestamp = DATETIME('now')
@@ -671,7 +725,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias *topodatapb.TabletAlias, analy
671725
firstInsertion := false
672726
if !lastAnalysisChanged {
673727
// The insert only returns more than 1 row changed if this is the first insertion.
674-
sqlResult, err := db.ExecVTOrc(`INSERT OR IGNORE
728+
sqlResult, err := db.ExecVTOrc(
729+
`INSERT OR IGNORE
675730
INTO database_instance_last_analysis (
676731
alias,
677732
analysis_timestamp,
@@ -701,7 +756,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias *topodatapb.TabletAlias, analy
701756
return nil
702757
}
703758

704-
_, err := db.ExecVTOrc(`INSERT
759+
_, err := db.ExecVTOrc(
760+
`INSERT
705761
INTO database_instance_analysis_changelog (
706762
alias,
707763
analysis_timestamp,
@@ -724,7 +780,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias *topodatapb.TabletAlias, analy
724780

725781
// ExpireInstanceAnalysisChangelog removes old-enough analysis entries from the changelog
726782
func ExpireInstanceAnalysisChangelog() error {
727-
_, err := db.ExecVTOrc(`DELETE
783+
_, err := db.ExecVTOrc(
784+
`DELETE
728785
FROM database_instance_analysis_changelog
729786
WHERE
730787
analysis_timestamp < DATETIME('now', PRINTF('-%d HOUR', ?))

0 commit comments

Comments
 (0)