Skip to content

Commit a761eeb

Browse files
authored
Merge pull request #444 from bootjp/feat/fix-abort
kv: suppress abort warnings for committed txns
2 parents 81ca2a4 + 5a26525 commit a761eeb

4 files changed

Lines changed: 47 additions & 3 deletions

File tree

kv/sharded_coordinator.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type ShardedCoordinator struct {
3636
defaultGroup uint64
3737
clock *HLC
3838
store store.MVCCStore
39+
log *slog.Logger
3940
}
4041

4142
// NewShardedCoordinator builds a coordinator for the provided shard groups.
@@ -52,6 +53,7 @@ func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*Shard
5253
defaultGroup: defaultGroup,
5354
clock: clock,
5455
store: st,
56+
log: slog.Default(),
5557
}
5658
}
5759

@@ -211,7 +213,7 @@ func (c *ShardedCoordinator) commitSecondaryTxns(startTS uint64, primaryGid uint
211213
}
212214
r, err := commitSecondaryWithRetry(g, req)
213215
if err != nil {
214-
slog.Warn("txn secondary commit failed",
216+
c.log.Warn("txn secondary commit failed",
215217
slog.Uint64("gid", gid),
216218
slog.String("primary_key", string(primaryKey)),
217219
slog.Uint64("start_ts", startTS),
@@ -266,7 +268,10 @@ func (c *ShardedCoordinator) abortPreparedTxn(startTS uint64, primaryKey []byte,
266268
Mutations: append([]*pb.Mutation{meta}, pg.keys...),
267269
}
268270
if _, err := g.Txn.Commit([]*pb.Request{req}); err != nil {
269-
slog.Warn("txn abort failed; locks may remain until TTL expiry",
271+
if errors.Is(err, ErrTxnAlreadyCommitted) {
272+
continue
273+
}
274+
c.log.Warn("txn abort failed; locks may remain until TTL expiry",
270275
slog.Uint64("gid", pg.gid),
271276
slog.String("primary_key", string(primaryKey)),
272277
slog.Uint64("start_ts", startTS),

kv/sharded_coordinator_abort_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package kv
22

33
import (
4+
"bytes"
45
"context"
6+
"log/slog"
57
"testing"
68

79
"github.com/bootjp/elastickv/distribution"
@@ -82,3 +84,22 @@ func TestShardedAbortRollback_PrepareFailOnShard2_CleansShard1Locks(t *testing.T
8284
_, err = s2.GetAt(ctx, []byte("x"), ^uint64(0))
8385
require.ErrorIs(t, err, store.ErrKeyNotFound, "user data for key 'x' should not exist on shard2")
8486
}
87+
88+
func TestAbortPreparedTxn_DoesNotWarnWhenTxnAlreadyCommitted(t *testing.T) {
89+
var buf bytes.Buffer
90+
logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelWarn}))
91+
92+
coord := &ShardedCoordinator{
93+
log: logger,
94+
groups: map[uint64]*ShardGroup{
95+
1: {Txn: &failingTransactional{err: errors.WithStack(ErrTxnAlreadyCommitted)}},
96+
},
97+
}
98+
99+
coord.abortPreparedTxn(10, []byte("pk"), []preparedGroup{{
100+
gid: 1,
101+
keys: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("pk")}},
102+
}}, 20)
103+
104+
require.NotContains(t, buf.String(), "txn abort failed; locks may remain until TTL expiry")
105+
}

kv/transaction.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,13 @@ func (t *TransactionManager) commitSequential(reqs []*pb.Request) (*TransactionR
229229

230230
func needsTxnCleanup(reqs []*pb.Request) bool {
231231
for _, req := range reqs {
232-
if req != nil && req.IsTxn && req.Phase != pb.Phase_NONE {
232+
if req == nil || !req.IsTxn {
233+
continue
234+
}
235+
// Be conservative: any transactional phase other than NONE/ABORT may have
236+
// left intents that require cleanup, including unknown enum values that
237+
// can appear during rolling upgrades.
238+
if req.Phase != pb.Phase_NONE && req.Phase != pb.Phase_ABORT {
233239
return true
234240
}
235241
}

kv/transaction_batch_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,3 +192,15 @@ func TestApplyRequestsDoesNotCountBusinessErrorAsProposalFailure(t *testing.T) {
192192
require.ErrorIs(t, results[0], ErrInvalidRequest)
193193
require.Zero(t, observer.FailureCount())
194194
}
195+
196+
func TestNeedsTxnCleanupSkipsAbortRequests(t *testing.T) {
197+
t.Parallel()
198+
199+
reqs := []*pb.Request{
200+
{IsTxn: true, Phase: pb.Phase_ABORT},
201+
}
202+
require.False(t, needsTxnCleanup(reqs))
203+
require.True(t, needsTxnCleanup([]*pb.Request{{IsTxn: true, Phase: pb.Phase_PREPARE}}))
204+
require.True(t, needsTxnCleanup([]*pb.Request{{IsTxn: true, Phase: pb.Phase_COMMIT}}))
205+
require.False(t, needsTxnCleanup([]*pb.Request{{IsTxn: true, Phase: pb.Phase_NONE}}))
206+
}

0 commit comments

Comments
 (0)