diff --git a/adapter/internal.go b/adapter/internal.go index 109de4dab..aa5b4f3c0 100644 --- a/adapter/internal.go +++ b/adapter/internal.go @@ -130,10 +130,10 @@ func forwardedTxnStartTS(reqs []*pb.Request) uint64 { } func forwardedTxnMetaMutation(r *pb.Request, metaPrefix []byte) (*pb.Mutation, bool) { - if r == nil { + if r == nil || !r.IsTxn { return nil, false } - if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT { + if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT && r.Phase != pb.Phase_NONE { return nil, false } if len(r.Mutations) == 0 || r.Mutations[0] == nil { diff --git a/adapter/internal_test.go b/adapter/internal_test.go index 615e95c0a..d641d2481 100644 --- a/adapter/internal_test.go +++ b/adapter/internal_test.go @@ -103,6 +103,37 @@ func TestFillForwardedTxnCommitTS_PreservesExistingCommitTS(t *testing.T) { require.Equal(t, uint64(42), meta.CommitTS) } +func TestFillForwardedTxnCommitTS_AssignsCommitTSForOnePhaseTxn(t *testing.T) { + t.Parallel() + + i := &Internal{} + startTS := uint64(10) + reqs := []*pb.Request{ + { + IsTxn: true, + Phase: pb.Phase_NONE, + Mutations: []*pb.Mutation{ + { + Op: pb.Op_PUT, + Key: []byte(kv.TxnMetaPrefix), + Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}), + }, + { + Op: pb.Op_PUT, + Key: []byte("k"), + Value: []byte("v"), + }, + }, + }, + } + + require.NoError(t, i.fillForwardedTxnCommitTS(reqs, startTS)) + + meta, err := kv.DecodeTxnMeta(reqs[0].Mutations[0].Value) + require.NoError(t, err) + require.Equal(t, startTS+1, meta.CommitTS) +} + func TestStampTxnTimestamps_UsesSingleTxnStartTS(t *testing.T) { t.Parallel() diff --git a/kv/coordinator.go b/kv/coordinator.go index e89b6d0fa..d9ad5f7d4 100644 --- a/kv/coordinator.go +++ b/kv/coordinator.go @@ -116,9 +116,9 @@ func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64) (*CoordinateR return nil, errors.WithStack(ErrTxnCommitTSRequired) } - logs := txnRequests(startTS, commitTS, defaultTxnLockTTLms, primary, reqs) - - r, err := c.transactionManager.Commit(logs) + r, err := c.transactionManager.Commit([]*pb.Request{ + onePhaseTxnRequest(startTS, commitTS, primary, reqs), + }) if err != nil { return nil, errors.WithStack(err) } @@ -209,7 +209,9 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C if len(primary) == 0 { return nil, errors.WithStack(ErrTxnPrimaryKeyRequired) } - requests = txnRequests(reqs.StartTS, 0, defaultTxnLockTTLms, primary, reqs.Elems) + requests = []*pb.Request{ + onePhaseTxnRequest(reqs.StartTS, 0, primary, reqs.Elems), + } } else { for _, req := range reqs.Elems { requests = append(requests, c.toRawRequest(req)) @@ -261,33 +263,17 @@ func elemToMutation(req *Elem[OP]) *pb.Mutation { panic("unreachable") } -func txnRequests(startTS, commitTS, lockTTLms uint64, primaryKey []byte, reqs []*Elem[OP]) []*pb.Request { - meta := &pb.Mutation{ - Op: pb.Op_PUT, - Key: []byte(txnMetaPrefix), - Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: lockTTLms, CommitTS: 0}), - } - - prepareMuts := make([]*pb.Mutation, 0, len(reqs)+1) - prepareMuts = append(prepareMuts, meta) - for _, req := range reqs { - prepareMuts = append(prepareMuts, elemToMutation(req)) - } - - commitMeta := &pb.Mutation{ - Op: pb.Op_PUT, - Key: []byte(txnMetaPrefix), - Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: 0, CommitTS: commitTS}), - } - commitMuts := make([]*pb.Mutation, 0, len(reqs)+1) - commitMuts = append(commitMuts, commitMeta) +func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP]) *pb.Request { + muts := make([]*pb.Mutation, 0, len(reqs)+1) + muts = append(muts, txnMetaMutation(primaryKey, 0, commitTS)) for _, req := range reqs { - commitMuts = append(commitMuts, &pb.Mutation{Op: pb.Op_PUT, Key: req.Key}) + muts = append(muts, elemToMutation(req)) } - - return []*pb.Request{ - {IsTxn: true, Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: prepareMuts}, - {IsTxn: true, Phase: pb.Phase_COMMIT, Ts: startTS, Mutations: commitMuts}, + return &pb.Request{ + IsTxn: true, + Phase: pb.Phase_NONE, + Ts: startTS, + Mutations: muts, } } diff --git a/kv/coordinator_txn_test.go b/kv/coordinator_txn_test.go index 64f87bad2..9c9929353 100644 --- a/kv/coordinator_txn_test.go +++ b/kv/coordinator_txn_test.go @@ -9,10 +9,12 @@ import ( type stubTransactional struct { commits int + reqs [][]*pb.Request } -func (s *stubTransactional) Commit(_ []*pb.Request) (*TransactionResponse, error) { +func (s *stubTransactional) Commit(reqs []*pb.Request) (*TransactionResponse, error) { s.commits++ + s.reqs = append(s.reqs, reqs) return &TransactionResponse{}, nil } @@ -54,3 +56,40 @@ func TestCoordinateDispatchTxn_RejectsMissingPrimaryKey(t *testing.T) { require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired) require.Equal(t, 0, tx.commits) } + +func TestCoordinateDispatchTxn_UsesOnePhaseRequest(t *testing.T) { + t.Parallel() + + tx := &stubTransactional{} + c := &Coordinate{ + transactionManager: tx, + clock: NewHLC(), + } + + startTS := uint64(10) + _, err := c.dispatchTxn([]*Elem[OP]{ + {Op: Put, Key: []byte("b"), Value: []byte("v1")}, + {Op: Del, Key: []byte("x")}, + }, startTS) + require.NoError(t, err) + require.Equal(t, 1, tx.commits) + require.Len(t, tx.reqs, 1) + require.Len(t, tx.reqs[0], 1) + + req := tx.reqs[0][0] + require.NotNil(t, req) + require.True(t, req.IsTxn) + require.Equal(t, pb.Phase_NONE, req.Phase) + require.Equal(t, startTS, req.Ts) + require.Len(t, req.Mutations, 3) + require.Equal(t, []byte(txnMetaPrefix), req.Mutations[0].Key) + require.Equal(t, []byte("b"), req.Mutations[1].Key) + require.Equal(t, []byte("v1"), req.Mutations[1].Value) + require.Equal(t, pb.Op_DEL, req.Mutations[2].Op) + require.Equal(t, []byte("x"), req.Mutations[2].Key) + + meta, err := DecodeTxnMeta(req.Mutations[0].Value) + require.NoError(t, err) + require.Equal(t, []byte("b"), meta.PrimaryKey) + require.Greater(t, meta.CommitTS, startTS) +} diff --git a/kv/fsm.go b/kv/fsm.go index 61a962fcc..c6cf4b669 100644 --- a/kv/fsm.go +++ b/kv/fsm.go @@ -87,7 +87,7 @@ func requestCommitTS(r *pb.Request) (uint64, error) { } commitTS := r.Ts - if r.IsTxn && (r.Phase == pb.Phase_COMMIT || r.Phase == pb.Phase_ABORT) { + if r.IsTxn && (r.Phase == pb.Phase_COMMIT || r.Phase == pb.Phase_ABORT || r.Phase == pb.Phase_NONE) { meta, _, err := extractTxnMeta(r.Mutations) if err != nil { return 0, errors.WithStack(err) @@ -177,8 +177,7 @@ func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS ui case pb.Phase_ABORT: return f.handleAbortRequest(ctx, r, commitTS) case pb.Phase_NONE: - // not reached - return errors.WithStack(ErrUnknownRequestType) + return f.handleOnePhaseTxnRequest(ctx, r, commitTS) default: return errors.WithStack(ErrUnknownRequestType) } @@ -266,6 +265,34 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error { return nil } +func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, commitTS uint64) error { + meta, muts, err := extractTxnMeta(r.Mutations) + if err != nil { + return err + } + if len(meta.PrimaryKey) == 0 { + return errors.WithStack(ErrTxnPrimaryKeyRequired) + } + if len(muts) == 0 { + return errors.WithStack(ErrInvalidRequest) + } + startTS := r.Ts + if commitTS <= startTS { + return errors.WithStack(ErrTxnCommitTSRequired) + } + + uniq, err := uniqueMutations(muts) + if err != nil { + return err + } + + storeMuts, err := f.buildOnePhaseStoreMutations(ctx, uniq) + if err != nil { + return err + } + return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, startTS, commitTS)) +} + func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error { meta, muts, err := extractTxnMeta(r.Mutations) if err != nil { @@ -343,6 +370,22 @@ func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutat return storeMuts, nil } +func (f *kvFSM) buildOnePhaseStoreMutations(ctx context.Context, muts []*pb.Mutation) ([]*store.KVPairMutation, error) { + for _, mut := range muts { + if isTxnInternalKey(mut.Key) { + return nil, errors.WithStack(ErrInvalidRequest) + } + if err := f.assertNoConflictingTxnLock(ctx, mut.Key, 0); err != nil { + return nil, err + } + } + storeMuts, err := toStoreMutations(muts) + if err != nil { + return nil, errors.WithStack(err) + } + return storeMuts, nil +} + func (f *kvFSM) buildCommitStoreMutations(ctx context.Context, muts []*pb.Mutation, meta TxnMeta, startTS, commitTS uint64) ([]*store.KVPairMutation, error) { storeMuts := make([]*store.KVPairMutation, 0, len(muts)*txnCommitStoreMutationFactor+txnCommitStoreMutationSlack) diff --git a/kv/fsm_occ_test.go b/kv/fsm_occ_test.go index d58fc3c97..09be86ae9 100644 --- a/kv/fsm_occ_test.go +++ b/kv/fsm_occ_test.go @@ -71,3 +71,35 @@ func TestApplyReturnsErrorOnConflict(t *testing.T) { require.NoError(t, err) require.Equal(t, []byte("v1"), v) } + +func TestOnePhaseTxnDetectsWriteConflict(t *testing.T) { + ctx := context.Background() + st := store.NewMVCCStore() + require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v1"), 100, 0)) + + fsm, ok := NewKvFSM(st).(*kvFSM) + require.True(t, ok) + + // One-phase txn with startTS < latest commit (100) should be rejected. + req := &pb.Request{ + IsTxn: true, + Phase: pb.Phase_NONE, + Ts: 90, + Mutations: []*pb.Mutation{ + {Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: []byte("k"), CommitTS: 110})}, + {Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v2")}, + }, + } + data, err := proto.Marshal(req) + require.NoError(t, err) + + resp := fsm.Apply(&raft.Log{Type: raft.LogCommand, Data: data}) + err, ok = resp.(error) + require.True(t, ok) + require.ErrorIs(t, err, store.ErrWriteConflict) + + // Ensure the original value is unchanged. + v, err := st.GetAt(ctx, []byte("k"), ^uint64(0)) + require.NoError(t, err) + require.Equal(t, []byte("v1"), v) +} diff --git a/kv/fsm_txn_test.go b/kv/fsm_txn_test.go index fe1cd1eb5..c258ba38f 100644 --- a/kv/fsm_txn_test.go +++ b/kv/fsm_txn_test.go @@ -143,3 +143,38 @@ func TestPrepareClampsHugeLockTTL(t *testing.T) { maxDelta := maxTxnLockTTLms << hlcLogicalBits require.LessOrEqual(t, lock.TTLExpireAt-now, maxDelta) } + +func TestOnePhaseTxnCommitsWithoutTxnArtifacts(t *testing.T) { + t.Parallel() + + ctx := context.Background() + st := store.NewMVCCStore() + fsm, ok := NewKvFSM(st).(*kvFSM) + require.True(t, ok) + + startTS := uint64(15) + commitTS := uint64(25) + key := []byte("k") + req := &pb.Request{ + IsTxn: true, + Phase: pb.Phase_NONE, + Ts: startTS, + Mutations: []*pb.Mutation{ + {Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: key, CommitTS: commitTS})}, + {Op: pb.Op_PUT, Key: key, Value: []byte("v")}, + }, + } + + require.NoError(t, applyFSMRequest(t, fsm, req)) + + value, err := st.GetAt(ctx, key, ^uint64(0)) + require.NoError(t, err) + require.Equal(t, []byte("v"), value) + + _, err = st.GetAt(ctx, txnLockKey(key), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound) + _, err = st.GetAt(ctx, txnIntentKey(key), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound) + _, err = st.GetAt(ctx, txnCommitKey(key, startTS), ^uint64(0)) + require.ErrorIs(t, err, store.ErrKeyNotFound) +} diff --git a/kv/transaction.go b/kv/transaction.go index 6faab54f8..da964fc10 100644 --- a/kv/transaction.go +++ b/kv/transaction.go @@ -159,8 +159,10 @@ func (t *TransactionManager) commitSequential(reqs []*pb.Request) (*TransactionR if err != nil { // Only attempt transactional cleanup for transactional batches. Raw request - // batches may partially succeed across shards by design. - if len(reqs) > 0 && reqs[0] != nil && reqs[0].IsTxn { + // batches may partially succeed across shards by design. One-phase + // transactional requests do not leave intents behind, so they do not need + // abort cleanup on failure. + if needsTxnCleanup(reqs) { _, _err := t.Abort(reqs) if _err != nil { return nil, errors.WithStack(errors.CombineErrors(err, _err)) @@ -174,6 +176,15 @@ func (t *TransactionManager) commitSequential(reqs []*pb.Request) (*TransactionR }, nil } +func needsTxnCleanup(reqs []*pb.Request) bool { + for _, req := range reqs { + if req != nil && req.IsTxn && req.Phase != pb.Phase_NONE { + return true + } + } + return false +} + func (t *TransactionManager) commitRaw(reqs []*pb.Request) (*TransactionResponse, error) { item := &rawCommitItem{ reqs: reqs, @@ -315,34 +326,9 @@ func combineApplyErrors(errs []error) error { func (t *TransactionManager) Abort(reqs []*pb.Request) (*TransactionResponse, error) { var abortReqs []*pb.Request for _, req := range reqs { - if req == nil || !req.IsTxn { - continue - } - meta, muts, err := extractTxnMeta(req.Mutations) - if err != nil { - // Best-effort cleanup; skip requests we can't interpret. - continue + if abortReq := abortRequestFor(req); abortReq != nil { + abortReqs = append(abortReqs, abortReq) } - startTS := req.Ts - abortTS := abortTSFrom(startTS, startTS) - if abortTS <= startTS { - // Overflow: can't choose an abort timestamp strictly greater than startTS. - continue - } - meta.CommitTS = abortTS - - abortReqs = append(abortReqs, &pb.Request{ - IsTxn: true, - Phase: pb.Phase_ABORT, - Ts: startTS, - Mutations: append([]*pb.Mutation{ - { - Op: pb.Op_PUT, - Key: []byte(txnMetaPrefix), - Value: EncodeTxnMeta(meta), - }, - }, muts...), - }) } var commitIndex uint64 @@ -362,6 +348,37 @@ func (t *TransactionManager) Abort(reqs []*pb.Request) (*TransactionResponse, er }, nil } +func abortRequestFor(req *pb.Request) *pb.Request { + if req == nil || !req.IsTxn || req.Phase == pb.Phase_NONE { + return nil + } + meta, muts, err := extractTxnMeta(req.Mutations) + if err != nil { + // Best-effort cleanup; skip requests we can't interpret. + return nil + } + startTS := req.Ts + abortTS := abortTSFrom(startTS, startTS) + if abortTS <= startTS { + // Overflow: can't choose an abort timestamp strictly greater than startTS. + return nil + } + meta.CommitTS = abortTS + + return &pb.Request{ + IsTxn: true, + Phase: pb.Phase_ABORT, + Ts: startTS, + Mutations: append([]*pb.Mutation{ + { + Op: pb.Op_PUT, + Key: []byte(txnMetaPrefix), + Value: EncodeTxnMeta(meta), + }, + }, muts...), + } +} + func extractTxnMeta(muts []*pb.Mutation) (TxnMeta, []*pb.Mutation, error) { if len(muts) == 0 || muts[0] == nil || !isTxnMetaKey(muts[0].Key) { return TxnMeta{}, nil, errors.WithStack(ErrTxnMetaMissing)