Skip to content

Commit 0577118

Browse files
committed
fix: address golangci-lint warnings
- Remove unused readKeys parameter from Coordinate.dispatchTxn and ShardedCoordinator.dispatchSingleShardTxn (unparam) - Extract validateReadKeysOnShard to reduce validateReadOnlyShards cyclomatic complexity below cyclop threshold - Fix gci import ordering in sharded_coordinator_txn_test.go
1 parent 7197586 commit 0577118

4 files changed

Lines changed: 36 additions & 47 deletions

File tree

kv/coordinator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*C
7575
}
7676

7777
if reqs.IsTxn {
78-
return c.dispatchTxn(reqs.Elems, reqs.StartTS, reqs.CommitTS, reqs.ReadKeys)
78+
return c.dispatchTxn(reqs.Elems, reqs.StartTS, reqs.CommitTS)
7979
}
8080

8181
return c.dispatchRaw(reqs.Elems)
@@ -122,7 +122,7 @@ func (c *Coordinate) nextStartTS() uint64 {
122122
return c.clock.Next()
123123
}
124124

125-
func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint64, readKeys [][]byte) (*CoordinateResponse, error) {
125+
func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint64) (*CoordinateResponse, error) {
126126
primary := primaryKeyForElems(reqs)
127127
if len(primary) == 0 {
128128
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)

kv/coordinator_txn_test.go

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestCoordinateDispatchTxn_RejectsNonMonotonicCommitTS(t *testing.T) {
3636

3737
_, err := c.dispatchTxn([]*Elem[OP]{
3838
{Op: Put, Key: []byte("k"), Value: []byte("v")},
39-
}, startTS, 0, nil)
39+
}, startTS, 0)
4040
require.ErrorIs(t, err, ErrTxnCommitTSRequired)
4141
require.Equal(t, 0, tx.commits)
4242
}
@@ -52,7 +52,7 @@ func TestCoordinateDispatchTxn_RejectsMissingPrimaryKey(t *testing.T) {
5252

5353
_, err := c.dispatchTxn([]*Elem[OP]{
5454
{Op: Put, Key: nil, Value: []byte("v")},
55-
}, 1, 0, nil)
55+
}, 1, 0)
5656
require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired)
5757
require.Equal(t, 0, tx.commits)
5858
}
@@ -70,7 +70,7 @@ func TestCoordinateDispatchTxn_UsesOnePhaseRequest(t *testing.T) {
7070
_, err := c.dispatchTxn([]*Elem[OP]{
7171
{Op: Put, Key: []byte("b"), Value: []byte("v1")},
7272
{Op: Del, Key: []byte("x")},
73-
}, startTS, 0, nil)
73+
}, startTS, 0)
7474
require.NoError(t, err)
7575
require.Equal(t, 1, tx.commits)
7676
require.Len(t, tx.reqs, 1)
@@ -107,7 +107,7 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) {
107107
commitTS := uint64(25)
108108
_, err := c.dispatchTxn([]*Elem[OP]{
109109
{Op: Put, Key: []byte("k"), Value: []byte("v")},
110-
}, startTS, commitTS, nil)
110+
}, startTS, commitTS)
111111
require.NoError(t, err)
112112
require.Len(t, tx.reqs, 1)
113113
require.Len(t, tx.reqs[0], 1)
@@ -117,24 +117,5 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) {
117117
require.Equal(t, commitTS, meta.CommitTS)
118118
}
119119

120-
func TestCoordinateDispatchTxn_ReadKeysNotInOnePhaseTxnRequest(t *testing.T) {
121-
t.Parallel()
122-
123-
tx := &stubTransactional{}
124-
c := &Coordinate{
125-
transactionManager: tx,
126-
clock: NewHLC(),
127-
}
128-
129-
// Single-shard transactions validate read keys pre-Raft (in the adapter),
130-
// so readKeys must NOT be included in the Raft log entry to avoid
131-
// post-commit rejections that break realtime ordering.
132-
readKeys := [][]byte{[]byte("rk1"), []byte("rk2")}
133-
_, err := c.dispatchTxn([]*Elem[OP]{
134-
{Op: Put, Key: []byte("k"), Value: []byte("v")},
135-
}, 10, 0, readKeys)
136-
require.NoError(t, err)
137-
require.Len(t, tx.reqs, 1)
138-
require.Len(t, tx.reqs[0], 1)
139-
require.Nil(t, tx.reqs[0][0].ReadKeys)
140-
}
120+
// ReadKeys omission from single-shard Raft entries is tested in
121+
// TestShardedCoordinatorDispatchTxn_SingleShardOmitsReadKeysFromRaftEntry.

kv/sharded_coordinator.go

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, co
209209
}
210210

211211
if len(gids) == 1 {
212-
return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems, readKeys)
212+
return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems)
213213
}
214214

215215
prepared, err := c.prewriteTxn(ctx, startTS, commitTS, primaryKey, grouped, gids, readKeys)
@@ -241,7 +241,7 @@ func (c *ShardedCoordinator) resolveTxnCommitTS(startTS, commitTS uint64) (uint6
241241
return commitTS, nil
242242
}
243243

244-
func (c *ShardedCoordinator) dispatchSingleShardTxn(startTS, commitTS uint64, primaryKey []byte, gid uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) {
244+
func (c *ShardedCoordinator) dispatchSingleShardTxn(startTS, commitTS uint64, primaryKey []byte, gid uint64, elems []*Elem[OP]) (*CoordinateResponse, error) {
245245
g, err := c.txnGroupForID(gid)
246246
if err != nil {
247247
return nil, err
@@ -642,25 +642,32 @@ func (c *ShardedCoordinator) validateReadOnlyShards(ctx context.Context, grouped
642642
if _, isWrite := writeSet[gid]; isWrite {
643643
continue
644644
}
645-
g, ok := c.groups[gid]
646-
if !ok {
647-
continue
645+
if err := c.validateReadKeysOnShard(ctx, gid, keys, startTS); err != nil {
646+
return err
648647
}
649-
// Linearizable read barrier: wait until the shard's FSM has applied
650-
// all Raft-committed entries so LatestCommitTS reflects the latest
651-
// committed state. Without this, a concurrent write that is committed
652-
// in Raft but not yet applied locally would be invisible.
653-
if _, err := linearizableReadEngineCtx(ctx, engineForGroup(g)); err != nil {
648+
}
649+
return nil
650+
}
651+
652+
func (c *ShardedCoordinator) validateReadKeysOnShard(ctx context.Context, gid uint64, keys [][]byte, startTS uint64) error {
653+
g, ok := c.groups[gid]
654+
if !ok {
655+
return nil
656+
}
657+
// Linearizable read barrier: wait until the shard's FSM has applied
658+
// all Raft-committed entries so LatestCommitTS reflects the latest
659+
// committed state. Without this, a concurrent write that is committed
660+
// in Raft but not yet applied locally would be invisible.
661+
if _, err := linearizableReadEngineCtx(ctx, engineForGroup(g)); err != nil {
662+
return errors.WithStack(err)
663+
}
664+
for _, key := range keys {
665+
ts, exists, err := g.Store.LatestCommitTS(ctx, key)
666+
if err != nil {
654667
return errors.WithStack(err)
655668
}
656-
for _, key := range keys {
657-
ts, exists, err := g.Store.LatestCommitTS(ctx, key)
658-
if err != nil {
659-
return errors.WithStack(err)
660-
}
661-
if exists && ts > startTS {
662-
return errors.WithStack(store.NewWriteConflictError(key))
663-
}
669+
if exists && ts > startTS {
670+
return errors.WithStack(store.NewWriteConflictError(key))
664671
}
665672
}
666673
return nil

kv/sharded_coordinator_txn_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ import (
66
"sync"
77
"testing"
88

9+
"github.com/stretchr/testify/require"
10+
"google.golang.org/protobuf/proto"
11+
912
"github.com/bootjp/elastickv/distribution"
1013
"github.com/bootjp/elastickv/internal/raftengine"
1114
pb "github.com/bootjp/elastickv/proto"
1215
"github.com/bootjp/elastickv/store"
13-
"github.com/stretchr/testify/require"
14-
"google.golang.org/protobuf/proto"
1516
)
1617

1718
type recordingTransactional struct {

0 commit comments

Comments
 (0)