Skip to content
134 changes: 134 additions & 0 deletions adapter/sqs_partition_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package adapter

import (
"bytes"
"encoding/binary"
)

// SQSPartitionResolver maps a partitioned-SQS key to the operator-
// chosen Raft group for the (queue, partition) tuple. Implements
// kv.PartitionResolver via duck typing — see the integration in
// main.go where the resolver is installed on ShardedCoordinator.
//
// The byte-range engine cannot route partitioned queues because
// adding per-partition routes would break its non-overlapping-cover
// invariant (a partition route for partition K of one queue would
// leave a gap for legacy keys that fall lexicographically between
// partitions K and K+1). The resolver-first dispatch path avoids
// this — it answers only for keys that match a partitioned family
// prefix and otherwise lets the engine handle dispatch.
type SQSPartitionResolver struct {
routes map[string][]uint64
}

// NewSQSPartitionResolver builds a resolver from the operator-
// supplied partition map. routes[queue][k] is the Raft group ID
// that owns partition k of queue, with len(routes[queue]) equal to
// the queue's PartitionCount.
//
// Returns nil when routes is empty so callers can keep the resolver
// out of the request path entirely on a non-partitioned cluster
// (kv.ShardRouter.WithPartitionResolver(nil) is a documented no-op).
//
// The constructor takes a defensive copy so a later caller mutation
// to the input map does not leak into the resolver's view at
// runtime.
func NewSQSPartitionResolver(routes map[string][]uint64) *SQSPartitionResolver {
if len(routes) == 0 {
return nil
}
cp := make(map[string][]uint64, len(routes))
for queue, groups := range routes {
ids := make([]uint64, len(groups))
copy(ids, groups)
cp[queue] = ids
}
return &SQSPartitionResolver{routes: cp}
}

// sqsResolverFamilyPrefixes is the set of partitioned-SQS family
// prefixes ResolveGroup recognises. Kept package-internal so any
// future renamed prefix touches both this list and the constant
// declaration in sqs_keys.go — TestSQSPartitionResolver_PrefixesAlign
// pins the alignment.
var sqsResolverFamilyPrefixes = []string{
SqsPartitionedMsgDataPrefix,
SqsPartitionedMsgVisPrefix,
SqsPartitionedMsgDedupPrefix,
SqsPartitionedMsgGroupPrefix,
SqsPartitionedMsgByAgePrefix,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Defining sqsResolverFamilyPrefixes as a slice of strings leads to repeated string-to-byte conversions in stripPartitionedFamilyPrefix on the request hot path. Pre-calculating these as a slice of byte slices would improve performance and avoid unnecessary allocations.

Suggested change
var sqsResolverFamilyPrefixes = []string{
SqsPartitionedMsgDataPrefix,
SqsPartitionedMsgVisPrefix,
SqsPartitionedMsgDedupPrefix,
SqsPartitionedMsgGroupPrefix,
SqsPartitionedMsgByAgePrefix,
}
var sqsResolverFamilyPrefixes = [][]byte{
[]byte(SqsPartitionedMsgDataPrefix),
[]byte(SqsPartitionedMsgVisPrefix),
[]byte(SqsPartitionedMsgDedupPrefix),
[]byte(SqsPartitionedMsgGroupPrefix),
[]byte(SqsPartitionedMsgByAgePrefix),
}


// ResolveGroup decodes the (queue, partition) embedded in a
// partitioned-SQS key and returns the operator-chosen Raft group.
//
// Returns (0, false) for any key that does not match a partitioned
// family prefix (legacy SQS, KV, S3, DynamoDB, queue-meta records,
// …) so kv.ShardRouter falls through to its byte-range engine for
// default routing.
func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool) {
if r == nil || len(key) == 0 {
return 0, false
}
queue, partition, ok := parsePartitionedSQSKey(key)
if !ok {
return 0, false
}
groups, found := r.routes[queue]
if !found {
return 0, false
}
// Defensive: a partition value outside the slice is a config /
// upstream-bug signal, not a routable key. Returning false
// surfaces it as "no route" at the router boundary, which is
// the correct fail-closed behaviour.
if uint64(partition) >= uint64(len(groups)) {
return 0, false
}
return groups[partition], true
}

// parsePartitionedSQSKey extracts the (queue, partition) pair from
// a partitioned-SQS key. Returns ok=false for any key that does not
// match a partitioned family prefix or that has a malformed queue /
// partition segment. Exposed at package-internal scope so the
// adapter's reaper / fanout reader can share the same parser
// (Phase 3.D PR 5).
func parsePartitionedSQSKey(key []byte) (string, uint32, bool) {
rest, matched := stripPartitionedFamilyPrefix(key)
if !matched {
return "", 0, false
}
// After the family prefix, the variable-length encoded queue
// segment is terminated by '|' (sqsPartitionedQueueTerminator).
// base64.RawURLEncoding never emits '|', so the first '|' in
// rest is unambiguously the queue terminator.
pipeIdx := bytes.IndexByte(rest, sqsPartitionedQueueTerminator)
if pipeIdx <= 0 {
return "", 0, false
}
encQueue := rest[:pipeIdx]
rest = rest[pipeIdx+1:]
const partitionLen = 4
if len(rest) < partitionLen {
return "", 0, false
}
partition := binary.BigEndian.Uint32(rest[:partitionLen])
queue, err := decodeSQSSegment(string(encQueue))
if err != nil {
return "", 0, false
}
return queue, partition, true
}

// stripPartitionedFamilyPrefix returns the bytes after the matched
// family prefix. matched=false if key has none of the known
// partitioned family prefixes.
func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) {
for _, prefix := range sqsResolverFamilyPrefixes {
if bytes.HasPrefix(key, []byte(prefix)) {
return key[len(prefix):], true
}
}
return nil, false
Comment on lines +158 to +164
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

With sqsResolverFamilyPrefixes updated to [][]byte, the loop here can avoid the []byte(prefix) conversion.

Suggested change
func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) {
for _, prefix := range sqsResolverFamilyPrefixes {
if bytes.HasPrefix(key, []byte(prefix)) {
return key[len(prefix):], true
}
}
return nil, false
func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) {
for _, prefix := range sqsResolverFamilyPrefixes {
if bytes.HasPrefix(key, prefix) {
return key[len(prefix):], true
}
}
return nil, false
}

}
223 changes: 223 additions & 0 deletions adapter/sqs_partition_resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package adapter

import (
"strings"
"testing"

"github.com/stretchr/testify/require"
)

// TestNewSQSPartitionResolver_NilOnEmpty pins the constructor's
// nil-on-empty contract — kv.ShardRouter.WithPartitionResolver(nil)
// is documented as a no-op, so a non-partitioned cluster MUST end
// up with no resolver in the request hot path.
func TestNewSQSPartitionResolver_NilOnEmpty(t *testing.T) {
t.Parallel()
require.Nil(t, NewSQSPartitionResolver(nil))
require.Nil(t, NewSQSPartitionResolver(map[string][]uint64{}))
}

// TestNewSQSPartitionResolver_DefensiveCopy pins that mutating the
// caller's input map after construction does NOT change the
// resolver's view. Without this, a hot-reload of --sqsFifoPartitionMap
// would partially alter the resolver mid-request and produce
// transient mis-routes.
func TestNewSQSPartitionResolver_DefensiveCopy(t *testing.T) {
t.Parallel()
input := map[string][]uint64{"q.fifo": {10, 11}}
r := NewSQSPartitionResolver(input)
// Mutate after construction.
input["q.fifo"][0] = 999
delete(input, "q.fifo")

key := sqsPartitionedMsgDataKey("q.fifo", 0, 1, "msg")
gid, ok := r.ResolveGroup(key)
require.True(t, ok)
require.Equal(t, uint64(10), gid,
"resolver must keep its constructor-time view; "+
"caller mutation cannot leak in")
}

// TestSQSPartitionResolver_ResolveByPartition pins the core dispatch
// contract: a key produced by sqsPartitionedMsg*Key for (queue,
// partition) resolves to the operator-chosen group for that exact
// partition.
func TestSQSPartitionResolver_ResolveByPartition(t *testing.T) {
t.Parallel()
r := NewSQSPartitionResolver(map[string][]uint64{
"orders.fifo": {10, 11, 12, 13},
"events.fifo": {20, 21},
})
cases := []struct {
name string
queue string
partition uint32
wantGroup uint64
}{
{"orders p0", "orders.fifo", 0, 10},
{"orders p1", "orders.fifo", 1, 11},
{"orders p3", "orders.fifo", 3, 13},
{"events p0", "events.fifo", 0, 20},
{"events p1", "events.fifo", 1, 21},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// Every family must resolve to the same group for the
// same (queue, partition) — the reaper / fanout reader
// / send path all depend on this invariant.
for familyName, key := range map[string][]byte{
"data": sqsPartitionedMsgDataKey(tc.queue, tc.partition, 1, "msg-id"),
"vis": sqsPartitionedMsgVisKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"),
"dedup": sqsPartitionedMsgDedupKey(tc.queue, tc.partition, 1, "dedup-id"),
"group": sqsPartitionedMsgGroupKey(tc.queue, tc.partition, 1, "group-id"),
"byage": sqsPartitionedMsgByAgeKey(tc.queue, tc.partition, 1, 1700000000000, "msg-id"),
} {
gid, ok := r.ResolveGroup(key)
require.True(t, ok, "family %s key for (%s, p%d) must resolve",
familyName, tc.queue, tc.partition)
require.Equal(t, tc.wantGroup, gid,
"family %s key for (%s, p%d) resolved to wrong group",
familyName, tc.queue, tc.partition)
}
})
}
}

// TestSQSPartitionResolver_QueueIsolation pins the queue-name
// terminator contract from the routing layer's perspective. Two
// queues with overlapping encoded prefixes (base64("queue") is a
// strict prefix of base64("queue1")) MUST resolve to their own
// groups, not each other's. The '|' terminator after the queue
// segment is what makes this safe — same invariant
// TestSqsPartitionedMsgKeys_QueueNamePrefixIsolation pins for the
// keyspace itself.
func TestSQSPartitionResolver_QueueIsolation(t *testing.T) {
t.Parallel()
r := NewSQSPartitionResolver(map[string][]uint64{
"queue": {500},
"queue1": {600},
})
queueKey := sqsPartitionedMsgDataKey("queue", 0, 1, "id")
queue1Key := sqsPartitionedMsgDataKey("queue1", 0, 1, "id")

queueGID, ok := r.ResolveGroup(queueKey)
require.True(t, ok)
require.Equal(t, uint64(500), queueGID)

queue1GID, ok := r.ResolveGroup(queue1Key)
require.True(t, ok)
require.Equal(t, uint64(600), queue1GID,
"queue1 keys must NOT resolve to queue's group; "+
"the '|' terminator after the queue segment is what makes this safe")
}

// TestSQSPartitionResolver_LegacyKeyFallsThrough pins that a non-
// partitioned (legacy) SQS key returns (0, false) — the
// kv.ShardRouter caller then falls through to the byte-range
// engine for default routing. A regression here would route every
// legacy queue's traffic to whichever group the resolver guessed
// from its partition map.
func TestSQSPartitionResolver_LegacyKeyFallsThrough(t *testing.T) {
t.Parallel()
r := NewSQSPartitionResolver(map[string][]uint64{
"orders.fifo": {10, 11},
})
cases := []struct {
name string
key []byte
}{
{"legacy data", sqsMsgDataKey("orders.fifo", 1, "id")},
{"legacy vis", sqsMsgVisKey("orders.fifo", 1, 1700000000000, "id")},
{"legacy dedup", sqsMsgDedupKey("orders.fifo", 1, "dedup")},
{"legacy group", sqsMsgGroupKey("orders.fifo", 1, "group")},
{"legacy byage", sqsMsgByAgeKey("orders.fifo", 1, 1700000000000, "id")},
{"queue meta", sqsQueueMetaKey("orders.fifo")},
{"non-sqs key", []byte("/some/other/key")},
{"empty", []byte{}},
{"nil", nil},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
gid, ok := r.ResolveGroup(tc.key)
require.False(t, ok, "non-partitioned key must NOT resolve")
require.Equal(t, uint64(0), gid)
})
}
}

// TestSQSPartitionResolver_UnknownQueueFallsThrough pins that a
// well-formed partitioned key for a queue that is NOT in the
// partition map returns (0, false). This shape can only happen if
// the cluster lost partition-map entries between writer and reader
// (an operational misconfiguration); routing through the byte-range
// engine is the correct fail-closed behaviour.
func TestSQSPartitionResolver_UnknownQueueFallsThrough(t *testing.T) {
t.Parallel()
r := NewSQSPartitionResolver(map[string][]uint64{
"known.fifo": {10, 11},
})
key := sqsPartitionedMsgDataKey("unknown.fifo", 0, 1, "id")
gid, ok := r.ResolveGroup(key)
require.False(t, ok)
require.Equal(t, uint64(0), gid)
}

// TestSQSPartitionResolver_OutOfRangePartitionFallsThrough pins
// that a partition value beyond the configured PartitionCount
// returns (0, false). Same fail-closed argument as the unknown-
// queue case — if a writer produced a key with partition 4 but the
// resolver's view only has partitions [0, 3], routing through the
// engine surfaces the disagreement at the request boundary.
func TestSQSPartitionResolver_OutOfRangePartitionFallsThrough(t *testing.T) {
t.Parallel()
r := NewSQSPartitionResolver(map[string][]uint64{
"q.fifo": {10, 11},
})
// PartitionCount in resolver is 2 (partitions 0 and 1); craft a
// key for partition 5 to simulate a writer at a different
// partition count.
key := sqsPartitionedMsgDataKey("q.fifo", 5, 1, "id")
gid, ok := r.ResolveGroup(key)
require.False(t, ok)
require.Equal(t, uint64(0), gid)
}

// TestSQSPartitionResolver_NilReceiverIsSafe pins defensive
// behaviour — a nil resolver pointer must not panic on
// ResolveGroup. kv.ShardRouter's resolver-first dispatch checks
// `if s.partitionResolver != nil` before calling, but a typed-nil
// can still slip through interface assertions.
func TestSQSPartitionResolver_NilReceiverIsSafe(t *testing.T) {
t.Parallel()
var r *SQSPartitionResolver
gid, ok := r.ResolveGroup([]byte("any-key"))
require.False(t, ok)
require.Equal(t, uint64(0), gid)
}

// TestSQSPartitionResolver_PrefixesAlign pins that the resolver's
// family-prefix list matches the constants in sqs_keys.go exactly.
// A future renamed prefix or added family that touches sqs_keys.go
// without also touching sqsResolverFamilyPrefixes would silently
// stop resolving keys for the new family.
func TestSQSPartitionResolver_PrefixesAlign(t *testing.T) {
t.Parallel()
want := []string{
SqsPartitionedMsgDataPrefix,
SqsPartitionedMsgVisPrefix,
SqsPartitionedMsgDedupPrefix,
SqsPartitionedMsgGroupPrefix,
SqsPartitionedMsgByAgePrefix,
}
require.Equal(t, want, sqsResolverFamilyPrefixes,
"sqsResolverFamilyPrefixes must mirror the constants in "+
"sqs_keys.go — a new partitioned family added there must "+
"be added here too, or the resolver silently stops "+
"matching keys in that family")
for _, p := range sqsResolverFamilyPrefixes {
require.True(t, strings.HasSuffix(p, "p|"),
"every partitioned prefix must end with the p| discriminator")
}
}
Loading
Loading