diff --git a/adapter/sqs_partition_resolver.go b/adapter/sqs_partition_resolver.go new file mode 100644 index 000000000..9e7ac02c1 --- /dev/null +++ b/adapter/sqs_partition_resolver.go @@ -0,0 +1,165 @@ +package adapter + +import ( + "bytes" + "encoding/binary" +) + +// SQSPartitionResolver maps a partitioned-SQS key to the operator- +// chosen Raft group for the (queue, partition) tuple. Implements +// kv.PartitionResolver via duck typing — see the integration in +// main.go where the resolver is installed on ShardedCoordinator. +// +// The byte-range engine cannot route partitioned queues because +// adding per-partition routes would break its non-overlapping-cover +// invariant (a partition route for partition K of one queue would +// leave a gap for legacy keys that fall lexicographically between +// partitions K and K+1). The resolver-first dispatch path avoids +// this — it answers only for keys that match a partitioned family +// prefix and otherwise lets the engine handle dispatch. +type SQSPartitionResolver struct { + routes map[string][]uint64 +} + +// NewSQSPartitionResolver builds a resolver from the operator- +// supplied partition map. routes[queue][k] is the Raft group ID +// that owns partition k of queue, with len(routes[queue]) equal to +// the queue's PartitionCount. +// +// Returns nil when routes is empty so callers can keep the resolver +// out of the request path entirely on a non-partitioned cluster +// (kv.ShardRouter.WithPartitionResolver(nil) is a documented no-op). +// +// The constructor takes a defensive copy so a later caller mutation +// to the input map does not leak into the resolver's view at +// runtime. +func NewSQSPartitionResolver(routes map[string][]uint64) *SQSPartitionResolver { + if len(routes) == 0 { + return nil + } + cp := make(map[string][]uint64, len(routes)) + for queue, groups := range routes { + ids := make([]uint64, len(groups)) + copy(ids, groups) + cp[queue] = ids + } + return &SQSPartitionResolver{routes: cp} +} + +// sqsResolverFamilyPrefixes is the set of partitioned-SQS family +// prefixes ResolveGroup recognises. Pre-converted to []byte so the +// hot-path bytes.HasPrefix call avoids an allocation per check +// (gemini medium on PR #715). Kept package-internal so any future +// renamed prefix touches both this list and the constant +// declaration in sqs_keys.go — TestSQSPartitionResolver_PrefixesAlign +// pins the alignment. +var sqsResolverFamilyPrefixes = [][]byte{ + []byte(SqsPartitionedMsgDataPrefix), + []byte(SqsPartitionedMsgVisPrefix), + []byte(SqsPartitionedMsgDedupPrefix), + []byte(SqsPartitionedMsgGroupPrefix), + []byte(SqsPartitionedMsgByAgePrefix), +} + +// ResolveGroup decodes the (queue, partition) embedded in a +// partitioned-SQS key and returns the operator-chosen Raft group. +// +// Returns (0, false) for any key that does not match a partitioned +// family prefix (legacy SQS, KV, S3, DynamoDB, queue-meta records, +// …) so kv.ShardRouter falls through to its byte-range engine for +// default routing. +// +// Returns (0, false) for a partitioned-shaped key whose queue is +// not in the routes map or whose partition index is beyond +// len(routes[queue]). The router pairs this with +// RecognisesPartitionedKey to fail closed instead of falling +// through — silently routing through the engine's +// !sqs|route|global default would mis-route HT-FIFO traffic during +// partition-map drift (codex P1 round 2 on PR #715). +func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool) { + if r == nil || len(key) == 0 { + return 0, false + } + queue, partition, ok := parsePartitionedSQSKey(key) + if !ok { + return 0, false + } + groups, found := r.routes[queue] + if !found { + return 0, false + } + // Defensive: a partition value outside the slice is a config / + // upstream-bug signal, not a routable key. Returning false + // surfaces it as "no route" at the router boundary, which is + // the correct fail-closed behaviour. + if uint64(partition) >= uint64(len(groups)) { + return 0, false + } + return groups[partition], true +} + +// RecognisesPartitionedKey reports whether key has the structural +// shape of a partitioned-SQS key — i.e. starts with one of the +// partitioned family prefixes. The check is PREFIX-ONLY, not a +// full parse: a key with a partitioned prefix followed by a +// malformed queue / partition segment still answers true, so the +// router fails closed via kv.PartitionResolver semantics instead +// of falling through to the engine and silently routing to the +// SQS catalog default group via routeKey's !sqs|route|global +// collapse (round 5 review nit on PR #715). +// +// A nil receiver returns false so kv.ShardRouter's typed-nil case +// (ResolveGroup(nil) == (0, false)) pairs with an honest "I don't +// recognise anything" answer instead of falsely claiming a shape. +func (r *SQSPartitionResolver) RecognisesPartitionedKey(key []byte) bool { + if r == nil || len(key) == 0 { + return false + } + _, ok := stripPartitionedFamilyPrefix(key) + return ok +} + +// parsePartitionedSQSKey extracts the (queue, partition) pair from +// a partitioned-SQS key. Returns ok=false for any key that does not +// match a partitioned family prefix or that has a malformed queue / +// partition segment. Exposed at package-internal scope so the +// adapter's reaper / fanout reader can share the same parser +// (Phase 3.D PR 5). +func parsePartitionedSQSKey(key []byte) (string, uint32, bool) { + rest, matched := stripPartitionedFamilyPrefix(key) + if !matched { + return "", 0, false + } + // After the family prefix, the variable-length encoded queue + // segment is terminated by '|' (sqsPartitionedQueueTerminator). + // base64.RawURLEncoding never emits '|', so the first '|' in + // rest is unambiguously the queue terminator. + pipeIdx := bytes.IndexByte(rest, sqsPartitionedQueueTerminator) + if pipeIdx <= 0 { + return "", 0, false + } + encQueue := rest[:pipeIdx] + rest = rest[pipeIdx+1:] + const partitionLen = 4 + if len(rest) < partitionLen { + return "", 0, false + } + partition := binary.BigEndian.Uint32(rest[:partitionLen]) + queue, err := decodeSQSSegment(string(encQueue)) + if err != nil { + return "", 0, false + } + return queue, partition, true +} + +// stripPartitionedFamilyPrefix returns the bytes after the matched +// family prefix. matched=false if key has none of the known +// partitioned family prefixes. +func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) { + for _, prefix := range sqsResolverFamilyPrefixes { + if bytes.HasPrefix(key, prefix) { + return key[len(prefix):], true + } + } + return nil, false +} diff --git a/adapter/sqs_partition_resolver_test.go b/adapter/sqs_partition_resolver_test.go new file mode 100644 index 000000000..d9cb9fe23 --- /dev/null +++ b/adapter/sqs_partition_resolver_test.go @@ -0,0 +1,357 @@ +package adapter + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestNewSQSPartitionResolver_NilOnEmpty pins the constructor's +// nil-on-empty contract — kv.ShardRouter.WithPartitionResolver(nil) +// is documented as a no-op, so a non-partitioned cluster MUST end +// up with no resolver in the request hot path. +func TestNewSQSPartitionResolver_NilOnEmpty(t *testing.T) { + t.Parallel() + require.Nil(t, NewSQSPartitionResolver(nil)) + require.Nil(t, NewSQSPartitionResolver(map[string][]uint64{})) +} + +// TestNewSQSPartitionResolver_DefensiveCopy pins that mutating the +// caller's input map after construction does NOT change the +// resolver's view. Without this, a hot-reload of --sqsFifoPartitionMap +// would partially alter the resolver mid-request and produce +// transient mis-routes. +func TestNewSQSPartitionResolver_DefensiveCopy(t *testing.T) { + t.Parallel() + input := map[string][]uint64{"q.fifo": {10, 11}} + r := NewSQSPartitionResolver(input) + // Mutate after construction. + input["q.fifo"][0] = 999 + delete(input, "q.fifo") + + key := sqsPartitionedMsgDataKey("q.fifo", 0, 1, "msg") + gid, ok := r.ResolveGroup(key) + require.True(t, ok) + require.Equal(t, uint64(10), gid, + "resolver must keep its constructor-time view; "+ + "caller mutation cannot leak in") +} + +// TestSQSPartitionResolver_ResolveByPartition pins the core dispatch +// contract: a key produced by sqsPartitionedMsg*Key for (queue, +// partition) resolves to the operator-chosen group for that exact +// partition. +func TestSQSPartitionResolver_ResolveByPartition(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "orders.fifo": {10, 11, 12, 13}, + "events.fifo": {20, 21}, + }) + cases := []struct { + name string + queue string + partition uint32 + wantGroup uint64 + }{ + {"orders p0", "orders.fifo", 0, 10}, + {"orders p1", "orders.fifo", 1, 11}, + {"orders p3", "orders.fifo", 3, 13}, + {"events p0", "events.fifo", 0, 20}, + {"events p1", "events.fifo", 1, 21}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + // Every family must resolve to the same group for the + // same (queue, partition) — the reaper / fanout reader + // / send path all depend on this invariant. + for familyName, key := range map[string][]byte{ + "data": sqsPartitionedMsgDataKey(tc.queue, tc.partition, 1, "msg-id"), + "vis": sqsPartitionedMsgVisKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"), + "dedup": sqsPartitionedMsgDedupKey(tc.queue, tc.partition, 1, "dedup-id"), + "group": sqsPartitionedMsgGroupKey(tc.queue, tc.partition, 1, "group-id"), + "byage": sqsPartitionedMsgByAgeKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"), + } { + gid, ok := r.ResolveGroup(key) + require.True(t, ok, "family %s key for (%s, p%d) must resolve", + familyName, tc.queue, tc.partition) + require.Equal(t, tc.wantGroup, gid, + "family %s key for (%s, p%d) resolved to wrong group", + familyName, tc.queue, tc.partition) + } + }) + } +} + +// TestSQSPartitionResolver_QueueIsolation pins the queue-name +// terminator contract from the routing layer's perspective. Two +// queues with overlapping encoded prefixes (base64("queue") is a +// strict prefix of base64("queue1")) MUST resolve to their own +// groups, not each other's. The '|' terminator after the queue +// segment is what makes this safe — same invariant +// TestSqsPartitionedMsgKeys_QueueNamePrefixIsolation pins for the +// keyspace itself. +func TestSQSPartitionResolver_QueueIsolation(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "queue": {500}, + "queue1": {600}, + }) + queueKey := sqsPartitionedMsgDataKey("queue", 0, 1, "id") + queue1Key := sqsPartitionedMsgDataKey("queue1", 0, 1, "id") + + queueGID, ok := r.ResolveGroup(queueKey) + require.True(t, ok) + require.Equal(t, uint64(500), queueGID) + + queue1GID, ok := r.ResolveGroup(queue1Key) + require.True(t, ok) + require.Equal(t, uint64(600), queue1GID, + "queue1 keys must NOT resolve to queue's group; "+ + "the '|' terminator after the queue segment is what makes this safe") +} + +// TestSQSPartitionResolver_LegacyKeyFallsThrough pins that a non- +// partitioned (legacy) SQS key returns (0, false) — the +// kv.ShardRouter caller then falls through to the byte-range +// engine for default routing. A regression here would route every +// legacy queue's traffic to whichever group the resolver guessed +// from its partition map. +func TestSQSPartitionResolver_LegacyKeyFallsThrough(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "orders.fifo": {10, 11}, + }) + cases := []struct { + name string + key []byte + }{ + {"legacy data", sqsMsgDataKey("orders.fifo", 1, "id")}, + {"legacy vis", sqsMsgVisKey("orders.fifo", 1, 1700000000000, "id")}, + {"legacy dedup", sqsMsgDedupKey("orders.fifo", 1, "dedup")}, + {"legacy group", sqsMsgGroupKey("orders.fifo", 1, "group")}, + {"legacy byage", sqsMsgByAgeKey("orders.fifo", 1, 1700000000000, "id")}, + {"queue meta", sqsQueueMetaKey("orders.fifo")}, + {"non-sqs key", []byte("/some/other/key")}, + {"empty", []byte{}}, + {"nil", nil}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + gid, ok := r.ResolveGroup(tc.key) + require.False(t, ok, "non-partitioned key must NOT resolve") + require.Equal(t, uint64(0), gid) + }) + } +} + +// TestSQSPartitionResolver_UnknownQueueRecognisedButUnresolved +// pins the round-5 fail-closed contract: a well-formed partitioned +// key for a queue that is NOT in the partition map returns +// ResolveGroup=(0, false) AND RecognisesPartitionedKey=true. The +// router pairs these so the request fails closed — engine +// fall-through would silently route the misconfiguration through +// the SQS catalog default group (codex round-2 P1 on PR #715). +func TestSQSPartitionResolver_UnknownQueueRecognisedButUnresolved(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "known.fifo": {10, 11}, + }) + key := sqsPartitionedMsgDataKey("unknown.fifo", 0, 1, "id") + gid, ok := r.ResolveGroup(key) + require.False(t, ok) + require.Equal(t, uint64(0), gid) + require.True(t, r.RecognisesPartitionedKey(key), + "unknown-queue partitioned key must still be recognised — "+ + "the router pairs Recognised=true with ok=false to fail "+ + "closed instead of falling through to the engine") +} + +// TestSQSPartitionResolver_OutOfRangePartitionRecognisedButUnresolved +// pins the round-5 fail-closed contract for a partition value +// beyond the configured PartitionCount. Same router-side +// fail-closed argument as the unknown-queue case. +func TestSQSPartitionResolver_OutOfRangePartitionRecognisedButUnresolved(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "q.fifo": {10, 11}, + }) + // PartitionCount in resolver is 2 (partitions 0 and 1); craft a + // key for partition 5 to simulate a writer at a different + // partition count. + key := sqsPartitionedMsgDataKey("q.fifo", 5, 1, "id") + gid, ok := r.ResolveGroup(key) + require.False(t, ok) + require.Equal(t, uint64(0), gid) + require.True(t, r.RecognisesPartitionedKey(key), + "OOR-partition key must still be recognised — the router "+ + "pairs Recognised=true with ok=false to fail closed "+ + "instead of falling through to the engine") +} + +// TestSQSPartitionResolver_NilReceiverIsSafe pins defensive +// behaviour — a nil resolver pointer must not panic on +// ResolveGroup. kv.ShardRouter's resolver-first dispatch checks +// `if s.partitionResolver != nil` before calling, but a typed-nil +// can still slip through interface assertions. +func TestSQSPartitionResolver_NilReceiverIsSafe(t *testing.T) { + t.Parallel() + var r *SQSPartitionResolver + gid, ok := r.ResolveGroup([]byte("any-key")) + require.False(t, ok) + require.Equal(t, uint64(0), gid) +} + +// TestSQSPartitionResolver_RecognisesPartitionedKey pins the +// shape-only predicate the router uses to decide between +// fall-through and fail-closed (codex round-2 P1 on PR #715). +// RecognisesPartitionedKey MUST answer purely on the structural +// shape — partitioned family prefix + queue + '|' terminator + +// be32 partition — independent of whether the queue is in the +// routes map. Otherwise the router could not reliably fail-closed +// for unresolved-but-recognised keys. +func TestSQSPartitionResolver_RecognisesPartitionedKey(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "known.fifo": {10, 11}, + }) + cases := []struct { + name string + key []byte + want bool + }{ + { + name: "data family + known queue", + key: sqsPartitionedMsgDataKey("known.fifo", 0, 1, "id"), + want: true, + }, + { + name: "vis family + unknown queue (recognised, unresolved)", + key: sqsPartitionedMsgVisKey("not-in-routes.fifo", 0, 1, 1, "id"), + want: true, + }, + { + name: "byage family + OOR partition (recognised, unresolved)", + key: sqsPartitionedMsgByAgeKey("known.fifo", 99, 1, 1, "id"), + want: true, + }, + { + name: "legacy SQS key — not partitioned", + key: sqsMsgDataKey("known.fifo", 1, "id"), + want: false, + }, + { + name: "non-SQS key", + key: []byte("/foo/bar"), + want: false, + }, + { + name: "queue meta key", + key: sqsQueueMetaKey("known.fifo"), + want: false, + }, + { + name: "empty", + key: []byte{}, + want: false, + }, + { + name: "nil", + key: nil, + want: false, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.Equal(t, tc.want, r.RecognisesPartitionedKey(tc.key)) + }) + } +} + +// TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver +// pins the typed-nil-safe branch — same as ResolveGroup, but for +// the new predicate. +func TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver(t *testing.T) { + t.Parallel() + var r *SQSPartitionResolver + require.False(t, r.RecognisesPartitionedKey([]byte("any-key"))) +} + +// TestSQSPartitionResolver_RecognisesMalformedPartitionedKey pins +// the prefix-only check (round 5 nit on PR #715): a key with a +// valid partitioned family prefix but a corrupt / truncated queue +// or partition segment MUST still be recognised so the router +// fails closed rather than falling through to engine routing +// (which would silently mis-route to the SQS catalog default +// group via routeKey's !sqs|route|global collapse). +func TestSQSPartitionResolver_RecognisesMalformedPartitionedKey(t *testing.T) { + t.Parallel() + r := NewSQSPartitionResolver(map[string][]uint64{ + "q.fifo": {10, 11}, + }) + cases := []struct { + name string + key []byte + }{ + { + // Prefix only, nothing after — parsePartitionedSQSKey + // would fail on the missing '|' terminator, but the + // shape IS recognised. + name: "prefix only", + key: []byte(SqsPartitionedMsgDataPrefix), + }, + { + // Prefix + base64 garbage that decodes invalidly. + // parsePartitionedSQSKey would fail at decodeSQSSegment. + name: "prefix + invalid base64", + key: []byte(SqsPartitionedMsgDataPrefix + "!!!|"), + }, + { + // Prefix + valid queue segment + '|' but no partition + // bytes (truncated). + name: "prefix + queue + truncated partition", + key: []byte(SqsPartitionedMsgVisPrefix + encodeSQSSegment("q.fifo") + "|"), + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.True(t, r.RecognisesPartitionedKey(tc.key), + "malformed partitioned key MUST be recognised so "+ + "the router fails closed rather than mis-routing "+ + "through the engine's SQS catalog default") + gid, ok := r.ResolveGroup(tc.key) + require.False(t, ok, + "malformed key cannot resolve") + require.Equal(t, uint64(0), gid) + }) + } +} + +// TestSQSPartitionResolver_PrefixesAlign pins that the resolver's +// family-prefix list matches the constants in sqs_keys.go exactly. +// A future renamed prefix or added family that touches sqs_keys.go +// without also touching sqsResolverFamilyPrefixes would silently +// stop resolving keys for the new family. +func TestSQSPartitionResolver_PrefixesAlign(t *testing.T) { + t.Parallel() + want := [][]byte{ + []byte(SqsPartitionedMsgDataPrefix), + []byte(SqsPartitionedMsgVisPrefix), + []byte(SqsPartitionedMsgDedupPrefix), + []byte(SqsPartitionedMsgGroupPrefix), + []byte(SqsPartitionedMsgByAgePrefix), + } + require.Equal(t, want, sqsResolverFamilyPrefixes, + "sqsResolverFamilyPrefixes must mirror the constants in "+ + "sqs_keys.go — a new partitioned family added there must "+ + "be added here too, or the resolver silently stops "+ + "matching keys in that family") + for _, p := range sqsResolverFamilyPrefixes { + require.True(t, strings.HasSuffix(string(p), "p|"), + "every partitioned prefix must end with the p| discriminator") + } +} diff --git a/kv/shard_router.go b/kv/shard_router.go index 0b99f1673..deaba662a 100644 --- a/kv/shard_router.go +++ b/kv/shard_router.go @@ -10,6 +10,41 @@ import ( "github.com/cockroachdb/errors" ) +// PartitionResolver maps a key to its owning Raft group when the +// key belongs to a partition-scheme keyspace (e.g. SQS HT-FIFO, +// where each (queue, partition) pair lives on a different group). +// ShardRouter consults the resolver before falling through to the +// byte-range engine, so partition routing can override the default +// shard-range layout without breaking the engine's non-overlapping- +// cover invariant. +// +// Implementations must be safe for concurrent use — ResolveGroup is +// called on the request hot path. Returning (0, false) for a key +// the resolver does not recognise lets the router fall through to +// the engine. Returning (0, false) for a key the resolver DOES +// recognise (the partitioned shape matches but the queue is not +// in the map, or the partition is out of range) is also valid; +// the router uses RecognisesPartitionedKey to distinguish "not +// partitioned, fall through" from "partitioned but unresolved, +// fail closed". +// +// The fail-closed split matters under partition-map drift / a +// partial rollout: without it, an unresolved partitioned key +// would silently land on the engine's SQS-catalog default group +// (because routeKey normalises every !sqs|... key to +// !sqs|route|global) instead of surfacing a routing error. +type PartitionResolver interface { + ResolveGroup(key []byte) (uint64, bool) + + // RecognisesPartitionedKey reports whether the key SHAPE is + // one this resolver is responsible for. Implementations + // answer based on prefix / structural inspection only — the + // answer must NOT depend on any in-memory mapping that could + // drift out of sync, otherwise the router cannot reliably + // fail-closed for unresolved-but-recognised keys. + RecognisesPartitionedKey(key []byte) bool +} + // ShardRouter routes requests to multiple raft groups based on key ranges. // // Cross-shard transactions are not supported. They require distributed @@ -17,9 +52,10 @@ import ( // // Non-transactional request batches may still partially succeed across shards. type ShardRouter struct { - engine *distribution.Engine - mu sync.RWMutex - groups map[uint64]*routerGroup + engine *distribution.Engine + partitionResolver PartitionResolver + mu sync.RWMutex + groups map[uint64]*routerGroup } var ErrCrossShardTransactionNotSupported = errors.New("cross-shard transactions are not supported") @@ -37,6 +73,78 @@ func NewShardRouter(e *distribution.Engine) *ShardRouter { } } +// WithPartitionResolver installs a partition-keyspace resolver that +// is consulted before the byte-range engine on every dispatch. A +// nil resolver clears any previously-installed resolver. Returns +// the receiver so callers can chain. +// +// Intended for use during startup, before the router begins handling +// requests. Interface assignment in Go is not atomic, so a call that +// races with a concurrent ResolveGroup in resolveGroup may produce a +// torn read; callers must wire the resolver once during construction +// (parseRuntimeConfig → NewShardedCoordinator → WithPartitionResolver) +// and treat any post-startup re-assignment as undefined behaviour. +func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter { + s.partitionResolver = r + return s +} + +// ResolveGroup tries the partition resolver first (when installed), +// then falls through to the byte-range engine. Exposed at package +// scope so ShardedCoordinator's per-key helpers (groupForKey, +// routeAndGroupForKey, engineGroupIDForKey, groupMutations) can +// consult the same dispatch path Commit / Abort / Get use — without +// it those helpers would bypass the resolver and partitioned-FIFO +// traffic would silently mis-route through 2PC and the read paths. +// +// The resolver runs on the RAW key before any user-key +// normalization. SQS keys in particular are collapsed to +// !sqs|route|global by routeKey to keep the engine's per-shard +// layout simple, but that collapse hides the partitioned-prefix +// information the resolver needs (issue: codex P1 / gemini high on +// PR #715). The engine still sees the post-normalization key, so +// legacy routing (catalog → !sqs|route|global → default group) +// stays unchanged. +// +// Fail-closed for recognised-but-unresolved keys: when the resolver +// recognises a partitioned shape (RecognisesPartitionedKey == true) +// but cannot resolve the queue/partition pair (ResolveGroup returns +// ok=false), the router refuses to fall through to the engine. +// Otherwise the engine would route the partitioned key to +// !sqs|route|global's default group, silently mis-routing HT-FIFO +// traffic during partition-map drift or partial rollout (codex P1 +// round 2 on PR #715). +// +// Returns (0, false) when neither the resolver nor the engine +// recognises the key. Caller surfaces this as an "unknown group" +// error so a partitioned-prefix key whose queue is missing from the +// resolver map fails closed rather than landing on whichever +// engine-default group happens to cover the raw bytes. +func (s *ShardRouter) ResolveGroup(rawKey []byte) (uint64, bool) { + if s.partitionResolver != nil { + if gid, ok := s.partitionResolver.ResolveGroup(rawKey); ok { + return gid, true + } + if s.partitionResolver.RecognisesPartitionedKey(rawKey) { + // Partitioned shape, but the resolver cannot map it + // (unknown queue, out-of-range partition). Fail closed + // — the engine's catalog-level default route would + // silently misroute this through routeKey's + // !sqs|route|global collapse. + return 0, false + } + } + // Engine routes against the user-key view of the byte-range + // space; routeKey may rewrite SQS / DynamoDB / Redis-internal + // keys to a stable per-table or per-namespace route key so the + // engine sees one route per logical entity. + route, ok := s.engine.GetRoute(routeKey(rawKey)) + if !ok { + return 0, false + } + return route.GroupID, true +} + // Register associates a raft group ID with its transactional manager and store. func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore) { s.mu.Lock() @@ -123,28 +231,28 @@ func (s *ShardRouter) groupRequests(reqs []*pb.Request) (map[uint64][]*pb.Reques if len(r.Mutations) == 0 || r.Mutations[0] == nil { return nil, ErrInvalidRequest } - key := routeKey(r.Mutations[0].Key) - if len(key) == 0 { + rawKey := r.Mutations[0].Key + if len(rawKey) == 0 { return nil, ErrInvalidRequest } - route, ok := s.engine.GetRoute(key) + gid, ok := s.ResolveGroup(rawKey) if !ok { - return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", key) + return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", rawKey) } - batches[route.GroupID] = append(batches[route.GroupID], r) + batches[gid] = append(batches[gid], r) } return batches, nil } // Get retrieves a key routed to the correct shard. func (s *ShardRouter) Get(ctx context.Context, key []byte) ([]byte, error) { - route, ok := s.engine.GetRoute(routeKey(key)) + gid, ok := s.ResolveGroup(key) if !ok { return nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", key) } - g, ok := s.getGroup(route.GroupID) + g, ok := s.getGroup(gid) if !ok { - return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", route.GroupID) + return nil, errors.Wrapf(ErrInvalidRequest, "unknown group %d", gid) } v, err := g.store.GetAt(ctx, key, ^uint64(0)) if err != nil { diff --git a/kv/shard_router_partition_test.go b/kv/shard_router_partition_test.go new file mode 100644 index 000000000..c5fd86fab --- /dev/null +++ b/kv/shard_router_partition_test.go @@ -0,0 +1,319 @@ +package kv + +import ( + "bytes" + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/bootjp/elastickv/distribution" + pb "github.com/bootjp/elastickv/proto" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// fakePartitionResolver is a hand-rolled PartitionResolver for the +// router-side tests — keeps the tests free of any adapter-package +// dependency so the routing contract is verified at the kv layer in +// isolation. +// +// recognisedPrefix lets a test simulate the "recognised but +// unresolved" failure mode (codex round-2 P1 on PR #715): keys +// that start with this prefix and miss the routes map cause +// ResolveGroup to return (0, false) AND +// RecognisesPartitionedKey to return true, so the router fails +// closed instead of falling through to the engine. +type fakePartitionResolver struct { + routes map[string]uint64 + recognisedPrefix []byte +} + +func (f *fakePartitionResolver) ResolveGroup(key []byte) (uint64, bool) { + gid, ok := f.routes[string(key)] + return gid, ok +} + +func (f *fakePartitionResolver) RecognisesPartitionedKey(key []byte) bool { + if len(f.recognisedPrefix) == 0 { + return false + } + return len(key) >= len(f.recognisedPrefix) && + bytes.HasPrefix(key, f.recognisedPrefix) +} + +// TestShardRouter_PartitionResolverWins pins that when the resolver +// claims a key, the router dispatches to the resolver's group even +// if the byte-range engine would have routed elsewhere. This is the +// whole point of the resolver — overlay routing on top of the +// existing non-overlapping cover model. +func TestShardRouter_PartitionResolverWins(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 1) // engine routes everything to group 1 + + router := NewShardRouter(e) + router.WithPartitionResolver(&fakePartitionResolver{ + routes: map[string]uint64{"resolver-key": 42}, + }) + + // Per-test sink so a parallel sibling test cannot perturb the + // invariant we are checking. Each fakeTxn writes its own id + // into this slot on Commit; the post-condition reads it back. + var sink atomic.Uint64 + s1 := store.NewMVCCStore() + s42 := store.NewMVCCStore() + router.Register(1, &fakeTxn{id: 1, sink: &sink}, s1) + router.Register(42, &fakeTxn{id: 42, sink: &sink}, s42) + + reqs := []*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("resolver-key"), Value: []byte("v")}}}, + } + resp, err := router.Commit(reqs) + require.NoError(t, err) + require.NotNil(t, resp) + // Verify: the request landed on group 42's fake txn, not 1's. + require.Equal(t, uint64(42), sink.Load()) +} + +// TestShardRouter_PartitionResolverFallsThrough pins that when the +// resolver returns (0, false), dispatch falls through to the byte- +// range engine. Without this, the resolver would have to know +// every key in the cluster — which would defeat the overlay +// pattern's purpose (let the resolver answer only for partitioned- +// keyspace keys). +func TestShardRouter_PartitionResolverFallsThrough(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte("a"), []byte("m"), 1) + e.UpdateRoute([]byte("m"), nil, 2) + + router := NewShardRouter(e) + // Resolver only knows about "resolver-only-key"; everything + // else falls through to the engine. + router.WithPartitionResolver(&fakePartitionResolver{ + routes: map[string]uint64{"resolver-only-key": 99}, + }) + + var sink atomic.Uint64 + s1 := store.NewMVCCStore() + s2 := store.NewMVCCStore() + router.Register(1, &fakeTxn{id: 1, sink: &sink}, s1) + router.Register(2, &fakeTxn{id: 2, sink: &sink}, s2) + + // "b" is in the engine's [a, m) range → group 1. + resp1, err1 := router.Commit([]*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("b"), Value: []byte("v")}}}, + }) + require.NoError(t, err1) + require.NotNil(t, resp1) + require.Equal(t, uint64(1), sink.Load(), + "engine [a,m) range must route to group 1") + + // "x" is in the engine's [m, ∞) range → group 2. + resp2, err2 := router.Commit([]*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("x"), Value: []byte("v")}}}, + }) + require.NoError(t, err2) + require.NotNil(t, resp2) + require.Equal(t, uint64(2), sink.Load(), + "engine [m,∞) range must route to group 2") +} + +// TestShardRouter_NilPartitionResolverIsNoOp pins that +// WithPartitionResolver(nil) leaves the router behaving exactly as +// the legacy engine-only dispatcher. This is the documented "no +// partition layer" path that a non-partitioned cluster takes. +func TestShardRouter_NilPartitionResolverIsNoOp(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 7) + + router := NewShardRouter(e) + router.WithPartitionResolver(nil) + + var sink atomic.Uint64 + s7 := store.NewMVCCStore() + router.Register(7, &fakeTxn{id: 7, sink: &sink}, s7) + + // With no resolver installed, the engine's default route owns + // the request — group 7 dispatches. + resp, err := router.Commit([]*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: []byte("any"), Value: []byte("v")}}}, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, uint64(7), sink.Load(), + "nil resolver must leave the engine in charge") +} + +// TestShardRouter_ResolverSeesRawKeyNotNormalized pins the codex-P1 +// fix on PR #715: resolveGroup MUST consult the resolver with the +// pre-normalization (raw) key, not the post-normalization key. SQS +// keys in particular collapse to !sqs|route|global via routeKey, so +// a resolver that only saw the normalized form would never match +// any partitioned-prefix key — the resolver would be a no-op for +// every Commit/Abort/Get on a partitioned-FIFO queue. +// +// The fake resolver here records every key it was asked about; the +// post-condition asserts the recorded key is the raw key the caller +// supplied, not whatever routeKey normalized it to. +func TestShardRouter_ResolverSeesRawKeyNotNormalized(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 1) + + router := NewShardRouter(e) + + rawKey := []byte("!sqs|msg|data|p|raw-test-key") + resolver := &recordingResolver{match: rawKey, gid: 42} + router.WithPartitionResolver(resolver) + + var sink atomic.Uint64 + router.Register(1, &fakeTxn{id: 1, sink: &sink}, store.NewMVCCStore()) + router.Register(42, &fakeTxn{id: 42, sink: &sink}, store.NewMVCCStore()) + + resp, err := router.Commit([]*pb.Request{ + {IsTxn: false, Mutations: []*pb.Mutation{{Op: pb.Op_PUT, Key: rawKey, Value: []byte("v")}}}, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Equal(t, uint64(42), sink.Load(), + "resolver must claim the raw partitioned-prefix key; "+ + "if the router collapsed it via routeKey first, this "+ + "would route to the engine's default group (1)") + + // The resolver MUST have been called with the raw key, not the + // !sqs|route|global collapse. This is the codex P1 invariant. + seen := resolver.seenKeys() + require.Len(t, seen, 1) + require.Equal(t, rawKey, seen[0], + "resolver received normalized key — routeKey ran before "+ + "resolver, contrary to the §3.D PR 4-B-2 design") +} + +// TestShardRouter_FailClosedOnRecognisedButUnresolved pins the +// codex round-2 P1 fix on PR #715: a key the resolver recognises +// (partitioned shape) but cannot resolve (unknown queue / OOR +// partition) must NOT fall through to the engine. Without the +// fail-closed branch, the engine's routeKey-collapsed +// !sqs|route|global default would silently swallow the misroute +// during partition-map drift / partial rollout. +func TestShardRouter_FailClosedOnRecognisedButUnresolved(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 1) // engine would route everything to group 1 + + router := NewShardRouter(e) + // Resolver claims nothing but RECOGNISES the partitioned shape + // for any key starting with "!sqs|msg|data|p|". + router.WithPartitionResolver(&fakePartitionResolver{ + routes: map[string]uint64{}, + recognisedPrefix: []byte("!sqs|msg|data|p|"), + }) + + // A partitioned-shaped key the resolver recognises but cannot + // resolve. Without fail-closed this would silently route to + // group 1 via the engine's default. + _, ok := router.ResolveGroup([]byte("!sqs|msg|data|p|unknown-queue")) + require.False(t, ok, + "recognised-but-unresolved partitioned key must fail closed; "+ + "engine fall-through would silently mis-route HT-FIFO "+ + "traffic to the !sqs|route|global default group during "+ + "partition-map drift") + + // A non-partitioned key is NOT recognised → must still fall + // through to the engine. This pins that fail-closed only fires + // for the recognised-but-unresolved case. + gid, ok := router.ResolveGroup([]byte("legacy-key")) + require.True(t, ok) + require.Equal(t, uint64(1), gid, + "non-recognised key must fall through to engine default") +} + +// TestShardRouter_GetUsesResolver pins that the resolver-first path +// applies to Get as well as Commit/Abort. A regression that fixed +// only Commit's path would silently route reads through the engine +// even after the resolver claimed the key. +func TestShardRouter_GetUsesResolver(t *testing.T) { + t.Parallel() + e := distribution.NewEngine() + e.UpdateRoute([]byte(""), nil, 1) + + router := NewShardRouter(e) + router.WithPartitionResolver(&fakePartitionResolver{ + routes: map[string]uint64{"resolver-key": 42}, + }) + + var sink atomic.Uint64 + s1 := store.NewMVCCStore() + s42 := store.NewMVCCStore() + // Seed group 42 only — if Get falls through to the engine + // (which would route to group 1), the test fails because + // group 1's store is empty. + require.NoError(t, s42.PutAt(context.Background(), []byte("resolver-key"), []byte("v"), 1, 0)) + router.Register(1, &fakeTxn{id: 1, sink: &sink}, s1) + router.Register(42, &fakeTxn{id: 42, sink: &sink}, s42) + + v, err := router.Get(context.Background(), []byte("resolver-key")) + require.NoError(t, err) + require.Equal(t, []byte("v"), v) +} + +// recordingResolver is a PartitionResolver that records every key +// it was asked about and returns gid for keys equal to match. The +// recorded list is the test's evidence that the router consulted +// the resolver with the raw (pre-routeKey-normalisation) key. +type recordingResolver struct { + match []byte + gid uint64 + mu sync.Mutex + seen [][]byte +} + +func (r *recordingResolver) ResolveGroup(key []byte) (uint64, bool) { + r.mu.Lock() + defer r.mu.Unlock() + keyCopy := append([]byte(nil), key...) + r.seen = append(r.seen, keyCopy) + if bytes.Equal(key, r.match) { + return r.gid, true + } + return 0, false +} + +// RecognisesPartitionedKey returns false unconditionally — these +// router-side tests use the resolver-wins / fall-through paths +// only; the recognised-but-unresolved path is exercised separately +// via fakePartitionResolver.recognisedPrefix. +func (r *recordingResolver) RecognisesPartitionedKey([]byte) bool { + return false +} + +func (r *recordingResolver) seenKeys() [][]byte { + r.mu.Lock() + defer r.mu.Unlock() + out := make([][]byte, len(r.seen)) + copy(out, r.seen) + return out +} + +// fakeTxn is a Transactional double that records its own id into a +// caller-provided sink whenever Commit lands on it. Using a per- +// test sink (rather than a package-level variable) keeps parallel +// tests from clobbering each other's observations. +type fakeTxn struct { + id uint64 + sink *atomic.Uint64 +} + +func (f *fakeTxn) Commit(reqs []*pb.Request) (*TransactionResponse, error) { + if f.sink != nil { + f.sink.Store(f.id) + } + return &TransactionResponse{CommitIndex: 1}, nil +} + +func (f *fakeTxn) Abort(reqs []*pb.Request) (*TransactionResponse, error) { + return &TransactionResponse{}, nil +} diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index cc19bece0..54c24115d 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -171,6 +171,21 @@ func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator { return c } +// WithPartitionResolver wires a PartitionResolver onto the +// coordinator's underlying ShardRouter. The resolver runs before +// the byte-range engine on every dispatch, so partition-keyspace +// schemes (e.g. SQS HT-FIFO) can override the default shard layout +// without breaking the engine's non-overlapping-cover invariant. +// +// Applied after construction for the same reason as the other +// With* options on this type — NewShardedCoordinator is already +// heavily overloaded. Passing a nil resolver clears any previously- +// installed resolver. +func (c *ShardedCoordinator) WithPartitionResolver(r PartitionResolver) *ShardedCoordinator { + c.router.WithPartitionResolver(r) + return c +} + // NewShardedCoordinator builds a coordinator for the provided shard groups. // The defaultGroup is used for non-keyed leader checks. func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator { @@ -380,7 +395,13 @@ func (c *ShardedCoordinator) dispatchTxn(ctx context.Context, startTS uint64, co // Multi-shard path: group read keys by shard now. The result is passed // directly to prewriteTxn to avoid a second iteration inside that function. - groupedReadKeys := c.groupReadKeysByShardID(readKeys) + // A routing failure here aborts the transaction before any prewrite — + // silently dropping unresolvable read keys would let OCC validation run + // with an incomplete read set and break SSI. + groupedReadKeys, err := c.groupReadKeysByShardID(readKeys) + if err != nil { + return nil, err + } prepared, err := c.prewriteTxn(ctx, startTS, commitTS, primaryKey, grouped, gids, groupedReadKeys) if err != nil { return nil, err @@ -830,11 +851,11 @@ func (c *ShardedCoordinator) Clock() *HLC { } func (c *ShardedCoordinator) groupForKey(key []byte) (*ShardGroup, bool) { - route, ok := c.engine.GetRoute(routeKey(key)) + gid, ok := c.router.ResolveGroup(key) if !ok { return nil, false } - g, ok := c.groups[route.GroupID] + g, ok := c.groups[gid] return g, ok } @@ -844,39 +865,70 @@ func (c *ShardedCoordinator) groupForKey(key []byte) (*ShardGroup, bool) { // Leadership-only callers (IsLeaderForKey / VerifyLeaderForKey / // RaftLeaderForKey) keep using groupForKey because they don't need // the route ID. +// +// The gid comes from the partition-aware router (resolver-first, +// engine-fallback) so partitioned-FIFO traffic lands on the +// operator-chosen group. RouteID is read from the engine on the +// normalized key; the resolver does not have a notion of catalog +// RouteID, so partition-resolved keys observe under the engine's +// catalog RouteID for !sqs|route|global. Partition-aware keyviz +// is a Phase 3.D follow-up. func (c *ShardedCoordinator) routeAndGroupForKey(key []byte) (uint64, *ShardGroup, bool) { - route, ok := c.engine.GetRoute(routeKey(key)) + gid, ok := c.router.ResolveGroup(key) if !ok { return 0, nil, false } - g, ok := c.groups[route.GroupID] + g, ok := c.groups[gid] if !ok { return 0, nil, false } - return route.RouteID, g, true + var routeID uint64 + if route, found := c.engine.GetRoute(routeKey(key)); found { + routeID = route.RouteID + } + return routeID, g, true } func (c *ShardedCoordinator) engineGroupIDForKey(key []byte) uint64 { - route, ok := c.engine.GetRoute(routeKey(key)) + gid, ok := c.router.ResolveGroup(key) if !ok { return 0 } - return route.GroupID + return gid } -func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) map[uint64][][]byte { +// groupReadKeysByShardID groups txn read keys by their owning Raft +// group. Returns an error when ANY read key cannot be routed — +// silently skipping unresolvable keys would let a transaction +// commit with an incomplete OCC read-set, which breaks SSI under +// the §5 ShardRouter resolver-first dispatch (codex round-2 P1 on +// PR #715). +// +// The fail-closed semantic the resolver gained in PR 4-B-2 makes +// this path matter: a partitioned-shape read key whose queue is +// missing from --sqsFifoPartitionMap (drift / partial rollout) +// returns gid=0 from c.router.ResolveGroup. If we silently dropped +// those keys, the prewrite Raft entry would carry an empty +// ReadKeys slice for that key, the FSM's read-write conflict +// validation would never see it, and a concurrent write could +// commit alongside a stale read. Surface the routing failure as +// an error so the transaction aborts before any FSM apply. +func (c *ShardedCoordinator) groupReadKeysByShardID(readKeys [][]byte) (map[uint64][][]byte, error) { if len(readKeys) == 0 { - return nil + return nil, nil } grouped := make(map[uint64][][]byte) for _, key := range readKeys { - gid := c.engineGroupIDForKey(key) - if gid == 0 { - continue + gid, ok := c.router.ResolveGroup(key) + if !ok || gid == 0 { + return nil, errors.Wrapf(ErrInvalidRequest, + "no route for txn read key %q — recognised-but-"+ + "unresolved partition keys must fail closed to "+ + "preserve OCC read-set integrity", key) } grouped[gid] = append(grouped[gid], key) } - return grouped + return grouped, nil } // validateReadOnlyShards checks read-write conflicts on shards that have @@ -1037,12 +1089,19 @@ func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb. return nil, nil, ErrInvalidRequest } mut := elemToMutation(req) - route, ok := c.engine.GetRoute(routeKey(mut.Key)) + gid, ok := c.router.ResolveGroup(mut.Key) if !ok { return nil, nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", mut.Key) } - c.observeMutation(route.RouteID, mut) - grouped[route.GroupID] = append(grouped[route.GroupID], mut) + // Engine RouteID for keyviz observation; partition-resolved + // keys observe under the !sqs|route|global RouteID until + // partition-aware keyviz lands. + var routeID uint64 + if route, found := c.engine.GetRoute(routeKey(mut.Key)); found { + routeID = route.RouteID + } + c.observeMutation(routeID, mut) + grouped[gid] = append(grouped[gid], mut) } gids := make([]uint64, 0, len(grouped)) for gid := range grouped { diff --git a/kv/sharded_coordinator_partition_test.go b/kv/sharded_coordinator_partition_test.go new file mode 100644 index 000000000..ae46301a1 --- /dev/null +++ b/kv/sharded_coordinator_partition_test.go @@ -0,0 +1,325 @@ +package kv + +import ( + "context" + "sync" + "testing" + + "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// stubResolver routes any key in claim to the named group; everything +// else returns (0, false). Used by the coordinator-level resolver +// regression tests so they don't depend on the adapter package. +// +// recognisedPrefix simulates the "recognised but unresolved" path +// (codex round-2 P1 on PR #715) — keys with this prefix that miss +// claim cause RecognisesPartitionedKey to return true so the +// router fails closed instead of falling through to the engine. +type stubResolver struct { + mu sync.Mutex + claim map[string]uint64 + recognisedPrefix []byte + calls [][]byte +} + +func (s *stubResolver) ResolveGroup(key []byte) (uint64, bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.calls = append(s.calls, append([]byte(nil), key...)) + if gid, ok := s.claim[string(key)]; ok { + return gid, true + } + return 0, false +} + +func (s *stubResolver) RecognisesPartitionedKey(key []byte) bool { + if len(s.recognisedPrefix) == 0 { + return false + } + return len(key) >= len(s.recognisedPrefix) && + string(key[:len(s.recognisedPrefix)]) == string(s.recognisedPrefix) +} + +func (s *stubResolver) callKeys() [][]byte { + s.mu.Lock() + defer s.mu.Unlock() + out := make([][]byte, len(s.calls)) + copy(out, s.calls) + return out +} + +// TestShardedCoordinator_DispatchHonoursPartitionResolver pins +// resolver wiring at the coordinator level: a Dispatch whose key +// is claimed by the partition resolver must land on the +// resolver's group, not the engine's default group. +// +// NOTE (round 4 review): for a single-mutation batch, this test +// passes even if groupMutations bypasses the resolver — rawLogs +// produces one request, and router.Commit's groupRequests +// re-routes by raw key. This test pins the WithPartitionResolver +// fluent wiring + raw-key dispatch, NOT the groupMutations bypass +// regression. The 2-mutation test below is the actual regression +// for groupMutations (codex P1 + gemini HIGH). +func TestShardedCoordinator_DispatchHonoursPartitionResolver(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + + g1 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}, {CommitIndex: 1}}, + } + g42 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}, {CommitIndex: 1}}, + } + + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1, Store: store.NewMVCCStore()}, + 42: {Txn: g42, Store: store.NewMVCCStore()}, + }, 1, NewHLC(), nil) + + resolver := &stubResolver{claim: map[string]uint64{ + "!sqs|msg|data|p|partitioned-key": 42, + }} + coord.WithPartitionResolver(resolver) + + resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{ + {Op: Put, Key: []byte("!sqs|msg|data|p|partitioned-key"), Value: []byte("v")}, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + // The whole point: the request landed on group 42, NOT 1. + g1.mu.Lock() + g42.mu.Lock() + g1Count := len(g1.requests) + g42Count := len(g42.requests) + g1.mu.Unlock() + g42.mu.Unlock() + + require.Zero(t, g1Count, + "engine's default group must NOT receive a request when the "+ + "resolver claimed the key — coordinator's groupMutations "+ + "would otherwise bypass the partition resolver") + require.Equal(t, 1, g42Count, + "resolver-claimed group must receive exactly one request") + + // And the resolver was indeed consulted with the raw partitioned + // key — pins the codex-P1 fix at the coordinator-call boundary. + calls := resolver.callKeys() + require.NotEmpty(t, calls) + require.Equal(t, []byte("!sqs|msg|data|p|partitioned-key"), calls[0]) +} + +// TestShardedCoordinator_DispatchSplitsMutationsByResolverGroup is +// the genuine regression for the Gemini-HIGH groupMutations +// bypass: a Dispatch with mutations belonging to TWO different +// partitions must split into two requests, one per group. +// +// Pre-fix path (groupMutations called c.engine.GetRoute directly): +// - groupMutations → engine route → both mutations bundled under +// the engine default group. +// - rawLogs → ONE pb.Request with both mutations. +// - router.Commit → groupRequests routes by Mutations[0].Key only +// → request goes to group A only; group B receives nothing. +// +// Post-fix path (groupMutations consults c.router.ResolveGroup): +// - groupMutations → resolver → mut0→A, mut1→B; grouped split. +// - rawLogs → TWO pb.Requests, one per group. +// - Each group receives its own request. +// +// The assertion is that BOTH groups receive a request — pre-fix, +// only one would. This is the test the round 4 review asked for. +func TestShardedCoordinator_DispatchSplitsMutationsByResolverGroup(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + + g1 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + g42 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + g43 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1, Store: store.NewMVCCStore()}, + 42: {Txn: g42, Store: store.NewMVCCStore()}, + 43: {Txn: g43, Store: store.NewMVCCStore()}, + }, 1, NewHLC(), nil) + + keyP0 := []byte("!sqs|msg|data|p|partition-0-key") + keyP1 := []byte("!sqs|msg|data|p|partition-1-key") + coord.WithPartitionResolver(&stubResolver{claim: map[string]uint64{ + string(keyP0): 42, + string(keyP1): 43, + }}) + + resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{ + {Op: Put, Key: keyP0, Value: []byte("v0")}, + {Op: Put, Key: keyP1, Value: []byte("v1")}, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + g1.mu.Lock() + g42.mu.Lock() + g43.mu.Lock() + defer g1.mu.Unlock() + defer g42.mu.Unlock() + defer g43.mu.Unlock() + + require.Zero(t, len(g1.requests), + "engine default group must not receive a request — both "+ + "keys are partitioned-resolver claims") + require.Equal(t, 1, len(g42.requests), + "partition-0 group must receive exactly one request "+ + "(pre-groupMutations-fix: would receive both mutations "+ + "in one request because router.Commit routes by "+ + "Mutations[0].Key only)") + require.Equal(t, 1, len(g43.requests), + "partition-1 group must receive exactly one request "+ + "(pre-groupMutations-fix: would receive ZERO requests "+ + "because both mutations were bundled under the "+ + "engine-route group). This is the genuine regression "+ + "for the Gemini HIGH bypass — the fix splits "+ + "mutations across groups via c.router.ResolveGroup.") +} + +// TestShardedCoordinator_TxnFailsClosedForUnresolvedReadKey pins +// the codex round-2 P1 fix on PR #715: a transaction whose read +// set contains a partitioned-shape key the resolver recognises +// but cannot route (queue missing from --sqsFifoPartitionMap +// during drift / partial rollout) MUST abort before prewrite. +// +// Pre-fix path: groupReadKeysByShardID silently skipped read keys +// where engineGroupIDForKey returned 0 (which is what the +// fail-closed resolver returns for recognised-but-unresolved +// keys). The resulting prewrite Raft entry carried an empty +// ReadKeys slice for that key, the FSM's read-write conflict +// validation never saw it, and a concurrent write could commit +// alongside the stale read — SSI violated. +// +// Post-fix path: groupReadKeysByShardID returns an error when +// any read key cannot be routed; dispatchTxn propagates the +// error and the transaction aborts. +func TestShardedCoordinator_TxnFailsClosedForUnresolvedReadKey(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte(""), nil, 1) + + g1 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + g42 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1, Store: store.NewMVCCStore()}, + 42: {Txn: g42, Store: store.NewMVCCStore()}, + }, 1, NewHLC(), nil) + + // Resolver claims the WRITE key but RECOGNISES the read key as + // partitioned-shape and returns ok=false (simulating the + // queue-missing-from-map drift scenario). The router pairs + // Recognised=true with ok=false to fail closed. + writeKey := []byte("!sqs|msg|data|p|claimed-write-key") + readKey := []byte("!sqs|msg|data|p|drift-unresolved-read") + coord.WithPartitionResolver(&stubResolver{ + claim: map[string]uint64{string(writeKey): 42}, + recognisedPrefix: []byte("!sqs|msg|data|p|"), + }) + + // Issue a transaction that reads the unresolvable partitioned + // key and writes the claimed key. The transaction MUST abort — + // pre-fix it would silently drop the read key and proceed. + resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + IsTxn: true, + StartTS: 100, + CommitTS: 200, + Elems: []*Elem[OP]{ + {Op: Put, Key: writeKey, Value: []byte("v")}, + }, + ReadKeys: [][]byte{readKey}, + }) + require.Error(t, err, + "transaction with unresolvable partitioned read key MUST "+ + "abort — silently dropping the key would let OCC "+ + "validation run with an incomplete read set and "+ + "break SSI") + require.Nil(t, resp) + + // Neither group should have received a prepare — the routing + // failure surfaces before any prewrite. + g1.mu.Lock() + g42.mu.Lock() + defer g1.mu.Unlock() + defer g42.mu.Unlock() + require.Zero(t, len(g1.requests), + "engine default group must not see a prewrite — fail-closed "+ + "happens before any RPC") + require.Zero(t, len(g42.requests), + "resolver-claimed group must not see a prewrite either — "+ + "fail-closed aborts the txn before prewrite even though "+ + "the write key is routable") +} + +// TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys pins +// the inverse: a key the resolver does NOT claim must continue to +// route via the byte-range engine. Without this guard the resolver- +// first short-circuit could mask engine routing decisions. +func TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + + g1 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 1}}, + } + g2 := &recordingTransactional{ + responses: []*TransactionResponse{{CommitIndex: 2}}, + } + + coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{ + 1: {Txn: g1, Store: store.NewMVCCStore()}, + 2: {Txn: g2, Store: store.NewMVCCStore()}, + }, 1, NewHLC(), nil) + + // Resolver only claims a key that ISN'T in the request — the + // dispatch must fall through to the engine. + coord.WithPartitionResolver(&stubResolver{claim: map[string]uint64{ + "!sqs|msg|data|p|other-key": 42, + }}) + + // "x" lands in [m, ∞) → engine routes to group 2. + resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{ + Elems: []*Elem[OP]{ + {Op: Put, Key: []byte("x"), Value: []byte("v")}, + }, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + g1.mu.Lock() + g2.mu.Lock() + g1Count := len(g1.requests) + g2Count := len(g2.requests) + g1.mu.Unlock() + g2.mu.Unlock() + + require.Zero(t, g1Count, + "unclaimed key must engine-route to group 2, not group 1") + require.Equal(t, 1, g2Count, + "engine fallthrough must dispatch to group 2 for keys in [m, ∞)") +} diff --git a/kv/sharded_coordinator_txn_test.go b/kv/sharded_coordinator_txn_test.go index a75a3394c..6eaf567c3 100644 --- a/kv/sharded_coordinator_txn_test.go +++ b/kv/sharded_coordinator_txn_test.go @@ -309,7 +309,9 @@ func TestGroupReadKeysByShardID_NilReturnsNil(t *testing.T) { engine := distribution.NewEngine() engine.UpdateRoute([]byte(""), nil, 1) coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) - require.Nil(t, coord.groupReadKeysByShardID(nil)) + grouped, err := coord.groupReadKeysByShardID(nil) + require.NoError(t, err) + require.Nil(t, grouped) } func TestGroupReadKeysByShardID_EmptyReturnsNil(t *testing.T) { @@ -317,7 +319,9 @@ func TestGroupReadKeysByShardID_EmptyReturnsNil(t *testing.T) { engine := distribution.NewEngine() engine.UpdateRoute([]byte(""), nil, 1) coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) - require.Nil(t, coord.groupReadKeysByShardID([][]byte{})) + grouped, err := coord.groupReadKeysByShardID([][]byte{}) + require.NoError(t, err) + require.Nil(t, grouped) } func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) { @@ -327,11 +331,12 @@ func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) { engine.UpdateRoute([]byte("m"), nil, 2) coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}, 2: {}}, 1, NewHLC(), nil) - grouped := coord.groupReadKeysByShardID([][]byte{ + grouped, err := coord.groupReadKeysByShardID([][]byte{ []byte("b"), // shard 1 []byte("c"), // shard 1 []byte("x"), // shard 2 }) + require.NoError(t, err) require.Len(t, grouped, 2) require.Len(t, grouped[1], 2) require.Equal(t, []byte("b"), grouped[1][0]) @@ -340,20 +345,34 @@ func TestGroupReadKeysByShardID_GroupsByShardID(t *testing.T) { require.Equal(t, []byte("x"), grouped[2][0]) } -func TestGroupReadKeysByShardID_SkipsUnroutableKeys(t *testing.T) { +// TestGroupReadKeysByShardID_FailsClosedOnUnroutable pins the +// codex round-2 P1 fix on PR #715: a read key the resolver cannot +// route (recognised-but-unresolved partition key during drift, or +// any key outside the engine's range cover) MUST surface as an +// error so the transaction aborts before any prewrite. Silently +// skipping unroutable keys would let OCC validation run with an +// incomplete read set and break SSI — a concurrent write to that +// key could commit alongside a stale read. +// +// This test was previously TestGroupReadKeysByShardID_SkipsUnroutableKeys +// and pinned the BUGGY skip-silently behaviour. Renamed and rewritten +// to pin the new fail-closed contract. +func TestGroupReadKeysByShardID_FailsClosedOnUnroutable(t *testing.T) { t.Parallel() // Only route "a"-"m" to shard 1. Keys outside this range are unroutable. engine := distribution.NewEngine() engine.UpdateRoute([]byte("a"), []byte("m"), 1) coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{1: {}}, 1, NewHLC(), nil) - grouped := coord.groupReadKeysByShardID([][]byte{ + grouped, err := coord.groupReadKeysByShardID([][]byte{ []byte("b"), // routable → shard 1 - []byte("zzz"), // unroutable → skipped + []byte("zzz"), // unroutable → MUST surface as error }) - require.Len(t, grouped, 1) - require.Len(t, grouped[1], 1) - require.Equal(t, []byte("b"), grouped[1][0]) + require.Error(t, err, + "unroutable read key MUST fail closed — silently skipping "+ + "would drop the key from OCC validation and break SSI") + require.Nil(t, grouped) + require.ErrorIs(t, err, ErrInvalidRequest) } // --------------------------------------------------------------------------- diff --git a/main.go b/main.go index 3169b09d6..bd201ef9a 100644 --- a/main.go +++ b/main.go @@ -318,7 +318,8 @@ func run() error { sampler := buildKeyVizSampler() coordinate := kv.NewShardedCoordinator(cfg.engine, shardGroups, cfg.defaultGroup, clock, shardStore). WithLeaseReadObserver(metricsRegistry.LeaseReadObserver()). - WithSampler(keyVizSamplerForCoordinator(sampler)) + WithSampler(keyVizSamplerForCoordinator(sampler)). + WithPartitionResolver(buildSQSPartitionResolver(cfg.sqsFifoPartitionMap)) distCatalog, err := setupDistributionCatalog(ctx, runtimes, cfg.engine) if err != nil { return err @@ -477,6 +478,65 @@ func buildLeaderSQS(groups []groupSpec, sqsAddr string, raftSqsMap string) (map[ return buildLeaderAddrMap(groups, sqsAddr, raftSqsMap, parseRaftSQSMap) } +// buildSQSPartitionResolver flattens the operator-supplied partition +// map into the {queue → []groupID} shape adapter consumes and +// returns a ResolveGroup-capable resolver. Returns nil on an empty +// map so the coordinator's resolver field stays unset on a non- +// partitioned cluster — kv.ShardRouter.WithPartitionResolver(nil) +// is a documented no-op, so the request hot path keeps the existing +// engine-only dispatch. +// +// Return type is the kv.PartitionResolver interface, NOT the +// concrete *adapter.SQSPartitionResolver, because Go wraps a typed +// nil pointer into a NON-NIL interface value when the function +// signature is the concrete type. With a concrete return type, a +// non-partitioned cluster would carry a non-nil interface whose +// underlying pointer is nil, the resolver-first short-circuit +// `s.partitionResolver != nil` would always pass, and every request +// would pay an extra ResolveGroup call (which the nil-receiver +// guard makes safe but not free). The interface return type makes +// the untyped `nil` propagate as a true nil interface. +// +// The group-reference parsing here cannot fail in practice because +// parseSQSFifoGroupList already canonicalized each entry as a +// uint64 string at flag-parse time; the conversion is repeated +// defensively so a future caller that bypasses parseSQSFifoGroupList +// (e.g. a test seeding the map programmatically) gets a clear panic +// instead of a silent route-to-group-zero. +func buildSQSPartitionResolver(partitionMap map[string]sqsFifoQueueRouting) kv.PartitionResolver { + if len(partitionMap) == 0 { + return nil + } + flat := make(map[string][]uint64, len(partitionMap)) + for queue, routing := range partitionMap { + ids := make([]uint64, 0, len(routing.groups)) + for _, groupRef := range routing.groups { + id, err := strconv.ParseUint(groupRef, 10, 64) + if err != nil { + // parseSQSFifoGroupList canonicalized this; a + // non-uint64 string here means a programmer skipped + // the validator. Panic loudly rather than silently + // route to group 0. + panic(errors.Wrapf(err, + "queue %q: bypassed group-ref canonicalisation, %q is not uint64", + queue, groupRef)) + } + ids = append(ids, id) + } + flat[queue] = ids + } + r := adapter.NewSQSPartitionResolver(flat) + if r == nil { + // Defensive: NewSQSPartitionResolver returns nil on an + // empty input. The len-check above already short-circuits + // for empty partitionMap, so reaching this branch means + // the canonicalisation collapsed every entry — surface as + // a true nil interface, not a typed-nil pointer wrapper. + return nil + } + return r +} + // buildSQSFifoPartitionMap parses and validates the // --sqsFifoPartitionMap flag against the configured Raft groups. // Extracted from parseRuntimeConfig so that function stays under the diff --git a/main_sqs_resolver_test.go b/main_sqs_resolver_test.go new file mode 100644 index 000000000..8f6981e6a --- /dev/null +++ b/main_sqs_resolver_test.go @@ -0,0 +1,80 @@ +package main + +import ( + "testing" + + "github.com/bootjp/elastickv/kv" + "github.com/stretchr/testify/require" +) + +// TestBuildSQSPartitionResolver_NilOnEmpty pins the typed-nil +// interface invariant flagged by the Phase 3.D PR 4-B-2 round-1 +// review: buildSQSPartitionResolver MUST return a true-nil +// kv.PartitionResolver interface (not a typed-nil pointer wrapped +// in a non-nil interface) when the partition map is empty or nil. +// +// If the function's return type were the concrete +// *adapter.SQSPartitionResolver and the body returned a nil +// pointer, Go would wrap that pointer into a NON-NIL interface +// when assigned to kv.ShardRouter.partitionResolver. The router's +// resolver-first short-circuit `s.partitionResolver != nil` would +// always pass on a non-partitioned cluster, and every request +// would pay an extra ResolveGroup call (which the nil-receiver +// guard inside (*SQSPartitionResolver).ResolveGroup makes safe +// but not free). +// +// requireNilInterface forces the interface conversion at the call +// boundary, which is what makes this regression observable. A +// plain require.Nil(t, r) would pass even with a typed-nil because +// at that point r still has the concrete pointer type. +func TestBuildSQSPartitionResolver_NilOnEmpty(t *testing.T) { + t.Parallel() + cases := []struct { + name string + in map[string]sqsFifoQueueRouting + }{ + {"nil map", nil}, + {"empty map", map[string]sqsFifoQueueRouting{}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + requireNilInterface(t, buildSQSPartitionResolver(tc.in), + "buildSQSPartitionResolver("+tc.name+ + ") must produce a true nil interface, "+ + "not a typed-nil pointer wrapper") + }) + } +} + +// TestBuildSQSPartitionResolver_NonEmptyReturnsResolver pins the +// happy path: a non-empty partition map yields a non-nil resolver +// that dispatches partition keys correctly. +func TestBuildSQSPartitionResolver_NonEmptyReturnsResolver(t *testing.T) { + t.Parallel() + in := map[string]sqsFifoQueueRouting{ + "orders.fifo": {partitionCount: 2, groups: []string{"10", "11"}}, + } + r := buildSQSPartitionResolver(in) + require.NotNil(t, r, + "non-empty partition map must produce a non-nil resolver") +} + +// requireNilInterface accepts a kv.PartitionResolver and asserts +// the INTERFACE value is nil — both type AND value tags must be +// nil. The function-parameter conversion forces a typed-nil +// pointer to be wrapped into a non-nil interface, which is the +// failure mode the regression test guards against. +// +// Uses Go's `==` operator on the interface, NOT testify's +// require.Nil. require.Nil reflects through to the underlying +// pointer and considers a nil pointer wrapped in a non-nil +// interface as "nil", so it would pass even with the typed-nil +// regression present. require.True(t, r == nil, ...) uses Go's +// native interface comparison, which only returns true when the +// interface's type tag is also nil — that is the exact invariant +// this test was designed to catch (round 3 review on PR #715). +func requireNilInterface(t *testing.T, r kv.PartitionResolver, msg string) { + t.Helper() + require.True(t, r == nil, msg) +}