Skip to content

Commit 6eb2e2e

Browse files
authored
Merge pull request #499 from bootjp/feat/read-skew-fix
store: add read-set validation to ApplyMutations for SSI
2 parents c28fd40 + dd9e6eb commit 6eb2e2e

22 files changed

Lines changed: 716 additions & 65 deletions

adapter/distribution_server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ func (s *distributionCoordinatorStub) applyDispatch(
664664
startTS uint64,
665665
commitTS uint64,
666666
) error {
667-
if err := s.store.ApplyMutations(ctx, mutations, startTS, commitTS); err != nil {
667+
if err := s.store.ApplyMutations(ctx, mutations, nil, startTS, commitTS); err != nil {
668668
return err
669669
}
670670
if s.afterDispatch != nil {

adapter/redis.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1923,7 +1923,11 @@ func (t *txnContext) commit() error {
19231923
return nil
19241924
}
19251925

1926-
group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems, StartTS: t.startTS}
1926+
readKeys := make([][]byte, 0, len(t.readKeys))
1927+
for _, k := range t.readKeys {
1928+
readKeys = append(readKeys, k)
1929+
}
1930+
group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: elems, StartTS: t.startTS, ReadKeys: readKeys}
19271931
ctx, cancel := context.WithTimeout(context.Background(), redisDispatchTimeout)
19281932
defer cancel()
19291933
if _, err := t.server.coordinator.Dispatch(ctx, group); err != nil {

adapter/s3_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1916,7 +1916,7 @@ func TestS3Server_BackwardCompatibility_NoBucketAclFieldIsPrivate(t *testing.T)
19161916
commitTS := coord.Clock().Next()
19171917
err = st.ApplyMutations(context.Background(), []*store.KVPairMutation{
19181918
{Op: store.OpTypePut, Key: s3keys.BucketMetaKey("legacy-bucket"), Value: legacyJSON},
1919-
}, commitTS-1, commitTS)
1919+
}, nil, commitTS-1, commitTS)
19201920
require.NoError(t, err)
19211921

19221922
// Create a server WITH credentials; the legacy bucket has no acl field.

distribution/catalog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ func (s *CatalogStore) applySaveMutations(ctx context.Context, plan savePlan, mu
627627
if err != nil {
628628
return err
629629
}
630-
if err := s.store.ApplyMutations(ctx, mutations, plan.readTS, commitTS); err != nil {
630+
if err := s.store.ApplyMutations(ctx, mutations, nil, plan.readTS, commitTS); err != nil {
631631
if errors.Is(err, store.ErrWriteConflict) {
632632
return errors.WithStack(ErrCatalogVersionMismatch)
633633
}

kv/coordinator.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,13 @@ func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64, commitTS uint
143143
return nil, errors.WithStack(ErrTxnCommitTSRequired)
144144
}
145145

146+
// Read-set validation for single-shard transactions is performed by the
147+
// adapter BEFORE Raft submission (validateReadSet). Passing readKeys
148+
// into the Raft log would cause the FSM to reject transactions after
149+
// they are already committed in the log, forcing retries at a later
150+
// timestamp and breaking realtime ordering of appends.
146151
r, err := c.transactionManager.Commit([]*pb.Request{
147-
onePhaseTxnRequest(startTS, commitTS, primary, reqs),
152+
onePhaseTxnRequest(startTS, commitTS, primary, reqs, nil),
148153
})
149154
if err != nil {
150155
return nil, errors.WithStack(err)
@@ -258,7 +263,7 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C
258263
commitTS = 0
259264
}
260265
requests = []*pb.Request{
261-
onePhaseTxnRequest(reqs.StartTS, commitTS, primary, reqs.Elems),
266+
onePhaseTxnRequest(reqs.StartTS, commitTS, primary, reqs.Elems, nil),
262267
}
263268
} else {
264269
for _, req := range reqs.Elems {
@@ -318,7 +323,7 @@ func elemToMutation(req *Elem[OP]) *pb.Mutation {
318323
panic("unreachable")
319324
}
320325

321-
func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP]) *pb.Request {
326+
func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Elem[OP], readKeys [][]byte) *pb.Request {
322327
muts := make([]*pb.Mutation, 0, len(reqs)+1)
323328
muts = append(muts, txnMetaMutation(primaryKey, 0, commitTS))
324329
for _, req := range reqs {
@@ -329,6 +334,7 @@ func onePhaseTxnRequest(startTS, commitTS uint64, primaryKey []byte, reqs []*Ele
329334
Phase: pb.Phase_NONE,
330335
Ts: startTS,
331336
Mutations: muts,
337+
ReadKeys: readKeys,
332338
}
333339
}
334340

kv/coordinator_txn_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,6 @@ func TestCoordinateDispatchTxn_UsesProvidedCommitTS(t *testing.T) {
116116
require.NoError(t, err)
117117
require.Equal(t, commitTS, meta.CommitTS)
118118
}
119+
120+
// ReadKeys omission from single-shard Raft entries is tested in
121+
// TestShardedCoordinatorDispatchTxn_SingleShardOmitsReadKeysFromRaftEntry.

kv/fsm.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func (f *kvFSM) handleRawRequest(ctx context.Context, r *pb.Request, commitTS ui
184184
}
185185
// Raw requests always commit against the latest state; use commitTS as both
186186
// the validation snapshot and the commit timestamp.
187-
return errors.WithStack(f.store.ApplyMutations(ctx, muts, commitTS, commitTS))
187+
return errors.WithStack(f.store.ApplyMutations(ctx, muts, nil, commitTS, commitTS))
188188
}
189189

190190
// extractDelPrefix checks if the mutations contain a DEL_PREFIX operation.
@@ -313,18 +313,16 @@ func (f *kvFSM) handlePrepareRequest(ctx context.Context, r *pb.Request) error {
313313
return err
314314
}
315315

316-
if err := f.store.ApplyMutations(ctx, storeMuts, startTS, startTS); err != nil {
316+
if err := f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, startTS); err != nil {
317317
return errors.WithStack(err)
318318
}
319319
return nil
320320
}
321321

322322
// handleOnePhaseTxnRequest applies a single-shard transaction atomically.
323-
// The isolation level is Snapshot Isolation (SI): only write-write conflicts
324-
// are detected via ApplyMutations. Read-write conflicts (write skew) are NOT
325-
// prevented because the read-set is not tracked. Callers requiring
326-
// Serializable Snapshot Isolation (SSI) must implement read-set validation
327-
// at a higher layer.
323+
// Both write-write and read-write conflicts are checked: the read set carried
324+
// in r.ReadKeys is validated alongside the mutation keys inside
325+
// ApplyMutations under the store's apply lock.
328326
func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, commitTS uint64) error {
329327
meta, muts, err := extractTxnMeta(r.Mutations)
330328
if err != nil {
@@ -350,7 +348,7 @@ func (f *kvFSM) handleOnePhaseTxnRequest(ctx context.Context, r *pb.Request, com
350348
if err != nil {
351349
return err
352350
}
353-
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, startTS, commitTS))
351+
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, r.ReadKeys, startTS, commitTS))
354352
}
355353

356354
func (f *kvFSM) handleCommitRequest(ctx context.Context, r *pb.Request) error {
@@ -419,7 +417,7 @@ func (f *kvFSM) commitApplyStartTS(ctx context.Context, primaryKey []byte, start
419417
// The secondary-shard LatestCommitTS scan is intentionally deferred to the
420418
// write-conflict path so the hot (first-time) commit path pays no extra cost.
421419
func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMuts []*store.KVPairMutation, uniq []*pb.Mutation, applyStartTS, commitTS uint64) error {
422-
err := f.store.ApplyMutations(ctx, storeMuts, applyStartTS, commitTS)
420+
err := f.store.ApplyMutations(ctx, storeMuts, nil, applyStartTS, commitTS)
423421
if err == nil {
424422
return nil
425423
}
@@ -436,7 +434,7 @@ func (f *kvFSM) applyCommitWithIdempotencyFallback(ctx context.Context, storeMut
436434
return errors.WithStack(lErr)
437435
}
438436
if exists && latestTS >= commitTS {
439-
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, commitTS, commitTS))
437+
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, commitTS, commitTS))
440438
}
441439
}
442440
return errors.WithStack(err)
@@ -475,7 +473,7 @@ func (f *kvFSM) handleAbortRequest(ctx context.Context, r *pb.Request, abortTS u
475473
if len(storeMuts) == 0 {
476474
return nil
477475
}
478-
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, startTS, abortTS))
476+
return errors.WithStack(f.store.ApplyMutations(ctx, storeMuts, nil, startTS, abortTS))
479477
}
480478

481479
func (f *kvFSM) buildPrepareStoreMutations(ctx context.Context, muts []*pb.Mutation, primaryKey []byte, startTS, expireAt uint64) ([]*store.KVPairMutation, error) {

kv/leader_routed_store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,11 @@ func (s *LeaderRoutedStore) LatestCommitTS(ctx context.Context, key []byte) (uin
232232
return s.proxyRawLatestCommitTS(ctx, key)
233233
}
234234

235-
func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error {
235+
func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error {
236236
if s == nil || s.local == nil {
237237
return errors.WithStack(store.ErrNotSupported)
238238
}
239-
return errors.WithStack(s.local.ApplyMutations(ctx, mutations, startTS, commitTS))
239+
return errors.WithStack(s.local.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS))
240240
}
241241

242242
func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error {

kv/shard_store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,7 +1116,7 @@ func cleanupTSWithNow(startTS, now uint64) uint64 {
11161116
//
11171117
// All mutations must belong to the same shard. Cross-shard mutation batches are
11181118
// not supported.
1119-
func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error {
1119+
func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error {
11201120
if len(mutations) == 0 {
11211121
return nil
11221122
}
@@ -1135,7 +1135,7 @@ func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPa
11351135
return errors.WithStack(ErrCrossShardMutationBatchNotSupported)
11361136
}
11371137
}
1138-
return errors.WithStack(firstGroup.Store.ApplyMutations(ctx, mutations, startTS, commitTS))
1138+
return errors.WithStack(firstGroup.Store.ApplyMutations(ctx, mutations, readKeys, startTS, commitTS))
11391139
}
11401140

11411141
// DeletePrefixAt applies a prefix delete to every shard in the store.

kv/sharded_coordinator.go

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[
9090
}
9191

9292
if reqs.IsTxn {
93-
return c.dispatchTxn(reqs.StartTS, reqs.CommitTS, reqs.Elems)
93+
return c.dispatchTxn(ctx, reqs.StartTS, reqs.CommitTS, reqs.Elems, reqs.ReadKeys)
9494
}
9595

9696
logs, err := c.requestLogs(reqs)
@@ -193,7 +193,7 @@ func (c *ShardedCoordinator) broadcastToAllGroups(requests []*pb.Request) (*Coor
193193
return &CoordinateResponse{CommitIndex: maxIndex.Load()}, nil
194194
}
195195

196-
func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems []*Elem[OP]) (*CoordinateResponse, error) {
196+
func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, commitTS uint64, elems []*Elem[OP], readKeys [][]byte) (*CoordinateResponse, error) {
197197
grouped, gids, err := c.groupMutations(elems)
198198
if err != nil {
199199
return nil, err
@@ -212,7 +212,7 @@ func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems
212212
return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems)
213213
}
214214

215-
prepared, err := c.prewriteTxn(startTS, commitTS, primaryKey, grouped, gids)
215+
prepared, err := c.prewriteTxn(ctx, startTS, commitTS, primaryKey, grouped, gids, readKeys)
216216
if err != nil {
217217
return nil, err
218218
}
@@ -246,8 +246,9 @@ func (c *ShardedCoordinator) dispatchSingleShardTxn(startTS, commitTS uint64, pr
246246
if err != nil {
247247
return nil, err
248248
}
249+
// Single-shard: read-set validated pre-Raft by the adapter.
249250
resp, err := g.Txn.Commit([]*pb.Request{
250-
onePhaseTxnRequest(startTS, commitTS, primaryKey, elems),
251+
onePhaseTxnRequest(startTS, commitTS, primaryKey, elems, nil),
251252
})
252253
if err != nil {
253254
return nil, errors.WithStack(err)
@@ -263,10 +264,12 @@ type preparedGroup struct {
263264
keys []*pb.Mutation
264265
}
265266

266-
func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey []byte, grouped map[uint64][]*pb.Mutation, gids []uint64) ([]preparedGroup, error) {
267+
func (c *ShardedCoordinator) prewriteTxn(ctx context.Context, startTS, commitTS uint64, primaryKey []byte, grouped map[uint64][]*pb.Mutation, gids []uint64, readKeys [][]byte) ([]preparedGroup, error) {
267268
prepareMeta := txnMetaMutation(primaryKey, defaultTxnLockTTLms, 0)
268269
prepared := make([]preparedGroup, 0, len(gids))
269270

271+
groupedReadKeys := c.groupReadKeysByShardID(readKeys)
272+
270273
for _, gid := range gids {
271274
g, err := c.txnGroupForID(gid)
272275
if err != nil {
@@ -277,6 +280,7 @@ func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey []
277280
Phase: pb.Phase_PREPARE,
278281
Ts: startTS,
279282
Mutations: append([]*pb.Mutation{prepareMeta}, grouped[gid]...),
283+
ReadKeys: groupedReadKeys[gid],
280284
}
281285
if _, err := g.Txn.Commit([]*pb.Request{req}); err != nil {
282286
c.abortPreparedTxn(startTS, primaryKey, prepared, abortTSFrom(startTS, commitTS))
@@ -285,6 +289,14 @@ func (c *ShardedCoordinator) prewriteTxn(startTS, commitTS uint64, primaryKey []
285289
prepared = append(prepared, preparedGroup{gid: gid, keys: keyMutations(grouped[gid])})
286290
}
287291

292+
// Validate read keys on read-only shards (shards that have read keys
293+
// but no mutations in this transaction). Without this, a concurrent
294+
// write to a read-only shard would go undetected.
295+
if err := c.validateReadOnlyShards(ctx, groupedReadKeys, gids, startTS); err != nil {
296+
c.abortPreparedTxn(startTS, primaryKey, prepared, abortTSFrom(startTS, commitTS))
297+
return nil, err
298+
}
299+
288300
return prepared, nil
289301
}
290302

@@ -586,6 +598,81 @@ func (c *ShardedCoordinator) engineGroupIDForKey(key []byte) uint64 {
586598
return route.GroupID
587599
}
588600

601+
func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) map[uint64][][]byte {
602+
if len(readKeys) == 0 {
603+
return nil
604+
}
605+
grouped := make(map[uint64][][]byte)
606+
for _, key := range readKeys {
607+
gid := c.engineGroupIDForKey(key)
608+
if gid == 0 {
609+
continue
610+
}
611+
grouped[gid] = append(grouped[gid], key)
612+
}
613+
return grouped
614+
}
615+
616+
// validateReadOnlyShards checks read-write conflicts on shards that have
617+
// read keys but no mutations in this transaction. writeGIDs is the set of
618+
// shards that already received a PREPARE with their readKeys attached.
619+
//
620+
// Because these shards have no mutations, we cannot send a PREPARE request
621+
// (the FSM rejects empty mutation lists). Instead we issue a linearizable
622+
// read barrier on each read-only shard's Raft group (ensuring the local
623+
// FSM has applied all committed log entries) and then check LatestCommitTS
624+
// against the local store.
625+
//
626+
// NOTE: This check is performed outside the FSM's applyMu lock, so there
627+
// is a small TOCTOU window between the linearizable read barrier and the
628+
// LatestCommitTS check. A concurrent write that commits in this window may
629+
// go undetected. Full SSI for read-only shards in multi-shard transactions
630+
// would require a dedicated "read-validate" FSM request phase. For
631+
// single-shard transactions and write-shard read keys, validation is fully
632+
// atomic under applyMu.
633+
func (c *ShardedCoordinator) validateReadOnlyShards(ctx context.Context, groupedReadKeys map[uint64][][]byte, writeGIDs []uint64, startTS uint64) error {
634+
if len(groupedReadKeys) == 0 {
635+
return nil
636+
}
637+
writeSet := make(map[uint64]struct{}, len(writeGIDs))
638+
for _, gid := range writeGIDs {
639+
writeSet[gid] = struct{}{}
640+
}
641+
for gid, keys := range groupedReadKeys {
642+
if _, isWrite := writeSet[gid]; isWrite {
643+
continue
644+
}
645+
if err := c.validateReadKeysOnShard(ctx, gid, keys, startTS); err != nil {
646+
return err
647+
}
648+
}
649+
return nil
650+
}
651+
652+
func (c *ShardedCoordinator) validateReadKeysOnShard(ctx context.Context, gid uint64, keys [][]byte, startTS uint64) error {
653+
g, ok := c.groups[gid]
654+
if !ok {
655+
return nil
656+
}
657+
// Linearizable read barrier: wait until the shard's FSM has applied
658+
// all Raft-committed entries so LatestCommitTS reflects the latest
659+
// committed state. Without this, a concurrent write that is committed
660+
// in Raft but not yet applied locally would be invisible.
661+
if _, err := linearizableReadEngineCtx(ctx, engineForGroup(g)); err != nil {
662+
return errors.WithStack(err)
663+
}
664+
for _, key := range keys {
665+
ts, exists, err := g.Store.LatestCommitTS(ctx, key)
666+
if err != nil {
667+
return errors.WithStack(err)
668+
}
669+
if exists && ts > startTS {
670+
return errors.WithStack(store.NewWriteConflictError(key))
671+
}
672+
}
673+
return nil
674+
}
675+
589676
var _ Coordinator = (*ShardedCoordinator)(nil)
590677

591678
func validateOperationGroup(reqs *OperationGroup[OP]) error {

0 commit comments

Comments
 (0)