Skip to content

Commit 5d0021b

Browse files
committed
feat(cluster-agent): enhance DatadogPodAutoscaler metrics with leader checks and tests
Improve the metrics generation and submission for DatadogPodAutoscaler: * **Observer pattern enhancement**: Changed autoscaling.ObserverFunc signature from func(string, string) to func(string, interface{}) to allow observers direct access to stored objects without additional lookups * **Centralized tag generation**: Refactored generator.go to use helper functions (baseAutoscalerTags, autoscalerTagsWithSource, autoscalerTagsWithContainer, conditionTags) making it easier to add or modify tags across all metrics * **Leader-only metric submission**: Added isLeader checks to SenderMetricsWriter, horizontalController, and verticalController to prevent non-leader instances from submitting duplicate metrics * **Backward compatibility**: Retained isLeader:true tag on all metrics via baseAutoscalerTags helper function
1 parent 10bbc8e commit 5d0021b

33 files changed

Lines changed: 1429 additions & 378 deletions

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@
464464
/pkg/cloudfoundry @DataDog/agent-integrations
465465
/pkg/clusteragent/ @DataDog/container-platform
466466
/pkg/clusteragent/autoscaling/ @DataDog/container-autoscaling
467+
/pkg/clusteragent/metricsstore/ @DataDog/container-autoscaling
467468
/pkg/clusteragent/admission/mutate/autoscaling @DataDog/container-integrations
468469
/pkg/clusteragent/admission/mutate/appsec @DataDog/container-platform @DataDog/asm-go
469470
/pkg/clusteragent/admission/mutate/autoinstrumentation/ @DataDog/container-platform @DataDog/injection-platform

cmd/cluster-agent/subcommands/start/command.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func start(log log.Component,
538538
log.Error("Admission controller is disabled, vertical autoscaling requires the admission controller to be enabled. Vertical scaling will be disabled.")
539539
}
540540

541-
if adapter, err := provider.StartWorkloadAutoscaling(mainCtx, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, demultiplexer); err == nil {
541+
if adapter, err := provider.StartWorkloadAutoscaling(mainCtx, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, taggerComp, demultiplexer); err == nil {
542542
pa = adapter
543543
} else {
544544
return fmt.Errorf("Error while starting workload autoscaling: %v", err)

pkg/clusteragent/autoscaling/cluster/config_retriever.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ import (
1212

1313
"k8s.io/utils/clock"
1414

15+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
1516
"github.com/DataDog/datadog-agent/pkg/config/remote/data"
1617
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
1718
"github.com/DataDog/datadog-agent/pkg/util/log"
1819
)
1920

2021
const (
21-
configRetrieverStoreID string = "cacr"
22+
configRetrieverStoreID autoscaling.SenderID = "cacr"
2223
)
2324

2425
// RcClient is a subinterface of rcclient.Component to allow mocking

pkg/clusteragent/autoscaling/cluster/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ var (
4040
// Only support EC2 for now
4141
nodeClassGVR = schema.GroupVersionResource{Group: "karpenter.k8s.aws", Version: "v1", Resource: "ec2nodeclasses"}
4242

43-
controllerID = "dca-c"
43+
controllerID autoscaling.SenderID = "dca-c"
4444
)
4545

4646
type Controller struct {

pkg/clusteragent/autoscaling/cluster/telemetry.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,3 @@ const (
1616
)
1717

1818
var autoscalingQueueMetricsProvider = workqueuetelemetry.NewQueueMetricsProvider()
19-
20-
// TODO add more telemetry collection

pkg/clusteragent/autoscaling/controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Controller struct {
3030
context context.Context
3131

3232
// Fields available to child controllers
33-
ID string
33+
ID SenderID
3434
Client dynamic.Interface
3535
Lister cache.GenericLister
3636
Workqueue workqueue.TypedRateLimitingInterface[string]
@@ -39,7 +39,7 @@ type Controller struct {
3939

4040
// NewController returns a new workload autoscaling controller
4141
func NewController(
42-
controllerID string,
42+
controllerID SenderID,
4343
processor Processor,
4444
client dynamic.Interface,
4545
informer dynamicinformer.DynamicSharedInformerFactory,
@@ -120,7 +120,7 @@ func (c *Controller) enqueue(obj interface{}) {
120120
c.Workqueue.AddRateLimited(key)
121121
}
122122

123-
func (c *Controller) enqueueID(id, sender string) {
123+
func (c *Controller) enqueueID(id string, sender SenderID, _ interface{}) {
124124
// Do not enqueue our own updates (avoid infinite loops)
125125
if sender != c.ID {
126126
log.Tracef("Enqueueing from observer update id: %s from sender: %s", id, sender)

pkg/clusteragent/autoscaling/externalmetrics/autoscaler_watcher.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,19 @@ import (
2727

2828
"github.com/DataDog/watermarkpodautoscaler/apis/datadoghq/v1alpha1"
2929

30+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
3031
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
3132
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/controllers"
3233
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/autoscalers"
3334
"github.com/DataDog/datadog-agent/pkg/util/log"
3435
)
3536

3637
const (
37-
autoscalerWatcherStoreID string = "aw"
38-
autoscalerReferencesSep string = ", "
39-
autoscalerReferencesKindSep string = ":"
40-
autoscalerWPAKindKey string = "wpa"
41-
autoscalerHPAKindKey string = "hpa"
38+
autoscalerWatcherStoreID autoscaling.SenderID = "aw"
39+
autoscalerReferencesSep string = ", "
40+
autoscalerReferencesKindSep string = ":"
41+
autoscalerWPAKindKey string = "wpa"
42+
autoscalerHPAKindKey string = "hpa"
4243
)
4344

4445
// AutoscalerWatcher watches autoscaling objects and reconciles the corresponding external metrics

pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"fmt"
1414
"time"
1515

16+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
1617
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
1718
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
1819
"github.com/DataDog/datadog-agent/pkg/util/log"
@@ -29,9 +30,9 @@ import (
2930
)
3031

3132
const (
32-
maxRetry int = 3
33-
requeueDelaySeconds int = 2
34-
ddmControllerStoreID string = "ddmc"
33+
maxRetry int = 3
34+
requeueDelaySeconds int = 2
35+
ddmControllerStoreID autoscaling.SenderID = "ddmc"
3536
)
3637

3738
type controllerOperation string
@@ -146,8 +147,8 @@ func (c *DatadogMetricController) enqueue(obj interface{}) {
146147
c.workqueue.AddRateLimited(key)
147148
}
148149

149-
func (c *DatadogMetricController) enqueueID(id, sender string) {
150-
// Do not enqueue our own updates (avoid infinite loops)
150+
func (c *DatadogMetricController) enqueueID(id string, sender autoscaling.SenderID, _ interface{}) {
151+
// Avoid re-enqueuing events triggered by this controller's own store updates
151152
if sender != ddmControllerStoreID {
152153
c.workqueue.AddRateLimited(id)
153154
}
@@ -352,7 +353,7 @@ func (c *DatadogMetricController) deleteDatadogMetric(ns, name string) error {
352353
return nil
353354
}
354355

355-
func (c *DatadogMetricController) deleteTelemetry(id, _ string) {
356+
func (c *DatadogMetricController) deleteTelemetry(id string, _ autoscaling.SenderID, _ interface{}) {
356357
ns, name, err := cache.SplitMetaNamespaceKey(id)
357358
if err != nil {
358359
log.Debugf("Unable to split meta namespace key to delete telemetry: %v", err)

pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"errors"
1212
"time"
1313

14+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
1415
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
1516
"github.com/DataDog/datadog-agent/pkg/util/backoff"
1617
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
@@ -19,7 +20,7 @@ import (
1920
)
2021

2122
const (
22-
metricRetrieverStoreID string = "mr"
23+
metricRetrieverStoreID autoscaling.SenderID = "mr"
2324
)
2425

2526
// Backoff range for number of retries R:

pkg/clusteragent/autoscaling/externalmetrics/store.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package externalmetrics
1010
import (
1111
"sync"
1212

13+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
1314
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
1415
)
1516

@@ -19,7 +20,8 @@ const (
1920
)
2021

2122
// DatadogMetricInternalObserverFunc represents observer functions of the datadog metrics store
22-
type DatadogMetricInternalObserverFunc func(string, string)
23+
// First parameter is the key, second parameter is the sender identifier, third parameter is the object (or nil for delete operations)
24+
type DatadogMetricInternalObserverFunc func(string, autoscaling.SenderID, interface{})
2325

2426
// DatadogMetricInternalObserver allows to define functions to watch changes in Store
2527
type DatadogMetricInternalObserver struct {
@@ -108,23 +110,23 @@ func (ds *DatadogMetricsInternalStore) Count() int {
108110
}
109111

110112
// Set `DatadogMetricInternal` for id
111-
func (ds *DatadogMetricsInternalStore) Set(id string, datadogMetric model.DatadogMetricInternal, sender string) {
113+
func (ds *DatadogMetricsInternalStore) Set(id string, datadogMetric model.DatadogMetricInternal, sender autoscaling.SenderID) {
112114
ds.lock.Lock()
113115
ds.store[id] = datadogMetric
114116
ds.lock.Unlock()
115117

116-
ds.notify(setOperation, id, sender)
118+
ds.notify(setOperation, id, sender, datadogMetric)
117119
}
118120

119121
// Delete `DatadogMetricInternal` corresponding to id if present
120-
func (ds *DatadogMetricsInternalStore) Delete(id, sender string) {
122+
func (ds *DatadogMetricsInternalStore) Delete(id string, sender autoscaling.SenderID) {
121123
ds.lock.Lock()
122-
_, exists := ds.store[id]
124+
obj, exists := ds.store[id]
123125
delete(ds.store, id)
124126
ds.lock.Unlock()
125127

126128
if exists {
127-
ds.notify(deleteOperation, id, sender)
129+
ds.notify(deleteOperation, id, sender, obj)
128130
}
129131
}
130132

@@ -152,31 +154,31 @@ func (ds *DatadogMetricsInternalStore) Unlock(string) {
152154
}
153155

154156
// UnlockSet sets the new DatadogMetricInternal value and releases the lock (previously acquired by `LockRead`)
155-
func (ds *DatadogMetricsInternalStore) UnlockSet(id string, datadogMetric model.DatadogMetricInternal, sender string) {
157+
func (ds *DatadogMetricsInternalStore) UnlockSet(id string, datadogMetric model.DatadogMetricInternal, sender autoscaling.SenderID) {
156158
ds.store[id] = datadogMetric
157159
ds.lock.Unlock()
158160

159-
ds.notify(setOperation, id, sender)
161+
ds.notify(setOperation, id, sender, datadogMetric)
160162
}
161163

162164
// UnlockDelete deletes a DatadogMetricInternal and releases the lock (previously acquired by `LockRead`)
163-
func (ds *DatadogMetricsInternalStore) UnlockDelete(id, sender string) {
164-
_, exists := ds.store[id]
165+
func (ds *DatadogMetricsInternalStore) UnlockDelete(id string, sender autoscaling.SenderID) {
166+
obj, exists := ds.store[id]
165167

166168
delete(ds.store, id)
167169
ds.lock.Unlock()
168170

169171
if exists {
170-
ds.notify(deleteOperation, id, sender)
172+
ds.notify(deleteOperation, id, sender, obj)
171173
}
172174
}
173175

174176
// It's a very simple implementation of a notify process, but it's enough in our case as we aim at only 1 or 2 observers
175-
func (ds *DatadogMetricsInternalStore) notify(operationType storeOperation, key, sender string) {
177+
func (ds *DatadogMetricsInternalStore) notify(operationType storeOperation, key string, sender autoscaling.SenderID, obj interface{}) {
176178
ds.observersLock.RLock()
177179
defer ds.observersLock.RUnlock()
178180

179181
for _, observer := range ds.observers[operationType] {
180-
observer(key, sender)
182+
observer(key, sender, obj)
181183
}
182184
}

0 commit comments

Comments
 (0)