Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/transaction/twopc/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,7 +1434,7 @@ func TestSemiSyncRequiredWithTwoPC(t *testing.T) {
utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)")
_, err = utils.ExecAllowError(t, conn, "commit")
require.Error(t, err)
require.ErrorContains(t, err, "two-pc is enabled, but semi-sync is not")
require.ErrorContains(t, err, "2pc is enabled, but not currently allowed")

_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync")
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type Controller interface {
// RollbackPrepared rolls back the prepared transaction and removes the transaction log.
RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) error

// WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved.
// WaitForPreparedTwoPCTransactions waits for all unresolved 2PC transactions on this tablet to be resolved.
WaitForPreparedTwoPCTransactions(ctx context.Context) error

// SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager.
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
if !dte.te.IsTwoPCAllowed() {
return vterrors.VT10002("two-pc is enabled, but semi-sync is not")
return vterrors.VT10002("2pc is enabled, but not currently allowed")
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the message in the e2e test since it's a misleading message. If it's better to keep the misleading message for backwards compatibility I can flip to that direction instead.

}
defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
dte.logStats.TransactionID = transactionID
Expand Down Expand Up @@ -228,6 +228,9 @@ func (dte *DTExecutor) CreateTransaction(dtid string, participants []*querypb.Ta
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
if !dte.te.IsTwoPCAllowed() {
return vterrors.VT10002("2pc is enabled, but not currently allowed")
}
defer dte.te.env.Stats().QueryTimings.Record("CREATE_TRANSACTION", time.Now())
return dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.CreateTransaction(dte.ctx, conn, dtid, participants)
Expand Down
28 changes: 28 additions & 0 deletions go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,34 @@ func TestNoTwopc(t *testing.T) {
}
}

func TestTwoPCNotAllowed(t *testing.T) {
ctx := t.Context()
txe, tsv, _, closer := newTestTxExecutor(t, ctx)
defer closer()

tsv.SetTwoPCAllowed(TwoPCAllowed_TabletControls, false)
require.False(t, tsv.te.IsTwoPCAllowed())

testcases := []struct {
desc string
fun func() error
wantErr string
}{{
desc: "Prepare",
fun: func() error { return txe.Prepare(1, "aa") },
wantErr: "VT10002: atomic distributed transaction not allowed: 2pc is enabled, but not currently allowed",
}, {
desc: "CreateTransaction",
fun: func() error { return txe.CreateTransaction("aa", nil) },
wantErr: "VT10002: atomic distributed transaction not allowed: 2pc is enabled, but not currently allowed",
}}
for _, tc := range testcases {
t.Run(tc.desc, func(t *testing.T) {
require.EqualError(t, tc.fun(), tc.wantErr)
})
}
}

func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB, closer func()) {
db = setUpQueryExecutorTest(t)
logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor", streamlog.NewQueryLogConfigForTest())
Expand Down
29 changes: 26 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,20 +771,43 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T
)
}

// WaitForPreparedTwoPCTransactions waits for all the prepared transactions to complete.
// hasUnresolvedTwoPCTransactions checks if this tablet has any unresolved 2PC transactions.
func (tsv *TabletServer) hasUnresolvedTwoPCTransactions(ctx context.Context) (bool, error) {
if !tsv.te.preparedPool.IsEmpty() {
return true, nil
}
if !tsv.te.twopcEnabled {
return false, nil
}
count, err := tsv.te.twoPC.CountUnresolvedTransaction(ctx, time.Now().Add(365*24*time.Hour))
if err != nil {
return false, err
}
return count > 0, nil
Comment on lines +782 to +786
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make the time as configurable.
How safe it is to return false if underlying database is not reachable?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we return true then in WaitForPreparedTwoPCTransactions the code can be simplified.

}

// WaitForPreparedTwoPCTransactions waits for all unresolved 2PC transactions on this tablet to be resolved.
func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) error {
if tsv.te.preparedPool.IsEmpty() {
hasUnresolved, lastErr := tsv.hasUnresolvedTwoPCTransactions(ctx)
if lastErr == nil && !hasUnresolved {
return nil
}
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
if lastErr != nil {
log.Error(fmt.Sprintf("Error reading unresolved transactions during wait: %v", lastErr))
}
// Return an error if we run out of time.
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Prepared transactions have not been resolved yet")
case <-ticker.C:
if tsv.te.preparedPool.IsEmpty() {
var err error
hasUnresolved, err = tsv.hasUnresolvedTwoPCTransactions(ctx)
if err != nil {
lastErr = err
} else if !hasUnresolved {
return nil
}
}
Expand Down
82 changes: 82 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,88 @@ func TestDatabaseNameReplaceByKeyspaceNameReserveBeginExecuteMethod(t *testing.T
require.NoError(t, err)
}

func TestWaitForPreparedTwoPCTransactionsDoesNotDrainCoordinatorDtState(t *testing.T) {
ctx := t.Context()
_, tsv, db, closer := newTestTxExecutor(t, ctx)
defer closer()

require.True(t, tsv.te.preparedPool.IsEmpty(),
"prepared pool should be empty for the coordinator role")

db.AddQueryPattern(
`select count\(\*\) from _vt\.dt_state where time_created.*`,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields("count(*)", "int64"),
"1",
),
)

waitCtx, cancel := context.WithTimeout(ctx, 250*time.Millisecond)
defer cancel()
err := tsv.WaitForPreparedTwoPCTransactions(waitCtx)
require.Error(t, err,
"WaitForPreparedTwoPCTransactions must not declare success while _vt.dt_state still "+
"has coordinator entries; otherwise SwitchWrites can disable the source primary "+
"with in-flight 2PC where this shard is the MM, stranding participant redo logs")
}

func TestWaitForPreparedTwoPCTransactionsTwoPCDisabled(t *testing.T) {
ctx := t.Context()
_, tsv, db := newNoTwopcExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()

// 2PC disabled: WaitForPreparedTwoPCTransactions must short-circuit
// without issuing a count query against the unopened TwoPC read pool.
waitCtx, cancel := context.WithTimeout(ctx, 250*time.Millisecond)
defer cancel()
err := tsv.WaitForPreparedTwoPCTransactions(waitCtx)
require.NoError(t, err,
"WaitForPreparedTwoPCTransactions should return immediately when 2PC is disabled, "+
"without querying _vt.dt_state on an unopened pool")
}

func TestWaitForPreparedTwoPCTransactionsSQLError(t *testing.T) {
ctx := t.Context()
_, tsv, db, closer := newTestTxExecutor(t, ctx)
defer closer()

require.True(t, tsv.te.preparedPool.IsEmpty(),
"prepared pool should be empty for the coordinator role")

// Inject a failure for the dt_state count query. The wait function must
// not silently claim drained on a query error; it should keep waiting and
// time out with FAILED_PRECONDITION.
db.RejectQueryPattern(
`select count\(\*\) from _vt\.dt_state where time_created.*`,
"simulated MySQL outage during cutover",
)

waitCtx, cancel := context.WithTimeout(ctx, 250*time.Millisecond)
defer cancel()
err := tsv.WaitForPreparedTwoPCTransactions(waitCtx)
require.Error(t, err,
"WaitForPreparedTwoPCTransactions must not declare success when the count query fails")
require.ErrorContains(t, err, "Prepared transactions have not been resolved yet")
}

func TestHasUnresolvedTwoPCTransactionsSQLError(t *testing.T) {
ctx := t.Context()
_, tsv, db, closer := newTestTxExecutor(t, ctx)
defer closer()

db.RejectQueryPattern(
`select count\(\*\) from _vt\.dt_state where time_created.*`,
"simulated MySQL outage",
)

has, err := tsv.hasUnresolvedTwoPCTransactions(ctx)
require.Error(t, err)
require.False(t, has,
"on count-query failure, the predicate must return (false, err) so callers "+
"don't claim drained on inconclusive data")
}

func setupTabletServerTest(t testing.TB, ctx context.Context, keyspaceName string) (*fakesqldb.DB, *TabletServer) {
cfg := tabletenv.NewDefaultConfig()
return setupTabletServerTestCustom(t, ctx, cfg, keyspaceName, vtenv.NewTestEnv())
Expand Down