Skip to content

Commit 04a997a

Browse files
authored
feat(sqs): HT-FIFO partition metrics counter (Phase 3.D PR 7a) (#737)
## Summary Phase 3.D PR 7a — adds the `elastickv_sqs_partition_messages_total{queue, partition, action}` Prometheus counter so dashboards and alerts can spot uneven `MessageGroupId` distributions across partitioned-FIFO queues. Metrics-only: the Jepsen HT-FIFO workload (PR 7b) ships separately so the two review loops do not cross. - `monitoring/sqs.go` (new): `SQSMetrics` with cardinality cap (`sqsMaxTrackedQueues = 512`, overflow collapses to `_other`) mirroring `DynamoDBMetrics`. Drops empty queue names and unknown action labels so a future call-site bug cannot pollute the series space dashboards have to learn about. Nil-receiver-safe so adapter call sites do not need to nil-guard. - `adapter/sqs.go`: `SQSPartitionObserver` interface + `WithSQSPartitionObserver` option. Re-declared in `adapter` so it doesn't import `monitoring` at the package boundary (matches the DynamoDB/Redis observer pattern). Action constants (`send`/`receive`/`delete`) re-declared on the adapter side and validated at runtime by the monitoring side — drift between the two surfaces as a dropped observation, not a wedge. - `adapter/sqs_fifo.go`, `adapter/sqs_messages.go`: emit the counter on the **partitioned** commit branch only (`PartitionCount > 1`) for send / receive / delete. Legacy single-partition queues stay off the metric since partition is always 0 and the cardinality cost would buy nothing. - `monitoring/registry.go`, `main_sqs.go`, `main.go`: wire the registry's `SQSPartitionObserver()` into `startSQSServer` so the SQS server picks up the production observer on cluster boot. Test fixtures and CLI tools that build `SQSServer` without a registry pass `nil` and the metric stays at zero. ## Tests `monitoring/sqs_test.go` (new, 6 cases): - `TestSQSMetrics_ObservePartitionMessage_IncrementsByLabelTriple` — pin the `(queue, partition, action)` counter contract. - `TestSQSMetrics_ObservePartitionMessage_DropsInvalidAction` — pin the typo guard against future drift between adapter and monitoring constants. - `TestSQSMetrics_ObservePartitionMessage_DropsEmptyQueue` — pin that an empty queue name does not collapse with valid observations onto a shared series. - `TestSQSMetrics_NilReceiverIsSafe` — pin the nil-receiver short-circuit the adapter relies on. - `TestSQSMetrics_QueueLabelOverflow` — pin the cap-and-collapse so a misbehaving caller cannot exhaust the Prometheus series budget. - `TestSQSMetrics_RegistryWiring` — pin that the public `Registry` exposes the metric under the documented name. ## Self-review (5 lenses) 1. **Data loss** — N/A; metrics-only, no storage / Raft / FSM touch. 2. **Concurrency** — counter increments are atomic via Prometheus; the `trackedQueues` map is only consulted from the dispatch-success path under the SQS server's existing concurrency model. No new locks. 3. **Performance** — one map lookup + one `CounterVec` lookup per partitioned send/receive/delete on the success branch. Legacy queues skip the call entirely. Cardinality bounded at 512 queue × 32 partition (`htfifoMaxPartitions`) × 3 action ≈ 49k series worst case; in practice a 32-partition queue yields 96 series, so the budget is plenty for the SLO panels. 4. **Data consistency** — the metric is observed AFTER OCC dispatch succeeds, so the counter reflects committed state. Receive/delete branches that return on retryable errors deliberately do not increment (the retry path will observe on the eventual success). 5. **Test coverage** — 6 unit tests in `monitoring/`, plus the adapter-side nil-observer path is exercised by all existing partitioned-FIFO tests in `adapter/sqs_partitioned_dispatch_test.go` (they pass `nil` observer through the test fixture). ## Test plan - [x] `go test -race -count=1 ./monitoring/...` - [x] `go test -race -count=1 -run 'TestSQS' ./adapter/...` - [x] `go test -race -count=1 ./...` (full suite) - [x] `golangci-lint --config=.golangci.yaml run ./...` (full repo) - [ ] Jepsen HT-FIFO workload — deferred to PR 7b ## Refs - `docs/design/2026_05_01_partial_split_queue_fifo.md` §11 PR 7 - Builds on PR 5b-3 (#734) capability gate, PR 6a (#735) tombstone reaper, PR 6b (#736 in flight) live-queue reaper.
2 parents f489669 + 8230365 commit 04a997a

8 files changed

Lines changed: 370 additions & 1 deletion

File tree

adapter/sqs.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,36 @@ type SQSServer struct {
200200
// check so partitioned queues can land on a single-shard
201201
// cluster and route through the engine's default group.
202202
partitionResolver *SQSPartitionResolver
203+
// partitionObserver records the
204+
// elastickv_sqs_partition_messages_total{queue, partition,
205+
// action} counter for HT-FIFO operations (PR 7a). nil on
206+
// non-monitored test fixtures and on single-binary CLI
207+
// tools that build SQSServer without a monitoring registry.
208+
// Increment call sites use a nil-receiver-safe call so the
209+
// metrics path costs nothing when unwired.
210+
partitionObserver SQSPartitionObserver
203211
}
204212

213+
// SQSPartitionObserver is the metrics-package interface
214+
// (monitoring.SQSPartitionObserver) re-declared here so the
215+
// adapter does not import monitoring at the package boundary —
216+
// matches the existing observer pattern for DynamoDB / Redis.
217+
type SQSPartitionObserver interface {
218+
ObservePartitionMessage(queue string, partition uint32, action string)
219+
}
220+
221+
// SQSPartitionAction* mirror the action label values from
222+
// monitoring.SQSPartitionAction*. Re-declared so adapter call
223+
// sites do not need a monitoring import; the observer interface
224+
// validates the value at runtime so a drift between these
225+
// constants and the monitoring side surfaces as a dropped
226+
// observation rather than a wedge.
227+
const (
228+
SQSPartitionActionSend = "send"
229+
SQSPartitionActionReceive = "receive"
230+
SQSPartitionActionDelete = "delete"
231+
)
232+
205233
// WithSQSLeaderMap configures the Raft-address-to-SQS-address mapping used to
206234
// forward requests from followers to the current leader. Format mirrors
207235
// WithDynamoDBLeaderMap / WithS3LeaderMap.
@@ -214,6 +242,28 @@ func WithSQSLeaderMap(m map[string]string) SQSServerOption {
214242
}
215243
}
216244

245+
// WithSQSPartitionObserver installs the
246+
// elastickv_sqs_partition_messages_total counter observer on the
247+
// SQS server. Pass nil (the default) on non-monitored test
248+
// fixtures; the partitioned send / receive / delete paths then
249+
// observe via a nil interface and the metric stays at zero. The
250+
// monitoring registry's SQSPartitionObserver() returns the
251+
// concrete implementation in production.
252+
func WithSQSPartitionObserver(o SQSPartitionObserver) SQSServerOption {
253+
return func(s *SQSServer) { s.partitionObserver = o }
254+
}
255+
256+
// observePartitionMessage is a nil-receiver-safe wrapper around
257+
// the configured observer. Pulled into a helper so the call
258+
// sites in send / receive / delete each cost one branch instead
259+
// of repeating the nil check.
260+
func (s *SQSServer) observePartitionMessage(queue string, partition uint32, action string) {
261+
if s == nil || s.partitionObserver == nil {
262+
return
263+
}
264+
s.partitionObserver.ObservePartitionMessage(queue, partition, action)
265+
}
266+
217267
// WithSQSPartitionResolver installs the cluster's partition
218268
// resolver on the SQS server so the CreateQueue capability gate
219269
// (validateHTFIFOCapability) can verify routing coverage before

adapter/sqs_fifo.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,14 @@ func (s *SQSServer) sendFifoMessage(
254254
}
255255
return nil, false, errors.WithStack(err)
256256
}
257+
// Hot-partition observability (§11 PR 7): record the per-
258+
// (queue, partition) send so dashboards can spot uneven
259+
// MessageGroupId distributions. Only partitioned queues emit
260+
// (PartitionCount > 1); for legacy queues partition is always
261+
// 0 and the metric would be uninformative + cardinality cost.
262+
if meta.PartitionCount > 1 {
263+
s.observePartitionMessage(queueName, partition, SQSPartitionActionSend)
264+
}
257265
return map[string]string{
258266
"MessageId": rec.MessageID,
259267
"MD5OfMessageBody": rec.MD5OfBody,

adapter/sqs_messages.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,6 +1269,15 @@ func (s *SQSServer) commitReceiveRotation(ctx context.Context, queueName string,
12691269
}
12701270
return nil, false, errors.WithStack(err)
12711271
}
1272+
// Hot-partition observability (§11 PR 7): record the per-
1273+
// (queue, partition) receive-rotation. Same gating rule as
1274+
// the send path — only partitioned queues emit (the metric
1275+
// would be uninformative for legacy queues that always sit
1276+
// on partition 0). meta is dereferenced above, so we don't
1277+
// re-check for nil here.
1278+
if meta.PartitionCount > 1 {
1279+
s.observePartitionMessage(queueName, cand.partition, SQSPartitionActionReceive)
1280+
}
12721281

12731282
handle, err := encodeReceiptHandleDispatch(meta, cand.partition, gen, cand.messageID, newToken)
12741283
if err != nil {
@@ -1432,6 +1441,12 @@ func (s *SQSServer) deleteMessageWithRetry(ctx context.Context, queueName string
14321441
return err
14331442
}
14341443
if _, err := s.coordinator.Dispatch(ctx, req); err == nil {
1444+
// Hot-partition observability (§11 PR 7): record the
1445+
// successful delete on the partitioned commit branch
1446+
// only. Legacy queues stay off the metric.
1447+
if meta != nil && meta.PartitionCount > 1 {
1448+
s.observePartitionMessage(queueName, handle.Partition, SQSPartitionActionDelete)
1449+
}
14351450
return nil
14361451
} else if !isRetryableTransactWriteError(err) {
14371452
return errors.WithStack(err)

main.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,10 @@ func startServers(in serversInput) error {
893893
// which case validateHTFIFOCapability skips the routing-
894894
// coverage check (Codex P1 review on PR #734, round 2).
895895
sqsPartitionResolver: buildSQSPartitionResolverConcrete(in.cfg.sqsFifoPartitionMap),
896+
// sqsPartitionObserver: the metrics registry's HT-FIFO
897+
// partition counter observer. nil when --metricsAddress is
898+
// empty (the adapter then no-ops the observe call).
899+
sqsPartitionObserver: in.metricsRegistry.SQSPartitionObserver(),
896900
metricsAddress: *metricsAddr,
897901
metricsToken: *metricsToken,
898902
pprofAddress: *pprofAddr,
@@ -1480,6 +1484,13 @@ type runtimeServerRunner struct {
14801484
// the coverage check.
14811485
sqsPartitionResolver *adapter.SQSPartitionResolver
14821486

1487+
// sqsPartitionObserver records the
1488+
// elastickv_sqs_partition_messages_total counter (PR 7a) for
1489+
// HT-FIFO send / receive / delete operations. Sourced from
1490+
// the monitoring registry; nil-receiver-safe on the adapter
1491+
// side so a test fixture without a registry can omit it.
1492+
sqsPartitionObserver adapter.SQSPartitionObserver
1493+
14831494
// roleStore is the access-key → role index the leader-side
14841495
// gRPC AdminForward service uses to re-validate the principal
14851496
// on every forwarded write. Mirrors what admin.Config.RoleIndex
@@ -1535,7 +1546,7 @@ func (r *runtimeServerRunner) start() error {
15351546
); err != nil {
15361547
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
15371548
}
1538-
sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile, r.sqsPartitionResolver)
1549+
sqsServer, err := startSQSServer(r.ctx, r.lc, r.eg, r.sqsAddress, r.shardStore, r.coordinate, r.leaderSQS, r.sqsRegion, r.sqsCredsFile, r.sqsPartitionResolver, r.sqsPartitionObserver)
15391550
if err != nil {
15401551
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
15411552
}

main_sqs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func startSQSServer(
2727
region string,
2828
credentialsFile string,
2929
partitionResolver *adapter.SQSPartitionResolver,
30+
partitionObserver adapter.SQSPartitionObserver,
3031
) (*adapter.SQSServer, error) {
3132
sqsAddr = strings.TrimSpace(sqsAddr)
3233
if sqsAddr == "" {
@@ -49,6 +50,7 @@ func startSQSServer(
4950
adapter.WithSQSRegion(region),
5051
adapter.WithSQSStaticCredentials(staticCreds),
5152
adapter.WithSQSPartitionResolver(partitionResolver),
53+
adapter.WithSQSPartitionObserver(partitionObserver),
5254
)
5355
// Two-goroutine shutdown pattern mirrors startS3Server: one goroutine waits
5456
// on either ctx.Done() or Run completion to call Stop, the other runs the

monitoring/registry.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Registry struct {
2121
hotPath *HotPathMetrics
2222
pebble *PebbleMetrics
2323
writeConflict *WriteConflictMetrics
24+
sqs *SQSMetrics
2425
}
2526

2627
// NewRegistry builds a registry with constant labels that identify the local node.
@@ -43,6 +44,7 @@ func NewRegistry(nodeID string, nodeAddress string) *Registry {
4344
r.hotPath = newHotPathMetrics(registerer)
4445
r.pebble = newPebbleMetrics(registerer)
4546
r.writeConflict = newWriteConflictMetrics(registerer)
47+
r.sqs = newSQSMetrics(registerer)
4648
return r
4749
}
4850

@@ -158,6 +160,18 @@ func (r *Registry) SetFSMApplySyncMode(activeLabel string) {
158160
r.pebble.SetFSMApplySyncMode(activeLabel)
159161
}
160162

163+
// SQSPartitionObserver returns the HT-FIFO partition-messages
164+
// observer backed by this registry. Returns nil when the registry
165+
// itself is nil so adapter call sites can pass the result through
166+
// without checking; SQSMetrics.ObservePartitionMessage is also
167+
// nil-receiver safe.
168+
func (r *Registry) SQSPartitionObserver() SQSPartitionObserver {
169+
if r == nil {
170+
return nil
171+
}
172+
return r.sqs
173+
}
174+
161175
// WriteConflictCollector returns a collector that polls each MVCC
162176
// store's per-(kind, key_prefix) OCC conflict counters and mirrors
163177
// them into the elastickv_store_write_conflict_total Prometheus

monitoring/sqs.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package monitoring
2+
3+
import (
4+
"strconv"
5+
"sync"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
)
9+
10+
// SQS HT-FIFO partition action labels. Stable string set so
11+
// dashboards / alerts can rely on the values not changing.
12+
const (
13+
SQSPartitionActionSend = "send"
14+
SQSPartitionActionReceive = "receive"
15+
SQSPartitionActionDelete = "delete"
16+
17+
// sqsMaxTrackedQueues caps the number of distinct queue names
18+
// the metrics layer will emit a per-(queue, partition, action)
19+
// series for. Any queue beyond this cap collapses to the
20+
// _other label so a misbehaving caller (e.g. a script that
21+
// generates random queue names) cannot blow up the
22+
// Prometheus cardinality budget. Mirrors dynamoMaxTrackedTables.
23+
sqsMaxTrackedQueues = 512
24+
25+
// sqsQueueOverflow is the placeholder label used when a queue
26+
// name is not in the tracked set (cap exceeded). Operators see
27+
// the overflow as a single _other series and know to look at
28+
// the application logs for the real names.
29+
sqsQueueOverflow = "_other"
30+
)
31+
32+
// SQSPartitionObserver records per-(queue, partition, action)
33+
// counters for HT-FIFO operations. The interface is small so
34+
// adapter call sites can pass a no-op observer in tests without
35+
// pulling in the full Prometheus registry.
36+
type SQSPartitionObserver interface {
37+
// ObservePartitionMessage increments the
38+
// sqs_partition_messages_total counter for one operation on
39+
// one (queue, partition) pair. Action must be one of
40+
// SQSPartitionActionSend / Receive / Delete; any other value
41+
// is silently dropped so a typo at a future call site cannot
42+
// crash the process.
43+
ObservePartitionMessage(queue string, partition uint32, action string)
44+
}
45+
46+
// SQSMetrics owns the Prometheus counter for HT-FIFO partition
47+
// operations. Mirrors DynamoDBMetrics' shape: per-Registry
48+
// instance, label-cardinality-bounded by sqsMaxTrackedQueues.
49+
type SQSMetrics struct {
50+
partitionMessages *prometheus.CounterVec
51+
52+
mu sync.Mutex
53+
trackedQueues map[string]struct{}
54+
}
55+
56+
func newSQSMetrics(registerer prometheus.Registerer) *SQSMetrics {
57+
m := &SQSMetrics{
58+
partitionMessages: prometheus.NewCounterVec(
59+
prometheus.CounterOpts{
60+
Name: "elastickv_sqs_partition_messages_total",
61+
Help: "Total HT-FIFO partition operations by queue, partition, and action (send / receive / delete). Non-zero only for queues with PartitionCount > 1; use to spot uneven MessageGroupId distributions across partitions.",
62+
},
63+
[]string{"queue", "partition", "action"},
64+
),
65+
trackedQueues: map[string]struct{}{},
66+
}
67+
registerer.MustRegister(m.partitionMessages)
68+
return m
69+
}
70+
71+
// ObservePartitionMessage implements SQSPartitionObserver. The
72+
// (queue, action) pair is validated and (queue) is collapsed to
73+
// the overflow label past sqsMaxTrackedQueues distinct names.
74+
func (m *SQSMetrics) ObservePartitionMessage(queue string, partition uint32, action string) {
75+
if m == nil {
76+
return
77+
}
78+
if !sqsValidPartitionAction(action) {
79+
return
80+
}
81+
if queue == "" {
82+
// Defensive: an empty queue name would collapse all
83+
// requests onto a single series — almost certainly a bug
84+
// at the call site. Drop silently rather than emit
85+
// poisoned data.
86+
return
87+
}
88+
queueLabel := m.queueLabelForCardinalityBudget(queue)
89+
// WithLabelValues avoids the prometheus.Labels map allocation
90+
// on every observe call. Label order matches the
91+
// NewCounterVec declaration: queue, partition, action.
92+
// Mirrors DynamoDBMetrics.
93+
m.partitionMessages.WithLabelValues(
94+
queueLabel,
95+
strconv.FormatUint(uint64(partition), 10),
96+
action,
97+
).Inc()
98+
}
99+
100+
// queueLabelForCardinalityBudget returns queue if the metric has
101+
// already emitted a series for it OR there is room in the
102+
// tracked-queues set; returns sqsQueueOverflow otherwise. The
103+
// cap-and-collapse pattern mirrors DynamoDBMetrics.tableLabel
104+
// so a misbehaving caller cannot exhaust the Prometheus
105+
// cardinality budget.
106+
func (m *SQSMetrics) queueLabelForCardinalityBudget(queue string) string {
107+
m.mu.Lock()
108+
defer m.mu.Unlock()
109+
if _, ok := m.trackedQueues[queue]; ok {
110+
return queue
111+
}
112+
if len(m.trackedQueues) >= sqsMaxTrackedQueues {
113+
return sqsQueueOverflow
114+
}
115+
m.trackedQueues[queue] = struct{}{}
116+
return queue
117+
}
118+
119+
// sqsValidPartitionAction returns true iff action is one of the
120+
// stable label values. Keeps a typo at the call site (e.g.
121+
// "Send" vs "send") from polluting the metric.
122+
func sqsValidPartitionAction(action string) bool {
123+
switch action {
124+
case SQSPartitionActionSend, SQSPartitionActionReceive, SQSPartitionActionDelete:
125+
return true
126+
}
127+
return false
128+
}

0 commit comments

Comments
 (0)