Skip to content

Commit 3561a7b

Browse files
authored
[CASCL-623] feat(cluster-agent): enhance DatadogPodAutoscaler metrics with dedicate metrics store (#46833)
## Scope of This PR This PR revisits the previous attempt: #42547 The focus here is strictly on **refactoring the existing metric generation logic**, without introducing new metrics. A follow-up PR will build on this foundation to introduce additional DPA metrics. ## Motivation Today, `DatadogPodAutoscaler` (DPA) resource metrics are exposed as OpenMetrics/Prometheus metrics and scraped by the `datadog_cluster_agent` check. While functional, this approach has several drawbacks: - All scraped metrics automatically inherit the **Cluster Agent pod context** (`pod_name`, `kube_namespace`, `kube_container_name`, etc.). - This creates confusion around DPA metric tags, as metrics appear tied to Cluster Agent pods rather than the DPA resources themselves. - It increases metric cardinality due to multiple Cluster Agent instances (leader, followers, rollouts) contributing additional metric contexts. ## Proposed Approach This PR changes how DPA metrics are generated. Instead of exposing them via OpenMetrics and relying on the `datadog_cluster_agent` check for collection, DPA metrics are now produced directly within the Cluster Agent autoscaling component — following the same pattern used for `kubernetes_state` metrics. This provides: - Better control over which tags are attached to DPA metrics - Cleaner and more accurate metric context - Reduced unnecessary metric cardinality ## Summary ### Core changes - Refactor `ObserverFunc` signature from `(string, string)` to `(string, interface{})` to pass the actual object to observers, enabling richer metric generation - Add new `pkg/clusteragent/autoscaling/workload/metrics` package with a `PodAutoscalerMetricsStore` that generates and periodically sends structured metrics (gauges/counts) for `DatadogPodAutoscaler` objects via `sender.Sender` - Replace old telemetry helpers (`telemetry.go` tag-based metrics) with leader-aware metric submission; metrics are only emitted by the leader ### Action metrics consolidation - Move horizontal/vertical scaling action metrics from event-driven `Submit*` functions in `counters.go` into the state-based `GeneratePodAutoscalerMetrics` generator - Delete `counters.go` entirely (`SubmitReceivedRecommendationsVersion` was dead code; remaining `Submit*` functions replaced by the generator) - Remove `sender` and `isLeader` from `horizontalController` and `verticalController` since they were only used for the now-removed `Submit*` calls ### Metrics emitted | Metric | Type | Notes | |--------|------|-------| | `received_recommendations_version` | Gauge | RC version of last received main scaling values; only emitted when > 0 | | `horizontal_scaling_received_replicas` | Gauge | Replicas recommended by the product recommender | | `vertical_scaling_received_requests` | Gauge | Per-container requested resources from recommender | | `vertical_scaling_received_limits` | Gauge | Per-container resource limits from recommender | | `horizontal_scaling_applied_replicas` | Gauge | Replicas from the last applied horizontal action | | `horizontal_scaling_actions` | Count | Cumulative count of horizontal scaling actions, tagged `status:ok` or `status:error` | | `vertical_rollout_triggered` | Count | Cumulative count of vertical rollout actions, tagged `status:ok` or `status:error` | | `autoscaler_conditions` | Gauge | 1.0/0.0 per condition type from CRD status | | `local_fallback_enabled` | Gauge | 1.0 when horizontal active source is local fallback | ### Model changes - Add `mainScalingValuesVersion uint64` to `PodAutoscalerInternal` to persist the remote config version of the last received main scaling values - Extend `UpdateFromMainValues` to accept and persist the RC version; `RemoveMainValues` resets it to 0 - Add `horizontalActionErrorCount`/`horizontalActionSuccessCount` and `verticalActionErrorCount`/`verticalActionSuccessCount` counter fields, incremented on each action outcome - Add public getters: `MainScalingValuesVersion()`, `HorizontalActionErrorCount()`, `HorizontalActionSuccessCount()`, `VerticalActionErrorCount()`, `VerticalActionSuccessCount()` ## Test plan - [x] Unit tests added for `metrics/store`, `metrics/generator`, and `metrics/writer` - [x] `generator_test.go` covers all 9 metrics including both `status:ok` and `status:error` count variants - [x] `config_retriever_values_test.go` updated with expected `MainScalingValuesVersion` values - [x] `controller_horizontal_test.go` updated to assert `HorizontalActionSuccessCount`/`HorizontalActionErrorCount` - [x] Existing `workload` controller tests updated to remove now-deleted `sender`/`isLeader` fixtures - [x] All tests in `pkg/clusteragent/autoscaling/...` pass locally 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: cedric.lamoriniere <cedric.lamoriniere@datadoghq.com>
1 parent f6e6779 commit 3561a7b

33 files changed

Lines changed: 1435 additions & 383 deletions

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@
466466
/pkg/cloudfoundry @DataDog/agent-integrations
467467
/pkg/clusteragent/ @DataDog/container-platform
468468
/pkg/clusteragent/autoscaling/ @DataDog/container-autoscaling
469+
/pkg/clusteragent/metricsstore/ @DataDog/container-autoscaling
469470
/pkg/clusteragent/admission/mutate/autoscaling @DataDog/container-integrations
470471
/pkg/clusteragent/admission/mutate/appsec @DataDog/container-platform @DataDog/asm-go
471472
/pkg/clusteragent/admission/mutate/autoinstrumentation/ @DataDog/container-platform @DataDog/injection-platform

cmd/cluster-agent/subcommands/start/command.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func start(log log.Component,
538538
log.Error("Admission controller is disabled, vertical autoscaling requires the admission controller to be enabled. Vertical scaling will be disabled.")
539539
}
540540

541-
if adapter, err := provider.StartWorkloadAutoscaling(mainCtx, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, demultiplexer); err == nil {
541+
if adapter, err := provider.StartWorkloadAutoscaling(mainCtx, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, taggerComp, demultiplexer); err == nil {
542542
pa = adapter
543543
} else {
544544
return fmt.Errorf("Error while starting workload autoscaling: %v", err)

pkg/clusteragent/autoscaling/cluster/config_retriever.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ import (
1212

1313
"k8s.io/utils/clock"
1414

15+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
1516
"github.com/DataDog/datadog-agent/pkg/config/remote/data"
1617
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
1718
"github.com/DataDog/datadog-agent/pkg/util/log"
1819
)
1920

2021
const (
21-
configRetrieverStoreID string = "cacr"
22+
configRetrieverStoreID autoscaling.SenderID = "cacr"
2223
)
2324

2425
// RcClient is a subinterface of rcclient.Component to allow mocking

pkg/clusteragent/autoscaling/cluster/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ var (
4040
// Only support EC2 for now
4141
nodeClassGVR = schema.GroupVersionResource{Group: "karpenter.k8s.aws", Version: "v1", Resource: "ec2nodeclasses"}
4242

43-
controllerID = "dca-c"
43+
controllerID autoscaling.SenderID = "dca-c"
4444
)
4545

4646
type Controller struct {

pkg/clusteragent/autoscaling/cluster/telemetry.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,3 @@ const (
1616
)
1717

1818
var autoscalingQueueMetricsProvider = workqueuetelemetry.NewQueueMetricsProvider()
19-
20-
// TODO add more telemetry collection

pkg/clusteragent/autoscaling/controller.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Controller struct {
3030
context context.Context
3131

3232
// Fields available to child controllers
33-
ID string
33+
ID SenderID
3434
Client dynamic.Interface
3535
Lister cache.GenericLister
3636
Workqueue workqueue.TypedRateLimitingInterface[string]
@@ -39,7 +39,7 @@ type Controller struct {
3939

4040
// NewController returns a new workload autoscaling controller
4141
func NewController(
42-
controllerID string,
42+
controllerID SenderID,
4343
processor Processor,
4444
client dynamic.Interface,
4545
informer dynamicinformer.DynamicSharedInformerFactory,
@@ -120,7 +120,7 @@ func (c *Controller) enqueue(obj interface{}) {
120120
c.Workqueue.AddRateLimited(key)
121121
}
122122

123-
func (c *Controller) enqueueID(id, sender string) {
123+
func (c *Controller) enqueueID(id string, sender SenderID, _ interface{}) {
124124
// Do not enqueue our own updates (avoid infinite loops)
125125
if sender != c.ID {
126126
log.Tracef("Enqueueing from observer update id: %s from sender: %s", id, sender)

pkg/clusteragent/autoscaling/externalmetrics/autoscaler_watcher.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,19 @@ import (
2727

2828
"github.com/DataDog/watermarkpodautoscaler/apis/datadoghq/v1alpha1"
2929

30+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
3031
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
3132
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/controllers"
3233
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/autoscalers"
3334
"github.com/DataDog/datadog-agent/pkg/util/log"
3435
)
3536

3637
const (
37-
autoscalerWatcherStoreID string = "aw"
38-
autoscalerReferencesSep string = ", "
39-
autoscalerReferencesKindSep string = ":"
40-
autoscalerWPAKindKey string = "wpa"
41-
autoscalerHPAKindKey string = "hpa"
38+
autoscalerWatcherStoreID autoscaling.SenderID = "aw"
39+
autoscalerReferencesSep string = ", "
40+
autoscalerReferencesKindSep string = ":"
41+
autoscalerWPAKindKey string = "wpa"
42+
autoscalerHPAKindKey string = "hpa"
4243
)
4344

4445
// AutoscalerWatcher watches autoscaling objects and reconciles the corresponding external metrics

pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"fmt"
1414
"time"
1515

16+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
1617
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
1718
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
1819
"github.com/DataDog/datadog-agent/pkg/util/log"
@@ -29,9 +30,9 @@ import (
2930
)
3031

3132
const (
32-
maxRetry int = 3
33-
requeueDelaySeconds int = 2
34-
ddmControllerStoreID string = "ddmc"
33+
maxRetry int = 3
34+
requeueDelaySeconds int = 2
35+
ddmControllerStoreID autoscaling.SenderID = "ddmc"
3536
)
3637

3738
type controllerOperation string
@@ -146,8 +147,8 @@ func (c *DatadogMetricController) enqueue(obj interface{}) {
146147
c.workqueue.AddRateLimited(key)
147148
}
148149

149-
func (c *DatadogMetricController) enqueueID(id, sender string) {
150-
// Do not enqueue our own updates (avoid infinite loops)
150+
func (c *DatadogMetricController) enqueueID(id string, sender autoscaling.SenderID, _ interface{}) {
151+
// Avoid re-enqueuing events triggered by this controller's own store updates
151152
if sender != ddmControllerStoreID {
152153
c.workqueue.AddRateLimited(id)
153154
}
@@ -352,7 +353,7 @@ func (c *DatadogMetricController) deleteDatadogMetric(ns, name string) error {
352353
return nil
353354
}
354355

355-
func (c *DatadogMetricController) deleteTelemetry(id, _ string) {
356+
func (c *DatadogMetricController) deleteTelemetry(id string, _ autoscaling.SenderID, _ interface{}) {
356357
ns, name, err := cache.SplitMetaNamespaceKey(id)
357358
if err != nil {
358359
log.Debugf("Unable to split meta namespace key to delete telemetry: %v", err)

pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"errors"
1212
"time"
1313

14+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
1415
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
1516
"github.com/DataDog/datadog-agent/pkg/util/backoff"
1617
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
@@ -19,7 +20,7 @@ import (
1920
)
2021

2122
const (
22-
metricRetrieverStoreID string = "mr"
23+
metricRetrieverStoreID autoscaling.SenderID = "mr"
2324
)
2425

2526
// Backoff range for number of retries R:

pkg/clusteragent/autoscaling/externalmetrics/store.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package externalmetrics
1010
import (
1111
"sync"
1212

13+
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
1314
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
1415
)
1516

@@ -19,7 +20,8 @@ const (
1920
)
2021

2122
// DatadogMetricInternalObserverFunc represents observer functions of the datadog metrics store
22-
type DatadogMetricInternalObserverFunc func(string, string)
23+
// First parameter is the key, second parameter is the sender identifier, third parameter is the object (or nil for delete operations)
24+
type DatadogMetricInternalObserverFunc func(string, autoscaling.SenderID, interface{})
2325

2426
// DatadogMetricInternalObserver allows to define functions to watch changes in Store
2527
type DatadogMetricInternalObserver struct {
@@ -108,23 +110,23 @@ func (ds *DatadogMetricsInternalStore) Count() int {
108110
}
109111

110112
// Set `DatadogMetricInternal` for id
111-
func (ds *DatadogMetricsInternalStore) Set(id string, datadogMetric model.DatadogMetricInternal, sender string) {
113+
func (ds *DatadogMetricsInternalStore) Set(id string, datadogMetric model.DatadogMetricInternal, sender autoscaling.SenderID) {
112114
ds.lock.Lock()
113115
ds.store[id] = datadogMetric
114116
ds.lock.Unlock()
115117

116-
ds.notify(setOperation, id, sender)
118+
ds.notify(setOperation, id, sender, datadogMetric)
117119
}
118120

119121
// Delete `DatadogMetricInternal` corresponding to id if present
120-
func (ds *DatadogMetricsInternalStore) Delete(id, sender string) {
122+
func (ds *DatadogMetricsInternalStore) Delete(id string, sender autoscaling.SenderID) {
121123
ds.lock.Lock()
122-
_, exists := ds.store[id]
124+
obj, exists := ds.store[id]
123125
delete(ds.store, id)
124126
ds.lock.Unlock()
125127

126128
if exists {
127-
ds.notify(deleteOperation, id, sender)
129+
ds.notify(deleteOperation, id, sender, obj)
128130
}
129131
}
130132

@@ -152,31 +154,31 @@ func (ds *DatadogMetricsInternalStore) Unlock(string) {
152154
}
153155

154156
// UnlockSet sets the new DatadogMetricInternal value and releases the lock (previously acquired by `LockRead`)
155-
func (ds *DatadogMetricsInternalStore) UnlockSet(id string, datadogMetric model.DatadogMetricInternal, sender string) {
157+
func (ds *DatadogMetricsInternalStore) UnlockSet(id string, datadogMetric model.DatadogMetricInternal, sender autoscaling.SenderID) {
156158
ds.store[id] = datadogMetric
157159
ds.lock.Unlock()
158160

159-
ds.notify(setOperation, id, sender)
161+
ds.notify(setOperation, id, sender, datadogMetric)
160162
}
161163

162164
// UnlockDelete deletes a DatadogMetricInternal and releases the lock (previously acquired by `LockRead`)
163-
func (ds *DatadogMetricsInternalStore) UnlockDelete(id, sender string) {
164-
_, exists := ds.store[id]
165+
func (ds *DatadogMetricsInternalStore) UnlockDelete(id string, sender autoscaling.SenderID) {
166+
obj, exists := ds.store[id]
165167

166168
delete(ds.store, id)
167169
ds.lock.Unlock()
168170

169171
if exists {
170-
ds.notify(deleteOperation, id, sender)
172+
ds.notify(deleteOperation, id, sender, obj)
171173
}
172174
}
173175

174176
// It's a very simple implementation of a notify process, but it's enough in our case as we aim at only 1 or 2 observers
175-
func (ds *DatadogMetricsInternalStore) notify(operationType storeOperation, key, sender string) {
177+
func (ds *DatadogMetricsInternalStore) notify(operationType storeOperation, key string, sender autoscaling.SenderID, obj interface{}) {
176178
ds.observersLock.RLock()
177179
defer ds.observersLock.RUnlock()
178180

179181
for _, observer := range ds.observers[operationType] {
180-
observer(key, sender)
182+
observer(key, sender, obj)
181183
}
182184
}

0 commit comments

Comments
 (0)