Skip to content

Commit d36c398

Browse files
committed
backup: M5-3 reject non-power-of-two partition_count at load (coderabbit Major #929)
partitionForGroup masks with (PartitionCount - 1) which assumes PartitionCount is a power of two. The live adapter validator (adapter/sqs_partitioning.go:isPowerOfTwo) enforces this for newly- created queues, but a malformed dump like partition_count=3 would pass readQueueMeta as-is and then hash inconsistently: (h & 0b10) masks bit 2 only, so partition 1 is unreachable and message routing breaks. Fix: in readQueueMeta, after the format_version check, fail closed on (pub.PartitionCount > 1 && PartitionCount & (PartitionCount-1) != 0) with ErrSQSEncodeInvalidQueue. Caught at load time so the operator sees the invalid dump before any record is staged. Caller audit: readQueueMeta is the single source of meta values for the encoder pipeline. The new gate fires before encodeQueue's addQueueMeta, validatePartitioning, and stageMessageRecords - none of which need to repeat the check. Regression tests: - TestSQSEncodeRejectsNonPowerOfTwoPartitionCount: 3/5/6/7/9/10 all fail closed. - TestSQSEncodeAcceptsPowerOfTwoPartitionCount: 2/4/8/16/32 all pass.
1 parent 8ff0af4 commit d36c398

2 files changed

Lines changed: 57 additions & 0 deletions

File tree

internal/backup/encode_sqs.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,16 @@ func (e *SQSRecordEncoder) readQueueMeta(root *os.Root, queueDir string) (sqsQue
624624
return sqsQueueMetaPublic{}, errors.Wrapf(ErrSQSEncodeInvalidQueue,
625625
"%s: unsupported format_version %d", rel, pub.FormatVersion)
626626
}
627+
// PartitionCount must be a power of two when > 1. The live
628+
// validator (adapter/sqs_partitioning.go:isPowerOfTwo) enforces
629+
// this so partitionFor's mask AND (h & (n-1)) is equivalent to
630+
// (h % n). A malformed dump with e.g. partition_count=3 would
631+
// hash inconsistently and route messages to wrong partitions.
632+
// Coderabbit Major #929.
633+
if pub.PartitionCount > 1 && pub.PartitionCount&(pub.PartitionCount-1) != 0 {
634+
return sqsQueueMetaPublic{}, errors.Wrapf(ErrSQSEncodeInvalidQueue,
635+
"%s: partition_count %d must be a power of two", rel, pub.PartitionCount)
636+
}
627637
return pub, nil
628638
}
629639

internal/backup/encode_sqs_partitioned_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package backup
33
import (
44
"bytes"
55
"encoding/binary"
6+
"fmt"
67
"testing"
78

89
"github.com/cockroachdb/errors"
@@ -373,6 +374,52 @@ func TestSQSEncodeClassicDedupKeepsLatestOnExpiredCollision(t *testing.T) {
373374
}
374375
}
375376

377+
// TestSQSEncodeRejectsNonPowerOfTwoPartitionCount pins coderabbit
378+
// Major #929. partitionForGroup masks with (PartitionCount - 1)
379+
// which assumes a power of two. A malformed dump like
380+
// partition_count=3 would mask via (h & 0b10) and route messages
381+
// inconsistently. readQueueMeta must fail closed.
382+
func TestSQSEncodeRejectsNonPowerOfTwoPartitionCount(t *testing.T) {
383+
t.Parallel()
384+
for _, n := range []uint32{3, 5, 6, 7, 9, 10} {
385+
t.Run(fmt.Sprintf("count=%d", n), func(t *testing.T) {
386+
t.Parallel()
387+
in := t.TempDir()
388+
writeSQSQueue(t, in, "bad.fifo",
389+
[]byte(fmt.Sprintf(`{"format_version":1,"name":"bad.fifo","fifo":true,`+
390+
`"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+
391+
`"delay_seconds":0,"partition_count":%d,"fifo_throughput_limit":"perMessageGroupId"}`, n)),
392+
nil)
393+
b := newSnapshotBuilder(sqsEncTS)
394+
err := NewSQSRecordEncoder(in).Encode(b)
395+
if !errors.Is(err, ErrSQSEncodeInvalidQueue) {
396+
t.Fatalf("partition_count=%d: err = %v, want ErrSQSEncodeInvalidQueue", n, err)
397+
}
398+
})
399+
}
400+
}
401+
402+
// TestSQSEncodeAcceptsPowerOfTwoPartitionCount confirms the
403+
// power-of-two guard does not reject legitimate values.
404+
func TestSQSEncodeAcceptsPowerOfTwoPartitionCount(t *testing.T) {
405+
t.Parallel()
406+
for _, n := range []uint32{2, 4, 8, 16, 32} {
407+
t.Run(fmt.Sprintf("count=%d", n), func(t *testing.T) {
408+
t.Parallel()
409+
in := t.TempDir()
410+
writeSQSQueue(t, in, "ok.fifo",
411+
[]byte(fmt.Sprintf(`{"format_version":1,"name":"ok.fifo","fifo":true,`+
412+
`"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+
413+
`"delay_seconds":0,"partition_count":%d,"fifo_throughput_limit":"perQueue"}`, n)),
414+
nil)
415+
b := newSnapshotBuilder(sqsEncTS)
416+
if err := NewSQSRecordEncoder(in).Encode(b); err != nil {
417+
t.Fatalf("partition_count=%d: unexpected err = %v", n, err)
418+
}
419+
})
420+
}
421+
}
422+
376423
// TestSQSEncodeClassicDedupKeepsLatestAcrossGroups pins codex P2
377424
// #929 (round 2): for classic FIFO queues the live dedup key is
378425
// (queue, generation, dedupID) — NO group / partition. Two retained

0 commit comments

Comments
 (0)