Skip to content

Commit d46d4da

Browse files
committed
feat(sqs): HT-FIFO schema + validators + dormancy gate (Phase 3.D PR 2)
Implements PR 2 of the §11 multi-PR rollout from docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md. The schema fields land on sqsQueueMeta with the design's wire types; the validator enforces the §3.2 cross-attribute rules; a temporary CreateQueue gate rejects PartitionCount > 1 until PR 5 lifts the gate atomically with the data-plane fanout. Schema (sqs_catalog.go): - sqsQueueMeta gains three optional fields: * PartitionCount uint32 — number of FIFO partitions; 0/1 means legacy single-partition; > 1 enables HT-FIFO. * FifoThroughputLimit string — "perMessageGroupId" (default for HT-FIFO) or "perQueue" (collapses every group to partition 0). * DeduplicationScope string — "messageGroup" (default for HT-FIFO) or "queue" (legacy single-window). - attributesEqual extended via baseAttributesEqual + htfifoAttributesEqual split (kept under cyclop ceiling). - queueMetaToAttributes surfaces the configured HT-FIFO fields via addHTFIFOAttributes so GetQueueAttributes("All") round-trips. Routing (sqs_partitioning.go, new): - partitionFor(meta, messageGroupId) uint32 implements §3.3: FNV-1a over MessageGroupId & (PartitionCount - 1). Edge cases documented in the godoc — PartitionCount 0/1 → 0, FifoThroughputLimit perQueue short-circuits to 0, empty MessageGroupId → 0. - The bitwise-mask optimisation requires PartitionCount be a power of two; the validator enforces it. A future bug that leaks a non-power-of-two value is caught by TestPartitionFor_PowerOfTwoMaskingMatchesMod. Validation (sqs_partitioning.go): - validatePartitionConfig enforces the §3.2 cross-attribute rules: * PartitionCount must be a power of two in [1, htfifoMaxPartitions=32]. * FifoThroughputLimit / DeduplicationScope are FIFO-only. * {PartitionCount > 1, DeduplicationScope = "queue"} rejects with InvalidParameterValue (incoherent params, vs InvalidAttributeValue for malformed individual values) per §3.2 cross-attribute gate. - Runs in parseAttributesIntoMeta (after resolveFifoQueueFlag, so the IsFIFO check sees the post-resolution flag) and in trySetQueueAttributesOnce (after applyAttributes; IsFIFO comes from the loaded meta). - validatePartitionImmutability is the §3.2 rule that PartitionCount, FifoThroughputLimit, and DeduplicationScope are immutable from CreateQueue. SetQueueAttributes is all-or-nothing: a request that touches a mutable attribute alongside an attempted immutable change rejects the whole request before persisting either. - snapshotImmutableHTFIFO captures the pre-apply values so the immutability check has a clean before/after pair. Dormancy gate (sqs_partitioning.go + sqs_catalog.go): - validatePartitionDormancyGate is the §11 PR 2 temporary gate that rejects CreateQueue with PartitionCount > 1 with InvalidAttributeValue("PartitionCount > 1 requires HT-FIFO data plane — not yet enabled"). The schema field exists in the meta type but no partitioned data can land. Removed in PR 5 in the same commit that wires the data-plane fanout — gate-and-lift is atomic so a half-deployed cluster can never accept a partitioned queue without the data plane to serve it. Tests: - adapter/sqs_partitioning_test.go: 14 unit tests covering partitionFor (legacy zero/one, perQueue short-circuit, empty MessageGroupId, determinism, distribution within ±5% across 8 partitions on 100k samples, power-of-two masking < N), isPowerOfTwo, validatePartitionConfig (power-of-two, max cap, FIFO-only, cross-attr InvalidParameterValue, single-partition + queue-dedup OK), validatePartitionDormancyGate (rejects > 1, allows 0/1), validatePartitionImmutability (per-attribute change rejects, same- value no-op succeeds), htfifoAttributesPresent. - adapter/sqs_partitioning_integration_test.go: 7 end-to-end tests against a real createNode cluster covering the wire surface: dormancy gate rejects PartitionCount > 1 (and the gate's reason surfaces to the operator), allows PartitionCount=1, validator rejects non-power-of-two, FIFO-only rejection on Standard queue, cross-attr {partitions, queue dedup} rejects, immutability rejects SetQueueAttributes change with same-value no-op succeeds, all-or- nothing — combined mutable+immutable rejects entirely without persisting the mutable change, GetQueueAttributes round-trip. - All tests pass under -race; golangci-lint clean. Out of scope for this PR (§11 PR 3-8): - PR 3: keyspace threading — partitionIndex through every sqsMsg*Key constructor, defaulting to 0 so existing queues stay byte-identical. Gate from PR 2 still in place. - PR 4: routing layer + --sqsFifoPartitionMap flag + mixed-version gate (§8.5 capability advertisement + §8 leadership-refusal hook in kv/lease_state.go). Gate still in place. - PR 5: send/receive partition fanout, receipt-handle v2, removes the dormancy gate atomically with the data-plane fanout. - PR 6: PurgeQueue/DeleteQueue partition iteration + tombstone + reaper. - PR 7: Jepsen HT-FIFO workload + metrics. - PR 8: doc lifecycle bump.
1 parent af2d12b commit d46d4da

4 files changed

Lines changed: 931 additions & 1 deletion

File tree

adapter/sqs_catalog.go

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ const (
6060
sqsErrQueueDoesNotExist = "AWS.SimpleQueueService.NonExistentQueue"
6161
sqsErrInvalidAttributeName = "InvalidAttributeName"
6262
sqsErrInvalidAttributeValue = "InvalidAttributeValue"
63+
// sqsErrInvalidParameterValue is the AWS code for incoherent
64+
// parameter combinations (vs malformed individual values, which
65+
// use sqsErrInvalidAttributeValue). HT-FIFO uses this for the
66+
// {PartitionCount > 1, DeduplicationScope = "queue"} cross-
67+
// attribute rejection.
68+
sqsErrInvalidParameterValue = "InvalidParameterValue"
6369
)
6470

6571
var sqsQueueNamePattern = regexp.MustCompile(`^[a-zA-Z0-9_-]{1,80}(\.fifo)?$`)
@@ -99,6 +105,36 @@ type sqsQueueMeta struct {
99105
// Persisted on the meta so a leader failover loads the configuration
100106
// along with the rest of the queue.
101107
Throttle *sqsQueueThrottle `json:"throttle,omitempty"`
108+
// PartitionCount is the number of FIFO partitions for this queue
109+
// (Phase 3.D HT-FIFO, see docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md).
110+
// Zero or 1 means the legacy single-partition layout — no schema
111+
// change. Greater than 1 enables HT-FIFO. Set at CreateQueue time
112+
// and immutable thereafter (SetQueueAttributes rejects any change).
113+
// Power-of-two values only (validator rejects others). PR 2 of the
114+
// rollout introduces this field but a temporary CreateQueue gate
115+
// rejects PartitionCount > 1 until PR 5 lifts the gate atomically
116+
// with the data-plane fanout — so the schema exists but no
117+
// partitioned data can land before the data plane is wired.
118+
PartitionCount uint32 `json:"partition_count,omitempty"`
119+
// FifoThroughputLimit mirrors the AWS attribute. "perMessageGroupId"
120+
// (default for HT-FIFO) keeps the §3.3 hash-by-MessageGroupId
121+
// routing; "perQueue" activates the partition-0 short-circuit so
122+
// every group ID routes to one partition (effectively N=1).
123+
// Set at CreateQueue time and immutable thereafter — flipping it
124+
// live would re-route in-flight messages and silently violate
125+
// within-group FIFO ordering (see §3.2 of the design).
126+
FifoThroughputLimit string `json:"fifo_throughput_limit,omitempty"`
127+
// DeduplicationScope mirrors the AWS attribute. "messageGroup"
128+
// (default for HT-FIFO) means the dedup window is per
129+
// (queue, partition, MessageGroupId, dedupId); "queue" is the
130+
// legacy single-window behaviour. Set at CreateQueue time and
131+
// immutable thereafter — changing live can resurrect or suppress
132+
// messages depending on the direction of the change. The
133+
// validator additionally rejects {PartitionCount > 1,
134+
// DeduplicationScope = "queue"} at CreateQueue time because the
135+
// dedup key cannot be globally unique across partitions without
136+
// a cross-partition OCC transaction.
137+
DeduplicationScope string `json:"deduplication_scope,omitempty"`
102138
}
103139

104140
// sqsQueueThrottle is the per-queue token-bucket configuration. Three
@@ -334,6 +370,14 @@ func parseAttributesIntoMeta(name string, attrs map[string]string) (*sqsQueueMet
334370
if meta.ContentBasedDedup && !meta.IsFIFO {
335371
return nil, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues")
336372
}
373+
// HT-FIFO validation runs after resolveFifoQueueFlag so the
374+
// IsFIFO-only checks see the post-resolution flag. The temporary
375+
// dormancy gate (§11 PR 2) runs separately in createQueue so
376+
// SetQueueAttributes paths share the schema validator without
377+
// re-rejecting on the gate.
378+
if err := validatePartitionConfig(meta); err != nil {
379+
return nil, err
380+
}
337381
return meta, nil
338382
}
339383

@@ -421,6 +465,42 @@ var sqsAttributeAppliers = map[string]attributeApplier{
421465
m.ContentBasedDedup = b
422466
return nil
423467
},
468+
// PartitionCount enables HT-FIFO when > 1 (Phase 3.D, see
469+
// docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md). Set
470+
// at CreateQueue time; SetQueueAttributes attempts to change it
471+
// reject via the immutability check in trySetQueueAttributesOnce.
472+
// PR 2 of the rollout introduces the field but the temporary
473+
// dormancy gate in tryCreateQueueOnce rejects PartitionCount > 1
474+
// until PR 5 lifts the gate atomically with the data plane.
475+
"PartitionCount": func(m *sqsQueueMeta, v string) error {
476+
n, err := strconv.ParseUint(strings.TrimSpace(v), 10, 32)
477+
if err != nil {
478+
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
479+
"PartitionCount must be a non-negative integer")
480+
}
481+
m.PartitionCount = uint32(n) //nolint:gosec // bounded by ParseUint(_, _, 32) above.
482+
return nil
483+
},
484+
"FifoThroughputLimit": func(m *sqsQueueMeta, v string) error {
485+
v = strings.TrimSpace(v)
486+
switch v {
487+
case "", htfifoThroughputPerMessageGroupID, htfifoThroughputPerQueue:
488+
m.FifoThroughputLimit = v
489+
return nil
490+
}
491+
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
492+
"FifoThroughputLimit must be 'perMessageGroupId' or 'perQueue'")
493+
},
494+
"DeduplicationScope": func(m *sqsQueueMeta, v string) error {
495+
v = strings.TrimSpace(v)
496+
switch v {
497+
case "", htfifoDedupeScopeMessageGroup, htfifoDedupeScopeQueue:
498+
m.DeduplicationScope = v
499+
return nil
500+
}
501+
return newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue,
502+
"DeduplicationScope must be 'messageGroup' or 'queue'")
503+
},
424504
// Throttle* are non-AWS extensions for per-queue rate limiting,
425505
// see docs/design/2026_04_26_proposed_sqs_per_queue_throttling.md.
426506
// Each accepts a non-negative float64; the cross-attribute
@@ -478,6 +558,12 @@ func applyAttributes(meta *sqsQueueMeta, attrs map[string]string) error {
478558
if err := validateThrottleConfig(meta); err != nil {
479559
return err
480560
}
561+
// HT-FIFO partition validation runs in parseAttributesIntoMeta /
562+
// trySetQueueAttributesOnce, AFTER resolveFifoQueueFlag, so the
563+
// IsFIFO-only checks see the post-resolution flag. Running here
564+
// would reject a valid CreateQueue with FifoQueue=true +
565+
// FifoThroughputLimit=perMessageGroupId because IsFIFO is still
566+
// false at this point in the flow.
481567
return nil
482568
}
483569

@@ -640,7 +726,9 @@ func attributesEqual(a, b *sqsQueueMeta) bool {
640726
if a == nil || b == nil {
641727
return false
642728
}
643-
return baseAttributesEqual(a, b) && throttleConfigEqual(a.Throttle, b.Throttle)
729+
return baseAttributesEqual(a, b) &&
730+
throttleConfigEqual(a.Throttle, b.Throttle) &&
731+
htfifoAttributesEqual(a, b)
644732
}
645733

646734
// baseAttributesEqual compares the pre-Phase-3.C/3.D attribute set.
@@ -678,6 +766,13 @@ func throttleConfigEqual(a, b *sqsQueueThrottle) bool {
678766
a.DefaultRefillPerSecond == b.DefaultRefillPerSecond
679767
}
680768

769+
// htfifoAttributesEqual compares the Phase 3.D HT-FIFO fields.
770+
func htfifoAttributesEqual(a, b *sqsQueueMeta) bool {
771+
return a.PartitionCount == b.PartitionCount &&
772+
a.FifoThroughputLimit == b.FifoThroughputLimit &&
773+
a.DeduplicationScope == b.DeduplicationScope
774+
}
775+
681776
// ------------------------ storage primitives ------------------------
682777

683778
func (s *SQSServer) nextTxnReadTS(ctx context.Context) uint64 {
@@ -745,6 +840,16 @@ func (s *SQSServer) createQueue(w http.ResponseWriter, r *http.Request) {
745840
writeSQSErrorFromErr(w, err)
746841
return
747842
}
843+
// Temporary dormancy gate (Phase 3.D §11 PR 2). PartitionCount > 1
844+
// must reject until PR 5 wires the data plane atomically with the
845+
// gate-lift. Without this, accepting a partitioned-queue create
846+
// would let SendMessage write under the legacy single-partition
847+
// prefix; the PR 5 reader would never find those messages and the
848+
// reaper would not enumerate them — silent message loss.
849+
if err := validatePartitionDormancyGate(requested); err != nil {
850+
writeSQSErrorFromErr(w, err)
851+
return
852+
}
748853
if len(in.Tags) > sqsMaxTagsPerQueue {
749854
// AWS caps tags per queue at 50. CreateQueue must reject
750855
// over-cap tag bundles up front; a silent slice-and-store
@@ -1196,6 +1301,10 @@ func queueMetaToAttributes(meta *sqsQueueMeta, selection sqsAttributeSelection,
11961301
// keys. Extracted into a helper so queueMetaToAttributes stays
11971302
// under the cyclop ceiling.
11981303
addThrottleAttributes(all, meta.Throttle)
1304+
// HT-FIFO attributes (Phase 3.D). Same omission rule as Throttle*:
1305+
// only present when configured. Extracted into a helper so this
1306+
// function stays under the cyclop ceiling.
1307+
addHTFIFOAttributes(all, meta)
11991308
if selection.expandAll {
12001309
return all
12011310
}
@@ -1280,15 +1389,32 @@ func (s *SQSServer) trySetQueueAttributesOnce(ctx context.Context, queueName str
12801389
if !exists {
12811390
return false, newSQSAPIError(http.StatusBadRequest, sqsErrQueueDoesNotExist, "queue does not exist")
12821391
}
1392+
// Snapshot the on-disk values of the immutable HT-FIFO fields
1393+
// before applying the request so the immutability check has a
1394+
// clean before/after pair. SetQueueAttributes is all-or-nothing
1395+
// per §3.2: if any immutable attribute carries a differing value,
1396+
// the entire request is rejected before any attribute is
1397+
// persisted (including mutable attributes in the same call).
1398+
preApply := snapshotImmutableHTFIFO(meta)
12831399
if err := applyAttributes(meta, attrs); err != nil {
12841400
return false, err
12851401
}
1402+
if err := validatePartitionImmutability(preApply, meta); err != nil {
1403+
return false, err
1404+
}
12861405
// ContentBasedDeduplication is FIFO-only; a Standard queue
12871406
// silently accepting it would advertise unsupported behavior to
12881407
// clients. Same rule enforced on CreateQueue.
12891408
if meta.ContentBasedDedup && !meta.IsFIFO {
12901409
return false, newSQSAPIError(http.StatusBadRequest, sqsErrInvalidAttributeValue, "ContentBasedDeduplication is only valid on FIFO queues")
12911410
}
1411+
// HT-FIFO schema validator runs after applyAttributes so the
1412+
// FIFO-only checks see the post-apply state. IsFIFO comes from
1413+
// the loaded meta record (immutable from CreateQueue) so the
1414+
// validator sees the same flag CreateQueue set.
1415+
if err := validatePartitionConfig(meta); err != nil {
1416+
return false, err
1417+
}
12921418
meta.LastModifiedAtMillis = time.Now().UnixMilli()
12931419
metaBytes, err := encodeSQSQueueMeta(meta)
12941420
if err != nil {

0 commit comments

Comments
 (0)