Skip to content

Commit eedc17c

Browse files
committed
fix(sqs): fail-closed for unresolved txn read keys
Round 5 introduced fail-closed semantics in ShardRouter.ResolveGroup (recognised-but-unresolved partitioned keys return ok=false instead of falling through to the engine), but the read-key path inside ShardedCoordinator was not audited for the new contract. engineGroupIDForKey discards the resolver's ok flag and returns 0 for any failure. groupReadKeysByShardID then loops `if gid == 0 { continue }`, silently dropping unrouted read keys from the prewrite payload. With the new fail-closed resolver, a partitioned read key whose queue has drifted out of --sqsFifoPartitionMap (partial rollout / config drift) gets dropped from OCC validation. The FSM never sees that key in ReadKeys, a concurrent write to the same key commits without conflict, and SSI is broken. Codex round-2 P1 on PR #715 caught this — addressed here. Fix groupReadKeysByShardID returns (map, error). Any read key that fails to route surfaces as ErrInvalidRequest, dispatchTxn propagates the error, and the transaction aborts before prewrite. Calls c.router.ResolveGroup directly (rather than via engineGroupIDForKey) so the (gid, ok) signal is preserved through the boundary. Tests - TestGroupReadKeysByShardID_FailsClosedOnUnroutable replaces TestGroupReadKeysByShardID_SkipsUnroutableKeys (which had been pinning the buggy skip-silently behaviour). Asserts the new fail-closed contract: unroutable keys → error, no partial map. - TestShardedCoordinator_TxnFailsClosedForUnresolvedReadKey is the coordinator-level regression: a transaction with a recognised-but-unresolved partitioned read key MUST abort before any prewrite. Asserts no group received an RPC. - The three existing TestGroupReadKeysByShardID_* tests are updated for the new (map, error) signature.
1 parent 87561fd commit eedc17c

3 files changed

Lines changed: 139 additions & 16 deletions

File tree

kv/sharded_coordinator.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,13 @@ func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, co
395395

396396
// Multi-shard path: group read keys by shard now. The result is passed
397397
// directly to prewriteTxn to avoid a second iteration inside that function.
398-
groupedReadKeys := c.groupReadKeysByShardID(readKeys)
398+
// A routing failure here aborts the transaction before any prewrite —
399+
// silently dropping unresolvable read keys would let OCC validation run
400+
// with an incomplete read set and break SSI.
401+
groupedReadKeys, err := c.groupReadKeysByShardID(readKeys)
402+
if err != nil {
403+
return nil, err
404+
}
399405
prepared, err := c.prewriteTxn(ctx, startTS, commitTS, primaryKey, grouped, gids, groupedReadKeys)
400406
if err != nil {
401407
return nil, err
@@ -891,19 +897,38 @@ func (c *ShardedCoordinator) engineGroupIDForKey(key []byte) uint64 {
891897
return gid
892898
}
893899

894-
func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) map[uint64][][]byte {
900+
// groupReadKeysByShardID groups txn read keys by their owning Raft
901+
// group. Returns an error when ANY read key cannot be routed —
902+
// silently skipping unresolvable keys would let a transaction
903+
// commit with an incomplete OCC read-set, which breaks SSI under
904+
// the §5 ShardRouter resolver-first dispatch (codex round-2 P1 on
905+
// PR #715).
906+
//
907+
// The fail-closed semantic the resolver gained in PR 4-B-2 makes
908+
// this path matter: a partitioned-shape read key whose queue is
909+
// missing from --sqsFifoPartitionMap (drift / partial rollout)
910+
// returns gid=0 from c.router.ResolveGroup. If we silently dropped
911+
// those keys, the prewrite Raft entry would carry an empty
912+
// ReadKeys slice for that key, the FSM's read-write conflict
913+
// validation would never see it, and a concurrent write could
914+
// commit alongside a stale read. Surface the routing failure as
915+
// an error so the transaction aborts before any FSM apply.
916+
func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) (map[uint64][][]byte, error) {
895917
if len(readKeys) == 0 {
896-
return nil
918+
return nil, nil
897919
}
898920
grouped := make(map[uint64][][]byte)
899921
for _, key := range readKeys {
900-
gid := c.engineGroupIDForKey(key)
901-
if gid == 0 {
902-
continue
922+
gid, ok := c.router.ResolveGroup(key)
923+
if !ok || gid == 0 {
924+
return nil, errors.Wrapf(ErrInvalidRequest,
925+
"no route for txn read key %q — recognised-but-"+
926+
"unresolved partition keys must fail closed to "+
927+
"preserve OCC read-set integrity", key)
903928
}
904929
grouped[gid] = append(grouped[gid], key)
905930
}
906-
return grouped
931+
return grouped, nil
907932
}
908933

909934
// validateReadOnlyShards checks read-write conflicts on shards that have

kv/sharded_coordinator_partition_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,85 @@ func TestShardedCoordinator_DispatchSplitsMutationsByResolverGroup(t *testing.T)
195195
"mutations across groups via c.router.ResolveGroup.")
196196
}
197197

198+
// TestShardedCoordinator_TxnFailsClosedForUnresolvedReadKey pins
199+
// the codex round-2 P1 fix on PR #715: a transaction whose read
200+
// set contains a partitioned-shape key the resolver recognises
201+
// but cannot route (queue missing from --sqsFifoPartitionMap
202+
// during drift / partial rollout) MUST abort before prewrite.
203+
//
204+
// Pre-fix path: groupReadKeysByShardID silently skipped read keys
205+
// where engineGroupIDForKey returned 0 (which is what the
206+
// fail-closed resolver returns for recognised-but-unresolved
207+
// keys). The resulting prewrite Raft entry carried an empty
208+
// ReadKeys slice for that key, the FSM's read-write conflict
209+
// validation never saw it, and a concurrent write could commit
210+
// alongside the stale read — SSI violated.
211+
//
212+
// Post-fix path: groupReadKeysByShardID returns an error when
213+
// any read key cannot be routed; dispatchTxn propagates the
214+
// error and the transaction aborts.
215+
func TestShardedCoordinator_TxnFailsClosedForUnresolvedReadKey(t *testing.T) {
216+
t.Parallel()
217+
engine := distribution.NewEngine()
218+
engine.UpdateRoute([]byte(""), nil, 1)
219+
220+
g1 := &recordingTransactional{
221+
responses: []*TransactionResponse{{CommitIndex: 1}},
222+
}
223+
g42 := &recordingTransactional{
224+
responses: []*TransactionResponse{{CommitIndex: 1}},
225+
}
226+
227+
coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{
228+
1: {Txn: g1, Store: store.NewMVCCStore()},
229+
42: {Txn: g42, Store: store.NewMVCCStore()},
230+
}, 1, NewHLC(), nil)
231+
232+
// Resolver claims the WRITE key but RECOGNISES the read key as
233+
// partitioned-shape and returns ok=false (simulating the
234+
// queue-missing-from-map drift scenario). The router pairs
235+
// Recognised=true with ok=false to fail closed.
236+
writeKey := []byte("!sqs|msg|data|p|claimed-write-key")
237+
readKey := []byte("!sqs|msg|data|p|drift-unresolved-read")
238+
coord.WithPartitionResolver(&stubResolver{
239+
claim: map[string]uint64{string(writeKey): 42},
240+
recognisedPrefix: []byte("!sqs|msg|data|p|"),
241+
})
242+
243+
// Issue a transaction that reads the unresolvable partitioned
244+
// key and writes the claimed key. The transaction MUST abort —
245+
// pre-fix it would silently drop the read key and proceed.
246+
resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{
247+
IsTxn: true,
248+
StartTS: 100,
249+
CommitTS: 200,
250+
Elems: []*Elem[OP]{
251+
{Op: Put, Key: writeKey, Value: []byte("v")},
252+
},
253+
ReadKeys: [][]byte{readKey},
254+
})
255+
require.Error(t, err,
256+
"transaction with unresolvable partitioned read key MUST "+
257+
"abort — silently dropping the key would let OCC "+
258+
"validation run with an incomplete read set and "+
259+
"break SSI")
260+
require.Nil(t, resp)
261+
262+
// Neither group should have received a prepare — the routing
263+
// failure surfaces before any prewrite.
264+
g1.mu.Lock()
265+
g42.mu.Lock()
266+
defer g1.mu.Unlock()
267+
defer g42.mu.Unlock()
268+
require.Zero(t, len(g1.requests),
269+
"engine default group must not see a prewrite — fail-closed "+
270+
"happens before any RPC")
271+
require.Zero(t, len(g42.requests),
272+
"resolver-claimed group must not see a prewrite either — "+
273+
"fail-closed aborts the txn before prewrite even though "+
274+
"the write key is routable")
275+
}
276+
198277
// TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys pins
199278
// the inverse: a key the resolver does NOT claim must continue to
200279
// route via the byte-range engine. Without this guard the resolver-

kv/sharded_coordinator_txn_test.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -309,15 +309,19 @@ func TestGroupReadKeysByShardID_NilReturnsNil(t *testing.T) {
309309
engine := distribution.NewEngine()
310310
engine.UpdateRoute([]byte(""), nil, 1)
311311
coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil)
312-
require.Nil(t, coord.groupReadKeysByShardID(nil))
312+
grouped, err := coord.groupReadKeysByShardID(nil)
313+
require.NoError(t, err)
314+
require.Nil(t, grouped)
313315
}
314316

315317
func TestGroupReadKeysByShardID_EmptyReturnsNil(t *testing.T) {
316318
t.Parallel()
317319
engine := distribution.NewEngine()
318320
engine.UpdateRoute([]byte(""), nil, 1)
319321
coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil)
320-
require.Nil(t, coord.groupReadKeysByShardID([][]byte{}))
322+
grouped, err := coord.groupReadKeysByShardID([][]byte{})
323+
require.NoError(t, err)
324+
require.Nil(t, grouped)
321325
}
322326

323327
func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) {
@@ -327,11 +331,12 @@ func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) {
327331
engine.UpdateRoute([]byte("m"), nil, 2)
328332
coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}, 2: {}}, 1, NewHLC(), nil)
329333

330-
grouped := coord.groupReadKeysByShardID([][]byte{
334+
grouped, err := coord.groupReadKeysByShardID([][]byte{
331335
[]byte("b"), // shard 1
332336
[]byte("c"), // shard 1
333337
[]byte("x"), // shard 2
334338
})
339+
require.NoError(t, err)
335340
require.Len(t, grouped, 2)
336341
require.Len(t, grouped[1], 2)
337342
require.Equal(t, []byte("b"), grouped[1][0])
@@ -340,20 +345,34 @@ func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) {
340345
require.Equal(t, []byte("x"), grouped[2][0])
341346
}
342347

343-
func TestGroupReadKeysByShardID_SkipsUnroutableKeys(t *testing.T) {
348+
// TestGroupReadKeysByShardID_FailsClosedOnUnroutable pins the
349+
// codex round-2 P1 fix on PR #715: a read key the resolver cannot
350+
// route (recognised-but-unresolved partition key during drift, or
351+
// any key outside the engine's range cover) MUST surface as an
352+
// error so the transaction aborts before any prewrite. Silently
353+
// skipping unroutable keys would let OCC validation run with an
354+
// incomplete read set and break SSI — a concurrent write to that
355+
// key could commit alongside a stale read.
356+
//
357+
// This test was previously TestGroupReadKeysByShardID_SkipsUnroutableKeys
358+
// and pinned the BUGGY skip-silently behaviour. Renamed and rewritten
359+
// to pin the new fail-closed contract.
360+
func TestGroupReadKeysByShardID_FailsClosedOnUnroutable(t *testing.T) {
344361
t.Parallel()
345362
// Only route "a"-"m" to shard 1. Keys outside this range are unroutable.
346363
engine := distribution.NewEngine()
347364
engine.UpdateRoute([]byte("a"), []byte("m"), 1)
348365
coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil)
349366

350-
grouped := coord.groupReadKeysByShardID([][]byte{
367+
grouped, err := coord.groupReadKeysByShardID([][]byte{
351368
[]byte("b"), // routable → shard 1
352-
[]byte("zzz"), // unroutable → skipped
369+
[]byte("zzz"), // unroutable → MUST surface as error
353370
})
354-
require.Len(t, grouped, 1)
355-
require.Len(t, grouped[1], 1)
356-
require.Equal(t, []byte("b"), grouped[1][0])
371+
require.Error(t, err,
372+
"unroutable read key MUST fail closed — silently skipping "+
373+
"would drop the key from OCC validation and break SSI")
374+
require.Nil(t, grouped)
375+
require.ErrorIs(t, err, ErrInvalidRequest)
357376
}
358377

359378
// ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)