Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,7 @@ func (a Allocator) RebalanceTarget(
// continues to correspond to the original candidate set and source store,
// even as candidates are removed.
for {
target, existingCandidate, bestIdx = bestRebalanceTarget(a.randGen, results, a.as)
target, existingCandidate, bestIdx = bestRebalanceTarget(ctx, a.randGen, results, a.as)
if target == nil {
return zero, zero, "", false
}
Expand Down Expand Up @@ -2555,7 +2555,7 @@ func (a *Allocator) TransferLeaseTarget(
for _, s := range sl.Stores {
targetStores = append(targetStores, s.StoreID)
}
handle := a.as.BuildMMARebalanceAdvisor(source.StoreID, targetStores)
handle := a.as.BuildMMARebalanceAdvisor(ctx, source.StoreID, targetStores)
var bestOption roachpb.ReplicaDescriptor
candidates := make([]roachpb.ReplicaDescriptor, 0, len(validTargets))
bestOptionLeaseCount := int32(math.MaxInt32)
Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,10 @@ func rankedCandidateListForRebalancing(
// Contract: responsible for making sure that the returned bestIdx has the
// corresponding MMA advisor in advisors.
func bestRebalanceTarget(
randGen allocatorRand, options []rebalanceOptions, as *mmaintegration.AllocatorSync,
ctx context.Context,
randGen allocatorRand,
options []rebalanceOptions,
as *mmaintegration.AllocatorSync,
) (target, existingCandidate *candidate, bestIdx int) {
bestIdx = -1
var bestTarget *candidate
Expand Down Expand Up @@ -1982,7 +1985,7 @@ func bestRebalanceTarget(
for _, cand := range options[bestIdx].candidates {
stores = append(stores, cand.store.StoreID)
}
options[bestIdx].advisor = as.BuildMMARebalanceAdvisor(options[bestIdx].existing.store.StoreID, stores)
options[bestIdx].advisor = as.BuildMMARebalanceAdvisor(ctx, options[bestIdx].existing.store.StoreID, stores)
}

// Copy the selected target out of the candidates slice before modifying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func TestBestRebalanceTarget(t *testing.T) {
var i int
for {
i++
target, existing, _ := bestRebalanceTarget(allocRand, candidates, a.as)
target, existing, _ := bestRebalanceTarget(ctx, allocRand, candidates, a.as)
if len(expectedTargets) == 0 {
if target == nil {
break
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_test(
"logging_test.go",
"memo_helper_test.go",
"mma_metrics_test.go",
"rebalance_advisor_test.go",
],
data = glob(["testdata/**"]),
embed = [":mmaprototype"],
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/allocator/mmaprototype/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ type Allocator interface {
//
// TODO(sumeer): merge the above comment with the comment in the
// implementation.
BuildMMARebalanceAdvisor(existing roachpb.StoreID, cands []roachpb.StoreID) *MMARebalanceAdvisor
BuildMMARebalanceAdvisor(
ctx context.Context, existing roachpb.StoreID, cands []roachpb.StoreID,
) *MMARebalanceAdvisor

// IsInConflictWithMMA is called by the allocator sync to determine if the
// given candidate is in conflict with the existing store.
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2409,6 +2409,22 @@ func (cs *clusterState) getStoreReportedLoad(storeID roachpb.StoreID) (roachpb.N
return 0, nil
}

// hasStore reports whether storeID is known to mma's clusterState.
// Used by BuildMMARebalanceAdvisor to filter candidate slices that come
// from the legacy allocator's StorePool view, which can include stores
// that gossip has not yet announced to MMA.
func (cs *clusterState) hasStore(storeID roachpb.StoreID) bool {
_, ok := cs.stores[storeID]
return ok
}

// notHasStore is the negation of hasStore, exposed so callers can pass
// it as a method value to slices.IndexFunc / slices.DeleteFunc without
// an inline closure.
func (cs *clusterState) notHasStore(storeID roachpb.StoreID) bool {
return !cs.hasStore(storeID)
}

func (cs *clusterState) getNodeReportedLoad(nodeID roachpb.NodeID) *NodeLoad {
if nodeState, ok := cs.nodes[nodeID]; ok {
return &nodeState.NodeLoad
Expand Down
25 changes: 25 additions & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,15 @@ func (mm *meansMemo) getStoreLoadSummary(
// stores may contain duplicate storeIDs, in which case computeMeansForStoreSet
// should deduplicate processing of the stores. stores should be immutable.
//
// Precondition: every storeID in stores must be known to loadProvider
// (loadProvider.getStoreReportedLoad must return a non-nil *storeLoad).
// Callers receiving storeIDs from outside MMA (notably the legacy
// allocator's StorePool view, which can run ahead of MMA's gossip
// updates) must filter unknown stores out first. BuildMMARebalanceAdvisor
// is the canonical filtering point; clusterState.hasStore is the helper
// to use. A violation is asserted in test builds and logged-and-skipped
// in production rather than nil-dereferenced (see #170703).
//
// If stores is empty, computeMeansForStoreSet returns the zero meansLoad
// and ok=false. Callers must check ok before consuming the returned means;
// the zero value would otherwise misclassify stores as overloaded due to
Expand All @@ -510,6 +519,17 @@ func computeMeansForStoreSet(
if _, ok := scratchStores[storeID]; ok {
continue
}
if sload == nil {
// Precondition violation: a caller passed an unknown storeID. In
// test builds this panics, surfacing the bug; in production it
// logs and skips, avoiding the nil-pointer crash from #170703.
// context.Background is used because plumbing ctx into this
// internal helper would touch every memo path; the assertion is a
// last-resort safety net and is not expected to fire.
assertTruef(context.Background(), false,
"computeMeansForStoreSet: storeID %d not known to loadProvider", storeID)
continue
}
n++
scratchStores[storeID] = struct{}{}
for j := range sload.reportedLoad {
Expand All @@ -530,6 +550,11 @@ func computeMeansForStoreSet(
scratchNodes[nodeID] = loadProvider.getNodeReportedLoad(nodeID)
}
}
if n == 0 {
// Every storeID hit the nil-sload guard above. Bail out before the
// divisions below would panic with division by zero.
return meansLoad{}, false
}
for i := range means.storeLoad.load {
if means.storeLoad.capacity[i] != UnknownCapacity {
// NB: capacity can be 0 for CPURate when nodeCPURateUsage >>
Expand Down
43 changes: 37 additions & 6 deletions pkg/kv/kvserver/allocator/mmaprototype/rebalance_advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package mmaprototype

import (
"context"
"slices"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -71,29 +72,59 @@ func NoopMMARebalanceAdvisor() *MMARebalanceAdvisor {
// duplicate storeIDs. It is up to computeMeansForStoreSet to handle
// de-duplication of storeIDs from the cands list.
//
// Callers may pass storeIDs (in `existing` or `cands`) that MMA's
// clusterState has not yet seen. This happens during startup, when the
// legacy allocator's StorePool reflects gossiped store descriptors that
// MMA has not yet been notified of via SetStore. Such stores are filtered
// out of `cands` before computing means; if `existing` itself is unknown,
// a NoopMMARebalanceAdvisor is returned because MMA has no load history
// against which to judge candidates. The same asymmetry is acknowledged
// in updateStoreStatuses, which logs and skips unknown stores rather
// than panicking. See #170703.
//
// The returned advisor should be passed to IsInConflictWithMMA as a helper to
// determine if a candidate is vetoed by the multi-metric allocator due to
// running counter to its goals.
func (a *allocatorState) BuildMMARebalanceAdvisor(
existing roachpb.StoreID, cands []roachpb.StoreID,
ctx context.Context, existing roachpb.StoreID, cands []roachpb.StoreID,
) *MMARebalanceAdvisor {
// a.cs is mutated by gossip-driven callbacks (e.g. ProcessStoreLoadMsg) and
// must only be accessed under a.mu. The other public methods on
// allocatorState follow this discipline.
a.mu.Lock()
defer a.mu.Unlock()
if !a.cs.hasStore(existing) {
// MMA has no load history for the source store, so it cannot judge
// whether any candidate is more overloaded than existing. Fall back to
// a no-op advisor rather than risk a misclassification (or, before the
// hasStore filter on cands below was added, a nil-pointer panic).
log.KvDistribution.VEventf(ctx, 2,
"mma skipping advisor: existing store s%d not yet known to mma", existing)
return NoopMMARebalanceAdvisor()
}
// Drop any cand the integration layer learned about (via gossip / StorePool)
// before MMA did. computeMeansForStoreSet would otherwise nil-deref on the
// missing storeLoad. In the steady state every cand is known, so the
// IndexFunc walk completes without allocating; only when an unknown cand
// is present do we copy and compact.
if slices.IndexFunc(cands, a.cs.notHasStore) != -1 {
cp := make([]roachpb.StoreID, len(cands))
copy(cp, cands)
cands = slices.DeleteFunc(cp, a.cs.notHasStore)
}
// TODO(wenyihu6): for simplicity, we create a new scratchNodes every call.
// We should reuse the scratchNodes instead.
scratchNodes := map[roachpb.NodeID]*NodeLoad{}
scratchStores := map[roachpb.StoreID]struct{}{}
cands = append(cands, existing)
means, ok := computeMeansForStoreSet(a.cs, cands, scratchNodes, scratchStores)
if !ok {
// Unreachable: cands always contains at least `existing`. Assert in
// test builds; in production, fall back to a no-op advisor rather than
// return a zero-valued means that would misclassify stores. Gating on
// !ok avoids variadic arg boxing on the success path.
assertTruef(context.Background(), false, "computeMeansForStoreSet returned !ok for non-empty cands=%v", cands)
// Unreachable: cands always contains at least `existing`, which we
// just verified is known to MMA. Assert in test builds; in production,
// fall back to a no-op advisor rather than return a zero-valued means
// that would misclassify stores. Gating on !ok avoids variadic arg
// boxing on the success path.
assertTruef(ctx, false, "computeMeansForStoreSet returned !ok for non-empty cands=%v", cands)
return NoopMMARebalanceAdvisor()
}
return &MMARebalanceAdvisor{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2026 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package mmaprototype

import (
"context"
"math/rand"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

// TestBuildMMARebalanceAdvisorUnknownStores covers the crash described in
// #170703: before the fix in this commit, BuildMMARebalanceAdvisor would
// nil-pointer-deref if any storeID it was given (either `existing` or
// anything in `cands`) was not yet known to MMA's clusterState. This
// happens during startup, when the caller (the legacy allocator) builds
// candidate lists from StorePool's gossip-driven view before MMA has been
// notified of the corresponding stores via SetStore.
//
// After the fix, unknown cands are silently dropped, and an unknown
// existing falls back to a no-op advisor (which always reports no
// conflict).
func TestBuildMMARebalanceAdvisorUnknownStores(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

const (
knownStore roachpb.StoreID = 1
unknownStore roachpb.StoreID = 99
)

makeAllocator := func() *allocatorState {
a := NewAllocatorState(timeutil.DefaultTimeSource{}, rand.New(rand.NewSource(0)))
a.SetStore(StoreAttributesAndLocality{
StoreID: knownStore,
NodeID: roachpb.NodeID(knownStore),
})
return a
}

t.Run("unknown cand", func(t *testing.T) {
a := makeAllocator()
// Unknown cand is silently dropped; the advisor is built over just
// the known existing store.
advisor := a.BuildMMARebalanceAdvisor(ctx, knownStore, []roachpb.StoreID{unknownStore})
require.NotNil(t, advisor)
require.False(t, advisor.disabled)
require.Equal(t, knownStore, advisor.existingStoreID)
})

t.Run("unknown existing", func(t *testing.T) {
a := makeAllocator()
// MMA cannot judge candidates against a source it does not know;
// the no-op advisor short-circuits IsInConflictWithMMA to false.
advisor := a.BuildMMARebalanceAdvisor(ctx, unknownStore, nil)
require.NotNil(t, advisor)
require.True(t, advisor.disabled)
require.False(t, a.IsInConflictWithMMA(ctx, knownStore, advisor, false /* cpuOnly */))
})
}
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/mmaintegration/allocator_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ type mmaState interface {
// MMARebalanceAdvisor for the given existing store and candidates. The
// advisor should be later passed to IsInConflictWithMMA to determine if a
// given candidate is in conflict with the existing store.
BuildMMARebalanceAdvisor(existing roachpb.StoreID, cands []roachpb.StoreID) *mmaprototype.MMARebalanceAdvisor
BuildMMARebalanceAdvisor(
ctx context.Context, existing roachpb.StoreID, cands []roachpb.StoreID,
) *mmaprototype.MMARebalanceAdvisor
// IsInConflictWithMMA is called by the allocator sync to determine if the
// given candidate is in conflict with the existing store.
IsInConflictWithMMA(ctx context.Context, cand roachpb.StoreID, advisor *mmaprototype.MMARebalanceAdvisor, cpuOnly bool) bool
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/mmaintegration/thrashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ import (
// after calling this function, so the caller is responsible for keeping track
// of the returned advisor and associating it.
func (as *AllocatorSync) BuildMMARebalanceAdvisor(
existing roachpb.StoreID, cands []roachpb.StoreID,
ctx context.Context, existing roachpb.StoreID, cands []roachpb.StoreID,
) *mmaprototype.MMARebalanceAdvisor {
if kvserverbase.GetLoadBasedRebalancingMode(&as.st.SV) != kvserverbase.LBRebalancingMultiMetricAndCount {
return mmaprototype.NoopMMARebalanceAdvisor()
}
return as.mmaAllocator.BuildMMARebalanceAdvisor(existing, cands)
return as.mmaAllocator.BuildMMARebalanceAdvisor(ctx, existing, cands)
}

// IsInConflictWithMMA determines if a candidate conflicts with MMA's goals.
Expand Down
Loading