diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 979b272d917..198e495be8c 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -1434,7 +1434,7 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) { utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") _, err = utils.ExecAllowError(t, conn, "commit") require.Error(t, err) - require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not") + require.ErrorContains(t, err, "2pc is enabled, but not currently allowed") _, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") require.NoError(t, err) diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index 94bffd7d84d..120d042155c 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -117,7 +117,7 @@ type Controller interface { // RollbackPrepared rolls back the prepared transaction and removes the transaction log. RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) error - // WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved. + // WaitForPreparedTwoPCTransactions waits for all unresolved 2PC transactions on this tablet to be resolved. WaitForPreparedTwoPCTransactions(ctx context.Context) error // SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager. diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index dfc6f5dafb6..ef750e43b07 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -62,7 +62,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } if !dte.te.IsTwoPCAllowed() { - return vterrors.VT10002("two-pc is enabled, but semi-sync is not") + return vterrors.VT10002("2pc is enabled, but not currently allowed") } defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now()) dte.logStats.TransactionID = transactionID @@ -228,6 +228,9 @@ func (dte *DTExecutor) CreateTransaction(dtid string, participants []*querypb.Ta if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } + if !dte.te.IsTwoPCAllowed() { + return vterrors.VT10002("2pc is enabled, but not currently allowed") + } defer dte.te.env.Stats().QueryTimings.Record("CREATE_TRANSACTION", time.Now()) return dte.inTransaction(func(conn *StatefulConnection) error { return dte.te.twoPC.CreateTransaction(dte.ctx, conn, dtid, participants) diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index 67babc6a23e..43db757a34d 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -675,6 +675,34 @@ func TestNoTwopc(t *testing.T) { } } +func TestTwoPCNotAllowed(t *testing.T) { + ctx := t.Context() + txe, tsv, _, closer := newTestTxExecutor(t, ctx) + defer closer() + + tsv.SetTwoPCAllowed(TwoPCAllowed_TabletControls, false) + require.False(t, tsv.te.IsTwoPCAllowed()) + + testcases := []struct { + desc string + fun func() error + wantErr string + }{{ + desc: "Prepare", + fun: func() error { return txe.Prepare(1, "aa") }, + wantErr: "VT10002: atomic distributed transaction not allowed: 2pc is enabled, but not currently allowed", + }, { + desc: "CreateTransaction", + fun: func() error { return txe.CreateTransaction("aa", nil) }, + wantErr: "VT10002: atomic distributed transaction not allowed: 2pc is enabled, but not currently allowed", + }} + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + require.EqualError(t, tc.fun(), tc.wantErr) + }) + } +} + func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB, closer func()) { db = setUpQueryExecutorTest(t) logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor", streamlog.NewQueryLogConfigForTest()) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index f9adac72f60..47e09f0de93 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -771,9 +771,25 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T ) } -// WaitForPreparedTwoPCTransactions waits for all the prepared transactions to complete. +// hasUnresolvedTwoPCTransactions checks if this tablet has any unresolved 2PC transactions. +func (tsv *TabletServer) hasUnresolvedTwoPCTransactions(ctx context.Context) (bool, error) { + if !tsv.te.preparedPool.IsEmpty() { + return true, nil + } + if !tsv.te.twopcEnabled { + return false, nil + } + count, err := tsv.te.twoPC.CountUnresolvedTransaction(ctx, time.Now().Add(365*24*time.Hour)) + if err != nil { + return false, err + } + return count > 0, nil +} + +// WaitForPreparedTwoPCTransactions waits for all unresolved 2PC transactions on this tablet to be resolved. func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) error { - if tsv.te.preparedPool.IsEmpty() { + hasUnresolved, lastErr := tsv.hasUnresolvedTwoPCTransactions(ctx) + if lastErr == nil && !hasUnresolved { return nil } ticker := time.NewTicker(100 * time.Millisecond) @@ -781,10 +797,17 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e for { select { case <-ctx.Done(): + if lastErr != nil { + log.Error(fmt.Sprintf("Error reading unresolved transactions during wait: %v", lastErr)) + } // Return an error if we run out of time. return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Prepared transactions have not been resolved yet") case <-ticker.C: - if tsv.te.preparedPool.IsEmpty() { + var err error + hasUnresolved, err = tsv.hasUnresolvedTwoPCTransactions(ctx) + if err != nil { + lastErr = err + } else if !hasUnresolved { return nil } } diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 38a44f6bee2..3b7d8324ea9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -2578,6 +2578,88 @@ func TestDatabaseNameReplaceByKeyspaceNameReserveBeginExecuteMethod(t *testing.T require.NoError(t, err) } +func TestWaitForPreparedTwoPCTransactionsDoesNotDrainCoordinatorDtState(t *testing.T) { + ctx := t.Context() + _, tsv, db, closer := newTestTxExecutor(t, ctx) + defer closer() + + require.True(t, tsv.te.preparedPool.IsEmpty(), + "prepared pool should be empty for the coordinator role") + + db.AddQueryPattern( + `select count\(\*\) from _vt\.dt_state where time_created.*`, + sqltypes.MakeTestResult( + sqltypes.MakeTestFields("count(*)", "int64"), + "1", + ), + ) + + waitCtx, cancel := context.WithTimeout(ctx, 250*time.Millisecond) + defer cancel() + err := tsv.WaitForPreparedTwoPCTransactions(waitCtx) + require.Error(t, err, + "WaitForPreparedTwoPCTransactions must not declare success while _vt.dt_state still "+ + "has coordinator entries; otherwise SwitchWrites can disable the source primary "+ + "with in-flight 2PC where this shard is the MM, stranding participant redo logs") +} + +func TestWaitForPreparedTwoPCTransactionsTwoPCDisabled(t *testing.T) { + ctx := t.Context() + _, tsv, db := newNoTwopcExecutor(t, ctx) + defer db.Close() + defer tsv.StopService() + + // 2PC disabled: WaitForPreparedTwoPCTransactions must short-circuit + // without issuing a count query against the unopened TwoPC read pool. + waitCtx, cancel := context.WithTimeout(ctx, 250*time.Millisecond) + defer cancel() + err := tsv.WaitForPreparedTwoPCTransactions(waitCtx) + require.NoError(t, err, + "WaitForPreparedTwoPCTransactions should return immediately when 2PC is disabled, "+ + "without querying _vt.dt_state on an unopened pool") +} + +func TestWaitForPreparedTwoPCTransactionsSQLError(t *testing.T) { + ctx := t.Context() + _, tsv, db, closer := newTestTxExecutor(t, ctx) + defer closer() + + require.True(t, tsv.te.preparedPool.IsEmpty(), + "prepared pool should be empty for the coordinator role") + + // Inject a failure for the dt_state count query. The wait function must + // not silently claim drained on a query error; it should keep waiting and + // time out with FAILED_PRECONDITION. + db.RejectQueryPattern( + `select count\(\*\) from _vt\.dt_state where time_created.*`, + "simulated MySQL outage during cutover", + ) + + waitCtx, cancel := context.WithTimeout(ctx, 250*time.Millisecond) + defer cancel() + err := tsv.WaitForPreparedTwoPCTransactions(waitCtx) + require.Error(t, err, + "WaitForPreparedTwoPCTransactions must not declare success when the count query fails") + require.ErrorContains(t, err, "Prepared transactions have not been resolved yet") +} + +func TestHasUnresolvedTwoPCTransactionsSQLError(t *testing.T) { + ctx := t.Context() + _, tsv, db, closer := newTestTxExecutor(t, ctx) + defer closer() + + db.RejectQueryPattern( + `select count\(\*\) from _vt\.dt_state where time_created.*`, + "simulated MySQL outage", + ) + + has, err := tsv.hasUnresolvedTwoPCTransactions(ctx) + require.Error(t, err) + require.False(t, has, + "on count-query failure, the predicate must return (false, err) so callers "+ + "don't claim drained on inconclusive data") +} + func setupTabletServerTest(t testing.TB, ctx context.Context, keyspaceName string) (*fakesqldb.DB, *TabletServer) { cfg := tabletenv.NewDefaultConfig() return setupTabletServerTestCustom(t, ctx, cfg, keyspaceName, vtenv.NewTestEnv())