From 1f48eeb15db76d667a9fdbf3d379fd12c7ffb957 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 18:08:08 +0900 Subject: [PATCH 1/8] feat(sqs): partition resolver for HT-FIFO routing (Phase 3.D PR 4-B-2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the routing-layer half of PR 4-B: a PartitionResolver that ShardRouter consults BEFORE falling through to the byte-range engine. SQS HT-FIFO needs partition-aware dispatch — partition K of queue Q lives on a different Raft group than partition K+1 — but the engine's non-overlapping-cover model cannot express overlay routes without breaking legacy keys (a partition route would leave a lexicographic gap that legacy keys fall into). The resolver-first dispatch sidesteps this: the resolver answers only for partitioned-keyspace keys, returns (0, false) otherwise, and the engine handles everything else exactly as today. What changes - kv.PartitionResolver interface: ResolveGroup([]byte) (uint64, bool). - kv.ShardRouter.WithPartitionResolver: option, nil-safe, idempotent. - kv.ShardRouter.resolveGroup: tries the resolver first, falls through to engine.GetRoute. groupRequests and Get both call this unified path so reads and writes share the same dispatch logic. - kv.ShardedCoordinator.WithPartitionResolver: delegates to the router so main.go can install the resolver via the existing fluent-construction style. - adapter.SQSPartitionResolver: parses (queue, partition) from the partitioned key shape, looks up the operator-chosen group from the runtime config map. Defensive copy at construction; nil-safe ResolveGroup; returns (0, false) for legacy / non-matching keys. - main.go: builds the resolver from runtimeConfig.sqsFifoPartitionMap (canonicalized to numeric uint64 group IDs by parseSQSFifoGroupList) and installs it via WithPartitionResolver. The resolver is nil for a non-partitioned cluster — request hot path stays engine-only. What does NOT change yet - htfifoCapabilityAdvertised stays false. PR 4-B-3 wires the §8 leadership-refusal hook + catalog-polling helper for the CreateQueue capability gate, then flips the constant to true. - Send / Receive partition fanout is still PR 5. PR 5 lifts the PartitionCount > 1 dormancy gate from PR 2 in the same commit that wires the data-plane fanout. Self-review (per CLAUDE.md) 1. Data loss — routing layer only; no FSM/Pebble/retention path. No issue. 2. Concurrency — partitionResolver field is set once at startup before any request lands. ResolveGroup reads from a constructor- time defensive copy, so a hot-reload of --sqsFifoPartitionMap (a future capability) cannot perturb in-flight requests. No issue. 3. Performance — one map lookup + a 4-byte BigEndian decode per resolver hit, which only fires on partitioned-prefix matches. Engine-only path (the common case for non-partitioned clusters) adds a single `if s.partitionResolver != nil` branch — negligible. No issue. 4. Data consistency — the resolver's output strictly OVERRIDES the engine. A partitioned-FIFO key always reaches the operator-chosen partition group; legacy keys still flow through the engine. The resolver's "queue not found" / "partition out of range" branches return (0, false) so the router surfaces an error rather than silently mis-routing. No issue. 5. Test coverage — adapter/sqs_partition_resolver_test.go: 9 top-level tests (nil-on-empty, defensive-copy, partition dispatch across all 5 families, queue-name prefix isolation, legacy fall- through, unknown queue, out-of-range partition, nil receiver, prefix alignment). kv/shard_router_partition_test.go: 4 tests (resolver wins, engine fallthrough for unknown keys, nil resolver no-op, Get path also uses resolver). Existing TestShardRouter* tests unchanged. --- adapter/sqs_partition_resolver.go | 134 +++++++++++++++ adapter/sqs_partition_resolver_test.go | 223 +++++++++++++++++++++++++ kv/shard_router.go | 67 +++++++- kv/shard_router_partition_test.go | 179 ++++++++++++++++++++ kv/sharded_coordinator.go | 15 ++ main.go | 42 ++++- 6 files changed, 651 insertions(+), 9 deletions(-) create mode 100644 adapter/sqs_partition_resolver.go create mode 100644 adapter/sqs_partition_resolver_test.go create mode 100644 kv/shard_router_partition_test.go diff --git a/adapter/sqs_partition_resolver.go b/adapter/sqs_partition_resolver.go new file mode 100644 index 000000000..8d9c9df84 --- /dev/null +++ b/adapter/sqs_partition_resolver.go @@ -0,0 +1,134 @@ +package adapter + +import ( + "bytes" + "encoding/binary" +) + +// SQSPartitionResolver maps a partitioned-SQS key to the operator- +// chosen Raft group for the (queue, partition) tuple. Implements +// kv.PartitionResolver via duck typing — see the integration in +// main.go where the resolver is installed on ShardedCoordinator. +// +// The byte-range engine cannot route partitioned queues because +// adding per-partition routes would break its non-overlapping-cover +// invariant (a partition route for partition K of one queue would +// leave a gap for legacy keys that fall lexicographically between +// partitions K and K+1). The resolver-first dispatch path avoids +// this — it answers only for keys that match a partitioned family +// prefix and otherwise lets the engine handle dispatch. +type SQSPartitionResolver struct { + routes map[string][]uint64 +} + +// NewSQSPartitionResolver builds a resolver from the operator- +// supplied partition map. routes[queue][k] is the Raft group ID +// that owns partition k of queue, with len(routes[queue]) equal to +// the queue's PartitionCount. +// +// Returns nil when routes is empty so callers can keep the resolver +// out of the request path entirely on a non-partitioned cluster +// (kv.ShardRouter.WithPartitionResolver(nil) is a documented no-op). +// +// The constructor takes a defensive copy so a later caller mutation +// to the input map does not leak into the resolver's view at +// runtime. +func NewSQSPartitionResolver(routes map[string][]uint64) *SQSPartitionResolver { + if len(routes) == 0 { + return nil + } + cp := make(map[string][]uint64, len(routes)) + for queue, groups := range routes { + ids := make([]uint64, len(groups)) + copy(ids, groups) + cp[queue] = ids + } + return &SQSPartitionResolver{routes: cp} +} + +// sqsResolverFamilyPrefixes is the set of partitioned-SQS family +// prefixes ResolveGroup recognises. Kept package-internal so any +// future renamed prefix touches both this list and the constant +// declaration in sqs_keys.go — TestSQSPartitionResolver_PrefixesAlign +// pins the alignment. +var sqsResolverFamilyPrefixes = []string{ + SqsPartitionedMsgDataPrefix, + SqsPartitionedMsgVisPrefix, + SqsPartitionedMsgDedupPrefix, + SqsPartitionedMsgGroupPrefix, + SqsPartitionedMsgByAgePrefix, +} + +// ResolveGroup decodes the (queue, partition) embedded in a +// partitioned-SQS key and returns the operator-chosen Raft group. +// +// Returns (0, false) for any key that does not match a partitioned +// family prefix (legacy SQS, KV, S3, DynamoDB, queue-meta records, +// …) so kv.ShardRouter falls through to its byte-range engine for +// default routing. +func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool) { + if r == nil || len(key) == 0 { + return 0, false + } + queue, partition, ok := parsePartitionedSQSKey(key) + if !ok { + return 0, false + } + groups, found := r.routes[queue] + if !found { + return 0, false + } + // Defensive: a partition value outside the slice is a config / + // upstream-bug signal, not a routable key. Returning false + // surfaces it as "no route" at the router boundary, which is + // the correct fail-closed behaviour. + if uint64(partition) >= uint64(len(groups)) { + return 0, false + } + return groups[partition], true +} + +// parsePartitionedSQSKey extracts the (queue, partition) pair from +// a partitioned-SQS key. Returns ok=false for any key that does not +// match a partitioned family prefix or that has a malformed queue / +// partition segment. Exposed at package-internal scope so the +// adapter's reaper / fanout reader can share the same parser +// (Phase 3.D PR 5). +func parsePartitionedSQSKey(key []byte) (string, uint32, bool) { + rest, matched := stripPartitionedFamilyPrefix(key) + if !matched { + return "", 0, false + } + // After the family prefix, the variable-length encoded queue + // segment is terminated by '|' (sqsPartitionedQueueTerminator). + // base64.RawURLEncoding never emits '|', so the first '|' in + // rest is unambiguously the queue terminator. + pipeIdx := bytes.IndexByte(rest, sqsPartitionedQueueTerminator) + if pipeIdx <= 0 { + return "", 0, false + } + encQueue := rest[:pipeIdx] + rest = rest[pipeIdx+1:] + const partitionLen = 4 + if len(rest) < partitionLen { + return "", 0, false + } + partition := binary.BigEndian.Uint32(rest[:partitionLen]) + queue, err := decodeSQSSegment(string(encQueue)) + if err != nil { + return "", 0, false + } + return queue, partition, true +} + +// stripPartitionedFamilyPrefix returns the bytes after the matched +// family prefix. matched=false if key has none of the known +// partitioned family prefixes. +func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) { + for _, prefix := range sqsResolverFamilyPrefixes { + if bytes.HasPrefix(key, []byte(prefix)) { + return key[len(prefix):], true + } + } + return nil, false +} diff --git a/adapter/sqs_partition_resolver_test.go b/adapter/sqs_partition_resolver_test.go new file mode 100644 index 000000000..ebfd252d6 --- /dev/null +++ b/adapter/sqs_partition_resolver_test.go @@ -0,0 +1,223 @@ +package adapter + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestNewSQSPartitionResolver_NilOnEmpty pins the constructor's +// nil-on-empty contract — kv.ShardRouter.WithPartitionResolver(nil) +// is documented as a no-op, so a non-partitioned cluster MUST end +// up with no resolver in the request hot path. +func TestNewSQSPartitionResolver_NilOnEmpty(t *testing.T) { + t.Parallel() + require.Nil(t, NewSQSPartitionResolver(nil)) + require.Nil(t, NewSQSPartitionResolver(map[string][]uint64{})) +} + +// TestNewSQSPartitionResolver_DefensiveCopy pins that mutating the +// caller's input map after construction does NOT change the +// resolver's view. Without this, a hot-reload of --sqsFifoPartitionMap +// would partially alter the resolver mid-request and produce +// transient mis-routes. +func TestNewSQSPartitionResolver_DefensiveCopy(t *testing.T) { + t.Parallel() + input := map[string][]uint64{"q.fifo": {10, 11}} + r := NewSQSPartitionResolver(input) + // Mutate after construction. + input["q.fifo"][0] = 999 + delete(input, "q.fifo") + + key := sqsPartitionedMsgDataKey("q.fifo", 0, 1, "msg") + gid, ok := r.ResolveGroup(key) + require.True(t, ok) + require.Equal(t, uint64(10), gid, + "resolver must keep its constructor-time view; "+ + "caller mutation cannot leak in") +} + +// TestSQSPartitionResolver_ResolveByPartition pins the core dispatch +// contract: a key produced by sqsPartitionedMsg*Key for (queue, +// partition) resolves to the operator-chosen group for that exact +// partition. +func TestSQSPartitionResolver_ResolveByPartition(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "orders.fifo": {10, 11, 12, 13}, + "events.fifo": {20, 21}, + }) + cases := []struct { + name string + queue string + partition uint32 + wantGroup uint64 + }{ + {"orders p0", "orders.fifo", 0, 10}, + {"orders p1", "orders.fifo", 1, 11}, + {"orders p3", "orders.fifo", 3, 13}, + {"events p0", "events.fifo", 0, 20}, + {"events p1", "events.fifo", 1, 21}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + // Every family must resolve to the same group for the + // same (queue, partition) — the reaper / fanout reader + // / send path all depend on this invariant. + for familyName, key := range map[string][]byte{ + "data": sqsPartitionedMsgDataKey(tc.queue, tc.partition, 1, "msg-id"), + "vis": sqsPartitionedMsgVisKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"), + "dedup": sqsPartitionedMsgDedupKey(tc.queue, tc.partition, 1, "dedup-id"), + "group": sqsPartitionedMsgGroupKey(tc.queue, tc.partition, 1, "group-id"), + "byage": sqsPartitionedMsgByAgeKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"), + } { + gid, ok := r.ResolveGroup(key) + require.True(t, ok, "family %s key for (%s, p%d) must resolve", + familyName, tc.queue, tc.partition) + require.Equal(t, tc.wantGroup, gid, + "family %s key for (%s, p%d) resolved to wrong group", + familyName, tc.queue, tc.partition) + } + }) + } +} + +// TestSQSPartitionResolver_QueueIsolation pins the queue-name +// terminator contract from the routing layer's perspective. Two +// queues with overlapping encoded prefixes (base64("queue") is a +// strict prefix of base64("queue1")) MUST resolve to their own +// groups, not each other's. The '|' terminator after the queue +// segment is what makes this safe — same invariant +// TestSqsPartitionedMsgKeys_QueueNamePrefixIsolation pins for the +// keyspace itself. +func TestSQSPartitionResolver_QueueIsolation(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "queue": {500}, + "queue1": {600}, + }) + queueKey := sqsPartitionedMsgDataKey("queue", 0, 1, "id") + queue1Key := sqsPartitionedMsgDataKey("queue1", 0, 1, "id") + + queueGID, ok := r.ResolveGroup(queueKey) + require.True(t, ok) + require.Equal(t, uint64(500), queueGID) + + queue1GID, ok := r.ResolveGroup(queue1Key) + require.True(t, ok) + require.Equal(t, uint64(600), queue1GID, + "queue1 keys must NOT resolve to queue's group; "+ + "the '|' terminator after the queue segment is what makes this safe") +} + +// TestSQSPartitionResolver_LegacyKeyFallsThrough pins that a non- +// partitioned (legacy) SQS key returns (0, false) — the +// kv.ShardRouter caller then falls through to the byte-range +// engine for default routing. A regression here would route every +// legacy queue's traffic to whichever group the resolver guessed +// from its partition map. +func TestSQSPartitionResolver_LegacyKeyFallsThrough(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "orders.fifo": {10, 11}, + }) + cases := []struct { + name string + key []byte + }{ + {"legacy data", sqsMsgDataKey("orders.fifo", 1, "id")}, + {"legacy vis", sqsMsgVisKey("orders.fifo", 1, 1700000000000, "id")}, + {"legacy dedup", sqsMsgDedupKey("orders.fifo", 1, "dedup")}, + {"legacy group", sqsMsgGroupKey("orders.fifo", 1, "group")}, + {"legacy byage", sqsMsgByAgeKey("orders.fifo", 1, 1700000000000, "id")}, + {"queue meta", sqsQueueMetaKey("orders.fifo")}, + {"non-sqs key", []byte("/some/other/key")}, + {"empty", []byte{}}, + {"nil", nil}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + gid, ok := r.ResolveGroup(tc.key) + require.False(t, ok, "non-partitioned key must NOT resolve") + require.Equal(t, uint64(0), gid) + }) + } +} + +// TestSQSPartitionResolver_UnknownQueueFallsThrough pins that a +// well-formed partitioned key for a queue that is NOT in the +// partition map returns (0, false). This shape can only happen if +// the cluster lost partition-map entries between writer and reader +// (an operational misconfiguration); routing through the byte-range +// engine is the correct fail-closed behaviour. +func TestSQSPartitionResolver_UnknownQueueFallsThrough(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "known.fifo": {10, 11}, + }) + key := sqsPartitionedMsgDataKey("unknown.fifo", 0, 1, "id") + gid, ok := r.ResolveGroup(key) + require.False(t, ok) + require.Equal(t, uint64(0), gid) +} + +// TestSQSPartitionResolver_OutOfRangePartitionFallsThrough pins +// that a partition value beyond the configured PartitionCount +// returns (0, false). Same fail-closed argument as the unknown- +// queue case — if a writer produced a key with partition 4 but the +// resolver's view only has partitions [0, 3], routing through the +// engine surfaces the disagreement at the request boundary. +func TestSQSPartitionResolver_OutOfRangePartitionFallsThrough(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "q.fifo": {10, 11}, + }) + // PartitionCount in resolver is 2 (partitions 0 and 1); craft a + // key for partition 5 to simulate a writer at a different + // partition count. + key := sqsPartitionedMsgDataKey("q.fifo", 5, 1, "id") + gid, ok := r.ResolveGroup(key) + require.False(t, ok) + require.Equal(t, uint64(0), gid) +} + +// TestSQSPartitionResolver_NilReceiverIsSafe pins defensive +// behaviour — a nil resolver pointer must not panic on +// ResolveGroup. kv.ShardRouter's resolver-first dispatch checks +// `if s.partitionResolver != nil` before calling, but a typed-nil +// can still slip through interface assertions. +func TestSQSPartitionResolver_NilReceiverIsSafe(t *testing.T) { + t.Parallel() + var r *SQSPartitionResolver + gid, ok := r.ResolveGroup([]byte("any-key")) + require.False(t, ok) + require.Equal(t, uint64(0), gid) +} + +// TestSQSPartitionResolver_PrefixesAlign pins that the resolver's +// family-prefix list matches the constants in sqs_keys.go exactly. +// A future renamed prefix or added family that touches sqs_keys.go +// without also touching sqsResolverFamilyPrefixes would silently +// stop resolving keys for the new family. +func TestSQSPartitionResolver_PrefixesAlign(t *testing.T) { + t.Parallel() + want := []string{ + SqsPartitionedMsgDataPrefix, + SqsPartitionedMsgVisPrefix, + SqsPartitionedMsgDedupPrefix, + SqsPartitionedMsgGroupPrefix, + SqsPartitionedMsgByAgePrefix, + } + require.Equal(t, want, sqsResolverFamilyPrefixes, + "sqsResolverFamilyPrefixes must mirror the constants in "+ + "sqs_keys.go — a new partitioned family added there must "+ + "be added here too, or the resolver silently stops "+ + "matching keys in that family") + for _, p := range sqsResolverFamilyPrefixes { + require.True(t, strings.HasSuffix(p, "p|"), + "every partitioned prefix must end with the p| discriminator") + } +} diff --git a/kv/shard_router.go b/kv/shard_router.go index 0b99f1673..22699927f 100644 --- a/kv/shard_router.go +++ b/kv/shard_router.go @@ -10,6 +10,22 @@ import ( "github.com/cockroachdb/errors" ) +// PartitionResolver maps a key to its owning Raft group when the +// key belongs to a partition-scheme keyspace (e.g. SQS HT-FIFO, +// where each (queue, partition) pair lives on a different group). +// ShardRouter consults the resolver before falling through to the +// byte-range engine, so partition routing can override the default +// shard-range layout without breaking the engine's non-overlapping- +// cover invariant. +// +// Implementations must be safe for concurrent use — ResolveGroup is +// called on the request hot path. Returning (0, false) for a key +// the resolver does not recognise lets the router fall through to +// the engine. +type PartitionResolver interface { + ResolveGroup(key []byte) (uint64, bool) +} + // ShardRouter routes requests to multiple raft groups based on key ranges. // // Cross-shard transactions are not supported. They require distributed @@ -17,9 +33,10 @@ import ( // // Non-transactional request batches may still partially succeed across shards. type ShardRouter struct { - engine *distribution.Engine - mu sync.RWMutex - groups map[uint64]*routerGroup + engine *distribution.Engine + partitionResolver PartitionResolver + mu sync.RWMutex + groups map[uint64]*routerGroup } var ErrCrossShardTransactionNotSupported = errors.New("cross-shard transactions are not supported") @@ -37,6 +54,40 @@ func NewShardRouter(e *distribution.Engine) *ShardRouter { } } +// WithPartitionResolver installs a partition-keyspace resolver that +// is consulted before the byte-range engine on every dispatch. A +// nil resolver clears any previously-installed resolver. Returns +// the receiver so callers can chain. +// +// Setting the resolver is idempotent — re-installing the same value +// is a no-op. Concurrent reads against ResolveGroup remain safe +// because both the read in resolveGroup and the assignment here +// happen against the same field; routine startup wires the resolver +// once before any request lands, so the rare write does not need a +// lock. +func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter { + s.partitionResolver = r + return s +} + +// resolveGroup tries the partition resolver first (when installed), +// then falls through to the byte-range engine. Returns the resolved +// Raft group ID and a found flag; (0, false) means no route in either +// the resolver or the engine — the caller surfaces this as an +// "unknown group" error. +func (s *ShardRouter) resolveGroup(key []byte) (uint64, bool) { + if s.partitionResolver != nil { + if gid, ok := s.partitionResolver.ResolveGroup(key); ok { + return gid, true + } + } + route, ok := s.engine.GetRoute(key) + if !ok { + return 0, false + } + return route.GroupID, true +} + // Register associates a raft group ID with its transactional manager and store. func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore) { s.mu.Lock() @@ -127,24 +178,24 @@ func (s *ShardRouter) groupRequests(reqs []*pb.Request) (map[uint64][]*pb.Reques if len(key) == 0 { return nil, ErrInvalidRequest } - route, ok := s.engine.GetRoute(key) + gid, ok := s.resolveGroup(key) if !ok { return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", key) } - batches[route.GroupID] = append(batches[route.GroupID], r) + batches[gid] = append(batches[gid], r) } return batches, nil } // Get retrieves a key routed to the correct shard. func (s *ShardRouter) Get(ctx context.Context, key []byte) ([]byte, error) { - route, ok := s.engine.GetRoute(routeKey(key)) + gid, ok := s.resolveGroup(routeKey(key)) if !ok { return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", key) } - g, ok := s.getGroup(route.GroupID) + g, ok := s.getGroup(gid) if !ok { - return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", route.GroupID) + return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid) } v, err := g.store.GetAt(ctx, key, ^uint64(0)) if err != nil { diff --git a/kv/shard_router_partition_test.go b/kv/shard_router_partition_test.go new file mode 100644 index 000000000..1f653c308 --- /dev/null +++ b/kv/shard_router_partition_test.go @@ -0,0 +1,179 @@ +package kv + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/bootjp/elastickv/distribution" + pb "github.com/bootjp/elastickv/proto" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// fakePartitionResolver is a hand-rolled PartitionResolver for the +// router-side tests — keeps the tests free of any adapter-package +// dependency so the routing contract is verified at the kv layer in +// isolation. +type fakePartitionResolver struct { + routes map[string]uint64 +} + +func (f *fakePartitionResolver) ResolveGroup(key []byte) (uint64, bool) { + gid, ok := f.routes[string(key)] + return gid, ok +} + +// TestShardRouter_PartitionResolverWins pins that when the resolver +// claims a key, the router dispatches to the resolver's group even +// if the byte-range engine would have routed elsewhere. This is the +// whole point of the resolver — overlay routing on top of the +// existing non-overlapping cover model. +func TestShardRouter_PartitionResolverWins(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 1) // engine routes everything to group 1 + + router := NewShardRouter(e) + router.WithPartitionResolver(&fakePartitionResolver{ + routes: map[string]uint64{"resolver-key": 42}, + }) + + // Per-test sink so a parallel sibling test cannot perturb the + // invariant we are checking. Each fakeTxn writes its own id + // into this slot on Commit; the post-condition reads it back. + var sink atomic.Uint64 + s1 := store.NewMVCCStore() + s42 := store.NewMVCCStore() + router.Register(1, &fakeTxn{id: 1, sink: &sink}, s1) + router.Register(42, &fakeTxn{id: 42, sink: &sink}, s42) + + reqs := []*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("resolver-key"), Value: []byte("v")}}}, + } + resp, err := router.Commit(reqs) + require.NoError(t, err) + require.NotNil(t, resp) + // Verify: the request landed on group 42's fake txn, not 1's. + require.Equal(t, uint64(42), sink.Load()) +} + +// TestShardRouter_PartitionResolverFallsThrough pins that when the +// resolver returns (0, false), dispatch falls through to the byte- +// range engine. Without this, the resolver would have to know +// every key in the cluster — which would defeat the overlay +// pattern's purpose (let the resolver answer only for partitioned- +// keyspace keys). +func TestShardRouter_PartitionResolverFallsThrough(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte("a"), []byte("m"), 1) + e.UpdateRoute([]byte("m"), nil, 2) + + router := NewShardRouter(e) + // Resolver only knows about "resolver-only-key"; everything + // else falls through to the engine. + router.WithPartitionResolver(&fakePartitionResolver{ + routes: map[string]uint64{"resolver-only-key": 99}, + }) + + var sink atomic.Uint64 + s1 := store.NewMVCCStore() + s2 := store.NewMVCCStore() + router.Register(1, &fakeTxn{id: 1, sink: &sink}, s1) + router.Register(2, &fakeTxn{id: 2, sink: &sink}, s2) + + // "b" is in the engine's [a, m) range → group 1. + resp1, err1 := router.Commit([]*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v")}}}, + }) + require.NoError(t, err1) + require.NotNil(t, resp1) + require.Equal(t, uint64(1), sink.Load(), + "engine [a,m) range must route to group 1") + + // "x" is in the engine's [m, ∞) range → group 2. + resp2, err2 := router.Commit([]*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v")}}}, + }) + require.NoError(t, err2) + require.NotNil(t, resp2) + require.Equal(t, uint64(2), sink.Load(), + "engine [m,∞) range must route to group 2") +} + +// TestShardRouter_NilPartitionResolverIsNoOp pins that +// WithPartitionResolver(nil) leaves the router behaving exactly as +// the legacy engine-only dispatcher. This is the documented "no +// partition layer" path that a non-partitioned cluster takes. +func TestShardRouter_NilPartitionResolverIsNoOp(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 7) + + router := NewShardRouter(e) + router.WithPartitionResolver(nil) + + var sink atomic.Uint64 + s7 := store.NewMVCCStore() + router.Register(7, &fakeTxn{id: 7, sink: &sink}, s7) + + // With no resolver installed, the engine's default route owns + // the request — group 7 dispatches. + resp, err := router.Commit([]*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("any"), Value: []byte("v")}}}, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, uint64(7), sink.Load(), + "nil resolver must leave the engine in charge") +} + +// TestShardRouter_GetUsesResolver pins that the resolver-first path +// applies to Get as well as Commit/Abort. A regression that fixed +// only Commit's path would silently route reads through the engine +// even after the resolver claimed the key. +func TestShardRouter_GetUsesResolver(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 1) + + router := NewShardRouter(e) + router.WithPartitionResolver(&fakePartitionResolver{ + routes: map[string]uint64{"resolver-key": 42}, + }) + + var sink atomic.Uint64 + s1 := store.NewMVCCStore() + s42 := store.NewMVCCStore() + // Seed group 42 only — if Get falls through to the engine + // (which would route to group 1), the test fails because + // group 1's store is empty. + require.NoError(t, s42.PutAt(context.Background(), []byte("resolver-key"), []byte("v"), 1, 0)) + router.Register(1, &fakeTxn{id: 1, sink: &sink}, s1) + router.Register(42, &fakeTxn{id: 42, sink: &sink}, s42) + + v, err := router.Get(context.Background(), []byte("resolver-key")) + require.NoError(t, err) + require.Equal(t, []byte("v"), v) +} + +// fakeTxn is a Transactional double that records its own id into a +// caller-provided sink whenever Commit lands on it. Using a per- +// test sink (rather than a package-level variable) keeps parallel +// tests from clobbering each other's observations. +type fakeTxn struct { + id uint64 + sink *atomic.Uint64 +} + +func (f *fakeTxn) Commit(reqs []*pb.Request) (*TransactionResponse, error) { + if f.sink != nil { + f.sink.Store(f.id) + } + return &TransactionResponse{CommitIndex: 1}, nil +} + +func (f *fakeTxn) Abort(reqs []*pb.Request) (*TransactionResponse, error) { + return &TransactionResponse{}, nil +} diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index cc19bece0..878e335e3 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -171,6 +171,21 @@ func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator { return c } +// WithPartitionResolver wires a PartitionResolver onto the +// coordinator's underlying ShardRouter. The resolver runs before +// the byte-range engine on every dispatch, so partition-keyspace +// schemes (e.g. SQS HT-FIFO) can override the default shard layout +// without breaking the engine's non-overlapping-cover invariant. +// +// Applied after construction for the same reason as the other +// With* options on this type — NewShardedCoordinator is already +// heavily overloaded. Passing a nil resolver clears any previously- +// installed resolver. +func (c *ShardedCoordinator) WithPartitionResolver(r PartitionResolver) *ShardedCoordinator { + c.router.WithPartitionResolver(r) + return c +} + // NewShardedCoordinator builds a coordinator for the provided shard groups. // The defaultGroup is used for non-keyed leader checks. func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator { diff --git a/main.go b/main.go index 3169b09d6..286d0bf81 100644 --- a/main.go +++ b/main.go @@ -318,7 +318,8 @@ func run() error { sampler := buildKeyVizSampler() coordinate := kv.NewShardedCoordinator(cfg.engine, shardGroups, cfg.defaultGroup, clock, shardStore). WithLeaseReadObserver(metricsRegistry.LeaseReadObserver()). - WithSampler(keyVizSamplerForCoordinator(sampler)) + WithSampler(keyVizSamplerForCoordinator(sampler)). + WithPartitionResolver(buildSQSPartitionResolver(cfg.sqsFifoPartitionMap)) distCatalog, err := setupDistributionCatalog(ctx, runtimes, cfg.engine) if err != nil { return err @@ -477,6 +478,45 @@ func buildLeaderSQS(groups []groupSpec, sqsAddr string, raftSqsMap string) (map[ return buildLeaderAddrMap(groups, sqsAddr, raftSqsMap, parseRaftSQSMap) } +// buildSQSPartitionResolver flattens the operator-supplied partition +// map into the {queue → []groupID} shape adapter consumes and +// returns a ResolveGroup-capable resolver. Returns nil on an empty +// map so the coordinator's resolver field stays unset on a non- +// partitioned cluster — kv.ShardRouter.WithPartitionResolver(nil) +// is a documented no-op, so the request hot path keeps the existing +// engine-only dispatch. +// +// The group-reference parsing here cannot fail in practice because +// parseSQSFifoGroupList already canonicalized each entry as a +// uint64 string at flag-parse time; the conversion is repeated +// defensively so a future caller that bypasses parseSQSFifoGroupList +// (e.g. a test seeding the map programmatically) gets a clear panic +// instead of a silent route-to-group-zero. +func buildSQSPartitionResolver(partitionMap map[string]sqsFifoQueueRouting) *adapter.SQSPartitionResolver { + if len(partitionMap) == 0 { + return nil + } + flat := make(map[string][]uint64, len(partitionMap)) + for queue, routing := range partitionMap { + ids := make([]uint64, 0, len(routing.groups)) + for _, groupRef := range routing.groups { + id, err := strconv.ParseUint(groupRef, 10, 64) + if err != nil { + // parseSQSFifoGroupList canonicalized this; a + // non-uint64 string here means a programmer skipped + // the validator. Panic loudly rather than silently + // route to group 0. + panic(errors.Wrapf(err, + "queue %q: bypassed group-ref canonicalisation, %q is not uint64", + queue, groupRef)) + } + ids = append(ids, id) + } + flat[queue] = ids + } + return adapter.NewSQSPartitionResolver(flat) +} + // buildSQSFifoPartitionMap parses and validates the // --sqsFifoPartitionMap flag against the configured Raft groups. // Extracted from parseRuntimeConfig so that function stays under the From 8bbfcb94abb5cc0367743d98f9b8aca5963d9b0f Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 18:22:17 +0900 Subject: [PATCH 2/8] fix(sqs): resolver runs on raw key + coordinator helpers consult resolver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #715 round 1 review caught two correctness bugs and two minor items. This commit addresses all four. 1) P1 (Codex): routeKey collapsed every !sqs|... key to !sqs|route|global before resolveGroup ran, so the resolver never saw a partitioned-prefix key — partitioned-FIFO traffic silently fell through to the engine's default group. ResolveGroup now runs on the RAW key. The fallback to the engine still uses routeKey-normalised input (so the engine's per-shard layout is unchanged for legacy traffic). New regression test TestShardRouter_ResolverSeesRawKeyNotNormalized uses a recording resolver to assert the raw key reaches it. 2) HIGH (Gemini): ShardedCoordinator's per-key helpers (groupForKey, routeAndGroupForKey, engineGroupIDForKey, groupMutations) called c.engine.GetRoute directly, bypassing the resolver. This left 2PC + read paths with unrouted partitioned traffic. Each helper now consults c.router.ResolveGroup for the gid. The engine is queried separately for the catalog RouteID (keyviz observation) — the resolver is opaque to keyviz today; partition-aware heatmap is a Phase 3.D follow-up. 3) MEDIUM (Gemini, perf): sqsResolverFamilyPrefixes was []string, forcing []byte(prefix) per check on the request hot path. Pre-converted to [][]byte; stripPartitionedFamilyPrefix loops over byte slices directly. 4) MEDIUM (Gemini, doc): WithPartitionResolver claimed concurrent reads were safe, which is incorrect — interface assignment is not atomic in Go. Comment now reflects the startup-only intent. ResolveGroup is now exported so ShardedCoordinator's helpers can share the resolver-first dispatch path with Commit / Abort / Get. --- adapter/sqs_partition_resolver.go | 20 ++++--- adapter/sqs_partition_resolver_test.go | 14 ++--- kv/shard_router.go | 59 +++++++++++++------- kv/shard_router_partition_test.go | 77 ++++++++++++++++++++++++++ kv/sharded_coordinator.go | 39 +++++++++---- 5 files changed, 164 insertions(+), 45 deletions(-) diff --git a/adapter/sqs_partition_resolver.go b/adapter/sqs_partition_resolver.go index 8d9c9df84..5774a0d7c 100644 --- a/adapter/sqs_partition_resolver.go +++ b/adapter/sqs_partition_resolver.go @@ -47,16 +47,18 @@ func NewSQSPartitionResolver(routes map[string][]uint64) *SQSPartitionResolver { } // sqsResolverFamilyPrefixes is the set of partitioned-SQS family -// prefixes ResolveGroup recognises. Kept package-internal so any -// future renamed prefix touches both this list and the constant +// prefixes ResolveGroup recognises. Pre-converted to []byte so the +// hot-path bytes.HasPrefix call avoids an allocation per check +// (gemini medium on PR #715). Kept package-internal so any future +// renamed prefix touches both this list and the constant // declaration in sqs_keys.go — TestSQSPartitionResolver_PrefixesAlign // pins the alignment. -var sqsResolverFamilyPrefixes = []string{ - SqsPartitionedMsgDataPrefix, - SqsPartitionedMsgVisPrefix, - SqsPartitionedMsgDedupPrefix, - SqsPartitionedMsgGroupPrefix, - SqsPartitionedMsgByAgePrefix, +var sqsResolverFamilyPrefixes = [][]byte{ + []byte(SqsPartitionedMsgDataPrefix), + []byte(SqsPartitionedMsgVisPrefix), + []byte(SqsPartitionedMsgDedupPrefix), + []byte(SqsPartitionedMsgGroupPrefix), + []byte(SqsPartitionedMsgByAgePrefix), } // ResolveGroup decodes the (queue, partition) embedded in a @@ -126,7 +128,7 @@ func parsePartitionedSQSKey(key []byte) (string, uint32, bool) { // partitioned family prefixes. func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) { for _, prefix := range sqsResolverFamilyPrefixes { - if bytes.HasPrefix(key, []byte(prefix)) { + if bytes.HasPrefix(key, prefix) { return key[len(prefix):], true } } diff --git a/adapter/sqs_partition_resolver_test.go b/adapter/sqs_partition_resolver_test.go index ebfd252d6..a693f1949 100644 --- a/adapter/sqs_partition_resolver_test.go +++ b/adapter/sqs_partition_resolver_test.go @@ -204,12 +204,12 @@ func TestSQSPartitionResolver_NilReceiverIsSafe(t *testing.T) { // stop resolving keys for the new family. func TestSQSPartitionResolver_PrefixesAlign(t *testing.T) { t.Parallel() - want := []string{ - SqsPartitionedMsgDataPrefix, - SqsPartitionedMsgVisPrefix, - SqsPartitionedMsgDedupPrefix, - SqsPartitionedMsgGroupPrefix, - SqsPartitionedMsgByAgePrefix, + want := [][]byte{ + []byte(SqsPartitionedMsgDataPrefix), + []byte(SqsPartitionedMsgVisPrefix), + []byte(SqsPartitionedMsgDedupPrefix), + []byte(SqsPartitionedMsgGroupPrefix), + []byte(SqsPartitionedMsgByAgePrefix), } require.Equal(t, want, sqsResolverFamilyPrefixes, "sqsResolverFamilyPrefixes must mirror the constants in "+ @@ -217,7 +217,7 @@ func TestSQSPartitionResolver_PrefixesAlign(t *testing.T) { "be added here too, or the resolver silently stops "+ "matching keys in that family") for _, p := range sqsResolverFamilyPrefixes { - require.True(t, strings.HasSuffix(p, "p|"), + require.True(t, strings.HasSuffix(string(p), "p|"), "every partitioned prefix must end with the p| discriminator") } } diff --git a/kv/shard_router.go b/kv/shard_router.go index 22699927f..f500be6af 100644 --- a/kv/shard_router.go +++ b/kv/shard_router.go @@ -59,29 +59,50 @@ func NewShardRouter(e *distribution.Engine) *ShardRouter { // nil resolver clears any previously-installed resolver. Returns // the receiver so callers can chain. // -// Setting the resolver is idempotent — re-installing the same value -// is a no-op. Concurrent reads against ResolveGroup remain safe -// because both the read in resolveGroup and the assignment here -// happen against the same field; routine startup wires the resolver -// once before any request lands, so the rare write does not need a -// lock. +// Intended for use during startup, before the router begins handling +// requests. Interface assignment in Go is not atomic, so a call that +// races with a concurrent ResolveGroup in resolveGroup may produce a +// torn read; callers must wire the resolver once during construction +// (parseRuntimeConfig → NewShardedCoordinator → WithPartitionResolver) +// and treat any post-startup re-assignment as undefined behaviour. func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter { s.partitionResolver = r return s } -// resolveGroup tries the partition resolver first (when installed), -// then falls through to the byte-range engine. Returns the resolved -// Raft group ID and a found flag; (0, false) means no route in either -// the resolver or the engine — the caller surfaces this as an -// "unknown group" error. -func (s *ShardRouter) resolveGroup(key []byte) (uint64, bool) { +// ResolveGroup tries the partition resolver first (when installed), +// then falls through to the byte-range engine. Exposed at package +// scope so ShardedCoordinator's per-key helpers (groupForKey, +// routeAndGroupForKey, engineGroupIDForKey, groupMutations) can +// consult the same dispatch path Commit / Abort / Get use — without +// it those helpers would bypass the resolver and partitioned-FIFO +// traffic would silently mis-route through 2PC and the read paths. +// +// The resolver runs on the RAW key before any user-key +// normalization. SQS keys in particular are collapsed to +// !sqs|route|global by routeKey to keep the engine's per-shard +// layout simple, but that collapse hides the partitioned-prefix +// information the resolver needs (issue: codex P1 / gemini high on +// PR #715). The engine still sees the post-normalization key, so +// legacy routing (catalog → !sqs|route|global → default group) +// stays unchanged. +// +// Returns (0, false) when neither the resolver nor the engine +// recognises the key. Caller surfaces this as an "unknown group" +// error so a partitioned-prefix key whose queue is missing from the +// resolver map fails closed rather than landing on whichever +// engine-default group happens to cover the raw bytes. +func (s *ShardRouter) ResolveGroup(rawKey []byte) (uint64, bool) { if s.partitionResolver != nil { - if gid, ok := s.partitionResolver.ResolveGroup(key); ok { + if gid, ok := s.partitionResolver.ResolveGroup(rawKey); ok { return gid, true } } - route, ok := s.engine.GetRoute(key) + // Engine routes against the user-key view of the byte-range + // space; routeKey may rewrite SQS / DynamoDB / Redis-internal + // keys to a stable per-table or per-namespace route key so the + // engine sees one route per logical entity. + route, ok := s.engine.GetRoute(routeKey(rawKey)) if !ok { return 0, false } @@ -174,13 +195,13 @@ func (s *ShardRouter) groupRequests(reqs []*pb.Request) (map[uint64][]*pb.Reques if len(r.Mutations) == 0 || r.Mutations[0] == nil { return nil, ErrInvalidRequest } - key := routeKey(r.Mutations[0].Key) - if len(key) == 0 { + rawKey := r.Mutations[0].Key + if len(rawKey) == 0 { return nil, ErrInvalidRequest } - gid, ok := s.resolveGroup(key) + gid, ok := s.ResolveGroup(rawKey) if !ok { - return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", key) + return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", rawKey) } batches[gid] = append(batches[gid], r) } @@ -189,7 +210,7 @@ func (s *ShardRouter) groupRequests(reqs []*pb.Request) (map[uint64][]*pb.Reques // Get retrieves a key routed to the correct shard. func (s *ShardRouter) Get(ctx context.Context, key []byte) ([]byte, error) { - gid, ok := s.resolveGroup(routeKey(key)) + gid, ok := s.ResolveGroup(key) if !ok { return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", key) } diff --git a/kv/shard_router_partition_test.go b/kv/shard_router_partition_test.go index 1f653c308..ea13983ad 100644 --- a/kv/shard_router_partition_test.go +++ b/kv/shard_router_partition_test.go @@ -1,7 +1,9 @@ package kv import ( + "bytes" "context" + "sync" "sync/atomic" "testing" @@ -129,6 +131,51 @@ func TestShardRouter_NilPartitionResolverIsNoOp(t *testing.T) { "nil resolver must leave the engine in charge") } +// TestShardRouter_ResolverSeesRawKeyNotNormalized pins the codex-P1 +// fix on PR #715: resolveGroup MUST consult the resolver with the +// pre-normalization (raw) key, not the post-normalization key. SQS +// keys in particular collapse to !sqs|route|global via routeKey, so +// a resolver that only saw the normalized form would never match +// any partitioned-prefix key — the resolver would be a no-op for +// every Commit/Abort/Get on a partitioned-FIFO queue. +// +// The fake resolver here records every key it was asked about; the +// post-condition asserts the recorded key is the raw key the caller +// supplied, not whatever routeKey normalized it to. +func TestShardRouter_ResolverSeesRawKeyNotNormalized(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 1) + + router := NewShardRouter(e) + + rawKey := []byte("!sqs|msg|data|p|raw-test-key") + resolver := &recordingResolver{match: rawKey, gid: 42} + router.WithPartitionResolver(resolver) + + var sink atomic.Uint64 + router.Register(1, &fakeTxn{id: 1, sink: &sink}, store.NewMVCCStore()) + router.Register(42, &fakeTxn{id: 42, sink: &sink}, store.NewMVCCStore()) + + resp, err := router.Commit([]*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: rawKey, Value: []byte("v")}}}, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, uint64(42), sink.Load(), + "resolver must claim the raw partitioned-prefix key; "+ + "if the router collapsed it via routeKey first, this "+ + "would route to the engine's default group (1)") + + // The resolver MUST have been called with the raw key, not the + // !sqs|route|global collapse. This is the codex P1 invariant. + seen := resolver.seenKeys() + require.Len(t, seen, 1) + require.Equal(t, rawKey, seen[0], + "resolver received normalized key — routeKey ran before "+ + "resolver, contrary to the §3.D PR 4-B-2 design") +} + // TestShardRouter_GetUsesResolver pins that the resolver-first path // applies to Get as well as Commit/Abort. A regression that fixed // only Commit's path would silently route reads through the engine @@ -158,6 +205,36 @@ func TestShardRouter_GetUsesResolver(t *testing.T) { require.Equal(t, []byte("v"), v) } +// recordingResolver is a PartitionResolver that records every key +// it was asked about and returns gid for keys equal to match. The +// recorded list is the test's evidence that the router consulted +// the resolver with the raw (pre-routeKey-normalisation) key. +type recordingResolver struct { + match []byte + gid uint64 + mu sync.Mutex + seen [][]byte +} + +func (r *recordingResolver) ResolveGroup(key []byte) (uint64, bool) { + r.mu.Lock() + defer r.mu.Unlock() + keyCopy := append([]byte(nil), key...) + r.seen = append(r.seen, keyCopy) + if bytes.Equal(key, r.match) { + return r.gid, true + } + return 0, false +} + +func (r *recordingResolver) seenKeys() [][]byte { + r.mu.Lock() + defer r.mu.Unlock() + out := make([][]byte, len(r.seen)) + copy(out, r.seen) + return out +} + // fakeTxn is a Transactional double that records its own id into a // caller-provided sink whenever Commit lands on it. Using a per- // test sink (rather than a package-level variable) keeps parallel diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 878e335e3..49c61522e 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -845,11 +845,11 @@ func (c *ShardedCoordinator) Clock() *HLC { } func (c *ShardedCoordinator) groupForKey(key []byte) (*ShardGroup, bool) { - route, ok := c.engine.GetRoute(routeKey(key)) + gid, ok := c.router.ResolveGroup(key) if !ok { return nil, false } - g, ok := c.groups[route.GroupID] + g, ok := c.groups[gid] return g, ok } @@ -859,24 +859,36 @@ func (c *ShardedCoordinator) groupForKey(key []byte) (*ShardGroup, bool) { // Leadership-only callers (IsLeaderForKey / VerifyLeaderForKey / // RaftLeaderForKey) keep using groupForKey because they don't need // the route ID. +// +// The gid comes from the partition-aware router (resolver-first, +// engine-fallback) so partitioned-FIFO traffic lands on the +// operator-chosen group. RouteID is read from the engine on the +// normalized key; the resolver does not have a notion of catalog +// RouteID, so partition-resolved keys observe under the engine's +// catalog RouteID for !sqs|route|global. Partition-aware keyviz +// is a Phase 3.D follow-up. func (c *ShardedCoordinator) routeAndGroupForKey(key []byte) (uint64, *ShardGroup, bool) { - route, ok := c.engine.GetRoute(routeKey(key)) + gid, ok := c.router.ResolveGroup(key) if !ok { return 0, nil, false } - g, ok := c.groups[route.GroupID] + g, ok := c.groups[gid] if !ok { return 0, nil, false } - return route.RouteID, g, true + var routeID uint64 + if route, found := c.engine.GetRoute(routeKey(key)); found { + routeID = route.RouteID + } + return routeID, g, true } func (c *ShardedCoordinator) engineGroupIDForKey(key []byte) uint64 { - route, ok := c.engine.GetRoute(routeKey(key)) + gid, ok := c.router.ResolveGroup(key) if !ok { return 0 } - return route.GroupID + return gid } func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) map[uint64][][]byte { @@ -1052,12 +1064,19 @@ func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb. return nil, nil, ErrInvalidRequest } mut := elemToMutation(req) - route, ok := c.engine.GetRoute(routeKey(mut.Key)) + gid, ok := c.router.ResolveGroup(mut.Key) if !ok { return nil, nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", mut.Key) } - c.observeMutation(route.RouteID, mut) - grouped[route.GroupID] = append(grouped[route.GroupID], mut) + // Engine RouteID for keyviz observation; partition-resolved + // keys observe under the !sqs|route|global RouteID until + // partition-aware keyviz lands. + var routeID uint64 + if route, found := c.engine.GetRoute(routeKey(mut.Key)); found { + routeID = route.RouteID + } + c.observeMutation(routeID, mut) + grouped[gid] = append(grouped[gid], mut) } gids := make([]uint64, 0, len(grouped)) for gid := range grouped { From 7f3a643406ee9f227905e773040d70d9bacd9c02 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 18:26:46 +0900 Subject: [PATCH 3/8] fix(sqs): buildSQSPartitionResolver returns interface to avoid typed-nil MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 1 Claude review on PR #715 caught a typed-nil interface bug: buildSQSPartitionResolver had return type *adapter.SQSPartitionResolver, so on a non-partitioned cluster it returned a typed-nil pointer. When that pointer was passed into kv.ShardRouter.WithPartitionResolver (parameter type kv.PartitionResolver), Go wrapped it into a NON-NIL interface — the resolver-first short-circuit `s.partitionResolver != nil` would always pass on every request, defeating the "non-partitioned cluster keeps engine-only hot path" contract from the PR description. The (*SQSPartitionResolver).ResolveGroup nil-receiver guard kept this functionally safe (correct routing) but not free (extra map lookup per request). Fix - Return type changed from *adapter.SQSPartitionResolver to kv.PartitionResolver. Untyped `nil` returns now propagate as a true nil interface, the short-circuit fires correctly, and the hot path stays engine-only. - Defensive nil guard after NewSQSPartitionResolver in case canonicalisation collapses every entry — the typed pointer from the constructor would otherwise wrap to a non-nil interface even when its underlying pointer is nil. - New main_sqs_resolver_test.go regression test: TestBuildSQSPartitionResolver_NilOnEmpty uses requireNilInterface to force the kv.PartitionResolver conversion at the call boundary (a plain require.Nil on the concrete pointer would pass even with the bug present, since the pointer itself IS nil — only the interface wrap exposes the failure mode). - TestBuildSQSPartitionResolver_NonEmptyReturnsResolver pins the happy path so a future "always return nil" regression is caught. --- main.go | 24 +++++++++++-- main_sqs_resolver_test.go | 71 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 main_sqs_resolver_test.go diff --git a/main.go b/main.go index 286d0bf81..bd201ef9a 100644 --- a/main.go +++ b/main.go @@ -486,13 +486,24 @@ func buildLeaderSQS(groups []groupSpec, sqsAddr string, raftSqsMap string) (map[ // is a documented no-op, so the request hot path keeps the existing // engine-only dispatch. // +// Return type is the kv.PartitionResolver interface, NOT the +// concrete *adapter.SQSPartitionResolver, because Go wraps a typed +// nil pointer into a NON-NIL interface value when the function +// signature is the concrete type. With a concrete return type, a +// non-partitioned cluster would carry a non-nil interface whose +// underlying pointer is nil, the resolver-first short-circuit +// `s.partitionResolver != nil` would always pass, and every request +// would pay an extra ResolveGroup call (which the nil-receiver +// guard makes safe but not free). The interface return type makes +// the untyped `nil` propagate as a true nil interface. +// // The group-reference parsing here cannot fail in practice because // parseSQSFifoGroupList already canonicalized each entry as a // uint64 string at flag-parse time; the conversion is repeated // defensively so a future caller that bypasses parseSQSFifoGroupList // (e.g. a test seeding the map programmatically) gets a clear panic // instead of a silent route-to-group-zero. -func buildSQSPartitionResolver(partitionMap map[string]sqsFifoQueueRouting) *adapter.SQSPartitionResolver { +func buildSQSPartitionResolver(partitionMap map[string]sqsFifoQueueRouting) kv.PartitionResolver { if len(partitionMap) == 0 { return nil } @@ -514,7 +525,16 @@ func buildSQSPartitionResolver(partitionMap map[string]sqsFifoQueueRouting) *ada } flat[queue] = ids } - return adapter.NewSQSPartitionResolver(flat) + r := adapter.NewSQSPartitionResolver(flat) + if r == nil { + // Defensive: NewSQSPartitionResolver returns nil on an + // empty input. The len-check above already short-circuits + // for empty partitionMap, so reaching this branch means + // the canonicalisation collapsed every entry — surface as + // a true nil interface, not a typed-nil pointer wrapper. + return nil + } + return r } // buildSQSFifoPartitionMap parses and validates the diff --git a/main_sqs_resolver_test.go b/main_sqs_resolver_test.go new file mode 100644 index 000000000..70962c13e --- /dev/null +++ b/main_sqs_resolver_test.go @@ -0,0 +1,71 @@ +package main + +import ( + "testing" + + "github.com/bootjp/elastickv/kv" + "github.com/stretchr/testify/require" +) + +// TestBuildSQSPartitionResolver_NilOnEmpty pins the typed-nil +// interface invariant flagged by the Phase 3.D PR 4-B-2 round-1 +// review: buildSQSPartitionResolver MUST return a true-nil +// kv.PartitionResolver interface (not a typed-nil pointer wrapped +// in a non-nil interface) when the partition map is empty or nil. +// +// If the function's return type were the concrete +// *adapter.SQSPartitionResolver and the body returned a nil +// pointer, Go would wrap that pointer into a NON-NIL interface +// when assigned to kv.ShardRouter.partitionResolver. The router's +// resolver-first short-circuit `s.partitionResolver != nil` would +// always pass on a non-partitioned cluster, and every request +// would pay an extra ResolveGroup call (which the nil-receiver +// guard inside (*SQSPartitionResolver).ResolveGroup makes safe +// but not free). +// +// requireNilInterface forces the interface conversion at the call +// boundary, which is what makes this regression observable. A +// plain require.Nil(t, r) would pass even with a typed-nil because +// at that point r still has the concrete pointer type. +func TestBuildSQSPartitionResolver_NilOnEmpty(t *testing.T) { + t.Parallel() + cases := []struct { + name string + in map[string]sqsFifoQueueRouting + }{ + {"nil map", nil}, + {"empty map", map[string]sqsFifoQueueRouting{}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + requireNilInterface(t, buildSQSPartitionResolver(tc.in), + "buildSQSPartitionResolver("+tc.name+ + ") must produce a true nil interface, "+ + "not a typed-nil pointer wrapper") + }) + } +} + +// TestBuildSQSPartitionResolver_NonEmptyReturnsResolver pins the +// happy path: a non-empty partition map yields a non-nil resolver +// that dispatches partition keys correctly. +func TestBuildSQSPartitionResolver_NonEmptyReturnsResolver(t *testing.T) { + t.Parallel() + in := map[string]sqsFifoQueueRouting{ + "orders.fifo": {partitionCount: 2, groups: []string{"10", "11"}}, + } + r := buildSQSPartitionResolver(in) + require.NotNil(t, r, + "non-empty partition map must produce a non-nil resolver") +} + +// requireNilInterface accepts a kv.PartitionResolver and asserts +// the interface value (NOT just the underlying pointer) is nil. +// The function-parameter conversion forces a typed-nil pointer to +// be wrapped into a non-nil interface, which is exactly the +// failure mode the regression test guards against. +func requireNilInterface(t *testing.T, r kv.PartitionResolver, msg string) { + t.Helper() + require.Nil(t, r, msg) +} From f4a12e418baa7a78886c1e1c71c7b53d63022f7e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 18:30:02 +0900 Subject: [PATCH 4/8] test(sqs): coordinator-level regression for resolver dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 2 Claude review on PR #715 flagged that the Gemini-HIGH fix (ShardedCoordinator's groupMutations / groupForKey / etc. now route through c.router.ResolveGroup) had no coordinator-level regression test — the existing TestShardRouter_* tests pin the dispatch logic at the router layer but don't exercise the path Dispatch → groupMutations → c.router.ResolveGroup. Per CLAUDE.md ("when code review surfaces a defect, first add a failing test that reproduces the issue, then make it pass with the fix"), this commit lands the missing test. Two new tests in kv/sharded_coordinator_partition_test.go: - TestShardedCoordinator_DispatchHonoursPartitionResolver pins the Gemini HIGH fix: with the engine routing everything to group 1 but the resolver claiming a specific key for group 42, Dispatch on that key MUST hit group 42's recordingTransactional. Before the round-2 fix the request would have landed on group 1 because groupMutations called c.engine.GetRoute directly. Also asserts the resolver received the RAW partitioned key — pins the codex-P1 fix at the coordinator-call boundary. - TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys pins the inverse: keys NOT claimed by the resolver continue to route via the byte-range engine. Without this, the resolver-first short-circuit could mask engine routing decisions. stubResolver is a kv-internal PartitionResolver double so the tests don't pull in the adapter package. Each call records the raw key bytes (defensive copy) so concurrent reads stay race-safe under -race. --- kv/sharded_coordinator_partition_test.go | 152 +++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 kv/sharded_coordinator_partition_test.go diff --git a/kv/sharded_coordinator_partition_test.go b/kv/sharded_coordinator_partition_test.go new file mode 100644 index 000000000..774447d05 --- /dev/null +++ b/kv/sharded_coordinator_partition_test.go @@ -0,0 +1,152 @@ +package kv + +import ( + "context" + "sync" + "testing" + + "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// stubResolver routes any key in claim to the named group; everything +// else returns (0, false). Used by the coordinator-level resolver +// regression tests so they don't depend on the adapter package. +type stubResolver struct { + mu sync.Mutex + claim map[string]uint64 + calls [][]byte +} + +func (s *stubResolver) ResolveGroup(key []byte) (uint64, bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.calls = append(s.calls, append([]byte(nil), key...)) + if gid, ok := s.claim[string(key)]; ok { + return gid, true + } + return 0, false +} + +func (s *stubResolver) callKeys() [][]byte { + s.mu.Lock() + defer s.mu.Unlock() + out := make([][]byte, len(s.calls)) + copy(out, s.calls) + return out +} + +// TestShardedCoordinator_DispatchHonoursPartitionResolver pins the +// Gemini-HIGH fix: ShardedCoordinator's groupMutations path now +// calls c.router.ResolveGroup, so a Dispatch whose key is claimed +// by the partition resolver MUST land on the resolver's group, not +// the engine's default group. Before the fix the coordinator +// bypassed the resolver entirely and partitioned-FIFO traffic +// silently mis-routed through 2PC. +// +// Two-group setup: engine routes everything to group 1; resolver +// claims one specific key for group 42. Dispatch on that key must +// hit group 42's recordingTransactional, leaving group 1's +// recorder empty. +func TestShardedCoordinator_DispatchHonoursPartitionResolver(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + + g1 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}, {CommitIndex: 1}}, + } + g42 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}, {CommitIndex: 1}}, + } + + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1, Store: store.NewMVCCStore()}, + 42: {Txn: g42, Store: store.NewMVCCStore()}, + }, 1, NewHLC(), nil) + + resolver := &stubResolver{claim: map[string]uint64{ + "!sqs|msg|data|p|partitioned-key": 42, + }} + coord.WithPartitionResolver(resolver) + + resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{ + {Op: Put, Key: []byte("!sqs|msg|data|p|partitioned-key"), Value: []byte("v")}, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + // The whole point: the request landed on group 42, NOT 1. + g1.mu.Lock() + g42.mu.Lock() + g1Count := len(g1.requests) + g42Count := len(g42.requests) + g1.mu.Unlock() + g42.mu.Unlock() + + require.Zero(t, g1Count, + "engine's default group must NOT receive a request when the "+ + "resolver claimed the key — coordinator's groupMutations "+ + "would otherwise bypass the partition resolver") + require.Equal(t, 1, g42Count, + "resolver-claimed group must receive exactly one request") + + // And the resolver was indeed consulted with the raw partitioned + // key — pins the codex-P1 fix at the coordinator-call boundary. + calls := resolver.callKeys() + require.NotEmpty(t, calls) + require.Equal(t, []byte("!sqs|msg|data|p|partitioned-key"), calls[0]) +} + +// TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys pins +// the inverse: a key the resolver does NOT claim must continue to +// route via the byte-range engine. Without this guard the resolver- +// first short-circuit could mask engine routing decisions. +func TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + + g1 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + g2 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 2}}, + } + + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1, Store: store.NewMVCCStore()}, + 2: {Txn: g2, Store: store.NewMVCCStore()}, + }, 1, NewHLC(), nil) + + // Resolver only claims a key that ISN'T in the request — the + // dispatch must fall through to the engine. + coord.WithPartitionResolver(&stubResolver{claim: map[string]uint64{ + "!sqs|msg|data|p|other-key": 42, + }}) + + // "x" lands in [m, ∞) → engine routes to group 2. + resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{ + {Op: Put, Key: []byte("x"), Value: []byte("v")}, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + g1.mu.Lock() + g2.mu.Lock() + g1Count := len(g1.requests) + g2Count := len(g2.requests) + g1.mu.Unlock() + g2.mu.Unlock() + + require.Zero(t, g1Count, + "unclaimed key must engine-route to group 2, not group 1") + require.Equal(t, 1, g2Count, + "engine fallthrough must dispatch to group 2 for keys in [m, ∞)") +} From 9cfc7798f8f6e8fd442e97510bc5133a96ddc78c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 18:37:47 +0900 Subject: [PATCH 5/8] fix(sqs): fail-closed for recognised-but-unresolved partition keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 2 Codex P1 on PR #715: when a partition resolver is installed, an unresolved partitioned-shape key (unknown queue / out-of-range partition) silently falls through to engine.GetRoute(routeKey(rawKey)). For !sqs|... keys, routeKey collapses to !sqs|route|global, so the engine routes the misconfiguration to the SQS catalog default group instead of returning a routing error. During partition-map drift / partial rollout this would silently mis-route HT-FIFO traffic. Fix - kv.PartitionResolver gains RecognisesPartitionedKey(key) — a shape-only predicate. Implementations answer purely on prefix / structural inspection so the router can use it independently of the in-memory mapping. - ShardRouter.ResolveGroup: after ResolveGroup returns ok=false, consult RecognisesPartitionedKey. If the resolver recognises the shape, return (0, false) WITHOUT engine fallback — the caller surfaces a routing error. If the shape is not recognised, fall through to the engine as before. - adapter.SQSPartitionResolver: implements RecognisesPartitionedKey via parsePartitionedSQSKey (the same parser ResolveGroup uses). Nil-receiver and empty-key return false. Tests - kv/shard_router_partition_test.go: new TestShardRouter_FailClosedOnRecognisedButUnresolved — pins that a recognised-but-unresolved partitioned key returns (0, false) AND that a non-recognised key still falls through to the engine. - adapter/sqs_partition_resolver_test.go: new TestSQSPartitionResolver_RecognisesPartitionedKey covers 8 shape cases (data/vis/byage families with known + unknown queues, OOR partition, legacy SQS, queue meta, non-SQS, empty, nil). New TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver pins the typed-nil-safe branch. - Renamed and expanded TestSQSPartitionResolver_UnknownQueueRecognisedButUnresolved / TestSQSPartitionResolver_OutOfRangePartitionRecognisedButUnresolved to assert RecognisesPartitionedKey == true, pinning the router- side fail-closed contract. - Existing fakePartitionResolver / stubResolver / recordingResolver test doubles get the new method (the kv-internal tests don't depend on the adapter resolver). --- adapter/sqs_partition_resolver.go | 28 ++++++ adapter/sqs_partition_resolver_test.go | 111 ++++++++++++++++++++--- kv/shard_router.go | 38 +++++++- kv/shard_router_partition_test.go | 65 ++++++++++++- kv/sharded_coordinator_partition_test.go | 20 +++- 5 files changed, 243 insertions(+), 19 deletions(-) diff --git a/adapter/sqs_partition_resolver.go b/adapter/sqs_partition_resolver.go index 5774a0d7c..cd6fdb861 100644 --- a/adapter/sqs_partition_resolver.go +++ b/adapter/sqs_partition_resolver.go @@ -68,6 +68,14 @@ var sqsResolverFamilyPrefixes = [][]byte{ // family prefix (legacy SQS, KV, S3, DynamoDB, queue-meta records, // …) so kv.ShardRouter falls through to its byte-range engine for // default routing. +// +// Returns (0, false) for a partitioned-shaped key whose queue is +// not in the routes map or whose partition index is beyond +// len(routes[queue]). The router pairs this with +// RecognisesPartitionedKey to fail closed instead of falling +// through — silently routing through the engine's +// !sqs|route|global default would mis-route HT-FIFO traffic during +// partition-map drift (codex P1 round 2 on PR #715). func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool) { if r == nil || len(key) == 0 { return 0, false @@ -90,6 +98,26 @@ func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool) { return groups[partition], true } +// RecognisesPartitionedKey reports whether key has the structural +// shape of a partitioned-SQS key — a partitioned family prefix +// followed by an encoded queue segment, a '|' terminator, and a +// fixed-width uint32 partition index. Implementations of +// kv.PartitionResolver must answer purely on shape so the router +// can use the predicate to decide between fall-through (not +// partitioned) and fail-closed (partitioned but unresolved); see +// kv.PartitionResolver doc and codex P1 round 2 on PR #715. +// +// A nil receiver returns false so kv.ShardRouter's typed-nil case +// (ResolveGroup(nil) == (0, false)) pairs with an honest "I don't +// recognise anything" answer instead of falsely claiming a shape. +func (r *SQSPartitionResolver) RecognisesPartitionedKey(key []byte) bool { + if r == nil || len(key) == 0 { + return false + } + _, _, ok := parsePartitionedSQSKey(key) + return ok +} + // parsePartitionedSQSKey extracts the (queue, partition) pair from // a partitioned-SQS key. Returns ok=false for any key that does not // match a partitioned family prefix or that has a malformed queue / diff --git a/adapter/sqs_partition_resolver_test.go b/adapter/sqs_partition_resolver_test.go index a693f1949..be5db8d88 100644 --- a/adapter/sqs_partition_resolver_test.go +++ b/adapter/sqs_partition_resolver_test.go @@ -147,13 +147,14 @@ func TestSQSPartitionResolver_LegacyKeyFallsThrough(t *testing.T) { } } -// TestSQSPartitionResolver_UnknownQueueFallsThrough pins that a -// well-formed partitioned key for a queue that is NOT in the -// partition map returns (0, false). This shape can only happen if -// the cluster lost partition-map entries between writer and reader -// (an operational misconfiguration); routing through the byte-range -// engine is the correct fail-closed behaviour. -func TestSQSPartitionResolver_UnknownQueueFallsThrough(t *testing.T) { +// TestSQSPartitionResolver_UnknownQueueRecognisedButUnresolved +// pins the round-5 fail-closed contract: a well-formed partitioned +// key for a queue that is NOT in the partition map returns +// ResolveGroup=(0, false) AND RecognisesPartitionedKey=true. The +// router pairs these so the request fails closed — engine +// fall-through would silently route the misconfiguration through +// the SQS catalog default group (codex round-2 P1 on PR #715). +func TestSQSPartitionResolver_UnknownQueueRecognisedButUnresolved(t *testing.T) { t.Parallel() r := NewSQSPartitionResolver(map[string][]uint64{ "known.fifo": {10, 11}, @@ -162,15 +163,17 @@ func TestSQSPartitionResolver_UnknownQueueFallsThrough(t *testing.T) { gid, ok := r.ResolveGroup(key) require.False(t, ok) require.Equal(t, uint64(0), gid) + require.True(t, r.RecognisesPartitionedKey(key), + "unknown-queue partitioned key must still be recognised — "+ + "the router pairs Recognised=true with ok=false to fail "+ + "closed instead of falling through to the engine") } -// TestSQSPartitionResolver_OutOfRangePartitionFallsThrough pins -// that a partition value beyond the configured PartitionCount -// returns (0, false). Same fail-closed argument as the unknown- -// queue case — if a writer produced a key with partition 4 but the -// resolver's view only has partitions [0, 3], routing through the -// engine surfaces the disagreement at the request boundary. -func TestSQSPartitionResolver_OutOfRangePartitionFallsThrough(t *testing.T) { +// TestSQSPartitionResolver_OutOfRangePartitionRecognisedButUnresolved +// pins the round-5 fail-closed contract for a partition value +// beyond the configured PartitionCount. Same router-side +// fail-closed argument as the unknown-queue case. +func TestSQSPartitionResolver_OutOfRangePartitionRecognisedButUnresolved(t *testing.T) { t.Parallel() r := NewSQSPartitionResolver(map[string][]uint64{ "q.fifo": {10, 11}, @@ -182,6 +185,10 @@ func TestSQSPartitionResolver_OutOfRangePartitionFallsThrough(t *testing.T) { gid, ok := r.ResolveGroup(key) require.False(t, ok) require.Equal(t, uint64(0), gid) + require.True(t, r.RecognisesPartitionedKey(key), + "OOR-partition key must still be recognised — the router "+ + "pairs Recognised=true with ok=false to fail closed "+ + "instead of falling through to the engine") } // TestSQSPartitionResolver_NilReceiverIsSafe pins defensive @@ -197,6 +204,82 @@ func TestSQSPartitionResolver_NilReceiverIsSafe(t *testing.T) { require.Equal(t, uint64(0), gid) } +// TestSQSPartitionResolver_RecognisesPartitionedKey pins the +// shape-only predicate the router uses to decide between +// fall-through and fail-closed (codex round-2 P1 on PR #715). +// RecognisesPartitionedKey MUST answer purely on the structural +// shape — partitioned family prefix + queue + '|' terminator + +// be32 partition — independent of whether the queue is in the +// routes map. Otherwise the router could not reliably fail-closed +// for unresolved-but-recognised keys. +func TestSQSPartitionResolver_RecognisesPartitionedKey(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "known.fifo": {10, 11}, + }) + cases := []struct { + name string + key []byte + want bool + }{ + { + name: "data family + known queue", + key: sqsPartitionedMsgDataKey("known.fifo", 0, 1, "id"), + want: true, + }, + { + name: "vis family + unknown queue (recognised, unresolved)", + key: sqsPartitionedMsgVisKey("not-in-routes.fifo", 0, 1, 1, "id"), + want: true, + }, + { + name: "byage family + OOR partition (recognised, unresolved)", + key: sqsPartitionedMsgByAgeKey("known.fifo", 99, 1, 1, "id"), + want: true, + }, + { + name: "legacy SQS key — not partitioned", + key: sqsMsgDataKey("known.fifo", 1, "id"), + want: false, + }, + { + name: "non-SQS key", + key: []byte("/foo/bar"), + want: false, + }, + { + name: "queue meta key", + key: sqsQueueMetaKey("known.fifo"), + want: false, + }, + { + name: "empty", + key: []byte{}, + want: false, + }, + { + name: "nil", + key: nil, + want: false, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.want, r.RecognisesPartitionedKey(tc.key)) + }) + } +} + +// TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver +// pins the typed-nil-safe branch — same as ResolveGroup, but for +// the new predicate. +func TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver(t *testing.T) { + t.Parallel() + var r *SQSPartitionResolver + require.False(t, r.RecognisesPartitionedKey([]byte("any-key"))) +} + // TestSQSPartitionResolver_PrefixesAlign pins that the resolver's // family-prefix list matches the constants in sqs_keys.go exactly. // A future renamed prefix or added family that touches sqs_keys.go diff --git a/kv/shard_router.go b/kv/shard_router.go index f500be6af..deaba662a 100644 --- a/kv/shard_router.go +++ b/kv/shard_router.go @@ -21,9 +21,28 @@ import ( // Implementations must be safe for concurrent use — ResolveGroup is // called on the request hot path. Returning (0, false) for a key // the resolver does not recognise lets the router fall through to -// the engine. +// the engine. Returning (0, false) for a key the resolver DOES +// recognise (the partitioned shape matches but the queue is not +// in the map, or the partition is out of range) is also valid; +// the router uses RecognisesPartitionedKey to distinguish "not +// partitioned, fall through" from "partitioned but unresolved, +// fail closed". +// +// The fail-closed split matters under partition-map drift / a +// partial rollout: without it, an unresolved partitioned key +// would silently land on the engine's SQS-catalog default group +// (because routeKey normalises every !sqs|... key to +// !sqs|route|global) instead of surfacing a routing error. type PartitionResolver interface { ResolveGroup(key []byte) (uint64, bool) + + // RecognisesPartitionedKey reports whether the key SHAPE is + // one this resolver is responsible for. Implementations + // answer based on prefix / structural inspection only — the + // answer must NOT depend on any in-memory mapping that could + // drift out of sync, otherwise the router cannot reliably + // fail-closed for unresolved-but-recognised keys. + RecognisesPartitionedKey(key []byte) bool } // ShardRouter routes requests to multiple raft groups based on key ranges. @@ -87,6 +106,15 @@ func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter { // legacy routing (catalog → !sqs|route|global → default group) // stays unchanged. // +// Fail-closed for recognised-but-unresolved keys: when the resolver +// recognises a partitioned shape (RecognisesPartitionedKey == true) +// but cannot resolve the queue/partition pair (ResolveGroup returns +// ok=false), the router refuses to fall through to the engine. +// Otherwise the engine would route the partitioned key to +// !sqs|route|global's default group, silently mis-routing HT-FIFO +// traffic during partition-map drift or partial rollout (codex P1 +// round 2 on PR #715). +// // Returns (0, false) when neither the resolver nor the engine // recognises the key. Caller surfaces this as an "unknown group" // error so a partitioned-prefix key whose queue is missing from the @@ -97,6 +125,14 @@ func (s *ShardRouter) ResolveGroup(rawKey []byte) (uint64, bool) { if gid, ok := s.partitionResolver.ResolveGroup(rawKey); ok { return gid, true } + if s.partitionResolver.RecognisesPartitionedKey(rawKey) { + // Partitioned shape, but the resolver cannot map it + // (unknown queue, out-of-range partition). Fail closed + // — the engine's catalog-level default route would + // silently misroute this through routeKey's + // !sqs|route|global collapse. + return 0, false + } } // Engine routes against the user-key view of the byte-range // space; routeKey may rewrite SQS / DynamoDB / Redis-internal diff --git a/kv/shard_router_partition_test.go b/kv/shard_router_partition_test.go index ea13983ad..c5fd86fab 100644 --- a/kv/shard_router_partition_test.go +++ b/kv/shard_router_partition_test.go @@ -17,8 +17,16 @@ import ( // router-side tests — keeps the tests free of any adapter-package // dependency so the routing contract is verified at the kv layer in // isolation. +// +// recognisedPrefix lets a test simulate the "recognised but +// unresolved" failure mode (codex round-2 P1 on PR #715): keys +// that start with this prefix and miss the routes map cause +// ResolveGroup to return (0, false) AND +// RecognisesPartitionedKey to return true, so the router fails +// closed instead of falling through to the engine. type fakePartitionResolver struct { - routes map[string]uint64 + routes map[string]uint64 + recognisedPrefix []byte } func (f *fakePartitionResolver) ResolveGroup(key []byte) (uint64, bool) { @@ -26,6 +34,14 @@ func (f *fakePartitionResolver) ResolveGroup(key []byte) (uint64, bool) { return gid, ok } +func (f *fakePartitionResolver) RecognisesPartitionedKey(key []byte) bool { + if len(f.recognisedPrefix) == 0 { + return false + } + return len(key) >= len(f.recognisedPrefix) && + bytes.HasPrefix(key, f.recognisedPrefix) +} + // TestShardRouter_PartitionResolverWins pins that when the resolver // claims a key, the router dispatches to the resolver's group even // if the byte-range engine would have routed elsewhere. This is the @@ -176,6 +192,45 @@ func TestShardRouter_ResolverSeesRawKeyNotNormalized(t *testing.T) { "resolver, contrary to the §3.D PR 4-B-2 design") } +// TestShardRouter_FailClosedOnRecognisedButUnresolved pins the +// codex round-2 P1 fix on PR #715: a key the resolver recognises +// (partitioned shape) but cannot resolve (unknown queue / OOR +// partition) must NOT fall through to the engine. Without the +// fail-closed branch, the engine's routeKey-collapsed +// !sqs|route|global default would silently swallow the misroute +// during partition-map drift / partial rollout. +func TestShardRouter_FailClosedOnRecognisedButUnresolved(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 1) // engine would route everything to group 1 + + router := NewShardRouter(e) + // Resolver claims nothing but RECOGNISES the partitioned shape + // for any key starting with "!sqs|msg|data|p|". + router.WithPartitionResolver(&fakePartitionResolver{ + routes: map[string]uint64{}, + recognisedPrefix: []byte("!sqs|msg|data|p|"), + }) + + // A partitioned-shaped key the resolver recognises but cannot + // resolve. Without fail-closed this would silently route to + // group 1 via the engine's default. + _, ok := router.ResolveGroup([]byte("!sqs|msg|data|p|unknown-queue")) + require.False(t, ok, + "recognised-but-unresolved partitioned key must fail closed; "+ + "engine fall-through would silently mis-route HT-FIFO "+ + "traffic to the !sqs|route|global default group during "+ + "partition-map drift") + + // A non-partitioned key is NOT recognised → must still fall + // through to the engine. This pins that fail-closed only fires + // for the recognised-but-unresolved case. + gid, ok := router.ResolveGroup([]byte("legacy-key")) + require.True(t, ok) + require.Equal(t, uint64(1), gid, + "non-recognised key must fall through to engine default") +} + // TestShardRouter_GetUsesResolver pins that the resolver-first path // applies to Get as well as Commit/Abort. A regression that fixed // only Commit's path would silently route reads through the engine @@ -227,6 +282,14 @@ func (r *recordingResolver) ResolveGroup(key []byte) (uint64, bool) { return 0, false } +// RecognisesPartitionedKey returns false unconditionally — these +// router-side tests use the resolver-wins / fall-through paths +// only; the recognised-but-unresolved path is exercised separately +// via fakePartitionResolver.recognisedPrefix. +func (r *recordingResolver) RecognisesPartitionedKey([]byte) bool { + return false +} + func (r *recordingResolver) seenKeys() [][]byte { r.mu.Lock() defer r.mu.Unlock() diff --git a/kv/sharded_coordinator_partition_test.go b/kv/sharded_coordinator_partition_test.go index 774447d05..57206c492 100644 --- a/kv/sharded_coordinator_partition_test.go +++ b/kv/sharded_coordinator_partition_test.go @@ -13,10 +13,16 @@ import ( // stubResolver routes any key in claim to the named group; everything // else returns (0, false). Used by the coordinator-level resolver // regression tests so they don't depend on the adapter package. +// +// recognisedPrefix simulates the "recognised but unresolved" path +// (codex round-2 P1 on PR #715) — keys with this prefix that miss +// claim cause RecognisesPartitionedKey to return true so the +// router fails closed instead of falling through to the engine. type stubResolver struct { - mu sync.Mutex - claim map[string]uint64 - calls [][]byte + mu sync.Mutex + claim map[string]uint64 + recognisedPrefix []byte + calls [][]byte } func (s *stubResolver) ResolveGroup(key []byte) (uint64, bool) { @@ -29,6 +35,14 @@ func (s *stubResolver) ResolveGroup(key []byte) (uint64, bool) { return 0, false } +func (s *stubResolver) RecognisesPartitionedKey(key []byte) bool { + if len(s.recognisedPrefix) == 0 { + return false + } + return len(key) >= len(s.recognisedPrefix) && + string(key[:len(s.recognisedPrefix)]) == string(s.recognisedPrefix) +} + func (s *stubResolver) callKeys() [][]byte { s.mu.Lock() defer s.mu.Unlock() From b4bf81c35522ab3a36a9903102146f9b4960ce49 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 18:41:42 +0900 Subject: [PATCH 6/8] test(sqs): tighten typed-nil regression + add genuine groupMutations test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 3 + Round 4 review on PR #715 caught two test gaps. Both were real — the regressions they claim to catch were not actually caught by the tests as written. Round 3: requireNilInterface used testify's require.Nil testify's require.Nil reflects through to the underlying pointer and considers a nil pointer wrapped in a non-nil interface as "nil". So if buildSQSPartitionResolver's return type were reverted to *adapter.SQSPartitionResolver (the typed-nil bug), the regression test would still pass — require.Nil on the typed nil returns true. Switch to require.True(t, r == nil, msg). Go's `==` operator on the interface checks BOTH the type tag AND the value tag — it only returns true for a true nil interface, which is the exact invariant the typed-nil fix produces. Round 4: TestShardedCoordinator_DispatchHonoursPartitionResolver did not actually regress the groupMutations bypass For a single-mutation batch, the test passes even if groupMutations bypasses the resolver, because rawLogs produces one pb.Request and router.Commit's groupRequests re-routes by the raw key — the router rescues the mis-routing the coordinator would have introduced. To genuinely regress the bypass, the test must dispatch TWO mutations belonging to TWO different partition groups. With the buggy groupMutations both end up under the engine-default group, rawLogs produces one request, and router.Commit puts both mutations on whichever group claims Mutations[0].Key — the second group receives nothing. Added TestShardedCoordinator_DispatchSplitsMutationsByResolverGroup: - Engine routes everything to group 1. - Resolver claims keyP0 → group 42, keyP1 → group 43. - Dispatch with [Put keyP0, Put keyP1]. - Asserts BOTH g42 and g43 each receive exactly one request. - Pre-fix: g43 receives ZERO (bypass put both under group 1's rawLog, then router put them under g42 because of Mutations[0]). - Post-fix: groupMutations splits via c.router.ResolveGroup, two separate requests, each group gets its own. Updated TestShardedCoordinator_DispatchHonoursPartitionResolver comment to reflect what it actually pins (WithPartitionResolver wiring + raw-key dispatch, not groupMutations). --- kv/sharded_coordinator_partition_test.go | 102 ++++++++++++++++++++--- main_sqs_resolver_test.go | 17 +++- 2 files changed, 104 insertions(+), 15 deletions(-) diff --git a/kv/sharded_coordinator_partition_test.go b/kv/sharded_coordinator_partition_test.go index 57206c492..42fdd05a7 100644 --- a/kv/sharded_coordinator_partition_test.go +++ b/kv/sharded_coordinator_partition_test.go @@ -51,18 +51,18 @@ func (s *stubResolver) callKeys() [][]byte { return out } -// TestShardedCoordinator_DispatchHonoursPartitionResolver pins the -// Gemini-HIGH fix: ShardedCoordinator's groupMutations path now -// calls c.router.ResolveGroup, so a Dispatch whose key is claimed -// by the partition resolver MUST land on the resolver's group, not -// the engine's default group. Before the fix the coordinator -// bypassed the resolver entirely and partitioned-FIFO traffic -// silently mis-routed through 2PC. +// TestShardedCoordinator_DispatchHonoursPartitionResolver pins +// resolver wiring at the coordinator level: a Dispatch whose key +// is claimed by the partition resolver must land on the +// resolver's group, not the engine's default group. // -// Two-group setup: engine routes everything to group 1; resolver -// claims one specific key for group 42. Dispatch on that key must -// hit group 42's recordingTransactional, leaving group 1's -// recorder empty. +// NOTE (round 4 review): for a single-mutation batch, this test +// passes even if groupMutations bypasses the resolver — rawLogs +// produces one request, and router.Commit's groupRequests +// re-routes by raw key. This test pins the WithPartitionResolver +// fluent wiring + raw-key dispatch, NOT the groupMutations bypass +// regression. The 2-mutation test below is the actual regression +// for groupMutations (codex P1 + gemini HIGH). func TestShardedCoordinator_DispatchHonoursPartitionResolver(t *testing.T) { t.Parallel() engine := distribution.NewEngine() @@ -115,6 +115,86 @@ func TestShardedCoordinator_DispatchHonoursPartitionResolver(t *testing.T) { require.Equal(t, []byte("!sqs|msg|data|p|partitioned-key"), calls[0]) } +// TestShardedCoordinator_DispatchSplitsMutationsByResolverGroup is +// the genuine regression for the Gemini-HIGH groupMutations +// bypass: a Dispatch with mutations belonging to TWO different +// partitions must split into two requests, one per group. +// +// Pre-fix path (groupMutations called c.engine.GetRoute directly): +// - groupMutations → engine route → both mutations bundled under +// the engine default group. +// - rawLogs → ONE pb.Request with both mutations. +// - router.Commit → groupRequests routes by Mutations[0].Key only +// → request goes to group A only; group B receives nothing. +// +// Post-fix path (groupMutations consults c.router.ResolveGroup): +// - groupMutations → resolver → mut0→A, mut1→B; grouped split. +// - rawLogs → TWO pb.Requests, one per group. +// - Each group receives its own request. +// +// The assertion is that BOTH groups receive a request — pre-fix, +// only one would. This is the test the round 4 review asked for. +func TestShardedCoordinator_DispatchSplitsMutationsByResolverGroup(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + + g1 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + g42 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + g43 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1, Store: store.NewMVCCStore()}, + 42: {Txn: g42, Store: store.NewMVCCStore()}, + 43: {Txn: g43, Store: store.NewMVCCStore()}, + }, 1, NewHLC(), nil) + + keyP0 := []byte("!sqs|msg|data|p|partition-0-key") + keyP1 := []byte("!sqs|msg|data|p|partition-1-key") + coord.WithPartitionResolver(&stubResolver{claim: map[string]uint64{ + string(keyP0): 42, + string(keyP1): 43, + }}) + + resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{ + {Op: Put, Key: keyP0, Value: []byte("v0")}, + {Op: Put, Key: keyP1, Value: []byte("v1")}, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + g1.mu.Lock() + g42.mu.Lock() + g43.mu.Lock() + defer g1.mu.Unlock() + defer g42.mu.Unlock() + defer g43.mu.Unlock() + + require.Zero(t, len(g1.requests), + "engine default group must not receive a request — both "+ + "keys are partitioned-resolver claims") + require.Equal(t, 1, len(g42.requests), + "partition-0 group must receive exactly one request "+ + "(pre-groupMutations-fix: would receive both mutations "+ + "in one request because router.Commit routes by "+ + "Mutations[0].Key only)") + require.Equal(t, 1, len(g43.requests), + "partition-1 group must receive exactly one request "+ + "(pre-groupMutations-fix: would receive ZERO requests "+ + "because both mutations were bundled under the "+ + "engine-route group). This is the genuine regression "+ + "for the Gemini HIGH bypass — the fix splits "+ + "mutations across groups via c.router.ResolveGroup.") +} + // TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys pins // the inverse: a key the resolver does NOT claim must continue to // route via the byte-range engine. Without this guard the resolver- diff --git a/main_sqs_resolver_test.go b/main_sqs_resolver_test.go index 70962c13e..8f6981e6a 100644 --- a/main_sqs_resolver_test.go +++ b/main_sqs_resolver_test.go @@ -61,11 +61,20 @@ func TestBuildSQSPartitionResolver_NonEmptyReturnsResolver(t *testing.T) { } // requireNilInterface accepts a kv.PartitionResolver and asserts -// the interface value (NOT just the underlying pointer) is nil. -// The function-parameter conversion forces a typed-nil pointer to -// be wrapped into a non-nil interface, which is exactly the +// the INTERFACE value is nil — both type AND value tags must be +// nil. The function-parameter conversion forces a typed-nil +// pointer to be wrapped into a non-nil interface, which is the // failure mode the regression test guards against. +// +// Uses Go's `==` operator on the interface, NOT testify's +// require.Nil. require.Nil reflects through to the underlying +// pointer and considers a nil pointer wrapped in a non-nil +// interface as "nil", so it would pass even with the typed-nil +// regression present. require.True(t, r == nil, ...) uses Go's +// native interface comparison, which only returns true when the +// interface's type tag is also nil — that is the exact invariant +// this test was designed to catch (round 3 review on PR #715). func requireNilInterface(t *testing.T, r kv.PartitionResolver, msg string) { t.Helper() - require.Nil(t, r, msg) + require.True(t, r == nil, msg) } From 87561fd9f304d37907a90ecd06dcc6087501eba0 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 18:45:26 +0900 Subject: [PATCH 7/8] fix(sqs): RecognisesPartitionedKey checks prefix only, not full parse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 5 review on PR #715 raised a nit: RecognisesPartitionedKey delegated to parsePartitionedSQSKey, which fails when the queue segment is malformed base64 or the partition segment is truncated. For such corrupt-shape keys the predicate returned false, so the router fell through to the engine and silently routed via routeKey's !sqs|route|global collapse to the SQS catalog default group — exactly the failure mode the round 5 fail-closed change was introduced to prevent. The fix is the structural-only intent the kv.PartitionResolver contract already documents: "Implementations answer purely on prefix / structural inspection". Make RecognisesPartitionedKey match the intent — accept ANY key that starts with one of the partitioned family prefixes, regardless of subsequent corruption. ResolveGroup still returns (0, false) for malformed keys, and the router pairs that with Recognised=true to fail closed. Tests - TestSQSPartitionResolver_RecognisesMalformedPartitionedKey: three sub-cases pin the new contract — prefix-only, prefix + invalid base64 queue segment, prefix + valid queue + '|' but truncated partition bytes. All assert Recognised=true and ResolveGroup ok=false, which is exactly the fail-closed pairing the router consumes. - Existing TestSQSPartitionResolver_RecognisesPartitionedKey cases stay valid (the well-formed shapes still match). --- adapter/sqs_partition_resolver.go | 17 +++++---- adapter/sqs_partition_resolver_test.go | 51 ++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/adapter/sqs_partition_resolver.go b/adapter/sqs_partition_resolver.go index cd6fdb861..9e7ac02c1 100644 --- a/adapter/sqs_partition_resolver.go +++ b/adapter/sqs_partition_resolver.go @@ -99,13 +99,14 @@ func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool) { } // RecognisesPartitionedKey reports whether key has the structural -// shape of a partitioned-SQS key — a partitioned family prefix -// followed by an encoded queue segment, a '|' terminator, and a -// fixed-width uint32 partition index. Implementations of -// kv.PartitionResolver must answer purely on shape so the router -// can use the predicate to decide between fall-through (not -// partitioned) and fail-closed (partitioned but unresolved); see -// kv.PartitionResolver doc and codex P1 round 2 on PR #715. +// shape of a partitioned-SQS key — i.e. starts with one of the +// partitioned family prefixes. The check is PREFIX-ONLY, not a +// full parse: a key with a partitioned prefix followed by a +// malformed queue / partition segment still answers true, so the +// router fails closed via kv.PartitionResolver semantics instead +// of falling through to the engine and silently routing to the +// SQS catalog default group via routeKey's !sqs|route|global +// collapse (round 5 review nit on PR #715). // // A nil receiver returns false so kv.ShardRouter's typed-nil case // (ResolveGroup(nil) == (0, false)) pairs with an honest "I don't @@ -114,7 +115,7 @@ func (r *SQSPartitionResolver) RecognisesPartitionedKey(key []byte) bool { if r == nil || len(key) == 0 { return false } - _, _, ok := parsePartitionedSQSKey(key) + _, ok := stripPartitionedFamilyPrefix(key) return ok } diff --git a/adapter/sqs_partition_resolver_test.go b/adapter/sqs_partition_resolver_test.go index be5db8d88..d9cb9fe23 100644 --- a/adapter/sqs_partition_resolver_test.go +++ b/adapter/sqs_partition_resolver_test.go @@ -280,6 +280,57 @@ func TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver(t *testing.T) require.False(t, r.RecognisesPartitionedKey([]byte("any-key"))) } +// TestSQSPartitionResolver_RecognisesMalformedPartitionedKey pins +// the prefix-only check (round 5 nit on PR #715): a key with a +// valid partitioned family prefix but a corrupt / truncated queue +// or partition segment MUST still be recognised so the router +// fails closed rather than falling through to engine routing +// (which would silently mis-route to the SQS catalog default +// group via routeKey's !sqs|route|global collapse). +func TestSQSPartitionResolver_RecognisesMalformedPartitionedKey(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "q.fifo": {10, 11}, + }) + cases := []struct { + name string + key []byte + }{ + { + // Prefix only, nothing after — parsePartitionedSQSKey + // would fail on the missing '|' terminator, but the + // shape IS recognised. + name: "prefix only", + key: []byte(SqsPartitionedMsgDataPrefix), + }, + { + // Prefix + base64 garbage that decodes invalidly. + // parsePartitionedSQSKey would fail at decodeSQSSegment. + name: "prefix + invalid base64", + key: []byte(SqsPartitionedMsgDataPrefix + "!!!|"), + }, + { + // Prefix + valid queue segment + '|' but no partition + // bytes (truncated). + name: "prefix + queue + truncated partition", + key: []byte(SqsPartitionedMsgVisPrefix + encodeSQSSegment("q.fifo") + "|"), + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.True(t, r.RecognisesPartitionedKey(tc.key), + "malformed partitioned key MUST be recognised so "+ + "the router fails closed rather than mis-routing "+ + "through the engine's SQS catalog default") + gid, ok := r.ResolveGroup(tc.key) + require.False(t, ok, + "malformed key cannot resolve") + require.Equal(t, uint64(0), gid) + }) + } +} + // TestSQSPartitionResolver_PrefixesAlign pins that the resolver's // family-prefix list matches the constants in sqs_keys.go exactly. // A future renamed prefix or added family that touches sqs_keys.go From eedc17cdcb10c9861353abcf5e074a87e89174b8 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 19:37:32 +0900 Subject: [PATCH 8/8] fix(sqs): fail-closed for unresolved txn read keys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 5 introduced fail-closed semantics in ShardRouter.ResolveGroup (recognised-but-unresolved partitioned keys return ok=false instead of falling through to the engine), but the read-key path inside ShardedCoordinator was not audited for the new contract. engineGroupIDForKey discards the resolver's ok flag and returns 0 for any failure. groupReadKeysByShardID then loops `if gid == 0 { continue }`, silently dropping unrouted read keys from the prewrite payload. With the new fail-closed resolver, a partitioned read key whose queue has drifted out of --sqsFifoPartitionMap (partial rollout / config drift) gets dropped from OCC validation. The FSM never sees that key in ReadKeys, a concurrent write to the same key commits without conflict, and SSI is broken. Codex round-2 P1 on PR #715 caught this — addressed here. Fix groupReadKeysByShardID returns (map, error). Any read key that fails to route surfaces as ErrInvalidRequest, dispatchTxn propagates the error, and the transaction aborts before prewrite. Calls c.router.ResolveGroup directly (rather than via engineGroupIDForKey) so the (gid, ok) signal is preserved through the boundary. Tests - TestGroupReadKeysByShardID_FailsClosedOnUnroutable replaces TestGroupReadKeysByShardID_SkipsUnroutableKeys (which had been pinning the buggy skip-silently behaviour). Asserts the new fail-closed contract: unroutable keys → error, no partial map. - TestShardedCoordinator_TxnFailsClosedForUnresolvedReadKey is the coordinator-level regression: a transaction with a recognised-but-unresolved partitioned read key MUST abort before any prewrite. Asserts no group received an RPC. - The three existing TestGroupReadKeysByShardID_* tests are updated for the new (map, error) signature. --- kv/sharded_coordinator.go | 39 +++++++++--- kv/sharded_coordinator_partition_test.go | 79 ++++++++++++++++++++++++ kv/sharded_coordinator_txn_test.go | 37 ++++++++--- 3 files changed, 139 insertions(+), 16 deletions(-) diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 49c61522e..54c24115d 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -395,7 +395,13 @@ func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, co // Multi-shard path: group read keys by shard now. The result is passed // directly to prewriteTxn to avoid a second iteration inside that function. - groupedReadKeys := c.groupReadKeysByShardID(readKeys) + // A routing failure here aborts the transaction before any prewrite — + // silently dropping unresolvable read keys would let OCC validation run + // with an incomplete read set and break SSI. + groupedReadKeys, err := c.groupReadKeysByShardID(readKeys) + if err != nil { + return nil, err + } prepared, err := c.prewriteTxn(ctx, startTS, commitTS, primaryKey, grouped, gids, groupedReadKeys) if err != nil { return nil, err @@ -891,19 +897,38 @@ func (c *ShardedCoordinator) engineGroupIDForKey(key []byte) uint64 { return gid } -func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) map[uint64][][]byte { +// groupReadKeysByShardID groups txn read keys by their owning Raft +// group. Returns an error when ANY read key cannot be routed — +// silently skipping unresolvable keys would let a transaction +// commit with an incomplete OCC read-set, which breaks SSI under +// the §5 ShardRouter resolver-first dispatch (codex round-2 P1 on +// PR #715). +// +// The fail-closed semantic the resolver gained in PR 4-B-2 makes +// this path matter: a partitioned-shape read key whose queue is +// missing from --sqsFifoPartitionMap (drift / partial rollout) +// returns gid=0 from c.router.ResolveGroup. If we silently dropped +// those keys, the prewrite Raft entry would carry an empty +// ReadKeys slice for that key, the FSM's read-write conflict +// validation would never see it, and a concurrent write could +// commit alongside a stale read. Surface the routing failure as +// an error so the transaction aborts before any FSM apply. +func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) (map[uint64][][]byte, error) { if len(readKeys) == 0 { - return nil + return nil, nil } grouped := make(map[uint64][][]byte) for _, key := range readKeys { - gid := c.engineGroupIDForKey(key) - if gid == 0 { - continue + gid, ok := c.router.ResolveGroup(key) + if !ok || gid == 0 { + return nil, errors.Wrapf(ErrInvalidRequest, + "no route for txn read key %q — recognised-but-"+ + "unresolved partition keys must fail closed to "+ + "preserve OCC read-set integrity", key) } grouped[gid] = append(grouped[gid], key) } - return grouped + return grouped, nil } // validateReadOnlyShards checks read-write conflicts on shards that have diff --git a/kv/sharded_coordinator_partition_test.go b/kv/sharded_coordinator_partition_test.go index 42fdd05a7..ae46301a1 100644 --- a/kv/sharded_coordinator_partition_test.go +++ b/kv/sharded_coordinator_partition_test.go @@ -195,6 +195,85 @@ func TestShardedCoordinator_DispatchSplitsMutationsByResolverGroup(t *testing.T) "mutations across groups via c.router.ResolveGroup.") } +// TestShardedCoordinator_TxnFailsClosedForUnresolvedReadKey pins +// the codex round-2 P1 fix on PR #715: a transaction whose read +// set contains a partitioned-shape key the resolver recognises +// but cannot route (queue missing from --sqsFifoPartitionMap +// during drift / partial rollout) MUST abort before prewrite. +// +// Pre-fix path: groupReadKeysByShardID silently skipped read keys +// where engineGroupIDForKey returned 0 (which is what the +// fail-closed resolver returns for recognised-but-unresolved +// keys). The resulting prewrite Raft entry carried an empty +// ReadKeys slice for that key, the FSM's read-write conflict +// validation never saw it, and a concurrent write could commit +// alongside the stale read — SSI violated. +// +// Post-fix path: groupReadKeysByShardID returns an error when +// any read key cannot be routed; dispatchTxn propagates the +// error and the transaction aborts. +func TestShardedCoordinator_TxnFailsClosedForUnresolvedReadKey(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + + g1 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + g42 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1, Store: store.NewMVCCStore()}, + 42: {Txn: g42, Store: store.NewMVCCStore()}, + }, 1, NewHLC(), nil) + + // Resolver claims the WRITE key but RECOGNISES the read key as + // partitioned-shape and returns ok=false (simulating the + // queue-missing-from-map drift scenario). The router pairs + // Recognised=true with ok=false to fail closed. + writeKey := []byte("!sqs|msg|data|p|claimed-write-key") + readKey := []byte("!sqs|msg|data|p|drift-unresolved-read") + coord.WithPartitionResolver(&stubResolver{ + claim: map[string]uint64{string(writeKey): 42}, + recognisedPrefix: []byte("!sqs|msg|data|p|"), + }) + + // Issue a transaction that reads the unresolvable partitioned + // key and writes the claimed key. The transaction MUST abort — + // pre-fix it would silently drop the read key and proceed. + resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + IsTxn: true, + StartTS: 100, + CommitTS: 200, + Elems: []*Elem[OP]{ + {Op: Put, Key: writeKey, Value: []byte("v")}, + }, + ReadKeys: [][]byte{readKey}, + }) + require.Error(t, err, + "transaction with unresolvable partitioned read key MUST "+ + "abort — silently dropping the key would let OCC "+ + "validation run with an incomplete read set and "+ + "break SSI") + require.Nil(t, resp) + + // Neither group should have received a prepare — the routing + // failure surfaces before any prewrite. + g1.mu.Lock() + g42.mu.Lock() + defer g1.mu.Unlock() + defer g42.mu.Unlock() + require.Zero(t, len(g1.requests), + "engine default group must not see a prewrite — fail-closed "+ + "happens before any RPC") + require.Zero(t, len(g42.requests), + "resolver-claimed group must not see a prewrite either — "+ + "fail-closed aborts the txn before prewrite even though "+ + "the write key is routable") +} + // TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys pins // the inverse: a key the resolver does NOT claim must continue to // route via the byte-range engine. Without this guard the resolver- diff --git a/kv/sharded_coordinator_txn_test.go b/kv/sharded_coordinator_txn_test.go index a75a3394c..6eaf567c3 100644 --- a/kv/sharded_coordinator_txn_test.go +++ b/kv/sharded_coordinator_txn_test.go @@ -309,7 +309,9 @@ func TestGroupReadKeysByShardID_NilReturnsNil(t *testing.T) { engine := distribution.NewEngine() engine.UpdateRoute([]byte(""), nil, 1) coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) - require.Nil(t, coord.groupReadKeysByShardID(nil)) + grouped, err := coord.groupReadKeysByShardID(nil) + require.NoError(t, err) + require.Nil(t, grouped) } func TestGroupReadKeysByShardID_EmptyReturnsNil(t *testing.T) { @@ -317,7 +319,9 @@ func TestGroupReadKeysByShardID_EmptyReturnsNil(t *testing.T) { engine := distribution.NewEngine() engine.UpdateRoute([]byte(""), nil, 1) coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) - require.Nil(t, coord.groupReadKeysByShardID([][]byte{})) + grouped, err := coord.groupReadKeysByShardID([][]byte{}) + require.NoError(t, err) + require.Nil(t, grouped) } func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) { @@ -327,11 +331,12 @@ func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) { engine.UpdateRoute([]byte("m"), nil, 2) coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}, 2: {}}, 1, NewHLC(), nil) - grouped := coord.groupReadKeysByShardID([][]byte{ + grouped, err := coord.groupReadKeysByShardID([][]byte{ []byte("b"), // shard 1 []byte("c"), // shard 1 []byte("x"), // shard 2 }) + require.NoError(t, err) require.Len(t, grouped, 2) require.Len(t, grouped[1], 2) require.Equal(t, []byte("b"), grouped[1][0]) @@ -340,20 +345,34 @@ func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) { require.Equal(t, []byte("x"), grouped[2][0]) } -func TestGroupReadKeysByShardID_SkipsUnroutableKeys(t *testing.T) { +// TestGroupReadKeysByShardID_FailsClosedOnUnroutable pins the +// codex round-2 P1 fix on PR #715: a read key the resolver cannot +// route (recognised-but-unresolved partition key during drift, or +// any key outside the engine's range cover) MUST surface as an +// error so the transaction aborts before any prewrite. Silently +// skipping unroutable keys would let OCC validation run with an +// incomplete read set and break SSI — a concurrent write to that +// key could commit alongside a stale read. +// +// This test was previously TestGroupReadKeysByShardID_SkipsUnroutableKeys +// and pinned the BUGGY skip-silently behaviour. Renamed and rewritten +// to pin the new fail-closed contract. +func TestGroupReadKeysByShardID_FailsClosedOnUnroutable(t *testing.T) { t.Parallel() // Only route "a"-"m" to shard 1. Keys outside this range are unroutable. engine := distribution.NewEngine() engine.UpdateRoute([]byte("a"), []byte("m"), 1) coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) - grouped := coord.groupReadKeysByShardID([][]byte{ + grouped, err := coord.groupReadKeysByShardID([][]byte{ []byte("b"), // routable → shard 1 - []byte("zzz"), // unroutable → skipped + []byte("zzz"), // unroutable → MUST surface as error }) - require.Len(t, grouped, 1) - require.Len(t, grouped[1], 1) - require.Equal(t, []byte("b"), grouped[1][0]) + require.Error(t, err, + "unroutable read key MUST fail closed — silently skipping "+ + "would drop the key from OCC validation and break SSI") + require.Nil(t, grouped) + require.ErrorIs(t, err, ErrInvalidRequest) } // ---------------------------------------------------------------------------