Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func TestConnectionBilateralCleanup(t *testing.T) {

To make sure tests are easy to read, we use testify assertions. Make sure to use assert.Eventually instead of using manual thread.sleep and timeouts.

### Test Honesty
- A test must actually exercise the condition its name and doc claim, must fail on `main` without the fix it guards, and must not duplicate coverage that a unit test already pins down precisely. Tests that pass identically with or without the fix waste CI time and create false confidence.

## :rotating_light: Error Handling Excellence

Error handling is not an afterthought - it's core to reliable software.
Expand Down
179 changes: 172 additions & 7 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -104,7 +107,8 @@ func TestErrantGTIDOnPreviousPrimary(t *testing.T) {
output, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"PlannedReparentShard",
fmt.Sprintf("%s/%s", keyspace.Name, shard0.Name),
"--new-primary", replica.Alias)
"--new-primary", replica.Alias,
)
require.NoError(t, err, "error in PlannedReparentShard output - %s", output)

// Stop replicatin on the previous primary to simulate it not reparenting properly.
Expand Down Expand Up @@ -319,7 +323,8 @@ func TestVTOrcRepairs(t *testing.T) {
require.NoError(t, err)

// Wait for problems to be set.
utils.WaitForDetectedProblems(t, vtOrcProcess,
utils.WaitForDetectedProblems(
t, vtOrcProcess,
string(inst.PrimaryIsReadOnly),
curPrimary.Alias,
keyspace.Name,
Expand All @@ -333,7 +338,8 @@ func TestVTOrcRepairs(t *testing.T) {
assert.Equal(t, 200, status)

// wait for detected problem to be cleared.
utils.WaitForDetectedProblems(t, vtOrcProcess,
utils.WaitForDetectedProblems(
t, vtOrcProcess,
string(inst.PrimaryIsReadOnly),
curPrimary.Alias,
keyspace.Name,
Expand Down Expand Up @@ -700,7 +706,8 @@ func TestVTOrcWithPrs(t *testing.T) {
"PlannedReparentShard",
fmt.Sprintf("%s/%s", keyspace.Name, shard0.Name),
"--wait-replicas-timeout", "31s",
"--new-primary", replica.Alias)
"--new-primary", replica.Alias,
)
require.NoError(t, err, "error in PlannedReparentShard output - %s", output)

// check that the replica gets promoted
Expand Down Expand Up @@ -761,14 +768,16 @@ func TestDrainedTablet(t *testing.T) {
require.NotNil(t, replica, "could not find any replica tablet")

output, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"ChangeTabletType", replica.Alias, "DRAINED")
"ChangeTabletType", replica.Alias, "DRAINED",
)
require.NoError(t, err, "error in changing tablet type output - %s", output)

// Make sure VTOrc sees the drained tablets and doesn't forget them.
utils.WaitForDrainedTabletInVTOrc(t, vtOrcProcess, 1)

output, err = clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"ChangeTabletType", replica.Alias, "REPLICA")
"ChangeTabletType", replica.Alias, "REPLICA",
)
require.NoError(t, err, "error in changing tablet type output - %s", output)

// We have no drained tablets anymore. Wait for VTOrc to have processed that.
Expand Down Expand Up @@ -919,7 +928,8 @@ func TestSemiSyncRecoveryOrdering(t *testing.T) {
// Change durability to semi_sync. VTOrc should detect that replicas and primary
// need semi-sync enabled, and fix them in the correct order.
out, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy="+policy.DurabilitySemiSync)
"SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy="+policy.DurabilitySemiSync,
)
require.NoError(t, err, out)

// Poll the database-state API to verify recovery ordering.
Expand Down Expand Up @@ -1060,3 +1070,158 @@ func TestReplicationStoppedWithSemiSyncBlocked(t *testing.T) {
}, 30*time.Second, time.Second)
utils.CheckReplication(t, clusterInfo, primary, allNonPrimary, 30*time.Second)
}

// TestRecoveryDeadlocks exercises the `BeforeAnalyses` suppression mechanism
// added by #19925 and extended by #20015 end-to-end: when a tablet-level
// problem coexists with a shard-wide reachable-but-unhealthy primary problem,
// VTOrc must dispatch the tablet-level recovery first and must NOT dispatch
// ERS for the shard-wide problem.
//
// Pre-#19925/#20015, the shard-wide problem caused `recheckPrimaryHealth`
// to abort the tablet-level recovery mid-flight, so the
// `SuccessfulRecoveries[FixPrimary|FixReplica]` counter never incremented.
// The fixes route the tablet-level recovery first via `BeforeAnalyses`, so
// the counter ticks even while the shard-wide problem still exists.
//
// Coverage:
//
// - `PrimaryIsReadOnly × PrimarySemiSyncBlocked` (covered here, this is
// the customer-facing scenario from issue #20011).
//
// - `PrimaryIsReadOnly × PrimaryDiskStalled` and
// `ReplicationStopped × PrimaryDiskStalled` (NOT covered here). Both
// pairings need `PrimaryDiskStalled` to fire, which requires
// `!LastCheckValid && IsDiskStalled` simultaneously. Synthetic fault
// injection (e.g. `chmod 000` on a probe dir) flips `IsDiskStalled` but
// not `LastCheckValid` — the matcher does not match. Anything that
// flips `LastCheckValid` (pausing vttablet, killing mysqld) also breaks
// `fixPrimary`'s ability to run, so the assertion can't succeed either.
// The ordering logic is identical to pair 1 (same `BeforeAnalyses`
// bypass code path); coverage for these two pairings comes from unit
// tests in `analysis_dao_test.go` (`TestDeclaresBefore`,
// `TestDeclaresAfter`) and `topology_recovery_test.go`
// (`TestRecheckPrimaryHealth`).
//
// - `ReplicationStopped × PrimarySemiSyncBlocked` (#19925's pairing) is
// covered separately by `TestReplicationStoppedWithSemiSyncBlocked`.
//
// The narrow window in which both problems coexist is ~1–2 analysis cycles
// (long enough for VTOrc to detect both and dispatch recovery once); we do
// not require sustained co-occurrence.
func TestRecoveryDeadlocks(t *testing.T) {
t.Run("PrimaryIsReadOnly+PrimarySemiSyncBlocked", func(t *testing.T) {
defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance)
disableSemiSyncOnAllTablets(t)
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 1, nil, cluster.VTOrcConfiguration{
PreventCrossCellFailover: true,
}, cluster.DefaultVtorcsByCell, policy.DurabilitySemiSync)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]
primary, replica, _ := waitForPrimaryAndPick(t, keyspace, shard0)
vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0]
utils.WaitForSuccessfulRecoveryCount(t, vtorc, logic.ElectNewPrimaryRecoveryName, keyspace.Name, shard0.Name, 1)

fixPrimaryBefore := utils.GetSuccessfulRecoveryCount(t, vtorc, logic.FixPrimaryRecoveryName, keyspace.Name, shard0.Name)
ersBefore := utils.GetSuccessfulRecoveryCount(t, vtorc, logic.RecoverDeadPrimaryRecoveryName, keyspace.Name, shard0.Name)

// Stop the acker's IO thread so semi-sync ACKs cannot flow.
_, err := utils.RunSQL(t, "STOP REPLICA IO_THREAD", replica, "")
require.NoError(t, err)

// Issue a write that will block on the semi-sync wait. The connection
// will return only once fixReplica restarts the acker's IO thread,
// after which an ACK flows and the write commits.
//
// Note: we cannot reliably assert PrimarySemiSyncBlocked is detected
// in this test (same caveat as TestReplicationStoppedWithSemiSyncBlocked).
// SemiSyncBlocked only flips when a write is waiting for acks, and
// VTOrc fixes the replica faster than we can sustain the blocking
// condition. The deadlock scenario is covered by unit tests in
// analysis_dao_test.go (TestDeclaresBefore) and
// topology_recovery_test.go (TestRecheckPrimaryHealth).
var wg sync.WaitGroup
wg.Go(func() {
_, _ = utils.RunSQL(t, "CREATE TABLE IF NOT EXISTS test_recovery_deadlocks (id INT PRIMARY KEY)", primary, "vt_ks")
})
t.Cleanup(func() {
// Defensively unblock the goroutine if the test fails before
// the cluster recovers naturally.
_, _ = utils.RunSQL(t, "SET GLOBAL super_read_only = OFF", primary, "")
_, _ = utils.RunSQL(t, "START REPLICA", replica, "")
wg.Wait()
})

// Set the primary read-only while the write is hanging.
_, err = utils.RunSQL(t, "SET GLOBAL super_read_only = ON", primary, "")
require.NoError(t, err)

// PrimaryIsReadOnly is detected within ~1 analysis cycle.
utils.WaitForDetectedProblems(t, vtorc, string(inst.PrimaryIsReadOnly), primary.Alias, keyspace.Name, shard0.Name, 1)

// fixPrimary must complete despite the shard-wide problem also being
// present. Pre-fix this counter never increments because
// recheckPrimaryHealth aborts the recovery mid-flight.
utils.WaitForSuccessfulRecoveryCount(t, vtorc, logic.FixPrimaryRecoveryName, keyspace.Name, shard0.Name, fixPrimaryBefore+1)

// ERS must not have been dispatched.
ersAfter := utils.GetSuccessfulRecoveryCount(t, vtorc, logic.RecoverDeadPrimaryRecoveryName, keyspace.Name, shard0.Name)
assert.Equal(t, ersBefore, ersAfter, "ERS should not have been dispatched")

// Primary should no longer be read-only.
assert.True(t, utils.WaitForReadOnlyValue(t, primary, 0))
})
}

// disableSemiSyncOnAllTablets clears `rpl_semi_sync_source_enabled` and
// `rpl_semi_sync_replica_enabled` on every tablet's mysqld in the shared
// cluster. Required before SetupVttabletsAndVTOrcs when the previous test
// left a primary with semi-sync source on: vttablet's TearDown does not
// disable semi-sync, and `cleanAndStartVttablet` issues a DROP DATABASE
// before restarting vttablet, which hangs forever in the semi-sync wait
// because no acker is connected.
//
// Uses mysql.Connect directly (not utils.RunSQL) so that tablets whose
// mysqld is not yet running (e.g., first subtest) are silently skipped
// rather than failing the test.
func disableSemiSyncOnAllTablets(t *testing.T) {
t.Helper()
for _, cellInfo := range clusterInfo.CellInfos {
all := append([]*cluster.Vttablet{}, cellInfo.ReplicaTablets...)
all = append(all, cellInfo.RdonlyTablets...)
for _, tablet := range all {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
params := mysql.ConnParams{
Uname: "vt_dba",
UnixSocket: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/mysql.sock", tablet.TabletUID)),
}
conn, err := mysql.Connect(ctx, &params)
cancel()
if err != nil {
continue
}
_, _ = conn.ExecuteFetch("SET GLOBAL rpl_semi_sync_source_enabled = 0, GLOBAL rpl_semi_sync_replica_enabled = 0", 1, false)
conn.Close()
}
}
}

// waitForPrimaryAndPick blocks until VTOrc has elected a primary and returns
// it along with the surviving REPLICA (semi-sync acker) and RDONLY tablets.
func waitForPrimaryAndPick(t *testing.T, keyspace *cluster.Keyspace, shard *cluster.Shard) (primary, replica, rdonly *cluster.Vttablet) {
t.Helper()
primary = utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard)
require.NotNil(t, primary, "should have elected a primary")
for _, tablet := range shard.Vttablets {
if tablet.Alias == primary.Alias {
continue
}
if tablet.Type == "rdonly" {
rdonly = tablet
} else {
replica = tablet
}
}
require.NotNil(t, replica, "should have a REPLICA tablet")
require.NotNil(t, rdonly, "should have an RDONLY tablet")
return primary, replica, rdonly
}
66 changes: 61 additions & 5 deletions go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package inst

import (
"fmt"
"log/slog"
"slices"
"time"

Expand Down Expand Up @@ -467,9 +468,25 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
chosenProblem := matchedProblems[0]
a.Analysis = chosenProblem.Meta.Analysis
a.Description = chosenProblem.Meta.Description
// Per-decision log keys gated by util.ClearToLog so operators
// see the first occurrence of each unique prioritization decision
// and a periodic refresh while it persists, without flooding the
// log every analysis cycle for the duration of an incident.
tabletAliasString := topoproto.TabletAliasString(tablet.Alias)
if chosenProblem.Meta.Priority == detectionAnalysisPriorityShardWideAction {
if ca.hasShardWideAction {
// Already have a shard-wide action — suppress this one.
key := fmt.Sprintf("%s.%s.%s.%s.%s", tabletAliasString, a.AnalyzedKeyspace, a.AnalyzedShard, chosenProblem.Meta.Analysis, ca.shardWideAnalysisCode)
if util.ClearToLog("analysis_dao.suppress_duplicate_shard_wide", key) {
log.Info(
"suppressing duplicate shard-wide action",
slog.String("tablet", tabletAliasString),
slog.String("keyspace", a.AnalyzedKeyspace),
slog.String("shard", a.AnalyzedShard),
slog.String("suppressed", string(chosenProblem.Meta.Analysis)),
slog.String("active_shard_wide", string(ca.shardWideAnalysisCode)),
)
Comment thread
timvaillancourt marked this conversation as resolved.
}
return nil
}
ca.hasShardWideAction = true
Expand All @@ -490,17 +507,52 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
return declaresBefore(p, ca.shardWideAnalysisCode) ||
declaresAfter(ca.shardWideProblem, p.Meta.Analysis)
}
if !survives(chosenProblem) {
if survives(chosenProblem) {
key := fmt.Sprintf("%s.%s.%s.%s.%s", tabletAliasString, a.AnalyzedKeyspace, a.AnalyzedShard, chosenProblem.Meta.Analysis, ca.shardWideAnalysisCode)
if util.ClearToLog("analysis_dao.prioritize", key) {
log.Info(
"prioritizing tablet problem before shard-wide action",
slog.String("tablet", tabletAliasString),
slog.String("keyspace", a.AnalyzedKeyspace),
slog.String("shard", a.AnalyzedShard),
slog.String("chosen", string(chosenProblem.Meta.Analysis)),
slog.String("deferred_shard_wide", string(ca.shardWideAnalysisCode)),
)
}
} else {
found := false
for _, p := range matchedProblems[1:] {
if survives(p) {
key := fmt.Sprintf("%s.%s.%s.%s.%s.%s", tabletAliasString, a.AnalyzedKeyspace, a.AnalyzedShard, p.Meta.Analysis, chosenProblem.Meta.Analysis, ca.shardWideAnalysisCode)
if util.ClearToLog("analysis_dao.prioritize_alt", key) {
log.Info(
"prioritizing tablet problem before shard-wide action",
slog.String("tablet", tabletAliasString),
slog.String("keyspace", a.AnalyzedKeyspace),
slog.String("shard", a.AnalyzedShard),
slog.String("chosen", string(p.Meta.Analysis)),
slog.String("higher_priority_skipped", string(chosenProblem.Meta.Analysis)),
slog.String("deferred_shard_wide", string(ca.shardWideAnalysisCode)),
)
}
a.Analysis = p.Meta.Analysis
a.Description = p.Meta.Description
found = true
break
}
}
if !found {
key := fmt.Sprintf("%s.%s.%s.%s.%s", tabletAliasString, a.AnalyzedKeyspace, a.AnalyzedShard, chosenProblem.Meta.Analysis, ca.shardWideAnalysisCode)
if util.ClearToLog("analysis_dao.suppress_tablet", key) {
log.Info(
"suppressing tablet problem in favor of shard-wide action",
slog.String("tablet", tabletAliasString),
slog.String("keyspace", a.AnalyzedKeyspace),
slog.String("shard", a.AnalyzedShard),
slog.String("suppressed", string(chosenProblem.Meta.Analysis)),
slog.String("shard_wide", string(ca.shardWideAnalysisCode)),
)
}
return nil
}
}
Expand Down Expand Up @@ -637,7 +689,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC
// Find if the lastAnalysisHasChanged or not while updating the row if it has.
lastAnalysisChanged := false
{
sqlResult, err := db.ExecVTOrc(`UPDATE database_instance_last_analysis
sqlResult, err := db.ExecVTOrc(
`UPDATE database_instance_last_analysis
SET
analysis = ?,
analysis_timestamp = DATETIME('now')
Expand All @@ -664,7 +717,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC
firstInsertion := false
if !lastAnalysisChanged {
// The insert only returns more than 1 row changed if this is the first insertion.
sqlResult, err := db.ExecVTOrc(`INSERT OR IGNORE
sqlResult, err := db.ExecVTOrc(
`INSERT OR IGNORE
INTO database_instance_last_analysis (
alias,
analysis_timestamp,
Expand Down Expand Up @@ -693,7 +747,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC
return nil
}

_, err := db.ExecVTOrc(`INSERT
_, err := db.ExecVTOrc(
`INSERT
INTO database_instance_analysis_changelog (
alias,
analysis_timestamp,
Expand All @@ -715,7 +770,8 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC

// ExpireInstanceAnalysisChangelog removes old-enough analysis entries from the changelog
func ExpireInstanceAnalysisChangelog() error {
_, err := db.ExecVTOrc(`DELETE
_, err := db.ExecVTOrc(
`DELETE
FROM database_instance_analysis_changelog
WHERE
analysis_timestamp < DATETIME('now', PRINTF('-%d HOUR', ?))
Expand Down
Loading
Loading