Skip to content

Commit 07a2d5a

Browse files
Copilotbootjp
andauthored
kv: refactor handleCommitRequest to reduce complexity; move secondary-shard check to retry path
Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/60c27075-a235-43ab-8a68-a8d04d2e5a5d Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
1 parent ed09a24 commit 07a2d5a

2 files changed

Lines changed: 73 additions & 40 deletions

File tree

kv/fsm.go

Lines changed: 58 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -345,55 +345,77 @@ func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error {
345345
if len(meta.PrimaryKey) == 0 {
346346
return errors.WithStack(ErrTxnPrimaryKeyRequired)
347347
}
348-
applyStartTS := startTS
349-
if recordedCommitTS, committed, err := f.txnCommitTS(ctx, meta.PrimaryKey, startTS); err != nil {
348+
applyStartTS, err := f.commitApplyStartTS(ctx, meta.PrimaryKey, startTS, commitTS)
349+
if err != nil {
350350
return err
351-
} else if committed {
352-
if recordedCommitTS != commitTS {
353-
return errors.Wrapf(
354-
ErrTxnInvalidMeta,
355-
"commit_ts mismatch for primary key %s: recordedCommitTS=%d requestedCommitTS=%d startTS=%d",
356-
string(meta.PrimaryKey), recordedCommitTS, commitTS, startTS,
357-
)
358-
}
359-
// Treat duplicate commits as idempotent so stale txn artifacts can be
360-
// cleaned up after the commit record already exists.
361-
applyStartTS = commitTS
362351
}
363-
364352
uniq, err := uniqueMutations(muts)
365353
if err != nil {
366354
return err
367355
}
368-
369-
// Secondary-shard fallback: txnCommitKey lives only on the primary shard.
370-
// If this shard doesn't hold the primary key, detect an already-committed
371-
// state by checking whether any target data key already has a committed
372-
// version at or beyond commitTS. If so, use commitTS as the conflict-check
373-
// baseline so that idempotent re-application doesn't trip on the
374-
// already-written version.
375-
if applyStartTS == startTS {
376-
for _, mut := range uniq {
377-
latestTS, exists, err := f.store.LatestCommitTS(ctx, mut.Key)
378-
if err != nil {
379-
return err
380-
}
381-
if exists && latestTS >= commitTS {
382-
applyStartTS = commitTS
383-
break
384-
}
385-
}
386-
}
387-
388356
storeMuts, err := f.buildCommitStoreMutations(ctx, uniq, meta, startTS, commitTS)
389357
if err != nil {
390358
return err
391359
}
392-
393360
if len(storeMuts) == 0 {
394361
return nil
395362
}
396-
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, applyStartTS, commitTS))
363+
return f.applyCommitWithIdempotencyFallback(ctx, storeMuts, uniq, applyStartTS, commitTS)
364+
}
365+
366+
// commitApplyStartTS resolves the startTS to use for MVCC conflict detection
367+
// during a COMMIT. If a commit record already exists for the primary key it
368+
// returns commitTS (making the apply idempotent); otherwise it returns startTS.
369+
func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, startTS, commitTS uint64) (uint64, error) {
370+
recordedCommitTS, committed, err := f.txnCommitTS(ctx, primaryKey, startTS)
371+
if err != nil {
372+
return 0, err
373+
}
374+
if !committed {
375+
return startTS, nil
376+
}
377+
if recordedCommitTS != commitTS {
378+
return 0, errors.Wrapf(
379+
ErrTxnInvalidMeta,
380+
"commit_ts mismatch for primary key %s: recordedCommitTS=%d requestedCommitTS=%d startTS=%d",
381+
string(primaryKey), recordedCommitTS, commitTS, startTS,
382+
)
383+
}
384+
// Commit record exists — use commitTS so stale artifacts can be cleaned up
385+
// without triggering a write-conflict.
386+
return commitTS, nil
387+
}
388+
389+
// applyCommitWithIdempotencyFallback applies storeMuts at (applyStartTS,
390+
// commitTS). If the apply fails with a write-conflict and any of the target
391+
// keys already has a committed version at or beyond commitTS, the conflict is
392+
// treated as an idempotent secondary-shard retry and the apply is retried with
393+
// commitTS as the conflict-check baseline.
394+
//
395+
// The secondary-shard LatestCommitTS scan is intentionally deferred to the
396+
// write-conflict path so the hot (first-time) commit path pays no extra cost.
397+
func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMuts []*store.KVPairMutation, uniq []*pb.Mutation, applyStartTS, commitTS uint64) error {
398+
err := f.store.ApplyMutations(ctx, storeMuts, applyStartTS, commitTS)
399+
if err == nil {
400+
return nil
401+
}
402+
if !errors.Is(err, store.ErrWriteConflict) {
403+
return errors.WithStack(err)
404+
}
405+
// Write-conflict: scan mutations one by one and return as soon as we find
406+
// a key that is already committed at or beyond commitTS — this indicates an
407+
// idempotent secondary-shard retry (txnCommitKey lives on the primary
408+
// shard, not here). Retry with commitTS as the conflict-check baseline.
409+
for _, mut := range uniq {
410+
latestTS, exists, lErr := f.store.LatestCommitTS(ctx, mut.Key)
411+
if lErr != nil {
412+
return errors.WithStack(lErr)
413+
}
414+
if exists && latestTS >= commitTS {
415+
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, commitTS, commitTS))
416+
}
417+
}
418+
return errors.WithStack(err)
397419
}
398420

399421
func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS uint64) error {

kv/fsm_txn_test.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,27 @@ func TestCommitIsIdempotentOnSecondaryShardWhenKeyAlreadyCommitted(t *testing.T)
177177
func TestCommitPropagatesSecondaryShardLatestCommitTSError(t *testing.T) {
178178
t.Parallel()
179179

180+
ctx := context.Background()
180181
underlying := store.NewMVCCStore()
181182
userKey := []byte("k")
182-
errorStore := erroringLatestCommitStore{MVCCStore: underlying, key: userKey}
183-
fsm, ok := NewKvFSM(errorStore).(*kvFSM)
184-
require.True(t, ok)
185-
186183
startTS := uint64(16)
187184
commitTS := uint64(26)
188185
primaryKey := []byte("p") // no txnCommitKey in store → secondary-shard path
189186

187+
// Set up a PREPARED state directly in the underlying store.
188+
lockVal := encodeTxnLock(txnLock{StartTS: startTS, PrimaryKey: primaryKey})
189+
require.NoError(t, underlying.PutAt(ctx, txnLockKey(userKey), lockVal, startTS, 0))
190+
intentVal := encodeTxnIntent(txnIntent{StartTS: startTS, Op: txnIntentOpPut, Value: []byte("v")})
191+
require.NoError(t, underlying.PutAt(ctx, txnIntentKey(userKey), intentVal, startTS, 0))
192+
193+
// Write a conflicting version that will cause ApplyMutations to return a
194+
// write-conflict, triggering the LatestCommitTS fallback path.
195+
require.NoError(t, underlying.PutAt(ctx, userKey, []byte("other"), commitTS+1, 0))
196+
197+
errorStore := erroringLatestCommitStore{MVCCStore: underlying, key: userKey}
198+
fsm, ok := NewKvFSM(errorStore).(*kvFSM)
199+
require.True(t, ok)
200+
190201
commit := &pb.Request{
191202
IsTxn: true,
192203
Phase: pb.Phase_COMMIT,

0 commit comments

Comments
 (0)