Skip to content

Commit 90f7617

Browse files
asimclaude
andauthored
Claude/fix issue 2893 x3rpd (#2895)
* feat: add prometheus monitoring wrapper Reintroduces the Prometheus metrics wrapper previously available in the plugins repository, updated for go-micro v5. Exposes request count and latency histograms for handlers, subscribers, and outgoing client calls via NewHandlerWrapper, NewSubscriberWrapper, NewCallWrapper and NewClientWrapper, labelled with service/endpoint/status. Options cover namespace, subsystem, const labels, histogram buckets and a custom registerer; duplicate collectors (e.g. from multiple wrappers sharing the same config) are reused transparently via a cached metrics bundle. Fixes #2893 * fix(registry/etcd): clear lease/register caches on KeepAlive channel closure When the etcd client's long-lived KeepAlive channel closes (e.g. because the lease expired on the server side during a network partition), the previous cleanup only removed the channel bookkeeping. The stale entries in `leases` and `register` caused the next registerNode() heartbeat to hit the "unchanged hash" short-circuit and skip re-registration entirely, so the service permanently disappeared from etcd. Extract the cleanup into handleKeepAliveClosed and also drop the cached lease id and hash so the next heartbeat performs a full Grant+Put and the service recovers within one RegisterInterval. Regression introduced by #2822; fix is symmetric with the existing synchronous KeepAliveOnce recovery path that propagates rpctypes.ErrLeaseNotFound. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 79722e0 commit 90f7617

2 files changed

Lines changed: 73 additions & 8 deletions

File tree

registry/etcd/etcd.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import (
1414
"time"
1515

1616
hash "github.com/mitchellh/hashstructure"
17+
mtls "go-micro.dev/v5/internal/util/tls"
1718
"go-micro.dev/v5/logger"
1819
"go-micro.dev/v5/registry"
19-
mtls "go-micro.dev/v5/internal/util/tls"
2020
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
2121
clientv3 "go.etcd.io/etcd/client/v3"
2222
"go.uber.org/zap"
@@ -468,13 +468,7 @@ func (e *etcdRegistry) startKeepAlive(key string, leaseID clientv3.LeaseID) erro
468468
case ka, ok := <-ch:
469469
if !ok {
470470
log.Logf(logger.DebugLevel, "Keepalive channel closed for %s", key)
471-
e.Lock()
472-
// Only delete if still present (avoid race with stopKeepAlive)
473-
if _, exists := e.keepaliveChs[key]; exists {
474-
delete(e.keepaliveChs, key)
475-
delete(e.keepaliveStop, key)
476-
}
477-
e.Unlock()
471+
e.handleKeepAliveClosed(key)
478472
return
479473
}
480474
if ka == nil {
@@ -489,6 +483,26 @@ func (e *etcdRegistry) startKeepAlive(key string, leaseID clientv3.LeaseID) erro
489483
return nil
490484
}
491485

486+
// handleKeepAliveClosed is invoked by the keepalive goroutine when the
487+
// etcd client closes the KeepAlive channel (for example because the lease
488+
// expired on the server side during a network partition). In addition to
489+
// removing the channel bookkeeping, it drops the cached lease and
490+
// registration hash so the next registerNode() performs a full
491+
// Grant+Put re-registration instead of being short-circuited by the
492+
// "unchanged" check at the top of registerNode.
493+
func (e *etcdRegistry) handleKeepAliveClosed(key string) {
494+
e.Lock()
495+
defer e.Unlock()
496+
497+
// Only delete if still present to avoid racing with stopKeepAlive.
498+
if _, exists := e.keepaliveChs[key]; exists {
499+
delete(e.keepaliveChs, key)
500+
delete(e.keepaliveStop, key)
501+
}
502+
delete(e.leases, key)
503+
delete(e.register, key)
504+
}
505+
492506
// stopKeepAlive stops the keepalive goroutine for the given key
493507
func (e *etcdRegistry) stopKeepAlive(key string) {
494508
e.Lock()

registry/etcd/etcd_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,54 @@ func TestKeepAliveChannelReconnection(t *testing.T) {
269269
t.Error("Stop channel should have been cleaned up after closure")
270270
}
271271
}
272+
273+
// TestHandleKeepAliveClosedClearsLease is a regression test for the bug
274+
// where a closed KeepAlive channel (e.g. because the lease expired on
275+
// the etcd server during a network partition) left the `leases` and
276+
// `register` caches populated, causing the next registerNode() to
277+
// short-circuit on the "unchanged" check and never re-register the
278+
// service. handleKeepAliveClosed must clear all four maps so that the
279+
// next heartbeat performs a full Grant+Put re-registration.
280+
func TestHandleKeepAliveClosedClearsLease(t *testing.T) {
281+
reg := &etcdRegistry{
282+
options: registry.Options{Logger: logger.DefaultLogger},
283+
register: map[string]uint64{"svckey": 0xdeadbeef},
284+
leases: map[string]clientv3.LeaseID{"svckey": 42},
285+
keepaliveChs: map[string]<-chan *clientv3.LeaseKeepAliveResponse{"svckey": make(chan *clientv3.LeaseKeepAliveResponse)},
286+
keepaliveStop: map[string]chan bool{"svckey": make(chan bool, 1)},
287+
}
288+
289+
reg.handleKeepAliveClosed("svckey")
290+
291+
reg.RLock()
292+
defer reg.RUnlock()
293+
294+
if _, ok := reg.keepaliveChs["svckey"]; ok {
295+
t.Error("keepaliveChs not cleared")
296+
}
297+
if _, ok := reg.keepaliveStop["svckey"]; ok {
298+
t.Error("keepaliveStop not cleared")
299+
}
300+
if _, ok := reg.leases["svckey"]; ok {
301+
t.Error("leases not cleared — next heartbeat will skip re-registration")
302+
}
303+
if _, ok := reg.register["svckey"]; ok {
304+
t.Error("register hash not cleared — next heartbeat will skip re-registration")
305+
}
306+
}
307+
308+
// TestHandleKeepAliveClosedIdempotent verifies that handleKeepAliveClosed
309+
// does not panic when called with a key that was already removed (which
310+
// can happen if stopKeepAlive/Deregister ran concurrently).
311+
func TestHandleKeepAliveClosedIdempotent(t *testing.T) {
312+
reg := &etcdRegistry{
313+
options: registry.Options{Logger: logger.DefaultLogger},
314+
register: map[string]uint64{},
315+
leases: map[string]clientv3.LeaseID{},
316+
keepaliveChs: map[string]<-chan *clientv3.LeaseKeepAliveResponse{},
317+
keepaliveStop: map[string]chan bool{},
318+
}
319+
320+
// Should be a no-op; no panic.
321+
reg.handleKeepAliveClosed("missing")
322+
}

0 commit comments

Comments
 (0)