Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@
/pkg/cloudfoundry @DataDog/agent-integrations
/pkg/clusteragent/ @DataDog/container-platform
/pkg/clusteragent/autoscaling/ @DataDog/container-autoscaling
/pkg/clusteragent/metricsstore/ @DataDog/container-autoscaling
/pkg/clusteragent/admission/mutate/autoscaling @DataDog/container-integrations
/pkg/clusteragent/admission/mutate/appsec @DataDog/container-platform @DataDog/asm-go
/pkg/clusteragent/admission/mutate/autoinstrumentation/ @DataDog/container-platform @DataDog/injection-platform
Expand Down
2 changes: 1 addition & 1 deletion cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func start(log log.Component,
log.Error("Admission controller is disabled, vertical autoscaling requires the admission controller to be enabled. Vertical scaling will be disabled.")
}

if adapter, err := provider.StartWorkloadAutoscaling(mainCtx, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, demultiplexer); err == nil {
if adapter, err := provider.StartWorkloadAutoscaling(mainCtx, clusterID, clusterName, le.IsLeader, apiCl, rcClient, wmeta, taggerComp, demultiplexer); err == nil {
pa = adapter
} else {
return fmt.Errorf("Error while starting workload autoscaling: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/clusteragent/autoscaling/cluster/config_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import (

"k8s.io/utils/clock"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/config/remote/data"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
configRetrieverStoreID string = "cacr"
configRetrieverStoreID autoscaling.SenderID = "cacr"
)

// RcClient is a subinterface of rcclient.Component to allow mocking
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/autoscaling/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
// Only support EC2 for now
nodeClassGVR = schema.GroupVersionResource{Group: "karpenter.k8s.aws", Version: "v1", Resource: "ec2nodeclasses"}

controllerID = "dca-c"
controllerID autoscaling.SenderID = "dca-c"
)

type Controller struct {
Expand Down
2 changes: 0 additions & 2 deletions pkg/clusteragent/autoscaling/cluster/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,3 @@ const (
)

var autoscalingQueueMetricsProvider = workqueuetelemetry.NewQueueMetricsProvider()

// TODO add more telemetry collection
6 changes: 3 additions & 3 deletions pkg/clusteragent/autoscaling/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Controller struct {
context context.Context

// Fields available to child controllers
ID string
ID SenderID
Client dynamic.Interface
Lister cache.GenericLister
Workqueue workqueue.TypedRateLimitingInterface[string]
Expand All @@ -39,7 +39,7 @@ type Controller struct {

// NewController returns a new workload autoscaling controller
func NewController(
controllerID string,
controllerID SenderID,
processor Processor,
client dynamic.Interface,
informer dynamicinformer.DynamicSharedInformerFactory,
Expand Down Expand Up @@ -120,7 +120,7 @@ func (c *Controller) enqueue(obj interface{}) {
c.Workqueue.AddRateLimited(key)
}

func (c *Controller) enqueueID(id, sender string) {
func (c *Controller) enqueueID(id string, sender SenderID, _ interface{}) {
// Do not enqueue our own updates (avoid infinite loops)
if sender != c.ID {
log.Tracef("Enqueueing from observer update id: %s from sender: %s", id, sender)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,19 @@ import (

"github.com/DataDog/watermarkpodautoscaler/apis/datadoghq/v1alpha1"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/controllers"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/autoscalers"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
autoscalerWatcherStoreID string = "aw"
autoscalerReferencesSep string = ", "
autoscalerReferencesKindSep string = ":"
autoscalerWPAKindKey string = "wpa"
autoscalerHPAKindKey string = "hpa"
autoscalerWatcherStoreID autoscaling.SenderID = "aw"
autoscalerReferencesSep string = ", "
autoscalerReferencesKindSep string = ":"
autoscalerWPAKindKey string = "wpa"
autoscalerHPAKindKey string = "hpa"
)

// AutoscalerWatcher watches autoscaling objects and reconciles the corresponding external metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"time"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand All @@ -29,9 +30,9 @@ import (
)

const (
maxRetry int = 3
requeueDelaySeconds int = 2
ddmControllerStoreID string = "ddmc"
maxRetry int = 3
requeueDelaySeconds int = 2
ddmControllerStoreID autoscaling.SenderID = "ddmc"
)

type controllerOperation string
Expand Down Expand Up @@ -146,8 +147,8 @@ func (c *DatadogMetricController) enqueue(obj interface{}) {
c.workqueue.AddRateLimited(key)
}

func (c *DatadogMetricController) enqueueID(id, sender string) {
// Do not enqueue our own updates (avoid infinite loops)
func (c *DatadogMetricController) enqueueID(id string, sender autoscaling.SenderID, _ interface{}) {
// Avoid re-enqueuing events triggered by this controller's own store updates
if sender != ddmControllerStoreID {
c.workqueue.AddRateLimited(id)
}
Expand Down Expand Up @@ -352,7 +353,7 @@ func (c *DatadogMetricController) deleteDatadogMetric(ns, name string) error {
return nil
}

func (c *DatadogMetricController) deleteTelemetry(id, _ string) {
func (c *DatadogMetricController) deleteTelemetry(id string, _ autoscaling.SenderID, _ interface{}) {
ns, name, err := cache.SplitMetaNamespaceKey(id)
if err != nil {
log.Debugf("Unable to split meta namespace key to delete telemetry: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"time"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
"github.com/DataDog/datadog-agent/pkg/util/backoff"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
Expand All @@ -19,7 +20,7 @@ import (
)

const (
metricRetrieverStoreID string = "mr"
metricRetrieverStoreID autoscaling.SenderID = "mr"
)

// Backoff range for number of retries R:
Expand Down
28 changes: 15 additions & 13 deletions pkg/clusteragent/autoscaling/externalmetrics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package externalmetrics
import (
"sync"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
)

Expand All @@ -19,7 +20,8 @@ const (
)

// DatadogMetricInternalObserverFunc represents observer functions of the datadog metrics store
type DatadogMetricInternalObserverFunc func(string, string)
// First parameter is the key, second parameter is the sender identifier, third parameter is the object (or nil for delete operations)
type DatadogMetricInternalObserverFunc func(string, autoscaling.SenderID, interface{})

// DatadogMetricInternalObserver allows to define functions to watch changes in Store
type DatadogMetricInternalObserver struct {
Expand Down Expand Up @@ -108,23 +110,23 @@ func (ds *DatadogMetricsInternalStore) Count() int {
}

// Set `DatadogMetricInternal` for id
func (ds *DatadogMetricsInternalStore) Set(id string, datadogMetric model.DatadogMetricInternal, sender string) {
func (ds *DatadogMetricsInternalStore) Set(id string, datadogMetric model.DatadogMetricInternal, sender autoscaling.SenderID) {
ds.lock.Lock()
ds.store[id] = datadogMetric
ds.lock.Unlock()

ds.notify(setOperation, id, sender)
ds.notify(setOperation, id, sender, datadogMetric)
}

// Delete `DatadogMetricInternal` corresponding to id if present
func (ds *DatadogMetricsInternalStore) Delete(id, sender string) {
func (ds *DatadogMetricsInternalStore) Delete(id string, sender autoscaling.SenderID) {
ds.lock.Lock()
_, exists := ds.store[id]
obj, exists := ds.store[id]
delete(ds.store, id)
ds.lock.Unlock()

if exists {
ds.notify(deleteOperation, id, sender)
ds.notify(deleteOperation, id, sender, obj)
}
}

Expand Down Expand Up @@ -152,31 +154,31 @@ func (ds *DatadogMetricsInternalStore) Unlock(string) {
}

// UnlockSet sets the new DatadogMetricInternal value and releases the lock (previously acquired by `LockRead`)
func (ds *DatadogMetricsInternalStore) UnlockSet(id string, datadogMetric model.DatadogMetricInternal, sender string) {
func (ds *DatadogMetricsInternalStore) UnlockSet(id string, datadogMetric model.DatadogMetricInternal, sender autoscaling.SenderID) {
ds.store[id] = datadogMetric
ds.lock.Unlock()

ds.notify(setOperation, id, sender)
ds.notify(setOperation, id, sender, datadogMetric)
}

// UnlockDelete deletes a DatadogMetricInternal and releases the lock (previously acquired by `LockRead`)
func (ds *DatadogMetricsInternalStore) UnlockDelete(id, sender string) {
_, exists := ds.store[id]
func (ds *DatadogMetricsInternalStore) UnlockDelete(id string, sender autoscaling.SenderID) {
obj, exists := ds.store[id]

delete(ds.store, id)
ds.lock.Unlock()

if exists {
ds.notify(deleteOperation, id, sender)
ds.notify(deleteOperation, id, sender, obj)
}
}

// It's a very simple implementation of a notify process, but it's enough in our case as we aim at only 1 or 2 observers
func (ds *DatadogMetricsInternalStore) notify(operationType storeOperation, key, sender string) {
func (ds *DatadogMetricsInternalStore) notify(operationType storeOperation, key string, sender autoscaling.SenderID, obj interface{}) {
ds.observersLock.RLock()
defer ds.observersLock.RUnlock()

for _, observer := range ds.observers[operationType] {
observer(key, sender)
observer(key, sender, obj)
}
}
8 changes: 4 additions & 4 deletions pkg/clusteragent/autoscaling/max_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func NewHashHeap[T any](maxSize int, store *Store[T], creationTimestamp func(*T)
}

// InsertIntoHeap returns true if the key already exists in the max heap or was inserted correctly
// Used as an ObserverFunc; accept sender as parameter to match ObserverFunc signature
func (h *HashHeap[T]) InsertIntoHeap(key, _sender string) {
// Used as an ObserverFunc; accept sender and obj as parameters to match ObserverFunc signature
func (h *HashHeap[T]) InsertIntoHeap(key string, _ SenderID, _ interface{}) {
obj, found := h.store.Get(key)
if !found {
return
Expand Down Expand Up @@ -145,8 +145,8 @@ func (h *HashHeap[T]) InsertIntoHeap(key, _sender string) {
}

// DeleteFromHeap removes the given key from the max heap
// Used as an ObserverFunc; accept sender as parameter to match ObserverFunc signature
func (h *HashHeap[T]) DeleteFromHeap(key, _sender string) {
// Used as an ObserverFunc; accept sender and obj as parameters to match ObserverFunc signature
func (h *HashHeap[T]) DeleteFromHeap(key string, _ SenderID, _ interface{}) {
// Key did not exist in heap, return early
if !h.Exists(key) {
return
Expand Down
44 changes: 26 additions & 18 deletions pkg/clusteragent/autoscaling/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ const (
deleteOperation
)

// SenderID identifies the component that triggered a store change
type SenderID string

// ObserverFunc represents observer functions of the store
type ObserverFunc func(string, string)
// First parameter is the key, second parameter is the sender identifier, third parameter is the object (or nil for delete operations)
type ObserverFunc func(string, SenderID, interface{})

// Observer allows to define functions to watch changes in Store
type Observer struct {
Expand Down Expand Up @@ -101,21 +105,25 @@ func (s *Store[T]) GetFiltered(filter func(T) bool) []T {
// Update updates all objects in the store with the result of the `updator` function.
// Updator func is expected to return the new object and a boolean indicating if the object has changed.
// The object is updated only if boolean is true, observers are notified only for updated objects after all objects have been updated.
func (s *Store[T]) Update(updator func(T) (T, bool), sender string) {
var changedIDs []string
func (s *Store[T]) Update(updator func(T) (T, bool), sender SenderID) {
type change struct {
id string
obj T
}
var changes []change
s.lock.Lock()
for id, object := range s.store {
newObject, changed := updator(object)
if changed {
s.store[id] = newObject
changedIDs = append(changedIDs, id)
changes = append(changes, change{id: id, obj: newObject})
}
}
s.lock.Unlock()

// Notifying must be done after releasing the lock
for _, id := range changedIDs {
s.notify(setOperation, id, sender)
for _, ch := range changes {
s.notify(setOperation, ch.id, sender, ch.obj)
}
}

Expand All @@ -128,23 +136,23 @@ func (s *Store[T]) Count() int {
}

// Set object for id
func (s *Store[T]) Set(id string, obj T, sender string) {
func (s *Store[T]) Set(id string, obj T, sender SenderID) {
s.lock.Lock()
s.store[id] = obj
s.lock.Unlock()

s.notify(setOperation, id, sender)
s.notify(setOperation, id, sender, obj)
}

// Delete object corresponding to id if present
func (s *Store[T]) Delete(id, sender string) {
func (s *Store[T]) Delete(id string, sender SenderID) {
s.lock.Lock()
_, exists := s.store[id]
obj, exists := s.store[id]
delete(s.store, id)
s.lock.Unlock()

if exists {
s.notify(deleteOperation, id, sender)
s.notify(deleteOperation, id, sender, obj)
}
}

Expand All @@ -170,31 +178,31 @@ func (s *Store[T]) Unlock(string) {
}

// UnlockSet sets the new object value and releases the lock (previously acquired by `LockRead`)
func (s *Store[T]) UnlockSet(id string, obj T, sender string) {
func (s *Store[T]) UnlockSet(id string, obj T, sender SenderID) {
s.store[id] = obj
s.lock.Unlock()

s.notify(setOperation, id, sender)
s.notify(setOperation, id, sender, obj)
}

// UnlockDelete deletes an object and releases the lock (previously acquired by `LockRead`)
func (s *Store[T]) UnlockDelete(id, sender string) {
_, exists := s.store[id]
func (s *Store[T]) UnlockDelete(id string, sender SenderID) {
obj, exists := s.store[id]

delete(s.store, id)
s.lock.Unlock()

if exists {
s.notify(deleteOperation, id, sender)
s.notify(deleteOperation, id, sender, obj)
}
}

// It's a very simple implementation of a notify process, but it's enough in our case as we aim at only 1 or 2 observers
func (s *Store[T]) notify(operationType storeOperation, key, sender string) {
func (s *Store[T]) notify(operationType storeOperation, key string, sender SenderID, obj interface{}) {
s.observersLock.RLock()
defer s.observersLock.RUnlock()

for _, observer := range s.observers[operationType] {
observer(key, sender)
observer(key, sender, obj)
}
}
7 changes: 4 additions & 3 deletions pkg/clusteragent/autoscaling/workload/config_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ import (

"k8s.io/utils/clock"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling"
"github.com/DataDog/datadog-agent/pkg/config/remote/data"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

const (
configRetrieverStoreID string = "cr"
settingsReconcileInterval time.Duration = 5 * time.Minute
valuesReconcileInterval time.Duration = 5 * time.Minute
configRetrieverStoreID autoscaling.SenderID = "cr"
settingsReconcileInterval time.Duration = 5 * time.Minute
valuesReconcileInterval time.Duration = 5 * time.Minute
)

// RcClient is a subinterface of rcclient.Component to allow mocking
Expand Down
Loading