Skip to content

Commit bce448f

Browse files
authored
feat(sqs): partition resolver for HT-FIFO routing (Phase 3.D PR 4-B-2) (#715)
## Summary Routing-layer half of PR 4-B. Adds a `PartitionResolver` that `ShardRouter` consults BEFORE falling through to the byte-range engine. SQS HT-FIFO needs partition-aware dispatch, but the engine's non-overlapping-cover model can't express overlay routes — the resolver-first dispatch sidesteps this cleanly. Stacks on top of #708 (PR 4-B-1, capability JSON). Next is PR 4-B-3 (leadership-refusal + catalog polling + flip `htfifoCapabilityAdvertised` to `true`). ## What's added - `kv.PartitionResolver` interface — `ResolveGroup([]byte) (uint64, bool)`. - `kv.ShardRouter.WithPartitionResolver(...)` — fluent option, nil-safe. - `kv.ShardRouter.resolveGroup(...)` — unified dispatch path: resolver first, engine fallback. Both `groupRequests` (Commit/Abort) and `Get` route through it. - `kv.ShardedCoordinator.WithPartitionResolver(...)` — delegates to the router so `main.go` can install via the existing fluent-construction style. - `adapter.SQSPartitionResolver` — parses `(queue, partition)` from the partitioned key shape, looks up the operator-chosen group. Defensive copy at construction, nil-safe `ResolveGroup`, returns `(0, false)` for legacy / non-matching keys. - `main.go` — builds the resolver from `runtimeConfig.sqsFifoPartitionMap` and installs it. Resolver is `nil` on a non-partitioned cluster — hot path stays engine-only. ## What's NOT added (deferred to PR 4-B-3) - §8 leadership-refusal hook in `kv` (refuses leadership for an SQS Raft group hosting a partitioned queue when the binary lacks `htfifo`). - Catalog-polling helper for the CreateQueue capability gate (PR 5 starts using it). - Flipping `htfifoCapabilityAdvertised` from `false` to `true`. The design's "advertise htfifo only when both routing AND leadership-refusal are in place" rule keeps the constant `false` in this PR — PR 4-B-3 flips it. ## Test plan - [x] `adapter/sqs_partition_resolver_test.go` — 9 top-level tests: nil-on-empty, defensive-copy, partition dispatch across all 5 families, queue-name prefix isolation (`"queue"` vs `"queue1"`), legacy fall-through (8 sub-cases), unknown queue, out-of-range partition, nil receiver, prefix alignment with `sqs_keys.go` constants. - [x] `kv/shard_router_partition_test.go` — 4 tests: resolver wins over engine, engine fallthrough on resolver-miss, nil resolver no-op, `Get` path also routes through the resolver. - [x] `go test -race ./kv/...` pass. - [x] `go test -race ./adapter/...` pass. - [x] `golangci-lint ./kv/... ./adapter/... .` clean. ## Self-review (per CLAUDE.md) 1. **Data loss** — routing layer only; no FSM/Pebble/retention path. No issue. 2. **Concurrency** — `partitionResolver` is set once at startup before any request. `ResolveGroup` reads a constructor-time defensive copy, so a future hot-reload of `--sqsFifoPartitionMap` cannot perturb in-flight requests. No issue. 3. **Performance** — one map lookup + 4-byte BigEndian decode per resolver hit (only on partitioned-prefix matches). Engine-only path adds a single `if s.partitionResolver != nil` branch. No issue. 4. **Data consistency** — resolver output strictly OVERRIDES the engine for partitioned keys; legacy keys flow through unchanged. "queue not found" / "partition out of range" branches return `(0, false)` so the router surfaces an explicit error rather than silently mis-routing. No issue. 5. **Test coverage** — 13 tests across two new files; existing `TestShardRouter*` tests unchanged. Both override and fall-through paths pinned, plus the queue-name prefix-isolation invariant from PR #703. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added partition-based routing for distributed coordination. Requests are now directed to specific nodes based on configured partition mappings. * Enhanced SQS queue handling with optional partition-aware routing. When partition information is unavailable or unconfigured, the system automatically falls back to the existing routing mechanism. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 2dda2cb + eedc17c commit bce448f

9 files changed

Lines changed: 1530 additions & 38 deletions

adapter/sqs_partition_resolver.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package adapter
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
)
7+
8+
// SQSPartitionResolver maps a partitioned-SQS key to the operator-
9+
// chosen Raft group for the (queue, partition) tuple. Implements
10+
// kv.PartitionResolver via duck typing — see the integration in
11+
// main.go where the resolver is installed on ShardedCoordinator.
12+
//
13+
// The byte-range engine cannot route partitioned queues because
14+
// adding per-partition routes would break its non-overlapping-cover
15+
// invariant (a partition route for partition K of one queue would
16+
// leave a gap for legacy keys that fall lexicographically between
17+
// partitions K and K+1). The resolver-first dispatch path avoids
18+
// this — it answers only for keys that match a partitioned family
19+
// prefix and otherwise lets the engine handle dispatch.
20+
type SQSPartitionResolver struct {
21+
routes map[string][]uint64
22+
}
23+
24+
// NewSQSPartitionResolver builds a resolver from the operator-
25+
// supplied partition map. routes[queue][k] is the Raft group ID
26+
// that owns partition k of queue, with len(routes[queue]) equal to
27+
// the queue's PartitionCount.
28+
//
29+
// Returns nil when routes is empty so callers can keep the resolver
30+
// out of the request path entirely on a non-partitioned cluster
31+
// (kv.ShardRouter.WithPartitionResolver(nil) is a documented no-op).
32+
//
33+
// The constructor takes a defensive copy so a later caller mutation
34+
// to the input map does not leak into the resolver's view at
35+
// runtime.
36+
func NewSQSPartitionResolver(routes map[string][]uint64) *SQSPartitionResolver {
37+
if len(routes) == 0 {
38+
return nil
39+
}
40+
cp := make(map[string][]uint64, len(routes))
41+
for queue, groups := range routes {
42+
ids := make([]uint64, len(groups))
43+
copy(ids, groups)
44+
cp[queue] = ids
45+
}
46+
return &SQSPartitionResolver{routes: cp}
47+
}
48+
49+
// sqsResolverFamilyPrefixes is the set of partitioned-SQS family
50+
// prefixes ResolveGroup recognises. Pre-converted to []byte so the
51+
// hot-path bytes.HasPrefix call avoids an allocation per check
52+
// (gemini medium on PR #715). Kept package-internal so any future
53+
// renamed prefix touches both this list and the constant
54+
// declaration in sqs_keys.go — TestSQSPartitionResolver_PrefixesAlign
55+
// pins the alignment.
56+
var sqsResolverFamilyPrefixes = [][]byte{
57+
[]byte(SqsPartitionedMsgDataPrefix),
58+
[]byte(SqsPartitionedMsgVisPrefix),
59+
[]byte(SqsPartitionedMsgDedupPrefix),
60+
[]byte(SqsPartitionedMsgGroupPrefix),
61+
[]byte(SqsPartitionedMsgByAgePrefix),
62+
}
63+
64+
// ResolveGroup decodes the (queue, partition) embedded in a
65+
// partitioned-SQS key and returns the operator-chosen Raft group.
66+
//
67+
// Returns (0, false) for any key that does not match a partitioned
68+
// family prefix (legacy SQS, KV, S3, DynamoDB, queue-meta records,
69+
// …) so kv.ShardRouter falls through to its byte-range engine for
70+
// default routing.
71+
//
72+
// Returns (0, false) for a partitioned-shaped key whose queue is
73+
// not in the routes map or whose partition index is beyond
74+
// len(routes[queue]). The router pairs this with
75+
// RecognisesPartitionedKey to fail closed instead of falling
76+
// through — silently routing through the engine's
77+
// !sqs|route|global default would mis-route HT-FIFO traffic during
78+
// partition-map drift (codex P1 round 2 on PR #715).
79+
func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool) {
80+
if r == nil || len(key) == 0 {
81+
return 0, false
82+
}
83+
queue, partition, ok := parsePartitionedSQSKey(key)
84+
if !ok {
85+
return 0, false
86+
}
87+
groups, found := r.routes[queue]
88+
if !found {
89+
return 0, false
90+
}
91+
// Defensive: a partition value outside the slice is a config /
92+
// upstream-bug signal, not a routable key. Returning false
93+
// surfaces it as "no route" at the router boundary, which is
94+
// the correct fail-closed behaviour.
95+
if uint64(partition) >= uint64(len(groups)) {
96+
return 0, false
97+
}
98+
return groups[partition], true
99+
}
100+
101+
// RecognisesPartitionedKey reports whether key has the structural
102+
// shape of a partitioned-SQS key — i.e. starts with one of the
103+
// partitioned family prefixes. The check is PREFIX-ONLY, not a
104+
// full parse: a key with a partitioned prefix followed by a
105+
// malformed queue / partition segment still answers true, so the
106+
// router fails closed via kv.PartitionResolver semantics instead
107+
// of falling through to the engine and silently routing to the
108+
// SQS catalog default group via routeKey's !sqs|route|global
109+
// collapse (round 5 review nit on PR #715).
110+
//
111+
// A nil receiver returns false so kv.ShardRouter's typed-nil case
112+
// (ResolveGroup(nil) == (0, false)) pairs with an honest "I don't
113+
// recognise anything" answer instead of falsely claiming a shape.
114+
func (r *SQSPartitionResolver) RecognisesPartitionedKey(key []byte) bool {
115+
if r == nil || len(key) == 0 {
116+
return false
117+
}
118+
_, ok := stripPartitionedFamilyPrefix(key)
119+
return ok
120+
}
121+
122+
// parsePartitionedSQSKey extracts the (queue, partition) pair from
123+
// a partitioned-SQS key. Returns ok=false for any key that does not
124+
// match a partitioned family prefix or that has a malformed queue /
125+
// partition segment. Exposed at package-internal scope so the
126+
// adapter's reaper / fanout reader can share the same parser
127+
// (Phase 3.D PR 5).
128+
func parsePartitionedSQSKey(key []byte) (string, uint32, bool) {
129+
rest, matched := stripPartitionedFamilyPrefix(key)
130+
if !matched {
131+
return "", 0, false
132+
}
133+
// After the family prefix, the variable-length encoded queue
134+
// segment is terminated by '|' (sqsPartitionedQueueTerminator).
135+
// base64.RawURLEncoding never emits '|', so the first '|' in
136+
// rest is unambiguously the queue terminator.
137+
pipeIdx := bytes.IndexByte(rest, sqsPartitionedQueueTerminator)
138+
if pipeIdx <= 0 {
139+
return "", 0, false
140+
}
141+
encQueue := rest[:pipeIdx]
142+
rest = rest[pipeIdx+1:]
143+
const partitionLen = 4
144+
if len(rest) < partitionLen {
145+
return "", 0, false
146+
}
147+
partition := binary.BigEndian.Uint32(rest[:partitionLen])
148+
queue, err := decodeSQSSegment(string(encQueue))
149+
if err != nil {
150+
return "", 0, false
151+
}
152+
return queue, partition, true
153+
}
154+
155+
// stripPartitionedFamilyPrefix returns the bytes after the matched
156+
// family prefix. matched=false if key has none of the known
157+
// partitioned family prefixes.
158+
func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) {
159+
for _, prefix := range sqsResolverFamilyPrefixes {
160+
if bytes.HasPrefix(key, prefix) {
161+
return key[len(prefix):], true
162+
}
163+
}
164+
return nil, false
165+
}

0 commit comments

Comments
 (0)