Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions adapter/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func forwardedTxnStartTS(reqs []*pb.Request) uint64 {
}

func forwardedTxnMetaMutation(r *pb.Request, metaPrefix []byte) (*pb.Mutation, bool) {
if r == nil {
if r == nil || !r.IsTxn {
return nil, false
}
if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT {
if r.Phase != pb.Phase_COMMIT && r.Phase != pb.Phase_ABORT && r.Phase != pb.Phase_NONE {
return nil, false
}
if len(r.Mutations) == 0 || r.Mutations[0] == nil {
Expand Down
31 changes: 31 additions & 0 deletions adapter/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,37 @@ func TestFillForwardedTxnCommitTS_PreservesExistingCommitTS(t *testing.T) {
require.Equal(t, uint64(42), meta.CommitTS)
}

func TestFillForwardedTxnCommitTS_AssignsCommitTSForOnePhaseTxn(t *testing.T) {
t.Parallel()

i := &Internal{}
startTS := uint64(10)
reqs := []*pb.Request{
{
IsTxn: true,
Phase: pb.Phase_NONE,
Mutations: []*pb.Mutation{
{
Op: pb.Op_PUT,
Key: []byte(kv.TxnMetaPrefix),
Value: kv.EncodeTxnMeta(kv.TxnMeta{PrimaryKey: []byte("k"), CommitTS: 0}),
},
{
Op: pb.Op_PUT,
Key: []byte("k"),
Value: []byte("v"),
},
},
},
}

require.NoError(t, i.fillForwardedTxnCommitTS(reqs, startTS))

meta, err := kv.DecodeTxnMeta(reqs[0].Mutations[0].Value)
require.NoError(t, err)
require.Equal(t, startTS+1, meta.CommitTS)
}

func TestStampTxnTimestamps_UsesSingleTxnStartTS(t *testing.T) {
t.Parallel()

Expand Down
44 changes: 15 additions & 29 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64) (*CoordinateR
return nil, errors.WithStack(ErrTxnCommitTSRequired)
}

logs := txnRequests(startTS, commitTS, defaultTxnLockTTLms, primary, reqs)

r, err := c.transactionManager.Commit(logs)
r, err := c.transactionManager.Commit([]*pb.Request{
onePhaseTxnRequest(startTS, commitTS, primary, reqs),
})
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down Expand Up @@ -209,7 +209,9 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C
if len(primary) == 0 {
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
}
requests = txnRequests(reqs.StartTS, 0, defaultTxnLockTTLms, primary, reqs.Elems)
requests = []*pb.Request{
onePhaseTxnRequest(reqs.StartTS, 0, primary, reqs.Elems),
}
} else {
for _, req := range reqs.Elems {
requests = append(requests, c.toRawRequest(req))
Expand Down Expand Up @@ -261,33 +263,17 @@ func elemToMutation(req *Elem[OP]) *pb.Mutation {
panic("unreachable")
}

func txnRequests(startTS, commitTS, lockTTLms uint64, primaryKey []byte, reqs []*Elem[OP]) []*pb.Request {
meta := &pb.Mutation{
Op: pb.Op_PUT,
Key: []byte(txnMetaPrefix),
Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: lockTTLms, CommitTS: 0}),
}

prepareMuts := make([]*pb.Mutation, 0, len(reqs)+1)
prepareMuts = append(prepareMuts, meta)
for _, req := range reqs {
prepareMuts = append(prepareMuts, elemToMutation(req))
}

commitMeta := &pb.Mutation{
Op: pb.Op_PUT,
Key: []byte(txnMetaPrefix),
Value: EncodeTxnMeta(TxnMeta{PrimaryKey: primaryKey, LockTTLms: 0, CommitTS: commitTS}),
}
commitMuts := make([]*pb.Mutation, 0, len(reqs)+1)
commitMuts = append(commitMuts, commitMeta)
func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP]) *pb.Request {
muts := make([]*pb.Mutation, 0, len(reqs)+1)
muts = append(muts, txnMetaMutation(primaryKey, 0, commitTS))
Comment thread
bootjp marked this conversation as resolved.
for _, req := range reqs {
commitMuts = append(commitMuts, &pb.Mutation{Op: pb.Op_PUT, Key: req.Key})
muts = append(muts, elemToMutation(req))
}

return []*pb.Request{
{IsTxn: true, Phase: pb.Phase_PREPARE, Ts: startTS, Mutations: prepareMuts},
{IsTxn: true, Phase: pb.Phase_COMMIT, Ts: startTS, Mutations: commitMuts},
return &pb.Request{
IsTxn: true,
Phase: pb.Phase_NONE,
Ts: startTS,
Mutations: muts,
}
}

Expand Down
41 changes: 40 additions & 1 deletion kv/coordinator_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (

type stubTransactional struct {
commits int
reqs [][]*pb.Request
}

func (s *stubTransactional) Commit(_ []*pb.Request) (*TransactionResponse, error) {
func (s *stubTransactional) Commit(reqs []*pb.Request) (*TransactionResponse, error) {
s.commits++
s.reqs = append(s.reqs, reqs)
return &TransactionResponse{}, nil
}

Expand Down Expand Up @@ -54,3 +56,40 @@ func TestCoordinateDispatchTxn_RejectsMissingPrimaryKey(t *testing.T) {
require.ErrorIs(t, err, ErrTxnPrimaryKeyRequired)
require.Equal(t, 0, tx.commits)
}

func TestCoordinateDispatchTxn_UsesOnePhaseRequest(t *testing.T) {
t.Parallel()

tx := &stubTransactional{}
c := &Coordinate{
transactionManager: tx,
clock: NewHLC(),
}

startTS := uint64(10)
_, err := c.dispatchTxn([]*Elem[OP]{
{Op: Put, Key: []byte("b"), Value: []byte("v1")},
{Op: Del, Key: []byte("x")},
}, startTS)
require.NoError(t, err)
require.Equal(t, 1, tx.commits)
require.Len(t, tx.reqs, 1)
require.Len(t, tx.reqs[0], 1)

req := tx.reqs[0][0]
require.NotNil(t, req)
require.True(t, req.IsTxn)
require.Equal(t, pb.Phase_NONE, req.Phase)
require.Equal(t, startTS, req.Ts)
require.Len(t, req.Mutations, 3)
require.Equal(t, []byte(txnMetaPrefix), req.Mutations[0].Key)
require.Equal(t, []byte("b"), req.Mutations[1].Key)
require.Equal(t, []byte("v1"), req.Mutations[1].Value)
require.Equal(t, pb.Op_DEL, req.Mutations[2].Op)
require.Equal(t, []byte("x"), req.Mutations[2].Key)

meta, err := DecodeTxnMeta(req.Mutations[0].Value)
require.NoError(t, err)
require.Equal(t, []byte("b"), meta.PrimaryKey)
require.Greater(t, meta.CommitTS, startTS)
}
49 changes: 46 additions & 3 deletions kv/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func requestCommitTS(r *pb.Request) (uint64, error) {
}

commitTS := r.Ts
if r.IsTxn && (r.Phase == pb.Phase_COMMIT || r.Phase == pb.Phase_ABORT) {
if r.IsTxn && (r.Phase == pb.Phase_COMMIT || r.Phase == pb.Phase_ABORT || r.Phase == pb.Phase_NONE) {
meta, _, err := extractTxnMeta(r.Mutations)
if err != nil {
return 0, errors.WithStack(err)
Expand Down Expand Up @@ -177,8 +177,7 @@ func (f *kvFSM) handleTxnRequest(ctx context.Context, r *pb.Request, commitTS ui
case pb.Phase_ABORT:
return f.handleAbortRequest(ctx, r, commitTS)
case pb.Phase_NONE:
// not reached
return errors.WithStack(ErrUnknownRequestType)
return f.handleOnePhaseTxnRequest(ctx, r, commitTS)
default:
return errors.WithStack(ErrUnknownRequestType)
}
Expand Down Expand Up @@ -266,6 +265,34 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error {
return nil
}

func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, commitTS uint64) error {
meta, muts, err := extractTxnMeta(r.Mutations)
if err != nil {
return err
}
if len(meta.PrimaryKey) == 0 {
return errors.WithStack(ErrTxnPrimaryKeyRequired)
}
if len(muts) == 0 {
return errors.WithStack(ErrInvalidRequest)
}
startTS := r.Ts
if commitTS <= startTS {
return errors.WithStack(ErrTxnCommitTSRequired)
}

uniq, err := uniqueMutations(muts)
if err != nil {
return err
}

storeMuts, err := f.buildOnePhaseStoreMutations(ctx, uniq)
if err != nil {
return err
}
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, startTS, commitTS))
}

func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error {
meta, muts, err := extractTxnMeta(r.Mutations)
if err != nil {
Expand Down Expand Up @@ -343,6 +370,22 @@ func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutat
return storeMuts, nil
}

func (f *kvFSM) buildOnePhaseStoreMutations(ctx context.Context, muts []*pb.Mutation) ([]*store.KVPairMutation, error) {
for _, mut := range muts {
if isTxnInternalKey(mut.Key) {
return nil, errors.WithStack(ErrInvalidRequest)
}
if err := f.assertNoConflictingTxnLock(ctx, mut.Key, 0); err != nil {
return nil, err
}
}
storeMuts, err := toStoreMutations(muts)
if err != nil {
return nil, errors.WithStack(err)
}
return storeMuts, nil
}

func (f *kvFSM) buildCommitStoreMutations(ctx context.Context, muts []*pb.Mutation, meta TxnMeta, startTS, commitTS uint64) ([]*store.KVPairMutation, error) {
storeMuts := make([]*store.KVPairMutation, 0, len(muts)*txnCommitStoreMutationFactor+txnCommitStoreMutationSlack)

Expand Down
32 changes: 32 additions & 0 deletions kv/fsm_occ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,35 @@ func TestApplyReturnsErrorOnConflict(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []byte("v1"), v)
}

func TestOnePhaseTxnDetectsWriteConflict(t *testing.T) {
ctx := context.Background()
st := store.NewMVCCStore()
require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v1"), 100, 0))

fsm, ok := NewKvFSM(st).(*kvFSM)
require.True(t, ok)

// One-phase txn with startTS < latest commit (100) should be rejected.
req := &pb.Request{
IsTxn: true,
Phase: pb.Phase_NONE,
Ts: 90,
Mutations: []*pb.Mutation{
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: []byte("k"), CommitTS: 110})},
{Op: pb.Op_PUT, Key: []byte("k"), Value: []byte("v2")},
},
}
data, err := proto.Marshal(req)
require.NoError(t, err)

resp := fsm.Apply(&raft.Log{Type: raft.LogCommand, Data: data})
err, ok = resp.(error)
require.True(t, ok)
require.ErrorIs(t, err, store.ErrWriteConflict)

// Ensure the original value is unchanged.
v, err := st.GetAt(ctx, []byte("k"), ^uint64(0))
require.NoError(t, err)
require.Equal(t, []byte("v1"), v)
}
35 changes: 35 additions & 0 deletions kv/fsm_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,38 @@ func TestPrepareClampsHugeLockTTL(t *testing.T) {
maxDelta := maxTxnLockTTLms << hlcLogicalBits
require.LessOrEqual(t, lock.TTLExpireAt-now, maxDelta)
}

func TestOnePhaseTxnCommitsWithoutTxnArtifacts(t *testing.T) {
t.Parallel()

ctx := context.Background()
st := store.NewMVCCStore()
fsm, ok := NewKvFSM(st).(*kvFSM)
require.True(t, ok)

startTS := uint64(15)
commitTS := uint64(25)
key := []byte("k")
req := &pb.Request{
IsTxn: true,
Phase: pb.Phase_NONE,
Ts: startTS,
Mutations: []*pb.Mutation{
{Op: pb.Op_PUT, Key: []byte(txnMetaPrefix), Value: EncodeTxnMeta(TxnMeta{PrimaryKey: key, CommitTS: commitTS})},
{Op: pb.Op_PUT, Key: key, Value: []byte("v")},
},
}

require.NoError(t, applyFSMRequest(t, fsm, req))

value, err := st.GetAt(ctx, key, ^uint64(0))
require.NoError(t, err)
require.Equal(t, []byte("v"), value)

_, err = st.GetAt(ctx, txnLockKey(key), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound)
_, err = st.GetAt(ctx, txnIntentKey(key), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound)
_, err = st.GetAt(ctx, txnCommitKey(key, startTS), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound)
}
Loading
Loading