Skip to content

Commit f4a12e4

Browse files
committed
test(sqs): coordinator-level regression for resolver dispatch
Round 2 Claude review on PR #715 flagged that the Gemini-HIGH fix (ShardedCoordinator's groupMutations / groupForKey / etc. now route through c.router.ResolveGroup) had no coordinator-level regression test — the existing TestShardRouter_* tests pin the dispatch logic at the router layer but don't exercise the path Dispatch → groupMutations → c.router.ResolveGroup. Per CLAUDE.md ("when code review surfaces a defect, first add a failing test that reproduces the issue, then make it pass with the fix"), this commit lands the missing test. Two new tests in kv/sharded_coordinator_partition_test.go: - TestShardedCoordinator_DispatchHonoursPartitionResolver pins the Gemini HIGH fix: with the engine routing everything to group 1 but the resolver claiming a specific key for group 42, Dispatch on that key MUST hit group 42's recordingTransactional. Before the round-2 fix the request would have landed on group 1 because groupMutations called c.engine.GetRoute directly. Also asserts the resolver received the RAW partitioned key — pins the codex-P1 fix at the coordinator-call boundary. - TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys pins the inverse: keys NOT claimed by the resolver continue to route via the byte-range engine. Without this, the resolver-first short-circuit could mask engine routing decisions. stubResolver is a kv-internal PartitionResolver double so the tests don't pull in the adapter package. Each call records the raw key bytes (defensive copy) so concurrent reads stay race-safe under -race.
1 parent 7f3a643 commit f4a12e4

1 file changed

Lines changed: 152 additions & 0 deletions

File tree

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package kv
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
8+
"github.com/bootjp/elastickv/distribution"
9+
"github.com/bootjp/elastickv/store"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
// stubResolver routes any key in claim to the named group; everything
14+
// else returns (0, false). Used by the coordinator-level resolver
15+
// regression tests so they don't depend on the adapter package.
16+
type stubResolver struct {
17+
mu sync.Mutex
18+
claim map[string]uint64
19+
calls [][]byte
20+
}
21+
22+
func (s *stubResolver) ResolveGroup(key []byte) (uint64, bool) {
23+
s.mu.Lock()
24+
defer s.mu.Unlock()
25+
s.calls = append(s.calls, append([]byte(nil), key...))
26+
if gid, ok := s.claim[string(key)]; ok {
27+
return gid, true
28+
}
29+
return 0, false
30+
}
31+
32+
func (s *stubResolver) callKeys() [][]byte {
33+
s.mu.Lock()
34+
defer s.mu.Unlock()
35+
out := make([][]byte, len(s.calls))
36+
copy(out, s.calls)
37+
return out
38+
}
39+
40+
// TestShardedCoordinator_DispatchHonoursPartitionResolver pins the
41+
// Gemini-HIGH fix: ShardedCoordinator's groupMutations path now
42+
// calls c.router.ResolveGroup, so a Dispatch whose key is claimed
43+
// by the partition resolver MUST land on the resolver's group, not
44+
// the engine's default group. Before the fix the coordinator
45+
// bypassed the resolver entirely and partitioned-FIFO traffic
46+
// silently mis-routed through 2PC.
47+
//
48+
// Two-group setup: engine routes everything to group 1; resolver
49+
// claims one specific key for group 42. Dispatch on that key must
50+
// hit group 42's recordingTransactional, leaving group 1's
51+
// recorder empty.
52+
func TestShardedCoordinator_DispatchHonoursPartitionResolver(t *testing.T) {
53+
t.Parallel()
54+
engine := distribution.NewEngine()
55+
engine.UpdateRoute([]byte(""), nil, 1)
56+
57+
g1 := &recordingTransactional{
58+
responses: []*TransactionResponse{{CommitIndex: 1}, {CommitIndex: 1}},
59+
}
60+
g42 := &recordingTransactional{
61+
responses: []*TransactionResponse{{CommitIndex: 1}, {CommitIndex: 1}},
62+
}
63+
64+
coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{
65+
1: {Txn: g1, Store: store.NewMVCCStore()},
66+
42: {Txn: g42, Store: store.NewMVCCStore()},
67+
}, 1, NewHLC(), nil)
68+
69+
resolver := &stubResolver{claim: map[string]uint64{
70+
"!sqs|msg|data|p|partitioned-key": 42,
71+
}}
72+
coord.WithPartitionResolver(resolver)
73+
74+
resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{
75+
Elems: []*Elem[OP]{
76+
{Op: Put, Key: []byte("!sqs|msg|data|p|partitioned-key"), Value: []byte("v")},
77+
},
78+
})
79+
require.NoError(t, err)
80+
require.NotNil(t, resp)
81+
82+
// The whole point: the request landed on group 42, NOT 1.
83+
g1.mu.Lock()
84+
g42.mu.Lock()
85+
g1Count := len(g1.requests)
86+
g42Count := len(g42.requests)
87+
g1.mu.Unlock()
88+
g42.mu.Unlock()
89+
90+
require.Zero(t, g1Count,
91+
"engine's default group must NOT receive a request when the "+
92+
"resolver claimed the key — coordinator's groupMutations "+
93+
"would otherwise bypass the partition resolver")
94+
require.Equal(t, 1, g42Count,
95+
"resolver-claimed group must receive exactly one request")
96+
97+
// And the resolver was indeed consulted with the raw partitioned
98+
// key — pins the codex-P1 fix at the coordinator-call boundary.
99+
calls := resolver.callKeys()
100+
require.NotEmpty(t, calls)
101+
require.Equal(t, []byte("!sqs|msg|data|p|partitioned-key"), calls[0])
102+
}
103+
104+
// TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys pins
105+
// the inverse: a key the resolver does NOT claim must continue to
106+
// route via the byte-range engine. Without this guard the resolver-
107+
// first short-circuit could mask engine routing decisions.
108+
func TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys(t *testing.T) {
109+
t.Parallel()
110+
engine := distribution.NewEngine()
111+
engine.UpdateRoute([]byte("a"), []byte("m"), 1)
112+
engine.UpdateRoute([]byte("m"), nil, 2)
113+
114+
g1 := &recordingTransactional{
115+
responses: []*TransactionResponse{{CommitIndex: 1}},
116+
}
117+
g2 := &recordingTransactional{
118+
responses: []*TransactionResponse{{CommitIndex: 2}},
119+
}
120+
121+
coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{
122+
1: {Txn: g1, Store: store.NewMVCCStore()},
123+
2: {Txn: g2, Store: store.NewMVCCStore()},
124+
}, 1, NewHLC(), nil)
125+
126+
// Resolver only claims a key that ISN'T in the request — the
127+
// dispatch must fall through to the engine.
128+
coord.WithPartitionResolver(&stubResolver{claim: map[string]uint64{
129+
"!sqs|msg|data|p|other-key": 42,
130+
}})
131+
132+
// "x" lands in [m, ∞) → engine routes to group 2.
133+
resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{
134+
Elems: []*Elem[OP]{
135+
{Op: Put, Key: []byte("x"), Value: []byte("v")},
136+
},
137+
})
138+
require.NoError(t, err)
139+
require.NotNil(t, resp)
140+
141+
g1.mu.Lock()
142+
g2.mu.Lock()
143+
g1Count := len(g1.requests)
144+
g2Count := len(g2.requests)
145+
g1.mu.Unlock()
146+
g2.mu.Unlock()
147+
148+
require.Zero(t, g1Count,
149+
"unclaimed key must engine-route to group 2, not group 1")
150+
require.Equal(t, 1, g2Count,
151+
"engine fallthrough must dispatch to group 2 for keys in [m, ∞)")
152+
}

0 commit comments

Comments
 (0)