Skip to content

Commit ed09a24

Browse files
Copilotbootjp
andauthored
store/kv: optimize compactKeepIndex with binary search; add secondary shard idempotency
Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/a06bfa98-ebb4-486c-905f-68dd201385b1 Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
1 parent 8ec3ba1 commit ed09a24

4 files changed

Lines changed: 112 additions & 9 deletions

File tree

kv/fsm.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,26 @@ func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error {
365365
if err != nil {
366366
return err
367367
}
368+
369+
// Secondary-shard fallback: txnCommitKey lives only on the primary shard.
370+
// If this shard doesn't hold the primary key, detect an already-committed
371+
// state by checking whether any target data key already has a committed
372+
// version at or beyond commitTS. If so, use commitTS as the conflict-check
373+
// baseline so that idempotent re-application doesn't trip on the
374+
// already-written version.
375+
if applyStartTS == startTS {
376+
for _, mut := range uniq {
377+
latestTS, exists, err := f.store.LatestCommitTS(ctx, mut.Key)
378+
if err != nil {
379+
return err
380+
}
381+
if exists && latestTS >= commitTS {
382+
applyStartTS = commitTS
383+
break
384+
}
385+
}
386+
}
387+
368388
storeMuts, err := f.buildCommitStoreMutations(ctx, uniq, meta, startTS, commitTS)
369389
if err != nil {
370390
return err

kv/fsm_txn_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,89 @@ func TestCommitIsIdempotentAfterCommitRecordExists(t *testing.T) {
118118
require.Equal(t, commitTS, gotCommitTS)
119119
}
120120

121+
func TestCommitIsIdempotentOnSecondaryShardWhenKeyAlreadyCommitted(t *testing.T) {
122+
t.Parallel()
123+
124+
ctx := context.Background()
125+
st := store.NewMVCCStore()
126+
fsm, ok := NewKvFSM(st).(*kvFSM)
127+
require.True(t, ok)
128+
129+
startTS := uint64(15)
130+
commitTS := uint64(25)
131+
primaryKey := []byte("p") // lives on another shard; no txnCommitKey here
132+
userKey := []byte("k")
133+
134+
// PREPARE: write lock and intent for userKey.
135+
prepare := &pb.Request{
136+
IsTxn: true,
137+
Phase: pb.Phase_PREPARE,
138+
Ts: startTS,
139+
Mutations: []*pb.Mutation{
140+
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: defaultTxnLockTTLms})},
141+
{Op: pb.Op_PUT, Key: userKey, Value: []byte("v")},
142+
},
143+
}
144+
require.NoError(t, applyFSMRequest(t, fsm, prepare))
145+
146+
// Simulate a partial commit: the data key is written at commitTS but the
147+
// txn lock/intent are still present (inconsistent state that the
148+
// secondary-shard idempotency check must handle).
149+
require.NoError(t, st.PutAt(ctx, userKey, []byte("v"), commitTS, 0))
150+
151+
// COMMIT on this secondary shard (no txnCommitKey for primaryKey here).
152+
// Without the secondary-shard LatestCommitTS check this would fail with a
153+
// write-conflict error because userKey@commitTS > startTS.
154+
commit := &pb.Request{
155+
IsTxn: true,
156+
Phase: pb.Phase_COMMIT,
157+
Ts: startTS,
158+
Mutations: []*pb.Mutation{
159+
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, CommitTS: commitTS})},
160+
{Op: pb.Op_PUT, Key: userKey},
161+
},
162+
}
163+
require.NoError(t, applyFSMRequest(t, fsm, commit))
164+
165+
// The committed value should still be readable.
166+
v, err := st.GetAt(ctx, userKey, ^uint64(0))
167+
require.NoError(t, err)
168+
require.Equal(t, []byte("v"), v)
169+
170+
// The lock and intent should be cleaned up.
171+
_, err = st.GetAt(ctx, txnLockKey(userKey), ^uint64(0))
172+
require.ErrorIs(t, err, store.ErrKeyNotFound)
173+
_, err = st.GetAt(ctx, txnIntentKey(userKey), ^uint64(0))
174+
require.ErrorIs(t, err, store.ErrKeyNotFound)
175+
}
176+
177+
func TestCommitPropagatesSecondaryShardLatestCommitTSError(t *testing.T) {
178+
t.Parallel()
179+
180+
underlying := store.NewMVCCStore()
181+
userKey := []byte("k")
182+
errorStore := erroringLatestCommitStore{MVCCStore: underlying, key: userKey}
183+
fsm, ok := NewKvFSM(errorStore).(*kvFSM)
184+
require.True(t, ok)
185+
186+
startTS := uint64(16)
187+
commitTS := uint64(26)
188+
primaryKey := []byte("p") // no txnCommitKey in store → secondary-shard path
189+
190+
commit := &pb.Request{
191+
IsTxn: true,
192+
Phase: pb.Phase_COMMIT,
193+
Ts: startTS,
194+
Mutations: []*pb.Mutation{
195+
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, CommitTS: commitTS})},
196+
{Op: pb.Op_PUT, Key: userKey},
197+
},
198+
}
199+
err := applyFSMRequest(t, fsm, commit)
200+
require.Error(t, err)
201+
require.ErrorIs(t, err, ErrTestLatestCommitTS)
202+
}
203+
121204
func TestCommitRejectsMissingPrimaryKey(t *testing.T) {
122205
t.Parallel()
123206

store/compaction_unit_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ func TestCompactKeepIndex_MultipleVersions(t *testing.T) {
5555
{TS: 300, Value: []byte("v300")},
5656
{TS: 400, Value: []byte("v400")},
5757
}
58-
// minTS=250: iterates from end: i=3(400>250), i=2(300>250), i=1(200<=250)
59-
// keepIdx=1, > 0 → compact versions before idx 1
58+
// minTS=250: first index with TS>250 is idx 2 (TS=300); keepIdx = 2-1 = 1.
6059
assert.Equal(t, 1, compactKeepIndex(versions, 250))
6160
}
6261

@@ -67,7 +66,7 @@ func TestCompactKeepIndex_AllBelowMinTS(t *testing.T) {
6766
{TS: 20, Value: []byte("v20")},
6867
{TS: 30, Value: []byte("v30")},
6968
}
70-
// minTS=100: iterates from end: i=2(30<=100) → keepIdx=2, > 0 → return 2
69+
// minTS=100: first index with TS>100 is len(versions)=3; keepIdx = 3-1 = 2.
7170
// compact versions before idx 2 (i.e. TS=10 and TS=20)
7271
assert.Equal(t, 2, compactKeepIndex(versions, 100))
7372
}

store/mvcc_store.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -901,12 +901,13 @@ func compactKeepIndex(versions []VersionedValue, minTS uint64) int {
901901
if len(versions) == 0 {
902902
return -1
903903
}
904-
keepIdx := -1
905-
for i, version := range versions {
906-
if version.TS <= minTS {
907-
keepIdx = i
908-
}
909-
}
904+
// versions are sorted ascending by TS; use binary search for O(log n).
905+
// Find the first index where TS > minTS; everything before it has TS <= minTS.
906+
n := sort.Search(len(versions), func(i int) bool {
907+
return versions[i].TS > minTS
908+
})
909+
// n-1 is the last index with TS <= minTS.
910+
keepIdx := n - 1
910911
if keepIdx <= 0 {
911912
return -1
912913
}

0 commit comments

Comments
 (0)