Skip to content

Commit 9cfc779

Browse files
committed
fix(sqs): fail-closed for recognised-but-unresolved partition keys
Round 2 Codex P1 on PR #715: when a partition resolver is installed, an unresolved partitioned-shape key (unknown queue / out-of-range partition) silently falls through to engine.GetRoute(routeKey(rawKey)). For !sqs|... keys, routeKey collapses to !sqs|route|global, so the engine routes the misconfiguration to the SQS catalog default group instead of returning a routing error. During partition-map drift / partial rollout this would silently mis-route HT-FIFO traffic. Fix - kv.PartitionResolver gains RecognisesPartitionedKey(key) — a shape-only predicate. Implementations answer purely on prefix / structural inspection so the router can use it independently of the in-memory mapping. - ShardRouter.ResolveGroup: after ResolveGroup returns ok=false, consult RecognisesPartitionedKey. If the resolver recognises the shape, return (0, false) WITHOUT engine fallback — the caller surfaces a routing error. If the shape is not recognised, fall through to the engine as before. - adapter.SQSPartitionResolver: implements RecognisesPartitionedKey via parsePartitionedSQSKey (the same parser ResolveGroup uses). Nil-receiver and empty-key return false. Tests - kv/shard_router_partition_test.go: new TestShardRouter_FailClosedOnRecognisedButUnresolved — pins that a recognised-but-unresolved partitioned key returns (0, false) AND that a non-recognised key still falls through to the engine. - adapter/sqs_partition_resolver_test.go: new TestSQSPartitionResolver_RecognisesPartitionedKey covers 8 shape cases (data/vis/byage families with known + unknown queues, OOR partition, legacy SQS, queue meta, non-SQS, empty, nil). New TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver pins the typed-nil-safe branch. - Renamed and expanded TestSQSPartitionResolver_UnknownQueueRecognisedButUnresolved / TestSQSPartitionResolver_OutOfRangePartitionRecognisedButUnresolved to assert RecognisesPartitionedKey == true, pinning the router- side fail-closed contract. - Existing fakePartitionResolver / stubResolver / recordingResolver test doubles get the new method (the kv-internal tests don't depend on the adapter resolver).
1 parent f4a12e4 commit 9cfc779

5 files changed

Lines changed: 243 additions & 19 deletions

adapter/sqs_partition_resolver.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ var sqsResolverFamilyPrefixes = [][]byte{
6868
// family prefix (legacy SQS, KV, S3, DynamoDB, queue-meta records,
6969
// …) so kv.ShardRouter falls through to its byte-range engine for
7070
// default routing.
71+
//
72+
// Returns (0, false) for a partitioned-shaped key whose queue is
73+
// not in the routes map or whose partition index is beyond
74+
// len(routes[queue]). The router pairs this with
75+
// RecognisesPartitionedKey to fail closed instead of falling
76+
// through — silently routing through the engine's
77+
// !sqs|route|global default would mis-route HT-FIFO traffic during
78+
// partition-map drift (codex P1 round 2 on PR #715).
7179
func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool) {
7280
if r == nil || len(key) == 0 {
7381
return 0, false
@@ -90,6 +98,26 @@ func (r *SQSPartitionResolver) ResolveGroup(key []byte) (uint64, bool) {
9098
return groups[partition], true
9199
}
92100

101+
// RecognisesPartitionedKey reports whether key has the structural
102+
// shape of a partitioned-SQS key — a partitioned family prefix
103+
// followed by an encoded queue segment, a '|' terminator, and a
104+
// fixed-width uint32 partition index. Implementations of
105+
// kv.PartitionResolver must answer purely on shape so the router
106+
// can use the predicate to decide between fall-through (not
107+
// partitioned) and fail-closed (partitioned but unresolved); see
108+
// kv.PartitionResolver doc and codex P1 round 2 on PR #715.
109+
//
110+
// A nil receiver returns false so kv.ShardRouter's typed-nil case
111+
// (ResolveGroup(nil) == (0, false)) pairs with an honest "I don't
112+
// recognise anything" answer instead of falsely claiming a shape.
113+
func (r *SQSPartitionResolver) RecognisesPartitionedKey(key []byte) bool {
114+
if r == nil || len(key) == 0 {
115+
return false
116+
}
117+
_, _, ok := parsePartitionedSQSKey(key)
118+
return ok
119+
}
120+
93121
// parsePartitionedSQSKey extracts the (queue, partition) pair from
94122
// a partitioned-SQS key. Returns ok=false for any key that does not
95123
// match a partitioned family prefix or that has a malformed queue /

adapter/sqs_partition_resolver_test.go

Lines changed: 97 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,14 @@ func TestSQSPartitionResolver_LegacyKeyFallsThrough(t *testing.T) {
147147
}
148148
}
149149

150-
// TestSQSPartitionResolver_UnknownQueueFallsThrough pins that a
151-
// well-formed partitioned key for a queue that is NOT in the
152-
// partition map returns (0, false). This shape can only happen if
153-
// the cluster lost partition-map entries between writer and reader
154-
// (an operational misconfiguration); routing through the byte-range
155-
// engine is the correct fail-closed behaviour.
156-
func TestSQSPartitionResolver_UnknownQueueFallsThrough(t *testing.T) {
150+
// TestSQSPartitionResolver_UnknownQueueRecognisedButUnresolved
151+
// pins the round-5 fail-closed contract: a well-formed partitioned
152+
// key for a queue that is NOT in the partition map returns
153+
// ResolveGroup=(0, false) AND RecognisesPartitionedKey=true. The
154+
// router pairs these so the request fails closed — engine
155+
// fall-through would silently route the misconfiguration through
156+
// the SQS catalog default group (codex round-2 P1 on PR #715).
157+
func TestSQSPartitionResolver_UnknownQueueRecognisedButUnresolved(t *testing.T) {
157158
t.Parallel()
158159
r := NewSQSPartitionResolver(map[string][]uint64{
159160
"known.fifo": {10, 11},
@@ -162,15 +163,17 @@ func TestSQSPartitionResolver_UnknownQueueFallsThrough(t *testing.T) {
162163
gid, ok := r.ResolveGroup(key)
163164
require.False(t, ok)
164165
require.Equal(t, uint64(0), gid)
166+
require.True(t, r.RecognisesPartitionedKey(key),
167+
"unknown-queue partitioned key must still be recognised — "+
168+
"the router pairs Recognised=true with ok=false to fail "+
169+
"closed instead of falling through to the engine")
165170
}
166171

167-
// TestSQSPartitionResolver_OutOfRangePartitionFallsThrough pins
168-
// that a partition value beyond the configured PartitionCount
169-
// returns (0, false). Same fail-closed argument as the unknown-
170-
// queue case — if a writer produced a key with partition 4 but the
171-
// resolver's view only has partitions [0, 3], routing through the
172-
// engine surfaces the disagreement at the request boundary.
173-
func TestSQSPartitionResolver_OutOfRangePartitionFallsThrough(t *testing.T) {
172+
// TestSQSPartitionResolver_OutOfRangePartitionRecognisedButUnresolved
173+
// pins the round-5 fail-closed contract for a partition value
174+
// beyond the configured PartitionCount. Same router-side
175+
// fail-closed argument as the unknown-queue case.
176+
func TestSQSPartitionResolver_OutOfRangePartitionRecognisedButUnresolved(t *testing.T) {
174177
t.Parallel()
175178
r := NewSQSPartitionResolver(map[string][]uint64{
176179
"q.fifo": {10, 11},
@@ -182,6 +185,10 @@ func TestSQSPartitionResolver_OutOfRangePartitionFallsThrough(t *testing.T) {
182185
gid, ok := r.ResolveGroup(key)
183186
require.False(t, ok)
184187
require.Equal(t, uint64(0), gid)
188+
require.True(t, r.RecognisesPartitionedKey(key),
189+
"OOR-partition key must still be recognised — the router "+
190+
"pairs Recognised=true with ok=false to fail closed "+
191+
"instead of falling through to the engine")
185192
}
186193

187194
// TestSQSPartitionResolver_NilReceiverIsSafe pins defensive
@@ -197,6 +204,82 @@ func TestSQSPartitionResolver_NilReceiverIsSafe(t *testing.T) {
197204
require.Equal(t, uint64(0), gid)
198205
}
199206

207+
// TestSQSPartitionResolver_RecognisesPartitionedKey pins the
208+
// shape-only predicate the router uses to decide between
209+
// fall-through and fail-closed (codex round-2 P1 on PR #715).
210+
// RecognisesPartitionedKey MUST answer purely on the structural
211+
// shape — partitioned family prefix + queue + '|' terminator +
212+
// be32 partition — independent of whether the queue is in the
213+
// routes map. Otherwise the router could not reliably fail-closed
214+
// for unresolved-but-recognised keys.
215+
func TestSQSPartitionResolver_RecognisesPartitionedKey(t *testing.T) {
216+
t.Parallel()
217+
r := NewSQSPartitionResolver(map[string][]uint64{
218+
"known.fifo": {10, 11},
219+
})
220+
cases := []struct {
221+
name string
222+
key []byte
223+
want bool
224+
}{
225+
{
226+
name: "data family + known queue",
227+
key: sqsPartitionedMsgDataKey("known.fifo", 0, 1, "id"),
228+
want: true,
229+
},
230+
{
231+
name: "vis family + unknown queue (recognised, unresolved)",
232+
key: sqsPartitionedMsgVisKey("not-in-routes.fifo", 0, 1, 1, "id"),
233+
want: true,
234+
},
235+
{
236+
name: "byage family + OOR partition (recognised, unresolved)",
237+
key: sqsPartitionedMsgByAgeKey("known.fifo", 99, 1, 1, "id"),
238+
want: true,
239+
},
240+
{
241+
name: "legacy SQS key — not partitioned",
242+
key: sqsMsgDataKey("known.fifo", 1, "id"),
243+
want: false,
244+
},
245+
{
246+
name: "non-SQS key",
247+
key: []byte("/foo/bar"),
248+
want: false,
249+
},
250+
{
251+
name: "queue meta key",
252+
key: sqsQueueMetaKey("known.fifo"),
253+
want: false,
254+
},
255+
{
256+
name: "empty",
257+
key: []byte{},
258+
want: false,
259+
},
260+
{
261+
name: "nil",
262+
key: nil,
263+
want: false,
264+
},
265+
}
266+
for _, tc := range cases {
267+
t.Run(tc.name, func(t *testing.T) {
268+
t.Parallel()
269+
require.Equal(t, tc.want, r.RecognisesPartitionedKey(tc.key))
270+
})
271+
}
272+
}
273+
274+
// TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver
275+
// pins the typed-nil-safe branch — same as ResolveGroup, but for
276+
// the new predicate.
277+
func TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver(t *testing.T) {
278+
t.Parallel()
279+
var r *SQSPartitionResolver
280+
require.False(t, r.RecognisesPartitionedKey([]byte("any-key")))
281+
}
282+
200283
// TestSQSPartitionResolver_PrefixesAlign pins that the resolver's
201284
// family-prefix list matches the constants in sqs_keys.go exactly.
202285
// A future renamed prefix or added family that touches sqs_keys.go

kv/shard_router.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,28 @@ import (
2121
// Implementations must be safe for concurrent use — ResolveGroup is
2222
// called on the request hot path. Returning (0, false) for a key
2323
// the resolver does not recognise lets the router fall through to
24-
// the engine.
24+
// the engine. Returning (0, false) for a key the resolver DOES
25+
// recognise (the partitioned shape matches but the queue is not
26+
// in the map, or the partition is out of range) is also valid;
27+
// the router uses RecognisesPartitionedKey to distinguish "not
28+
// partitioned, fall through" from "partitioned but unresolved,
29+
// fail closed".
30+
//
31+
// The fail-closed split matters under partition-map drift / a
32+
// partial rollout: without it, an unresolved partitioned key
33+
// would silently land on the engine's SQS-catalog default group
34+
// (because routeKey normalises every !sqs|... key to
35+
// !sqs|route|global) instead of surfacing a routing error.
2536
type PartitionResolver interface {
2637
ResolveGroup(key []byte) (uint64, bool)
38+
39+
// RecognisesPartitionedKey reports whether the key SHAPE is
40+
// one this resolver is responsible for. Implementations
41+
// answer based on prefix / structural inspection only — the
42+
// answer must NOT depend on any in-memory mapping that could
43+
// drift out of sync, otherwise the router cannot reliably
44+
// fail-closed for unresolved-but-recognised keys.
45+
RecognisesPartitionedKey(key []byte) bool
2746
}
2847

2948
// ShardRouter routes requests to multiple raft groups based on key ranges.
@@ -87,6 +106,15 @@ func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter {
87106
// legacy routing (catalog → !sqs|route|global → default group)
88107
// stays unchanged.
89108
//
109+
// Fail-closed for recognised-but-unresolved keys: when the resolver
110+
// recognises a partitioned shape (RecognisesPartitionedKey == true)
111+
// but cannot resolve the queue/partition pair (ResolveGroup returns
112+
// ok=false), the router refuses to fall through to the engine.
113+
// Otherwise the engine would route the partitioned key to
114+
// !sqs|route|global's default group, silently mis-routing HT-FIFO
115+
// traffic during partition-map drift or partial rollout (codex P1
116+
// round 2 on PR #715).
117+
//
90118
// Returns (0, false) when neither the resolver nor the engine
91119
// recognises the key. Caller surfaces this as an "unknown group"
92120
// error so a partitioned-prefix key whose queue is missing from the
@@ -97,6 +125,14 @@ func (s *ShardRouter) ResolveGroup(rawKey []byte) (uint64, bool) {
97125
if gid, ok := s.partitionResolver.ResolveGroup(rawKey); ok {
98126
return gid, true
99127
}
128+
if s.partitionResolver.RecognisesPartitionedKey(rawKey) {
129+
// Partitioned shape, but the resolver cannot map it
130+
// (unknown queue, out-of-range partition). Fail closed
131+
// — the engine's catalog-level default route would
132+
// silently misroute this through routeKey's
133+
// !sqs|route|global collapse.
134+
return 0, false
135+
}
100136
}
101137
// Engine routes against the user-key view of the byte-range
102138
// space; routeKey may rewrite SQS / DynamoDB / Redis-internal

kv/shard_router_partition_test.go

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,31 @@ import (
1717
// router-side tests — keeps the tests free of any adapter-package
1818
// dependency so the routing contract is verified at the kv layer in
1919
// isolation.
20+
//
21+
// recognisedPrefix lets a test simulate the "recognised but
22+
// unresolved" failure mode (codex round-2 P1 on PR #715): keys
23+
// that start with this prefix and miss the routes map cause
24+
// ResolveGroup to return (0, false) AND
25+
// RecognisesPartitionedKey to return true, so the router fails
26+
// closed instead of falling through to the engine.
2027
type fakePartitionResolver struct {
21-
routes map[string]uint64
28+
routes map[string]uint64
29+
recognisedPrefix []byte
2230
}
2331

2432
func (f *fakePartitionResolver) ResolveGroup(key []byte) (uint64, bool) {
2533
gid, ok := f.routes[string(key)]
2634
return gid, ok
2735
}
2836

37+
func (f *fakePartitionResolver) RecognisesPartitionedKey(key []byte) bool {
38+
if len(f.recognisedPrefix) == 0 {
39+
return false
40+
}
41+
return len(key) >= len(f.recognisedPrefix) &&
42+
bytes.HasPrefix(key, f.recognisedPrefix)
43+
}
44+
2945
// TestShardRouter_PartitionResolverWins pins that when the resolver
3046
// claims a key, the router dispatches to the resolver's group even
3147
// if the byte-range engine would have routed elsewhere. This is the
@@ -176,6 +192,45 @@ func TestShardRouter_ResolverSeesRawKeyNotNormalized(t *testing.T) {
176192
"resolver, contrary to the §3.D PR 4-B-2 design")
177193
}
178194

195+
// TestShardRouter_FailClosedOnRecognisedButUnresolved pins the
196+
// codex round-2 P1 fix on PR #715: a key the resolver recognises
197+
// (partitioned shape) but cannot resolve (unknown queue / OOR
198+
// partition) must NOT fall through to the engine. Without the
199+
// fail-closed branch, the engine's routeKey-collapsed
200+
// !sqs|route|global default would silently swallow the misroute
201+
// during partition-map drift / partial rollout.
202+
func TestShardRouter_FailClosedOnRecognisedButUnresolved(t *testing.T) {
203+
t.Parallel()
204+
e := distribution.NewEngine()
205+
e.UpdateRoute([]byte(""), nil, 1) // engine would route everything to group 1
206+
207+
router := NewShardRouter(e)
208+
// Resolver claims nothing but RECOGNISES the partitioned shape
209+
// for any key starting with "!sqs|msg|data|p|".
210+
router.WithPartitionResolver(&fakePartitionResolver{
211+
routes: map[string]uint64{},
212+
recognisedPrefix: []byte("!sqs|msg|data|p|"),
213+
})
214+
215+
// A partitioned-shaped key the resolver recognises but cannot
216+
// resolve. Without fail-closed this would silently route to
217+
// group 1 via the engine's default.
218+
_, ok := router.ResolveGroup([]byte("!sqs|msg|data|p|unknown-queue"))
219+
require.False(t, ok,
220+
"recognised-but-unresolved partitioned key must fail closed; "+
221+
"engine fall-through would silently mis-route HT-FIFO "+
222+
"traffic to the !sqs|route|global default group during "+
223+
"partition-map drift")
224+
225+
// A non-partitioned key is NOT recognised → must still fall
226+
// through to the engine. This pins that fail-closed only fires
227+
// for the recognised-but-unresolved case.
228+
gid, ok := router.ResolveGroup([]byte("legacy-key"))
229+
require.True(t, ok)
230+
require.Equal(t, uint64(1), gid,
231+
"non-recognised key must fall through to engine default")
232+
}
233+
179234
// TestShardRouter_GetUsesResolver pins that the resolver-first path
180235
// applies to Get as well as Commit/Abort. A regression that fixed
181236
// only Commit's path would silently route reads through the engine
@@ -227,6 +282,14 @@ func (r *recordingResolver) ResolveGroup(key []byte) (uint64, bool) {
227282
return 0, false
228283
}
229284

285+
// RecognisesPartitionedKey returns false unconditionally — these
286+
// router-side tests use the resolver-wins / fall-through paths
287+
// only; the recognised-but-unresolved path is exercised separately
288+
// via fakePartitionResolver.recognisedPrefix.
289+
func (r *recordingResolver) RecognisesPartitionedKey([]byte) bool {
290+
return false
291+
}
292+
230293
func (r *recordingResolver) seenKeys() [][]byte {
231294
r.mu.Lock()
232295
defer r.mu.Unlock()

kv/sharded_coordinator_partition_test.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@ import (
1313
// stubResolver routes any key in claim to the named group; everything
1414
// else returns (0, false). Used by the coordinator-level resolver
1515
// regression tests so they don't depend on the adapter package.
16+
//
17+
// recognisedPrefix simulates the "recognised but unresolved" path
18+
// (codex round-2 P1 on PR #715) — keys with this prefix that miss
19+
// claim cause RecognisesPartitionedKey to return true so the
20+
// router fails closed instead of falling through to the engine.
1621
type stubResolver struct {
17-
mu sync.Mutex
18-
claim map[string]uint64
19-
calls [][]byte
22+
mu sync.Mutex
23+
claim map[string]uint64
24+
recognisedPrefix []byte
25+
calls [][]byte
2026
}
2127

2228
func (s *stubResolver) ResolveGroup(key []byte) (uint64, bool) {
@@ -29,6 +35,14 @@ func (s *stubResolver) ResolveGroup(key []byte) (uint64, bool) {
2935
return 0, false
3036
}
3137

38+
func (s *stubResolver) RecognisesPartitionedKey(key []byte) bool {
39+
if len(s.recognisedPrefix) == 0 {
40+
return false
41+
}
42+
return len(key) >= len(s.recognisedPrefix) &&
43+
string(key[:len(s.recognisedPrefix)]) == string(s.recognisedPrefix)
44+
}
45+
3246
func (s *stubResolver) callKeys() [][]byte {
3347
s.mu.Lock()
3448
defer s.mu.Unlock()

0 commit comments

Comments
 (0)