Skip to content

Commit c7eae63

Browse files
committed
feat(dist): add migration-source observability for hint queue
Tag hints at queue time with their origin (replication fan-out vs rebalance migration) and track five new per-source OTel counters: - dist.migration.queued – migration hints enqueued - dist.migration.replayed – migration hints successfully delivered - dist.migration.expired – migration hints aged past TTL - dist.migration.dropped – migration hints discarded (transport error or global cap) - dist.migration.last_age_ns – queue residency of the most-recently replayed migration hint; direct signal of new-primary reachability during rolling deploys Existing dist.hinted.* counters continue to aggregate across both sources; replication-only counts are derivable as (aggregate - migration). No second queue or drain loop is introduced. The implementation extends the existing hinted-handoff infrastructure with a lightweight hintSource tag on hintedEntry and matching per-source counter branches on every terminal path in queueHint and processHint (global-cap drop, queue success, expiry, replay success, and transport-error drop). Adds pkg/backend/dist_migration_hint_test.go with six focused tests covering source-tag preservation through queue → replay, per-source counter increments on every terminal path, and the not-found keep path.
1 parent 74bfe2b commit c7eae63

3 files changed

Lines changed: 424 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ All notable changes to HyperCache are recorded here. The format follows
88

99
### Added
1010

11+
- **Migration-source observability for the hint queue.** Hints produced by rebalance migrations are now
12+
tagged at queue time and tracked in a dedicated set of counters alongside the existing aggregate
13+
metrics. Five new OTel metrics: `dist.migration.queued`, `dist.migration.replayed`,
14+
`dist.migration.expired`, `dist.migration.dropped`, and `dist.migration.last_age_ns` (queue residency of
15+
the most-recently-replayed migration hint — direct signal of new-primary reachability during rolling
16+
deploys). Existing `dist.hinted.*` counters keep their meaning as the aggregate across both sources, so
17+
operators can derive replication-only as `aggregate - migration`. Implementation reuses the proven hint
18+
queue infrastructure (TTL, caps, replay, drop logic) — no second queue, no second drain loop.
19+
Tests in [`pkg/backend/dist_migration_hint_test.go`](pkg/backend/dist_migration_hint_test.go) cover
20+
source-tag preservation through queue→replay, per-source counter increments on every terminal path
21+
(replay success, expired, transport drop, global-cap drop), and the not-found keep-in-queue path.
1122
- **Adaptive Merkle anti-entropy scheduling.** New
1223
[`backend.WithDistMerkleAdaptiveBackoff(maxFactor)`](pkg/backend/dist_memory.go) option lets the auto-sync
1324
loop double its sleep interval after each tick that finds zero divergence across every peer, capped at

pkg/backend/dist_memory.go

Lines changed: 104 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,11 +254,30 @@ type distTransportSlot struct{ t DistTransport }
254254

255255
// hintedEntry represents a deferred replica write.
256256
type hintedEntry struct {
257-
item *cache.Item
258-
expire time.Time
259-
size int64 // approximate bytes for global cap accounting
257+
item *cache.Item
258+
queuedAt time.Time
259+
expire time.Time
260+
size int64 // approximate bytes for global cap accounting
261+
source hintSource // why this hint exists (replication fan-out vs rebalance migration)
260262
}
261263

264+
// hintSource records why a hint landed in the queue. Used to split
265+
// the aggregate hint counters (which sum across both sources) into a
266+
// migration-only view for operators tracking ownership-transfer
267+
// liveness during rebalance phases. Replication-only counts are derivable
268+
// as (aggregate - migration); we don't expose a third metric for it.
269+
type hintSource int8
270+
271+
const (
272+
// hintSourceReplication marks hints produced by a failed write fan-out
273+
// to a peer replica. This is the historical default — every hint
274+
// landed in this bucket before migration-source tagging existed.
275+
hintSourceReplication hintSource = iota
276+
// hintSourceMigration marks hints produced by a rebalance tick that
277+
// could not deliver an item to its new primary owner.
278+
hintSourceMigration
279+
)
280+
262281
// tombstone marks a delete intent with version ordering to prevent resurrection.
263282
type tombstone struct {
264283
version uint64
@@ -1179,12 +1198,17 @@ type distMetrics struct {
11791198
versionConflicts atomic.Int64 // times a newer version (or tie-broken origin) replaced previous candidate
11801199
versionTieBreaks atomic.Int64 // subset of conflicts decided by origin tie-break
11811200
readPrimaryPromote atomic.Int64 // times read path skipped unreachable primary and promoted next owner
1182-
hintedQueued atomic.Int64 // hints queued
1201+
hintedQueued atomic.Int64 // hints queued (both sources)
11831202
hintedReplayed atomic.Int64 // hints successfully replayed
11841203
hintedExpired atomic.Int64 // hints expired before delivery
11851204
hintedDropped atomic.Int64 // hints dropped due to non-not-found transport errors
11861205
hintedGlobalDropped atomic.Int64 // hints dropped due to global caps (count/bytes)
11871206
hintedBytes atomic.Int64 // approximate total bytes currently queued (best-effort)
1207+
migrationHintQueued atomic.Int64 // subset of hintedQueued: rebalance migration source only
1208+
migrationHintReplayed atomic.Int64 // subset of hintedReplayed: migration-source hints that drained
1209+
migrationHintExpired atomic.Int64 // subset of hintedExpired: migration-source hints aged out
1210+
migrationHintDropped atomic.Int64 // subset of hintedDropped + hintedGlobalDropped: migration-source hints that died
1211+
migrationHintLastAgeNanos atomic.Int64 // queue residency of the most-recently-replayed migration hint (ns)
11881212
merkleSyncs atomic.Int64 // merkle sync operations completed
11891213
merkleKeysPulled atomic.Int64 // keys applied during sync
11901214
merkleBuildNanos atomic.Int64 // last build duration (ns)
@@ -1232,6 +1256,11 @@ type DistMetrics struct {
12321256
HintedDropped int64
12331257
HintedGlobalDropped int64
12341258
HintedBytes int64
1259+
MigrationHintQueued int64 // subset of HintedQueued attributable to rebalance migrations
1260+
MigrationHintReplayed int64 // subset of HintedReplayed for migration hints
1261+
MigrationHintExpired int64 // subset of HintedExpired for migration hints
1262+
MigrationHintDropped int64 // subset of HintedDropped + HintedGlobalDropped for migration hints
1263+
MigrationHintLastAgeNanos int64 // queue residency of the most-recently-replayed migration hint (ns)
12351264
MerkleSyncs int64
12361265
MerkleKeysPulled int64
12371266
MerkleBuildNanos int64
@@ -1298,6 +1327,11 @@ func (dm *DistMemory) Metrics() DistMetrics {
12981327
HintedDropped: dm.metrics.hintedDropped.Load(),
12991328
HintedGlobalDropped: dm.metrics.hintedGlobalDropped.Load(),
13001329
HintedBytes: dm.metrics.hintedBytes.Load(),
1330+
MigrationHintQueued: dm.metrics.migrationHintQueued.Load(),
1331+
MigrationHintReplayed: dm.metrics.migrationHintReplayed.Load(),
1332+
MigrationHintExpired: dm.metrics.migrationHintExpired.Load(),
1333+
MigrationHintDropped: dm.metrics.migrationHintDropped.Load(),
1334+
MigrationHintLastAgeNanos: dm.metrics.migrationHintLastAgeNanos.Load(),
13011335
MerkleSyncs: dm.metrics.merkleSyncs.Load(),
13021336
MerkleKeysPulled: dm.metrics.merkleKeysPulled.Load(),
13031337
MerkleBuildNanos: dm.metrics.merkleBuildNanos.Load(),
@@ -2242,7 +2276,7 @@ func (dm *DistMemory) migrateIfNeeded(ctx context.Context, item *cache.Item) {
22422276
slog.Any("err", migrationErr),
22432277
)
22442278

2245-
dm.queueHint(string(owners[0]), item)
2279+
dm.queueHint(string(owners[0]), item, hintSourceMigration)
22462280
}
22472281

22482282
// Update originalPrimary so we don't recount repeatedly.
@@ -2723,7 +2757,7 @@ func (dm *DistMemory) replicateTo(ctx context.Context, item *cache.Item, replica
27232757
// (timeout, 5xx, connection reset) silently dropped replicas.
27242758
// The hint TTL bounds total retry time, so a target that's
27252759
// permanently gone still drains rather than ballooning.
2726-
dm.queueHint(string(oid), item)
2760+
dm.queueHint(string(oid), item, hintSourceReplication)
27272761
}
27282762

27292763
return acks
@@ -2798,7 +2832,7 @@ func (dm *DistMemory) getWithConsistencyParallel(
27982832
}
27992833

28002834
// --- Hinted handoff implementation ---.
2801-
func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { // reduced complexity
2835+
func (dm *DistMemory) queueHint(nodeID string, item *cache.Item, source hintSource) { // reduced complexity
28022836
if dm.hintTTL <= 0 {
28032837
return
28042838
}
@@ -2825,12 +2859,24 @@ func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { // reduced co
28252859
dm.hintsMu.Unlock()
28262860
dm.metrics.hintedGlobalDropped.Add(1)
28272861

2862+
if source == hintSourceMigration {
2863+
dm.metrics.migrationHintDropped.Add(1)
2864+
}
2865+
28282866
return
28292867
}
28302868

28312869
cloned := *item
28322870

2833-
queue = append(queue, hintedEntry{item: &cloned, expire: time.Now().Add(dm.hintTTL), size: size})
2871+
now := time.Now()
2872+
2873+
queue = append(queue, hintedEntry{
2874+
item: &cloned,
2875+
queuedAt: now,
2876+
expire: now.Add(dm.hintTTL),
2877+
size: size,
2878+
source: source,
2879+
})
28342880
dm.hints[nodeID] = queue
28352881
dm.adjustHintAccounting(1, size)
28362882

@@ -2842,6 +2888,10 @@ func (dm *DistMemory) queueHint(nodeID string, item *cache.Item) { // reduced co
28422888

28432889
dm.metrics.hintedQueued.Add(1)
28442890
dm.metrics.hintedBytes.Store(bytesNow)
2891+
2892+
if source == hintSourceMigration {
2893+
dm.metrics.migrationHintQueued.Add(1)
2894+
}
28452895
}
28462896

28472897
// approxHintSize estimates the size of a hinted item for global caps.
@@ -2960,6 +3010,10 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint
29603010
if now.After(entry.expire) {
29613011
dm.metrics.hintedExpired.Add(1)
29623012

3013+
if entry.source == hintSourceMigration {
3014+
dm.metrics.migrationHintExpired.Add(1)
3015+
}
3016+
29633017
return 1
29643018
}
29653019

@@ -2972,6 +3026,19 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint
29723026
if err == nil {
29733027
dm.metrics.hintedReplayed.Add(1)
29743028

3029+
if entry.source == hintSourceMigration {
3030+
dm.metrics.migrationHintReplayed.Add(1)
3031+
3032+
// last_age_ns reflects queue residency of the most-recently
3033+
// replayed migration hint. Operators read this as "how long did
3034+
// the last migration retry wait for the new primary to come
3035+
// back?" — a direct signal of cross-node reachability during
3036+
// rolling deploys.
3037+
if !entry.queuedAt.IsZero() {
3038+
dm.metrics.migrationHintLastAgeNanos.Store(now.Sub(entry.queuedAt).Nanoseconds())
3039+
}
3040+
}
3041+
29753042
return 1
29763043
}
29773044

@@ -2981,6 +3048,10 @@ func (dm *DistMemory) processHint(ctx context.Context, nodeID string, entry hint
29813048

29823049
dm.metrics.hintedDropped.Add(1)
29833050

3051+
if entry.source == hintSourceMigration {
3052+
dm.metrics.migrationHintDropped.Add(1)
3053+
}
3054+
29843055
dm.logger.Warn(
29853056
"hint dropped after replay error",
29863057
slog.String("peer_id", nodeID),
@@ -3879,6 +3950,31 @@ var distMetricSpecs = []distMetricSpec{
38793950
desc: "Approximate total bytes currently queued in hints",
38803951
get: func(m DistMetrics) int64 { return m.HintedBytes },
38813952
},
3953+
{
3954+
name: "dist.migration.queued", unit: unitHint, counter: true,
3955+
desc: "Migration-source hints queued (subset of dist.hinted.queued from rebalance ticks)",
3956+
get: func(m DistMetrics) int64 { return m.MigrationHintQueued },
3957+
},
3958+
{
3959+
name: "dist.migration.replayed", unit: unitHint, counter: true,
3960+
desc: "Migration-source hints successfully delivered on replay",
3961+
get: func(m DistMetrics) int64 { return m.MigrationHintReplayed },
3962+
},
3963+
{
3964+
name: "dist.migration.expired", unit: unitHint, counter: true,
3965+
desc: "Migration-source hints that aged past hint TTL before delivery",
3966+
get: func(m DistMetrics) int64 { return m.MigrationHintExpired },
3967+
},
3968+
{
3969+
name: "dist.migration.dropped", unit: unitHint, counter: true,
3970+
desc: "Migration-source hints discarded by replay error or queue caps",
3971+
get: func(m DistMetrics) int64 { return m.MigrationHintDropped },
3972+
},
3973+
{
3974+
name: "dist.migration.last_age_ns", unit: unitNanos, counter: false,
3975+
desc: "Queue residency of the most-recently-replayed migration hint (ns)",
3976+
get: func(m DistMetrics) int64 { return m.MigrationHintLastAgeNanos },
3977+
},
38823978

38833979
// --- Anti-entropy (Merkle) ---
38843980
{

0 commit comments

Comments
 (0)