Skip to content

Commit df75a4f

Browse files
vitess-bot[bot]Kyle Johnsonclaude
authored
[release-24.0] discovery: fix keyspaceState leak for keyspaces missing in localCell (#19993) (#20109)
Signed-off-by: Kyle Johnson <kyjohnson@hubspot.com> Signed-off-by: Kyle Johnson <kylej@hubspot.com> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Kyle Johnson <kyjohnson@hubspot.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1c95e0e commit df75a4f

2 files changed

Lines changed: 183 additions & 14 deletions

File tree

go/vt/discovery/keyspace_events.go

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,15 @@ var (
4343
// waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent.
4444
waitConsistentKeyspacesCheck = 100 * time.Millisecond
4545
kewHcSubscriberName = "KeyspaceEventWatcher"
46+
// missingKeyspaceTTL is how long getKeyspaceStatus caches a NoNode result
47+
// for a keyspace's SrvKeyspace in the local cell. Without this cache,
48+
// every healthcheck event for a tablet whose keyspace has no SrvKeyspace
49+
// in the local cell would allocate a keyspaceState, register watchers,
50+
// and immediately tear it down — leaking the SrvVSchema listener (whose
51+
// callback always returns true) on every event. This is observable in
52+
// multi-cell deployments where some keyspaces are only served from a
53+
// subset of cells.
54+
missingKeyspaceTTL = 30 * time.Second
4655
)
4756

4857
// KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents
@@ -60,6 +69,9 @@ type KeyspaceEventWatcher struct {
6069

6170
mu sync.Mutex
6271
keyspaces map[string]*keyspaceState
72+
// missingKeyspaces is a negative cache for keyspaces whose SrvKeyspace
73+
// does not exist in localCell. Entries expire after missingKeyspaceTTL.
74+
missingKeyspaces map[string]time.Time
6375

6476
subsMu sync.Mutex
6577
subs map[chan *KeyspaceEvent]struct{}
@@ -91,11 +103,12 @@ type ShardEvent struct {
91103
// will be used to detect unhealthy nodes.
92104
func NewKeyspaceEventWatcher(ctx context.Context, topoServer srvtopo.Server, hc HealthCheck, localCell string) *KeyspaceEventWatcher {
93105
kew := &KeyspaceEventWatcher{
94-
hc: hc,
95-
ts: topoServer,
96-
localCell: localCell,
97-
keyspaces: make(map[string]*keyspaceState),
98-
subs: make(map[chan *KeyspaceEvent]struct{}),
106+
hc: hc,
107+
ts: topoServer,
108+
localCell: localCell,
109+
keyspaces: make(map[string]*keyspaceState),
110+
missingKeyspaces: make(map[string]time.Time),
111+
subs: make(map[chan *KeyspaceEvent]struct{}),
99112
}
100113
kew.run(ctx)
101114
log.Info(fmt.Sprintf("started watching keyspace events in %q", localCell))
@@ -126,6 +139,14 @@ func (kss *keyspaceState) isConsistent() bool {
126139
return kss.consistent
127140
}
128141

142+
// isDeleted returns whether the keyspace has been marked deleted by a NoNode
143+
// SrvKeyspace watch result.
144+
func (kss *keyspaceState) isDeleted() bool {
145+
kss.mu.Lock()
146+
defer kss.mu.Unlock()
147+
return kss.deleted
148+
}
149+
129150
// Format prints the internal state for this keyspace for debug purposes.
130151
func (kss *keyspaceState) Format(f fmt.State, verb rune) {
131152
kss.mu.Lock()
@@ -618,17 +639,30 @@ func (kss *keyspaceState) isServing() bool {
618639
// In addition, the traffic switcher updates SrvVSchema when the DeniedTables attributes in a Shard
619640
// record is modified.
620641
func (kss *keyspaceState) onSrvVSchema(vs *vschemapb.SrvVSchema, err error) bool {
642+
kss.mu.Lock()
643+
defer kss.mu.Unlock()
644+
// If onSrvKeyspace has already marked this keyspace deleted (NoNode in
645+
// localCell), unregister this listener too — including on nil-payload
646+
// updates from a server shutdown. onSrvVSchema otherwise always returns
647+
// true, so the resilient SrvVSchema watcher would keep the closure —
648+
// and the orphan keyspaceState it captures — in its listeners slice
649+
// forever and re-run this callback's work on every update.
650+
if kss.deleted {
651+
return false
652+
}
621653
// The vschema can be nil if the server is currently shutting down.
622654
if vs == nil {
623655
return true
624656
}
625-
626-
kss.mu.Lock()
627-
defer kss.mu.Unlock()
628-
var kerr error
629-
if kss.moveTablesState, kerr = kss.getMoveTablesStatus(vs); err != nil {
657+
// Use a local for the new state — getMoveTablesStatus returns (nil, err)
658+
// on failure, and assigning directly into kss.moveTablesState would
659+
// silently clobber the previously-tracked state on a transient topo blip.
660+
newState, kerr := kss.getMoveTablesStatus(vs)
661+
if kerr != nil {
630662
log.Error(fmt.Sprintf("onSrvVSchema: keyspace %s failed to get move tables status: %v", kss.keyspace, kerr))
663+
return true
631664
}
665+
kss.moveTablesState = newState
632666
if kss.moveTablesState != nil && kss.moveTablesState.Typ != MoveTablesNone {
633667
// Mark the keyspace as inconsistent. ensureConsistentLocked() checks if the workflow is
634668
// switched, and if so, it will send an event to the buffering subscribers to indicate that
@@ -650,6 +684,14 @@ func newKeyspaceState(ctx context.Context, kew *KeyspaceEventWatcher, cell, keys
650684
shards: make(map[string]*shardState),
651685
}
652686
kew.ts.WatchSrvKeyspace(ctx, cell, keyspace, kss.onSrvKeyspace)
687+
if kss.isDeleted() {
688+
// SrvKeyspace returned NoNode synchronously via the cached value
689+
// during addListener, so this keyspaceState is already discarded.
690+
// Skip the SrvVSchema listener: onSrvVSchema always returns true,
691+
// and the listener would never be reaped — pinning the orphan
692+
// keyspaceState in the SrvVSchema watcher's listeners slice.
693+
return kss
694+
}
653695
kew.ts.WatchSrvVSchema(ctx, cell, kss.onSrvVSchema)
654696
return kss
655697
}
@@ -673,12 +715,22 @@ func (kew *KeyspaceEventWatcher) getKeyspaceStatus(ctx context.Context, keyspace
673715
defer kew.mu.Unlock()
674716
kss := kew.keyspaces[keyspace]
675717
if kss == nil {
718+
// Skip allocation if we recently confirmed this keyspace has no
719+
// SrvKeyspace in the local cell. Healthchecks for tablets in other
720+
// watched cells can otherwise drive this path >1k times/sec.
721+
if t, ok := kew.missingKeyspaces[keyspace]; ok {
722+
if time.Since(t) < missingKeyspaceTTL {
723+
return nil
724+
}
725+
delete(kew.missingKeyspaces, keyspace)
726+
}
676727
kss = newKeyspaceState(ctx, kew, kew.localCell, keyspace)
677728
kew.keyspaces[keyspace] = kss
678729
}
679-
if kss.deleted {
730+
if kss.isDeleted() {
680731
kss = nil
681732
delete(kew.keyspaces, keyspace)
733+
kew.missingKeyspaces[keyspace] = time.Now()
682734
// Delete from the sidecar database identifier cache as well.
683735
// Ignore any errors as they should all mean that the entry
684736
// does not exist in the cache (which will be common).

go/vt/discovery/keyspace_events_test.go

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"testing"
2525
"time"
2626

27+
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/require"
2829

2930
"vitess.io/vitess/go/test/utils"
@@ -396,9 +397,10 @@ func TestWaitForConsistentKeyspaces(t *testing.T) {
396397
ctx, cancel := context.WithCancel(context.Background())
397398
cancel()
398399
kew := KeyspaceEventWatcher{
399-
keyspaces: tt.ksMap,
400-
mu: sync.Mutex{},
401-
ts: &fakeTopoServer{},
400+
keyspaces: tt.ksMap,
401+
missingKeyspaces: make(map[string]time.Time),
402+
mu: sync.Mutex{},
403+
ts: &fakeTopoServer{},
402404
}
403405
err := kew.WaitForConsistentKeyspaces(ctx, tt.ksList)
404406
if tt.errExpected != "" {
@@ -701,3 +703,118 @@ func (f *fakeTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callb
701703
sv, err := f.GetSrvVSchema(ctx, cell)
702704
callback(sv, err)
703705
}
706+
707+
// fakeMissingKeyspaceTopoServer is a fakeTopoServer whose WatchSrvKeyspace
708+
// returns NoNode for keyspaces in the missing set, simulating a keyspace
709+
// that exists in the cluster but has no SrvKeyspace in the local cell.
710+
// It also counts how many times each Watch is invoked so tests can assert
711+
// the negative cache and the SrvVSchema-skip behavior in
712+
// KeyspaceEventWatcher.
713+
type fakeMissingKeyspaceTopoServer struct {
714+
fakeTopoServer
715+
missing map[string]struct{}
716+
watchSrvKeyspaceCalls atomic.Int64
717+
watchSrvVSchemaCalls atomic.Int64
718+
}
719+
720+
func (f *fakeMissingKeyspaceTopoServer) WatchSrvKeyspace(ctx context.Context, cell, keyspace string, callback func(*topodatapb.SrvKeyspace, error) bool) {
721+
f.watchSrvKeyspaceCalls.Add(1)
722+
if _, ok := f.missing[keyspace]; ok {
723+
callback(nil, topo.NewError(topo.NoNode, keyspace))
724+
return
725+
}
726+
f.fakeTopoServer.WatchSrvKeyspace(ctx, cell, keyspace, callback)
727+
}
728+
729+
func (f *fakeMissingKeyspaceTopoServer) WatchSrvVSchema(ctx context.Context, cell string, callback func(*vschemapb.SrvVSchema, error) bool) {
730+
f.watchSrvVSchemaCalls.Add(1)
731+
f.fakeTopoServer.WatchSrvVSchema(ctx, cell, callback)
732+
}
733+
734+
// TestKeyspaceEventWatcherMissingKeyspaceCache verifies that healthchecks for
735+
// a keyspace whose SrvKeyspace does not exist in the local cell don't allocate
736+
// a new keyspaceState (and don't register a SrvVSchema listener) on every
737+
// event. Without the negative cache and the SrvVSchema-skip, this path used
738+
// to fire on every healthcheck event and pin orphan keyspaceStates in the
739+
// SrvVSchema watcher's listeners slice (onSrvVSchema always returns true,
740+
// so the listener was never reaped).
741+
func TestKeyspaceEventWatcherMissingKeyspaceCache(t *testing.T) {
742+
cell := "cell1"
743+
missing := "missing-keyspace"
744+
745+
sts := &fakeMissingKeyspaceTopoServer{
746+
missing: map[string]struct{}{missing: {}},
747+
}
748+
hc := NewFakeHealthCheck(make(chan *TabletHealth))
749+
t.Cleanup(func() { hc.Close() })
750+
751+
kew := &KeyspaceEventWatcher{
752+
hc: hc,
753+
ts: sts,
754+
localCell: cell,
755+
keyspaces: make(map[string]*keyspaceState),
756+
missingKeyspaces: make(map[string]time.Time),
757+
subs: make(map[chan *KeyspaceEvent]struct{}),
758+
}
759+
760+
const lookups = 100
761+
for range lookups {
762+
require.Nil(t, kew.getKeyspaceStatus(t.Context(), missing),
763+
"getKeyspaceStatus must return nil for a keyspace missing from localCell")
764+
}
765+
766+
assert.Equal(t, int64(1), sts.watchSrvKeyspaceCalls.Load(),
767+
"WatchSrvKeyspace should be called at most once within missingKeyspaceTTL — the negative cache should short-circuit subsequent lookups")
768+
assert.Equal(t, int64(0), sts.watchSrvVSchemaCalls.Load(),
769+
"WatchSrvVSchema must never be called when SrvKeyspace returns NoNode synchronously — otherwise onSrvVSchema (always returning true) pins an orphan keyspaceState in the listeners slice")
770+
771+
// Force expiry of the negative cache and confirm exactly one re-allocation.
772+
kew.mu.Lock()
773+
for k := range kew.missingKeyspaces {
774+
kew.missingKeyspaces[k] = time.Now().Add(-2 * missingKeyspaceTTL)
775+
}
776+
kew.mu.Unlock()
777+
778+
require.Nil(t, kew.getKeyspaceStatus(t.Context(), missing))
779+
assert.Equal(t, int64(2), sts.watchSrvKeyspaceCalls.Load(),
780+
"after the negative-cache TTL elapses, the next lookup should re-probe SrvKeyspace exactly once")
781+
assert.Equal(t, int64(0), sts.watchSrvVSchemaCalls.Load(),
782+
"WatchSrvVSchema must still not be called after re-probing a missing keyspace")
783+
}
784+
785+
// TestOnSrvVSchemaUnregistersAfterDelete covers the async-deletion path: a
786+
// keyspace exists in localCell when newKeyspaceState registers onSrvVSchema,
787+
// then later disappears. onSrvKeyspace marks the state deleted and returns
788+
// false (reaping itself), but onSrvVSchema must also return false on its next
789+
// invocation — for every payload shape, including the nil "server shutting
790+
// down" case. Otherwise the resilient SrvVSchema watcher keeps the closure
791+
// (and the orphan keyspaceState it captures) in its listeners slice forever
792+
// and re-runs the callback's work on every SrvVSchema update.
793+
func TestOnSrvVSchemaUnregistersAfterDelete(t *testing.T) {
794+
cases := []struct {
795+
name string
796+
vs *vschemapb.SrvVSchema
797+
err error
798+
}{
799+
{"non-nil payload", &vschemapb.SrvVSchema{}, nil},
800+
{"nil payload (server shutdown)", nil, nil},
801+
}
802+
for _, tc := range cases {
803+
t.Run(tc.name, func(t *testing.T) {
804+
kss := &keyspaceState{
805+
keyspace: "ks1",
806+
shards: make(map[string]*shardState),
807+
}
808+
809+
// Simulate the async deletion: SrvKeyspace returns NoNode for localCell.
810+
require.False(t, kss.onSrvKeyspace(nil, topo.NewError(topo.NoNode, "ks1")),
811+
"onSrvKeyspace must return false on NoNode so the SrvKeyspace watcher reaps it")
812+
require.True(t, kss.isDeleted())
813+
814+
// The next SrvVSchema update must reap the orphan listener
815+
// regardless of payload shape.
816+
require.False(t, kss.onSrvVSchema(tc.vs, tc.err),
817+
"onSrvVSchema must return false once the keyspace is deleted, otherwise the listener pins an orphan keyspaceState")
818+
})
819+
}
820+
}

0 commit comments

Comments
 (0)