Skip to content

Commit d4d7ad8

Browse files
authored
feat(sqs): --sqsFifoPartitionMap flag + parser + validator (Phase 3.D PR 4-A) (#704)
## Summary Phase 3.D PR 4-A of the split-queue FIFO rollout. Operator-config foundation: parses `--sqsFifoPartitionMap`, validates against `--raftGroups`, plumbs through `parseRuntimeConfig` into `runtimeConfig.sqsFifoPartitionMap`. Stacks on top of #703 (PR 3, partitioned-keyspace constructors). ## What's added - `--sqsFifoPartitionMap` flag with grammar `queue.fifo:N=group_0,...,group_{N-1};...` - `parseSQSFifoPartitionMap` + `parseSQSFifoPartitionMapEntry` (grammar + per-entry consistency) - `validateSQSFifoPartitionMap` (cross-check against `--raftGroups`) - `sqsFifoQueueRouting` struct (PartitionCount + Groups) - `runtimeConfig.sqsFifoPartitionMap` field ## What's NOT added (deferred to PR 4-B) - Routing layer wiring through `ShardedCoordinator` - `htfifo` capability advertisement on `/sqs_health` - `kv/lease_state.go` leadership-refusal hook for non-htfifo binaries - Catalog polling for partitioned-queue discovery at startup These are coupled by the design's "binary advertises htfifo only when both routing AND leadership-refusal are in place" rule, so they belong in one PR. ## Validation coverage - PartitionCount > 0 - PartitionCount ≤ 32 (per-queue cap) - PartitionCount must be a power of two - `len(Groups) == PartitionCount` - No duplicate queues across entries - Every named group exists in `--raftGroups` - Empty queue / empty group / trailing-comma / missing-`=` all rejected with the offending entry quoted in the error ## Test plan - [x] `TestParseSQSFifoPartitionMap` covers grammar, whitespace, multiple queues, all rejection paths. - [x] `TestValidateSQSFifoPartitionMap` covers the missing-group case with queue + partition-index pointer. - [x] `parseRuntimeConfig` integration: `main_bootstrap_e2e_test.go` updated for the new parameter. - [x] `go test -race ./adapter/... .` pass. - [x] `golangci-lint ./...` clean. ## Self-review (per CLAUDE.md) 1. **Data loss** — config-only change. No FSM, Pebble, or retention path is touched. No issue. 2. **Concurrency / distributed failures** — single-shot parse at startup, no goroutines or shared state. No issue. 3. **Performance** — startup-time validation only. No hot-path effect. No issue. 4. **Data consistency** — partition count and group list must agree at parse time, so the runtime cannot see a half-shaped routing map. The flag has no effect on production traffic until PR 4-B + PR 5 land. No issue. 5. **Test coverage** — 11 sub-tests in `TestParseSQSFifoPartitionMap` and 3 in `TestValidateSQSFifoPartitionMap`, covering every documented rejection branch.
2 parents c153659 + 5d7cb96 commit d4d7ad8

4 files changed

Lines changed: 419 additions & 19 deletions

File tree

main.go

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,19 @@ var (
104104
raftS3Map = flag.String("raftS3Map", "", "Map of Raft address to S3 address (raftAddr=s3Addr,...)")
105105
raftDynamoMap = flag.String("raftDynamoMap", "", "Map of Raft address to DynamoDB address (raftAddr=dynamoAddr,...)")
106106
raftSqsMap = flag.String("raftSqsMap", "", "Map of Raft address to SQS address (raftAddr=sqsAddr,...)")
107+
// HT-FIFO partition assignment (Phase 3.D §5). Distinct from
108+
// --raftSqsMap (which maps raftAddr=sqsAddr for the
109+
// proxyToLeader endpoint resolution). The grammar is
110+
// `queue.fifo:N=group_0,...,group_{N-1}` with multiple queues
111+
// separated by `;`. Empty by default — leaving the flag empty
112+
// means no FIFO queue is partitioned and the legacy
113+
// single-partition layout applies to every queue. PR 5 of the
114+
// rollout plan consumes this map to dispatch SendMessage and
115+
// fan out ReceiveMessage; the §11 PR 2 dormancy gate currently
116+
// rejects PartitionCount > 1 on CreateQueue regardless of this
117+
// flag, so populating it has no effect on production traffic
118+
// until PR 5 lands.
119+
sqsFifoPartitionMap = flag.String("sqsFifoPartitionMap", "", "HT-FIFO partition map (queue.fifo:N=group_0,...,group_{N-1};...)")
107120
// Admin gRPC service flags (this PR — wired into the per-group raft
108121
// listeners; consumed by cmd/elastickv-admin via the bearer-token
109122
// gateway). These are independent of the admin HTTP listener flags
@@ -369,7 +382,7 @@ func resolveRuntimeInputs() (runtimeConfig, raftEngineType, []raftengine.Server,
369382
return runtimeConfig{}, "", nil, false, err
370383
}
371384

372-
cfg, err := parseRuntimeConfig(*myAddr, *redisAddr, *s3Addr, *dynamoAddr, *sqsAddr, *raftGroups, *shardRanges, *raftRedisMap, *raftS3Map, *raftDynamoMap, *raftSqsMap)
385+
cfg, err := parseRuntimeConfig(*myAddr, *redisAddr, *s3Addr, *dynamoAddr, *sqsAddr, *raftGroups, *shardRanges, *raftRedisMap, *raftS3Map, *raftDynamoMap, *raftSqsMap, *sqsFifoPartitionMap)
373386
if err != nil {
374387
return runtimeConfig{}, "", nil, false, err
375388
}
@@ -383,17 +396,18 @@ func resolveRuntimeInputs() (runtimeConfig, raftEngineType, []raftengine.Server,
383396
}
384397

385398
type runtimeConfig struct {
386-
groups []groupSpec
387-
defaultGroup uint64
388-
engine *distribution.Engine
389-
leaderRedis map[string]string
390-
leaderS3 map[string]string
391-
leaderDynamo map[string]string
392-
leaderSQS map[string]string
393-
multi bool
399+
groups []groupSpec
400+
defaultGroup uint64
401+
engine *distribution.Engine
402+
leaderRedis map[string]string
403+
leaderS3 map[string]string
404+
leaderDynamo map[string]string
405+
leaderSQS map[string]string
406+
sqsFifoPartitionMap map[string]sqsFifoQueueRouting
407+
multi bool
394408
}
395409

396-
func parseRuntimeConfig(myAddr, redisAddr, s3Addr, dynamoAddr, sqsAddr, raftGroups, shardRanges, raftRedisMap, raftS3Map, raftDynamoMap, raftSqsMap string) (runtimeConfig, error) {
410+
func parseRuntimeConfig(myAddr, redisAddr, s3Addr, dynamoAddr, sqsAddr, raftGroups, shardRanges, raftRedisMap, raftS3Map, raftDynamoMap, raftSqsMap, sqsFifoPartitionMapRaw string) (runtimeConfig, error) {
397411
groups, err := parseRaftGroups(raftGroups, myAddr)
398412
if err != nil {
399413
return runtimeConfig{}, errors.Wrapf(err, "failed to parse raft groups")
@@ -425,15 +439,21 @@ func parseRuntimeConfig(myAddr, redisAddr, s3Addr, dynamoAddr, sqsAddr, raftGrou
425439
return runtimeConfig{}, errors.Wrapf(err, "failed to parse raft sqs map")
426440
}
427441

442+
sqsFifoPartitionMap, err := buildSQSFifoPartitionMap(groups, sqsFifoPartitionMapRaw)
443+
if err != nil {
444+
return runtimeConfig{}, err
445+
}
446+
428447
return runtimeConfig{
429-
groups: groups,
430-
defaultGroup: defaultGroup,
431-
engine: engine,
432-
leaderRedis: leaderRedis,
433-
leaderS3: leaderS3,
434-
leaderDynamo: leaderDynamo,
435-
leaderSQS: leaderSQS,
436-
multi: len(groups) > 1,
448+
groups: groups,
449+
defaultGroup: defaultGroup,
450+
engine: engine,
451+
leaderRedis: leaderRedis,
452+
leaderS3: leaderS3,
453+
leaderDynamo: leaderDynamo,
454+
leaderSQS: leaderSQS,
455+
sqsFifoPartitionMap: sqsFifoPartitionMap,
456+
multi: len(groups) > 1,
437457
}, nil
438458
}
439459

@@ -457,6 +477,28 @@ func buildLeaderSQS(groups []groupSpec, sqsAddr string, raftSqsMap string) (map[
457477
return buildLeaderAddrMap(groups, sqsAddr, raftSqsMap, parseRaftSQSMap)
458478
}
459479

480+
// buildSQSFifoPartitionMap parses and validates the
481+
// --sqsFifoPartitionMap flag against the configured Raft groups.
482+
// Extracted from parseRuntimeConfig so that function stays under the
483+
// cyclop ceiling once the SQS HT-FIFO config plumbing landed.
484+
func buildSQSFifoPartitionMap(groups []groupSpec, raw string) (map[string]sqsFifoQueueRouting, error) {
485+
parsed, err := parseSQSFifoPartitionMap(raw)
486+
if err != nil {
487+
return nil, errors.Wrapf(err, "failed to parse sqs fifo partition map")
488+
}
489+
if len(parsed) == 0 {
490+
return parsed, nil
491+
}
492+
groupIDs := make(map[string]struct{}, len(groups))
493+
for _, g := range groups {
494+
groupIDs[strconv.FormatUint(g.id, 10)] = struct{}{}
495+
}
496+
if err := validateSQSFifoPartitionMap(parsed, groupIDs); err != nil {
497+
return nil, errors.Wrapf(err, "invalid sqs fifo partition map")
498+
}
499+
return parsed, nil
500+
}
501+
460502
func buildLeaderDynamo(groups []groupSpec, dynamoAddr string, raftDynamoMap string) (map[string]string, error) {
461503
return buildLeaderAddrMap(groups, dynamoAddr, raftDynamoMap, parseRaftDynamoMap)
462504
}

main_bootstrap_e2e_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ func startBootstrapE2ENode(
340340
bootstrapMembers string,
341341
engineType raftEngineType,
342342
) (*bootstrapE2ENode, error) {
343-
cfg, err := parseRuntimeConfig(ep.raftAddr, ep.redisAddr, "", "", "", "", "", "", "", "", "")
343+
cfg, err := parseRuntimeConfig(ep.raftAddr, ep.redisAddr, "", "", "", "", "", "", "", "", "", "")
344344
if err != nil {
345345
return nil, err
346346
}

shard_config.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package main
22

33
import (
44
"bytes"
5+
"maps"
6+
"slices"
57
"strconv"
68
"strings"
79

@@ -33,9 +35,29 @@ var (
3335
ErrInvalidRaftS3MapEntry = errors.New("invalid raftS3Map entry")
3436
ErrInvalidRaftDynamoMapEntry = errors.New("invalid raftDynamoMap entry")
3537
ErrInvalidRaftSQSMapEntry = errors.New("invalid raftSqsMap entry")
38+
ErrInvalidSQSFifoPartitionMapEntry = errors.New("invalid sqsFifoPartitionMap entry")
3639
ErrInvalidRaftBootstrapMembersEntry = errors.New("invalid raftBootstrapMembers entry")
3740
)
3841

42+
// sqsFifoPartitionMaxPartitions caps the per-queue partition count so
43+
// the partitionFor mask + bucket-store sizing arguments in
44+
// docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md §3.1 stay
45+
// honest: 32 partitions × ~1k RPS per shard ≈ 30k aggregate RPS per
46+
// queue, which matches the design's stated ceiling. Operators who
47+
// need more should split the workload across queues rather than
48+
// raising this value.
49+
const sqsFifoPartitionMaxPartitions = 32
50+
51+
// sqsFifoQueueRouting captures the operator-supplied partition-to-
52+
// group assignment for a single FIFO queue. Groups are listed in
53+
// partition-index order — Groups[k] owns partition k. The validator
54+
// (validateSQSFifoPartitionMap) checks that PartitionCount matches
55+
// len(Groups), is a power of two, and is within the per-queue cap.
56+
type sqsFifoQueueRouting struct {
57+
partitionCount uint32
58+
groups []string
59+
}
60+
3961
func parseRaftGroups(raw, defaultAddr string) ([]groupSpec, error) {
4062
if raw == "" {
4163
if defaultAddr == "" {
@@ -133,6 +155,175 @@ func parseRaftSQSMap(raw string) (map[string]string, error) {
133155
return parseRaftAddressMap(raw, ErrInvalidRaftSQSMapEntry)
134156
}
135157

158+
// parseSQSFifoPartitionMap reads the `--sqsFifoPartitionMap` operator
159+
// flag. The grammar is:
160+
//
161+
// queue1.fifo:N=group_0,group_1,...,group_{N-1}
162+
// ;queue2.fifo:M=group_0,...,group_{M-1}
163+
//
164+
// Multiple queue entries are separated by ';' (commas are reserved for
165+
// the per-queue group list). Each queue's PartitionCount must equal
166+
// len(Groups) — a mismatch is rejected at parse time so a config error
167+
// cannot silently produce a wrong-shaped routing map at runtime.
168+
//
169+
// This function does not validate that referenced Raft groups exist
170+
// or that the queue exists in the catalog; that's
171+
// validateSQSFifoPartitionMap's job and runs against the parsed
172+
// runtime config after both --raftGroups and --sqsFifoPartitionMap
173+
// have been parsed.
174+
func parseSQSFifoPartitionMap(raw string) (map[string]sqsFifoQueueRouting, error) {
175+
out := make(map[string]sqsFifoQueueRouting)
176+
if raw == "" {
177+
return out, nil
178+
}
179+
entries := strings.SplitSeq(raw, ";")
180+
for entry := range entries {
181+
entry = strings.TrimSpace(entry)
182+
if entry == "" {
183+
continue
184+
}
185+
queue, routing, err := parseSQSFifoPartitionMapEntry(entry)
186+
if err != nil {
187+
return nil, err
188+
}
189+
if _, dup := out[queue]; dup {
190+
return nil, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
191+
"duplicate queue %q", queue)
192+
}
193+
out[queue] = routing
194+
}
195+
return out, nil
196+
}
197+
198+
func parseSQSFifoPartitionMapEntry(entry string) (string, sqsFifoQueueRouting, error) {
199+
// Shape: queue.fifo:N=g0,g1,...,g{N-1}
200+
colonIdx := strings.Index(entry, ":")
201+
eqIdx := strings.Index(entry, "=")
202+
if colonIdx <= 0 || eqIdx <= colonIdx+1 {
203+
return "", sqsFifoQueueRouting{}, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
204+
"%q: expected queue.fifo:N=group_0,...,group_{N-1}", entry)
205+
}
206+
queue := strings.TrimSpace(entry[:colonIdx])
207+
if queue == "" {
208+
return "", sqsFifoQueueRouting{}, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
209+
"%q: empty queue name", entry)
210+
}
211+
count, err := parseSQSFifoPartitionCount(entry, entry[colonIdx+1:eqIdx])
212+
if err != nil {
213+
return "", sqsFifoQueueRouting{}, err
214+
}
215+
groups, err := parseSQSFifoGroupList(entry, entry[eqIdx+1:], count)
216+
if err != nil {
217+
return "", sqsFifoQueueRouting{}, err
218+
}
219+
return queue, sqsFifoQueueRouting{partitionCount: count, groups: groups}, nil
220+
}
221+
222+
// parseSQSFifoPartitionCount validates the N in `queue.fifo:N=...`.
223+
// Extracted from parseSQSFifoPartitionMapEntry to keep that function
224+
// under the cyclop ceiling once the validation surface grew to four
225+
// distinct rejections (parse error, zero, overflow cap, non-power-of-2).
226+
func parseSQSFifoPartitionCount(entry, countStr string) (uint32, error) {
227+
countStr = strings.TrimSpace(countStr)
228+
count64, err := strconv.ParseUint(countStr, 10, 32)
229+
if err != nil {
230+
return 0, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
231+
"%q: PartitionCount %q must be a positive decimal integer", entry, countStr)
232+
}
233+
if count64 == 0 {
234+
return 0, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
235+
"%q: PartitionCount must be > 0", entry)
236+
}
237+
if count64 > sqsFifoPartitionMaxPartitions {
238+
return 0, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
239+
"%q: PartitionCount %d exceeds the per-queue cap of %d",
240+
entry, count64, sqsFifoPartitionMaxPartitions)
241+
}
242+
if count64&(count64-1) != 0 {
243+
return 0, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
244+
"%q: PartitionCount %d must be a power of two", entry, count64)
245+
}
246+
// count64 is bounded by sqsFifoPartitionMaxPartitions (32) — well
247+
// inside uint32 range — so the narrowing is safe by construction.
248+
return uint32(count64), nil
249+
}
250+
251+
// parseSQSFifoGroupList validates the comma-separated group list and
252+
// asserts its length matches the parsed PartitionCount. Extracted for
253+
// the same cyclop reason as parseSQSFifoPartitionCount.
254+
//
255+
// Group tokens are canonicalized as uint64 (parsed via strconv.ParseUint
256+
// then re-formatted via strconv.FormatUint) so they line up with the
257+
// canonical IDs parseRaftGroups produces. This drops leading-zero
258+
// formatting (e.g. "01" → "1") and rejects non-numeric group references
259+
// at parse time — without this round-trip, a config like
260+
// --raftGroups "01=a" with --sqsFifoPartitionMap "q.fifo:1=01" would
261+
// fail validation because raftGroups becomes {"1": ...} while the
262+
// partition map keeps "01". Catching the bad shape at parse time keeps
263+
// the validator's error free to point at the missing group.
264+
func parseSQSFifoGroupList(entry, groupRaw string, count uint32) ([]string, error) {
265+
groupRaw = strings.TrimSpace(groupRaw)
266+
if groupRaw == "" {
267+
return nil, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
268+
"%q: empty group list", entry)
269+
}
270+
groups := make([]string, 0, count)
271+
for g := range strings.SplitSeq(groupRaw, ",") {
272+
g = strings.TrimSpace(g)
273+
if g == "" {
274+
return nil, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
275+
"%q: empty group name in list", entry)
276+
}
277+
id, err := strconv.ParseUint(g, 10, 64)
278+
if err != nil {
279+
return nil, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
280+
"%q: group %q is not a uint64 ID (raftGroups uses numeric IDs)",
281+
entry, g)
282+
}
283+
groups = append(groups, strconv.FormatUint(id, 10))
284+
}
285+
// Compare lengths in int rather than narrowing len(groups) to
286+
// uint32 — the narrowing would trip gosec G115 even though the
287+
// per-queue cap (32) keeps the value well in range. count is at
288+
// most sqsFifoPartitionMaxPartitions (32) so the int(count)
289+
// widening is safe and exact.
290+
if len(groups) != int(count) {
291+
return nil, errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
292+
"%q: PartitionCount=%d but %d groups listed; both must agree",
293+
entry, count, len(groups))
294+
}
295+
return groups, nil
296+
}
297+
298+
// validateSQSFifoPartitionMap checks the parsed map against the
299+
// configured Raft groups. Every group named in any queue's routing
300+
// must appear in the --raftGroups list with a matching ID — otherwise
301+
// the operator has typed a group ID that does not exist and the
302+
// runtime would route partition traffic to a non-existent shard.
303+
//
304+
// raftGroupIDs is the canonical-uint64-string set of IDs produced by
305+
// parseRaftGroups; partition-map group references are normalized to
306+
// the same canonical form by parseSQSFifoGroupList, so the lookup is
307+
// a plain set-membership check.
308+
func validateSQSFifoPartitionMap(m map[string]sqsFifoQueueRouting, raftGroupIDs map[string]struct{}) error {
309+
// Sort the queue names so a config with multiple misconfigured
310+
// queues always reports the lexicographically-first failure —
311+
// otherwise Go's randomised map iteration would surface a
312+
// different queue on each run, which makes operator triage
313+
// (and golden-test asserts) flaky for no benefit.
314+
for _, queue := range slices.Sorted(maps.Keys(m)) {
315+
routing := m[queue]
316+
for partition, group := range routing.groups {
317+
if _, ok := raftGroupIDs[group]; !ok {
318+
return errors.Wrapf(ErrInvalidSQSFifoPartitionMapEntry,
319+
"queue %q partition %d: group %q is not in --raftGroups",
320+
queue, partition, group)
321+
}
322+
}
323+
}
324+
return nil
325+
}
326+
136327
func parseRaftAddressMap(raw string, invalidEntry error) (map[string]string, error) {
137328
out := make(map[string]string)
138329
if raw == "" {

0 commit comments

Comments
 (0)