diff --git a/src/code.cloudfoundry.org/gorouter/metrics/compositereporter.go b/src/code.cloudfoundry.org/gorouter/metrics/compositereporter.go index 290d08219..18f082d30 100644 --- a/src/code.cloudfoundry.org/gorouter/metrics/compositereporter.go +++ b/src/code.cloudfoundry.org/gorouter/metrics/compositereporter.go @@ -47,6 +47,8 @@ type MetricReporter interface { CaptureNATSBufferedMessages(messages int) CaptureNATSDroppedMessages(messages int) UnmuzzleRouteRegistrationLatency() + CaptureEndpointsPerPool(endpoints int, route string, lbAlgo string) + UncaptureEndpointsPerPool(route, lbAlgo string) } type ComponentTagged interface { @@ -230,6 +232,18 @@ func (m MultiMetricReporter) CaptureNATSDroppedMessages(messages int) { } } +func (m MultiMetricReporter) CaptureEndpointsPerPool(endpoints int, route string, lbAlgo string) { + for _, r := range m { + r.CaptureEndpointsPerPool(endpoints, route, lbAlgo) + } +} + +func (m MultiMetricReporter) UncaptureEndpointsPerPool(route, lbAlgo string) { + for _, r := range m { + r.UncaptureEndpointsPerPool(route, lbAlgo) + } +} + func (c *CompositeReporter) CaptureBadRequest() { c.VarzReporter.CaptureBadRequest() c.MetricReporter.CaptureBadRequest() @@ -257,3 +271,11 @@ func (c *CompositeReporter) CaptureRoutingResponseLatency(b *route.Endpoint, sta func (c *CompositeReporter) CaptureHTTPLatency(d time.Duration, sourceID string) { c.MetricReporter.CaptureHTTPLatency(d, sourceID) } + +func (c *CompositeReporter) CaptureEndpointsPerPool(endpoints int, route string, lbAlgo string) { + c.MetricReporter.CaptureEndpointsPerPool(endpoints, route, lbAlgo) +} + +func (c *CompositeReporter) UncaptureEndpointsPerPool(route, lbAlgo string) { + c.MetricReporter.UncaptureEndpointsPerPool(route, lbAlgo) +} diff --git a/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_metricreporter.go b/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_metricreporter.go index ceb391e80..6a7002591 100644 --- a/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_metricreporter.go +++ b/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_metricreporter.go @@ -39,6 +39,13 @@ type FakeMetricReporter struct { captureEmptyContentLengthHeaderMutex sync.RWMutex captureEmptyContentLengthHeaderArgsForCall []struct { } + CaptureEndpointsPerPoolStub func(int, string, string) + captureEndpointsPerPoolMutex sync.RWMutex + captureEndpointsPerPoolArgsForCall []struct { + arg1 int + arg2 string + arg3 string + } CaptureFoundFileDescriptorsStub func(int) captureFoundFileDescriptorsMutex sync.RWMutex captureFoundFileDescriptorsArgsForCall []struct { @@ -136,6 +143,12 @@ type FakeMetricReporter struct { captureWebSocketUpdateMutex sync.RWMutex captureWebSocketUpdateArgsForCall []struct { } + UncaptureEndpointsPerPoolStub func(string, string) + uncaptureEndpointsPerPoolMutex sync.RWMutex + uncaptureEndpointsPerPoolArgsForCall []struct { + arg1 string + arg2 string + } UnmuzzleRouteRegistrationLatencyStub func() unmuzzleRouteRegistrationLatencyMutex sync.RWMutex unmuzzleRouteRegistrationLatencyArgsForCall []struct { @@ -312,6 +325,40 @@ func (fake *FakeMetricReporter) CaptureEmptyContentLengthHeaderCalls(stub func() fake.CaptureEmptyContentLengthHeaderStub = stub } +func (fake *FakeMetricReporter) CaptureEndpointsPerPool(arg1 int, arg2 string, arg3 string) { + fake.captureEndpointsPerPoolMutex.Lock() + fake.captureEndpointsPerPoolArgsForCall = append(fake.captureEndpointsPerPoolArgsForCall, struct { + arg1 int + arg2 string + arg3 string + }{arg1, arg2, arg3}) + stub := fake.CaptureEndpointsPerPoolStub + fake.recordInvocation("CaptureEndpointsPerPool", []interface{}{arg1, arg2, arg3}) + fake.captureEndpointsPerPoolMutex.Unlock() + if stub != nil { + fake.CaptureEndpointsPerPoolStub(arg1, arg2, arg3) + } +} + +func (fake *FakeMetricReporter) CaptureEndpointsPerPoolCallCount() int { + fake.captureEndpointsPerPoolMutex.RLock() + defer fake.captureEndpointsPerPoolMutex.RUnlock() + return len(fake.captureEndpointsPerPoolArgsForCall) +} + +func (fake *FakeMetricReporter) CaptureEndpointsPerPoolCalls(stub func(int, string, string)) { + fake.captureEndpointsPerPoolMutex.Lock() + defer fake.captureEndpointsPerPoolMutex.Unlock() + fake.CaptureEndpointsPerPoolStub = stub +} + +func (fake *FakeMetricReporter) CaptureEndpointsPerPoolArgsForCall(i int) (int, string, string) { + fake.captureEndpointsPerPoolMutex.RLock() + defer fake.captureEndpointsPerPoolMutex.RUnlock() + argsForCall := fake.captureEndpointsPerPoolArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + func (fake *FakeMetricReporter) CaptureFoundFileDescriptors(arg1 int) { fake.captureFoundFileDescriptorsMutex.Lock() fake.captureFoundFileDescriptorsArgsForCall = append(fake.captureFoundFileDescriptorsArgsForCall, struct { @@ -894,6 +941,39 @@ func (fake *FakeMetricReporter) CaptureWebSocketUpdateCalls(stub func()) { fake.CaptureWebSocketUpdateStub = stub } +func (fake *FakeMetricReporter) UncaptureEndpointsPerPool(arg1 string, arg2 string) { + fake.uncaptureEndpointsPerPoolMutex.Lock() + fake.uncaptureEndpointsPerPoolArgsForCall = append(fake.uncaptureEndpointsPerPoolArgsForCall, struct { + arg1 string + arg2 string + }{arg1, arg2}) + stub := fake.UncaptureEndpointsPerPoolStub + fake.recordInvocation("UncaptureEndpointsPerPool", []interface{}{arg1, arg2}) + fake.uncaptureEndpointsPerPoolMutex.Unlock() + if stub != nil { + fake.UncaptureEndpointsPerPoolStub(arg1, arg2) + } +} + +func (fake *FakeMetricReporter) UncaptureEndpointsPerPoolCallCount() int { + fake.uncaptureEndpointsPerPoolMutex.RLock() + defer fake.uncaptureEndpointsPerPoolMutex.RUnlock() + return len(fake.uncaptureEndpointsPerPoolArgsForCall) +} + +func (fake *FakeMetricReporter) UncaptureEndpointsPerPoolCalls(stub func(string, string)) { + fake.uncaptureEndpointsPerPoolMutex.Lock() + defer fake.uncaptureEndpointsPerPoolMutex.Unlock() + fake.UncaptureEndpointsPerPoolStub = stub +} + +func (fake *FakeMetricReporter) UncaptureEndpointsPerPoolArgsForCall(i int) (string, string) { + fake.uncaptureEndpointsPerPoolMutex.RLock() + defer fake.uncaptureEndpointsPerPoolMutex.RUnlock() + argsForCall := fake.uncaptureEndpointsPerPoolArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + func (fake *FakeMetricReporter) UnmuzzleRouteRegistrationLatency() { fake.unmuzzleRouteRegistrationLatencyMutex.Lock() fake.unmuzzleRouteRegistrationLatencyArgsForCall = append(fake.unmuzzleRouteRegistrationLatencyArgsForCall, struct { @@ -921,60 +1001,6 @@ func (fake *FakeMetricReporter) UnmuzzleRouteRegistrationLatencyCalls(stub func( func (fake *FakeMetricReporter) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.captureBackendExhaustedConnsMutex.RLock() - defer fake.captureBackendExhaustedConnsMutex.RUnlock() - fake.captureBackendInvalidIDMutex.RLock() - defer fake.captureBackendInvalidIDMutex.RUnlock() - fake.captureBackendInvalidTLSCertMutex.RLock() - defer fake.captureBackendInvalidTLSCertMutex.RUnlock() - fake.captureBackendTLSHandshakeFailedMutex.RLock() - defer fake.captureBackendTLSHandshakeFailedMutex.RUnlock() - fake.captureBadGatewayMutex.RLock() - defer fake.captureBadGatewayMutex.RUnlock() - fake.captureBadRequestMutex.RLock() - defer fake.captureBadRequestMutex.RUnlock() - fake.captureEmptyContentLengthHeaderMutex.RLock() - defer fake.captureEmptyContentLengthHeaderMutex.RUnlock() - fake.captureFoundFileDescriptorsMutex.RLock() - defer fake.captureFoundFileDescriptorsMutex.RUnlock() - fake.captureGorouterTimeMutex.RLock() - defer fake.captureGorouterTimeMutex.RUnlock() - fake.captureHTTPLatencyMutex.RLock() - defer fake.captureHTTPLatencyMutex.RUnlock() - fake.captureLookupTimeMutex.RLock() - defer fake.captureLookupTimeMutex.RUnlock() - fake.captureNATSBufferedMessagesMutex.RLock() - defer fake.captureNATSBufferedMessagesMutex.RUnlock() - fake.captureNATSDroppedMessagesMutex.RLock() - defer fake.captureNATSDroppedMessagesMutex.RUnlock() - fake.captureRegistryMessageMutex.RLock() - defer fake.captureRegistryMessageMutex.RUnlock() - fake.captureRouteRegistrationLatencyMutex.RLock() - defer fake.captureRouteRegistrationLatencyMutex.RUnlock() - fake.captureRouteServiceResponseMutex.RLock() - defer fake.captureRouteServiceResponseMutex.RUnlock() - fake.captureRouteStatsMutex.RLock() - defer fake.captureRouteStatsMutex.RUnlock() - fake.captureRoutesPrunedMutex.RLock() - defer fake.captureRoutesPrunedMutex.RUnlock() - fake.captureRoutesRegisteredMutex.RLock() - defer fake.captureRoutesRegisteredMutex.RUnlock() - fake.captureRoutesUnregisteredMutex.RLock() - defer fake.captureRoutesUnregisteredMutex.RUnlock() - fake.captureRoutingRequestMutex.RLock() - defer fake.captureRoutingRequestMutex.RUnlock() - fake.captureRoutingResponseMutex.RLock() - defer fake.captureRoutingResponseMutex.RUnlock() - fake.captureRoutingResponseLatencyMutex.RLock() - defer fake.captureRoutingResponseLatencyMutex.RUnlock() - fake.captureUnregistryMessageMutex.RLock() - defer fake.captureUnregistryMessageMutex.RUnlock() - fake.captureWebSocketFailureMutex.RLock() - defer fake.captureWebSocketFailureMutex.RUnlock() - fake.captureWebSocketUpdateMutex.RLock() - defer fake.captureWebSocketUpdateMutex.RUnlock() - fake.unmuzzleRouteRegistrationLatencyMutex.RLock() - defer fake.unmuzzleRouteRegistrationLatencyMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_subscriber.go b/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_subscriber.go index 2e827ce81..6183894c1 100644 --- a/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_subscriber.go +++ b/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_subscriber.go @@ -151,10 +151,6 @@ func (fake *FakeSubscriber) PendingReturnsOnCall(i int, result1 int, result2 err func (fake *FakeSubscriber) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.droppedMutex.RLock() - defer fake.droppedMutex.RUnlock() - fake.pendingMutex.RLock() - defer fake.pendingMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value diff --git a/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_varzreporter.go b/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_varzreporter.go index 9ee26b714..0a0cc3fe6 100644 --- a/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_varzreporter.go +++ b/src/code.cloudfoundry.org/gorouter/metrics/fakes/fake_varzreporter.go @@ -153,14 +153,6 @@ func (fake *FakeVarzReporter) CaptureRoutingResponseLatencyArgsForCall(i int) (* func (fake *FakeVarzReporter) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.captureBadGatewayMutex.RLock() - defer fake.captureBadGatewayMutex.RUnlock() - fake.captureBadRequestMutex.RLock() - defer fake.captureBadRequestMutex.RUnlock() - fake.captureRoutingRequestMutex.RLock() - defer fake.captureRoutingRequestMutex.RUnlock() - fake.captureRoutingResponseLatencyMutex.RLock() - defer fake.captureRoutingResponseLatencyMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value @@ -180,5 +172,4 @@ func (fake *FakeVarzReporter) recordInvocation(key string, args []interface{}) { fake.invocations[key] = append(fake.invocations[key], args) } -//lint:ignore SA1019 - auto-generated fake will go away when Varz goes away var _ metrics.VarzReporter = new(FakeVarzReporter) diff --git a/src/code.cloudfoundry.org/gorouter/metrics/metricsreporter.go b/src/code.cloudfoundry.org/gorouter/metrics/metricsreporter.go index c406b2065..ab5e59ba1 100644 --- a/src/code.cloudfoundry.org/gorouter/metrics/metricsreporter.go +++ b/src/code.cloudfoundry.org/gorouter/metrics/metricsreporter.go @@ -201,6 +201,14 @@ func (m *Metrics) CaptureNATSDroppedMessages(messages int) { func (m *Metrics) CaptureHTTPLatency(_ time.Duration, _ string) { } +// Empty implementation here is to fulfil interface +func (m *Metrics) CaptureEndpointsPerPool(endpoints int, route string, lbAlgo string) { +} + +// Empty implementation here is to fulfil interface +func (m *Metrics) UncaptureEndpointsPerPool(route string, lbAlgo string) { +} + func getResponseCounterName(statusCode int) string { statusCode = statusCode / 100 if statusCode >= 2 && statusCode <= 5 { diff --git a/src/code.cloudfoundry.org/gorouter/metrics_prometheus/metrics.go b/src/code.cloudfoundry.org/gorouter/metrics_prometheus/metrics.go index e623692c4..287b6bf25 100644 --- a/src/code.cloudfoundry.org/gorouter/metrics_prometheus/metrics.go +++ b/src/code.cloudfoundry.org/gorouter/metrics_prometheus/metrics.go @@ -43,6 +43,7 @@ type Metrics struct { NATSDroppedMessages mr.Gauge HTTPLatency mr.GaugeVec perRequestMetricsReporting bool + EndpointsPerPool mr.GaugeVec } func NewMetricsRegistry(config config.PrometheusConfig) *mr.Registry { @@ -91,6 +92,7 @@ func NewMetrics(registry *mr.Registry, perRequestMetricsReporting bool) *Metrics NATSDroppedMessages: registry.NewGauge("total_dropped_messages", "number of total dropped messages in NATS"), HTTPLatency: registry.NewGaugeVec("http_latency_seconds", "the latency of http requests from gorouter and back in sec", []string{"source_id"}), perRequestMetricsReporting: perRequestMetricsReporting, + EndpointsPerPool: registry.NewGaugeVec("endpoints_per_pool", "number of endpoints per pool", []string{"route", "lb_algorithm"}), } } @@ -220,6 +222,15 @@ func (metrics *Metrics) CaptureHTTPLatency(d time.Duration, sourceID string) { metrics.HTTPLatency.Set(float64(d)/float64(time.Second), []string{sourceID}) } +// CaptureEndpointsPerPool sets the number of endpoints for a given route and load balancing algorithm +func (metrics *Metrics) CaptureEndpointsPerPool(count int, route string, loadBalancingAlgo string) { + metrics.EndpointsPerPool.Set(float64(count), []string{route, loadBalancingAlgo}) +} + +func (metrics *Metrics) UncaptureEndpointsPerPool(route string, lbAlgo string) { + metrics.EndpointsPerPool.Delete([]string{route, lbAlgo}) +} + func statusGroupName(statusCode int) string { statusGroupNum := statusCode / 100 if statusGroupNum >= 2 && statusGroupNum <= 5 { diff --git a/src/code.cloudfoundry.org/gorouter/metrics_prometheus/metrics_test.go b/src/code.cloudfoundry.org/gorouter/metrics_prometheus/metrics_test.go index 0e46fb037..1a90d81b0 100644 --- a/src/code.cloudfoundry.org/gorouter/metrics_prometheus/metrics_test.go +++ b/src/code.cloudfoundry.org/gorouter/metrics_prometheus/metrics_test.go @@ -450,6 +450,54 @@ var _ = Describe("Metrics", func() { Expect(getMetrics(r.Port())).To(ContainSubstring("http_latency_seconds{source_id=\"some-source\"} 0.63")) }) }) + + Context("endpoints_per_pool metric", func() { + BeforeEach(func() { + var config = config.PrometheusConfig{Port: 0} + r = NewMetricsRegistry(config) + m = NewMetrics(r, true) + }) + + It("reports the number of endpoints per pool with correct labels", func() { + m.CaptureEndpointsPerPool(5, "routeA", "round_robin") + metricsOutput := getMetrics(r.Port()) + expected := "endpoints_per_pool{lb_algorithm=\"round_robin\",route=\"routeA\"} 5" + Expect(metricsOutput).To(ContainSubstring(expected)) + }) + + It("updates the value for the same label combination", func() { + m.CaptureEndpointsPerPool(5, "routeA", "round_robin") + m.CaptureEndpointsPerPool(7, "routeA", "round_robin") + metricsOutput := getMetrics(r.Port()) + expected := "endpoints_per_pool{lb_algorithm=\"round_robin\",route=\"routeA\"} 7" + Expect(metricsOutput).To(ContainSubstring(expected)) + }) + + It("reports multiple values for different label combinations", func() { + m.CaptureEndpointsPerPool(5, "routeA", "round_robin") + m.CaptureEndpointsPerPool(3, "routeB", "least_conn") + metricsOutput := getMetrics(r.Port()) + expectedA := "endpoints_per_pool{lb_algorithm=\"round_robin\",route=\"routeA\"} 5" + expectedB := "endpoints_per_pool{lb_algorithm=\"least_conn\",route=\"routeB\"} 3" + Expect(metricsOutput).To(ContainSubstring(expectedA)) + Expect(metricsOutput).To(ContainSubstring(expectedB)) + }) + + It("deletes the metric for a given route and LB algorithm", func() { + m.CaptureEndpointsPerPool(5, "routeA", "round_robin") + Expect(getMetrics(r.Port())).To(ContainSubstring("endpoints_per_pool{lb_algorithm=\"round_robin\",route=\"routeA\"} 5")) + + m.UncaptureEndpointsPerPool("routeA", "round_robin") + Expect(getMetrics(r.Port())).NotTo(ContainSubstring("endpoints_per_pool{lb_algorithm=\"round_robin\",route=\"routeA\"}")) + }) + + It("does nothing when deleting a non-existent label combination", func() { + m.CaptureEndpointsPerPool(5, "routeA", "round_robin") + + m.UncaptureEndpointsPerPool("routeX", "round_robin") + Expect(getMetrics(r.Port())).To(ContainSubstring("endpoints_per_pool{lb_algorithm=\"round_robin\",route=\"routeA\"} 5")) + }) + }) }) func getMetrics(port string) string { diff --git a/src/code.cloudfoundry.org/gorouter/registry/registry.go b/src/code.cloudfoundry.org/gorouter/registry/registry.go index 11821e985..1ab2df53e 100644 --- a/src/code.cloudfoundry.org/gorouter/registry/registry.go +++ b/src/code.cloudfoundry.org/gorouter/registry/registry.go @@ -108,10 +108,12 @@ func (r *RouteRegistry) Register(uri route.Uri, endpoint *route.Endpoint) { if r.logger.Enabled(context.Background(), slog.LevelInfo) { r.logger.Info("endpoint-registered", buildSlogAttrs(uri, endpoint)...) } + r.reportEndpointsPerPool(uri, endpoint) case route.EndpointUpdated: if r.logger.Enabled(context.Background(), slog.LevelInfo) { r.logger.Info("endpoint-registered", buildSlogAttrs(uri, endpoint)...) } + r.reportEndpointsPerPool(uri, endpoint) case route.EndpointUnmodified: if r.logger.Enabled(context.Background(), slog.LevelDebug) { r.logger.Debug("endpoint-not-registered", buildSlogAttrs(uri, endpoint)...) @@ -186,6 +188,7 @@ func (r *RouteRegistry) Unregister(uri route.Uri, endpoint *route.Endpoint) { if endpointRemoved { r.logger.Info("endpoint-unregistered", buildSlogAttrs(uri, endpoint)...) + r.reportEndpointsPerPool(uri, endpoint) } else { if r.logger.Enabled(context.Background(), slog.LevelDebug) { r.logger.Debug("endpoint-not-unregistered", buildSlogAttrs(uri, endpoint)...) @@ -420,6 +423,8 @@ func (r *RouteRegistry) pruneStaleDroplets() { r.byURI.EachNodeWithPool(func(t *container.Trie) { endpoints := t.Pool.PruneEndpoints() + lbAlgo := t.Pool.LoadBalancingAlgorithm + uri := t.ToPath() if r.EmptyPoolResponseCode503 && r.EmptyPoolTimeout > 0 { if time.Since(t.Pool.LastUpdated()) > r.EmptyPoolTimeout { t.Snip() @@ -438,11 +443,19 @@ func (r *RouteRegistry) pruneStaleDroplets() { isolationSegment = "-" } r.logger.Info("pruned-route", - slog.String("uri", t.ToPath()), + slog.String("uri", uri), slog.Any("endpoints", addresses), slog.String("isolation_segment", isolationSegment), ) r.reporter.CaptureRoutesPruned(uint64(len(endpoints))) + + if lbAlgo == config.LOAD_BALANCE_HB { + if t.Pool == nil || t.Pool.NumEndpoints() == 0 { + r.reporter.UncaptureEndpointsPerPool(uri, config.LOAD_BALANCE_HB) + } else { + r.reporter.CaptureEndpointsPerPool(t.Pool.NumEndpoints(), uri, config.LOAD_BALANCE_HB) + } + } } }) } @@ -461,6 +474,25 @@ func (r *RouteRegistry) freshenRoutes() { }) } +// reportEndpointsPerPool reports the endpoints_per_pool metric for hash-based (HB) route pools. +// For non-HB endpoints, it deletes any stale HB metric entries (e.g. after switching away from HB). +func (r *RouteRegistry) reportEndpointsPerPool(uri route.Uri, endpoint *route.Endpoint) { + if endpoint.LoadBalancingAlgorithm != config.LOAD_BALANCE_HB { + r.reporter.UncaptureEndpointsPerPool(string(uri), config.LOAD_BALANCE_HB) + return + } + + r.RLock() + pool := r.byURI.Find(uri.RouteKey()) + r.RUnlock() + + if pool == nil || pool.NumEndpoints() == 0 { + r.reporter.UncaptureEndpointsPerPool(string(uri), config.LOAD_BALANCE_HB) + return + } + r.reporter.CaptureEndpointsPerPool(pool.NumEndpoints(), string(uri), config.LOAD_BALANCE_HB) +} + func splitHostAndContextPath(uri route.Uri) (string, string) { contextPath := "/" trimmedUri := strings.TrimPrefix(uri.String(), "/") diff --git a/src/code.cloudfoundry.org/gorouter/registry/registry_test.go b/src/code.cloudfoundry.org/gorouter/registry/registry_test.go index c14dbf627..5e828186b 100644 --- a/src/code.cloudfoundry.org/gorouter/registry/registry_test.go +++ b/src/code.cloudfoundry.org/gorouter/registry/registry_test.go @@ -558,6 +558,310 @@ var _ = Describe("RouteRegistry", func() { }) }) + Context("endpoints_per_pool metric", func() { + It("reports the metric when registering an endpoint with hash-based routing", func() { + hbEndpoint1 := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-hb-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + }) + hbEndpoint2 := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.2", + Port: 8080, + PrivateInstanceId: "id-hb-2", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + }) + + r.Register("hb.example.com", hbEndpoint1) + Expect(reporter.CaptureEndpointsPerPoolCallCount()).To(BeNumerically(">=", 1)) + count, uri, algo := reporter.CaptureEndpointsPerPoolArgsForCall(0) + Expect(count).To(Equal(1)) + Expect(uri).To(Equal("hb.example.com")) + Expect(algo).To(Equal(config.LOAD_BALANCE_HB)) + + r.Register("hb.example.com", hbEndpoint2) + lastCall := reporter.CaptureEndpointsPerPoolCallCount() - 1 + count, uri, algo = reporter.CaptureEndpointsPerPoolArgsForCall(lastCall) + Expect(count).To(Equal(2)) + Expect(uri).To(Equal("hb.example.com")) + Expect(algo).To(Equal(config.LOAD_BALANCE_HB)) + }) + + It("does not delete the metric immediately after capturing it for HB endpoints", func() { + hbEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-hb-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + }) + + r.Register("hb.example.com", hbEndpoint) + + captureCount := reporter.CaptureEndpointsPerPoolCallCount() + deleteCount := reporter.UncaptureEndpointsPerPoolCallCount() + + // The metric should be captured but NOT deleted for an active HB route + Expect(captureCount).To(Equal(1)) + Expect(deleteCount).To(Equal(0), "UncaptureEndpointsPerPool should not be called when registering an HB endpoint with a live pool") + }) + + It("does not report the metric for non-HB endpoints", func() { + rrEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-rr-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_RR, + }) + + r.Register("rr.example.com", rrEndpoint) + Expect(reporter.CaptureEndpointsPerPoolCallCount()).To(Equal(0)) + }) + + It("deletes the metric when the last HB endpoint is unregistered", func() { + hbEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-hb-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + }) + + r.Register("hb.example.com", hbEndpoint) + Expect(reporter.CaptureEndpointsPerPoolCallCount()).To(BeNumerically(">=", 1)) + + r.Unregister("hb.example.com", hbEndpoint) + // After unregistering the last endpoint, the metric should be deleted + Expect(reporter.UncaptureEndpointsPerPoolCallCount()).To(BeNumerically(">=", 1)) + lastCall := reporter.UncaptureEndpointsPerPoolCallCount() - 1 + uri, algo := reporter.UncaptureEndpointsPerPoolArgsForCall(lastCall) + Expect(uri).To(Equal("hb.example.com")) + Expect(algo).To(Equal(config.LOAD_BALANCE_HB)) + }) + + It("updates the metric when an HB endpoint is removed but others remain", func() { + hbEndpoint1 := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-hb-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + }) + hbEndpoint2 := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.2", + Port: 8080, + PrivateInstanceId: "id-hb-2", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + }) + + r.Register("hb.example.com", hbEndpoint1) + r.Register("hb.example.com", hbEndpoint2) + + r.Unregister("hb.example.com", hbEndpoint1) + lastCall := reporter.CaptureEndpointsPerPoolCallCount() - 1 + count, uri, algo := reporter.CaptureEndpointsPerPoolArgsForCall(lastCall) + Expect(count).To(Equal(1)) + Expect(uri).To(Equal("hb.example.com")) + Expect(algo).To(Equal(config.LOAD_BALANCE_HB)) + }) + + It("deletes the HB metric when a route switches from hash-based to round-robin", func() { + hbEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + }) + + r.Register("switching.example.com", hbEndpoint) + Expect(reporter.CaptureEndpointsPerPoolCallCount()).To(BeNumerically(">=", 1)) + + // Same endpoint now registers with round-robin algorithm + rrEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_RR, + }) + + deleteBefore := reporter.UncaptureEndpointsPerPoolCallCount() + r.Register("switching.example.com", rrEndpoint) + + // Should have called delete to clean up the stale HB metric + Expect(reporter.UncaptureEndpointsPerPoolCallCount()).To(BeNumerically(">", deleteBefore)) + found := false + for i := deleteBefore; i < reporter.UncaptureEndpointsPerPoolCallCount(); i++ { + uri, algo := reporter.UncaptureEndpointsPerPoolArgsForCall(i) + if uri == "switching.example.com" && algo == config.LOAD_BALANCE_HB { + found = true + break + } + } + Expect(found).To(BeTrue(), "expected UncaptureEndpointsPerPool to be called for the switched route") + }) + + It("deletes the HB metric when the HB algorithm is removed and endpoint registers with the platform default", func() { + hbEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + }) + + r.Register("removed-hb.example.com", hbEndpoint) + Expect(reporter.CaptureEndpointsPerPoolCallCount()).To(BeNumerically(">=", 1)) + + // Endpoint re-registers with platform default algorithm (HB setting explicitly removed from route) + defaultAlgoEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_RR, + }) + + deleteBefore := reporter.UncaptureEndpointsPerPoolCallCount() + r.Register("removed-hb.example.com", defaultAlgoEndpoint) + + Expect(reporter.UncaptureEndpointsPerPoolCallCount()).To(BeNumerically(">", deleteBefore)) + found := false + for i := deleteBefore; i < reporter.UncaptureEndpointsPerPoolCallCount(); i++ { + uri, algo := reporter.UncaptureEndpointsPerPoolArgsForCall(i) + if uri == "removed-hb.example.com" && algo == config.LOAD_BALANCE_HB { + found = true + break + } + } + Expect(found).To(BeTrue(), "expected UncaptureEndpointsPerPool to be called when HB algorithm is removed") + }) + + It("calls uncapture for non-HB endpoints", func() { + rrEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_RR, + }) + + r.Register("rr-route.example.com", rrEndpoint) + Expect(reporter.CaptureEndpointsPerPoolCallCount()).To(Equal(0)) + Expect(reporter.UncaptureEndpointsPerPoolCallCount()).To(BeNumerically(">=", 1)) + }) + + It("does not change the metric count when an existing HB endpoint is updated", func() { + hbEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-hb-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + Tags: map[string]string{"version": "v1"}, + }) + + r.Register("hb.example.com", hbEndpoint) + Expect(reporter.CaptureEndpointsPerPoolCallCount()).To(Equal(1)) + count, _, _ := reporter.CaptureEndpointsPerPoolArgsForCall(0) + Expect(count).To(Equal(1)) + + // Re-register the same endpoint with updated tags (triggers EndpointUpdated) + hbEndpointUpdated := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-hb-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + Tags: map[string]string{"version": "v2"}, + }) + + r.Register("hb.example.com", hbEndpointUpdated) + lastCall := reporter.CaptureEndpointsPerPoolCallCount() - 1 + count, _, _ = reporter.CaptureEndpointsPerPoolArgsForCall(lastCall) + Expect(count).To(Equal(1), "endpoint count should remain 1 after updating an existing endpoint") + }) + + It("deletes the metric when HB endpoints are pruned", func() { + hbEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-hb-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + StaleThresholdInSeconds: int(configObj.DropletStaleThreshold.Seconds()), + }) + + r.Register("hb-prune.example.com", hbEndpoint) + Expect(reporter.CaptureEndpointsPerPoolCallCount()).To(BeNumerically(">=", 1)) + + r.StartPruningCycle() + defer r.StopPruningCycle() + + // Wait for the endpoint to go stale and be pruned + Eventually(func() int { + return reporter.UncaptureEndpointsPerPoolCallCount() + }, configObj.PruneStaleDropletsInterval+configObj.DropletStaleThreshold+100*time.Millisecond).Should(BeNumerically(">=", 1)) + + found := false + for i := 0; i < reporter.UncaptureEndpointsPerPoolCallCount(); i++ { + uri, algo := reporter.UncaptureEndpointsPerPoolArgsForCall(i) + if uri == "hb-prune.example.com" && algo == config.LOAD_BALANCE_HB { + found = true + break + } + } + Expect(found).To(BeTrue(), "expected UncaptureEndpointsPerPool to be called when HB endpoints are pruned") + }) + + It("updates the metric when some HB endpoints are pruned but others remain", func() { + freshEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.1", + Port: 8080, + PrivateInstanceId: "id-hb-1", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + }) + staleEndpoint := route.NewEndpoint(&route.EndpointOpts{ + Host: "192.168.1.2", + Port: 8080, + PrivateInstanceId: "id-hb-2", + LoadBalancingAlgorithm: config.LOAD_BALANCE_HB, + StaleThresholdInSeconds: int(configObj.DropletStaleThreshold.Seconds()), + }) + + r.Register("hb-partial-prune.example.com", freshEndpoint) + r.Register("hb-partial-prune.example.com", staleEndpoint) + + doneChan := make(chan struct{}) + defer close(doneChan) + + // Keep the fresh endpoint alive during pruning + go func() { + for { + select { + case <-doneChan: + return + default: + r.Register("hb-partial-prune.example.com", freshEndpoint) + time.Sleep(configObj.DropletStaleThreshold / 2) + } + } + }() + + captureCountBefore := reporter.CaptureEndpointsPerPoolCallCount() + + r.StartPruningCycle() + defer r.StopPruningCycle() + + // Wait for the stale endpoint to be pruned + Eventually(func() int { + return reporter.CaptureEndpointsPerPoolCallCount() + }, 2*(configObj.PruneStaleDropletsInterval+configObj.DropletStaleThreshold)).Should(BeNumerically(">", captureCountBefore)) + + // Find the capture call from pruning that shows count decreased to 1 + found := false + for i := captureCountBefore; i < reporter.CaptureEndpointsPerPoolCallCount(); i++ { + count, uri, algo := reporter.CaptureEndpointsPerPoolArgsForCall(i) + if uri == "hb-partial-prune.example.com" && algo == config.LOAD_BALANCE_HB && count == 1 { + found = true + break + } + } + Expect(found).To(BeTrue(), "expected CaptureEndpointsPerPool to be called with count=1 after partial pruning") + }) + }) + Context("Unregister", func() { Context("when endpoint has component tagged", func() { BeforeEach(func() {