Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/controller/api_sync_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"slices"
"strings"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/controller-runtime/pkg/client"

dash0common "github.com/dash0hq/dash0-operator/api/operator/common"
dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1"
"github.com/dash0hq/dash0-operator/internal/resources"
"github.com/dash0hq/dash0-operator/internal/util"
"github.com/dash0hq/dash0-operator/internal/util/logd"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ApiConfig struct {
Expand Down
33 changes: 26 additions & 7 deletions internal/controller/perses_dashboards_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/dash0hq/dash0-operator/internal/selfmonitoringapiaccess"
"github.com/dash0hq/dash0-operator/internal/util"
"github.com/dash0hq/dash0-operator/internal/util/logd"
"github.com/dash0hq/dash0-operator/internal/util/rate"
"github.com/dash0hq/dash0-operator/internal/util/resources"
)

type PersesDashboardCrdReconciler struct {
Expand All @@ -40,6 +42,7 @@ type PersesDashboardCrdReconciler struct {
leaderElectionAware util.LeaderElectionAware
mgr ctrl.Manager
httpClient *http.Client
limiter *rate.CappedRateLimiter
skipNameValidation bool
persesDashboardReconciler *PersesDashboardReconciler
persesDashboardCrdExists atomic.Bool
Expand All @@ -50,6 +53,7 @@ type PersesDashboardReconciler struct {
pseudoClusterUid types.UID
queue *workqueue.Typed[ThirdPartyResourceSyncJob]
httpClient *http.Client
limiter *rate.CappedRateLimiter
defaultApiConfigs selfmonitoringapiaccess.SynchronizedSlice[ApiConfig]
namespacedApiConfigs selfmonitoringapiaccess.SynchronizedMapSlice[ApiConfig]
namespacedSyncEnabled sync.Map
Expand All @@ -67,12 +71,14 @@ func NewPersesDashboardCrdReconciler(
queue *workqueue.Typed[ThirdPartyResourceSyncJob],
leaderElectionAware util.LeaderElectionAware,
httpClient *http.Client,
limiter *rate.CappedRateLimiter,
) *PersesDashboardCrdReconciler {
return &PersesDashboardCrdReconciler{
Client: k8sClient,
queue: queue,
leaderElectionAware: leaderElectionAware,
httpClient: httpClient,
limiter: limiter,
}
}

Expand Down Expand Up @@ -126,6 +132,7 @@ func (r *PersesDashboardCrdReconciler) CreateThirdPartyResourceReconciler(pseudo
queue: r.queue,
pseudoClusterUid: pseudoClusterUid,
httpClient: r.httpClient,
limiter: r.limiter,
defaultApiConfigs: *selfmonitoringapiaccess.NewSynchronizedSlice[ApiConfig](),
namespacedApiConfigs: *selfmonitoringapiaccess.NewSynchronizedMapSlice[ApiConfig](),
}
Expand Down Expand Up @@ -374,11 +381,15 @@ func (r *PersesDashboardReconciler) Create(
e event.TypedCreateEvent[*unstructured.Unstructured],
_ workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
logger := logd.FromContext(ctx)
if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) {
return
}

if persesDashboardReconcileRequestMetric != nil {
persesDashboardReconcileRequestMetric.Add(ctx, 1)
}

logger := logd.FromContext(ctx)
logger.Info(
"Detected a new Perses dashboard resource",
"namespace",
Expand All @@ -395,11 +406,15 @@ func (r *PersesDashboardReconciler) Update(
e event.TypedUpdateEvent[*unstructured.Unstructured],
_ workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
logger := logd.FromContext(ctx)
if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.ObjectNew), logger) {
return
}

if persesDashboardReconcileRequestMetric != nil {
persesDashboardReconcileRequestMetric.Add(ctx, 1)
}

logger := logd.FromContext(ctx)
logger.Info(
"Detected a change for a Perses dashboard resource",
"namespace",
Expand All @@ -416,11 +431,15 @@ func (r *PersesDashboardReconciler) Delete(
e event.TypedDeleteEvent[*unstructured.Unstructured],
_ workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
logger := logd.FromContext(ctx)
if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) {
return
}

if persesDashboardReconcileRequestMetric != nil {
persesDashboardReconcileRequestMetric.Add(ctx, 1)
}

logger := logd.FromContext(ctx)
logger.Info(
"Detected the deletion of a Perses dashboard resource",
"namespace",
Expand All @@ -437,11 +456,14 @@ func (r *PersesDashboardReconciler) Generic(
e event.TypedGenericEvent[*unstructured.Unstructured],
_ workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
logger := logd.FromContext(ctx)
if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) {
return
}
if persesDashboardReconcileRequestMetric != nil {
persesDashboardReconcileRequestMetric.Add(ctx, 1)
}

logger := logd.FromContext(ctx)
logger.Info(
"Reconciling dashboard triggered by config event (updated API config or authorization).",
"namespace",
Expand Down Expand Up @@ -723,9 +745,6 @@ func (r *PersesDashboardReconciler) synchronizeNamespacedResources(
Object: dashboardResource,
}
r.Generic(ctx, evt, nil)

// stagger API requests a bit
time.Sleep(50 * time.Millisecond)
}
logger.Info(fmt.Sprintf("Triggering synchronization of dashboards in namespace %s has finished.", namespace))
}()
Expand Down
7 changes: 6 additions & 1 deletion internal/controller/perses_dashboards_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1"
"github.com/dash0hq/dash0-operator/internal/util"
"github.com/dash0hq/dash0-operator/internal/util/logd"
"github.com/dash0hq/dash0-operator/internal/util/rate"

"github.com/h2non/gock"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -392,7 +393,10 @@ var _ = Describe(

Expect(persesDashboardCrdReconciler.SetupWithManager(ctx, mgr, k8sClient, logger)).To(Succeed())

StartProcessingThirdPartySynchronizationQueue(testQueuePersesDashboards, logger)
StartProcessingThirdPartySynchronizationQueue(
testQueuePersesDashboards,
logger,
)
},
)

Expand Down Expand Up @@ -1018,6 +1022,7 @@ func createPersesDashboardCrdReconciler() *PersesDashboardCrdReconciler {
testQueuePersesDashboards,
&DummyLeaderElectionAware{Leader: true},
TestHTTPClient(),
rate.NewNoOpCappedRateLimiter(),
)

// We create the controller multiple times in tests, this option is required, otherwise the controller
Expand Down
36 changes: 27 additions & 9 deletions internal/controller/prometheus_rules_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/dash0hq/dash0-operator/internal/selfmonitoringapiaccess"
"github.com/dash0hq/dash0-operator/internal/util"
"github.com/dash0hq/dash0-operator/internal/util/logd"
"github.com/dash0hq/dash0-operator/internal/util/rate"
"github.com/dash0hq/dash0-operator/internal/util/resources"
)

type PrometheusRuleCrdReconciler struct {
Expand All @@ -45,6 +47,7 @@ type PrometheusRuleCrdReconciler struct {
leaderElectionAware util.LeaderElectionAware
mgr ctrl.Manager
httpClient *http.Client
limiter *rate.CappedRateLimiter
skipNameValidation bool
prometheusRuleReconciler *PrometheusRuleReconciler
prometheusRuleCrdExists atomic.Bool
Expand All @@ -55,6 +58,7 @@ type PrometheusRuleReconciler struct {
pseudoClusterUid types.UID
queue *workqueue.Typed[ThirdPartyResourceSyncJob]
httpClient *http.Client
limiter *rate.CappedRateLimiter
defaultApiConfigs selfmonitoringapiaccess.SynchronizedSlice[ApiConfig]
namespacedApiConfigs selfmonitoringapiaccess.SynchronizedMapSlice[ApiConfig]
namespacedSyncEnabled sync.Map
Expand Down Expand Up @@ -92,12 +96,14 @@ func NewPrometheusRuleCrdReconciler(
queue *workqueue.Typed[ThirdPartyResourceSyncJob],
leaderElectionAware util.LeaderElectionAware,
httpClient *http.Client,
limiter *rate.CappedRateLimiter,
) *PrometheusRuleCrdReconciler {
return &PrometheusRuleCrdReconciler{
Client: k8sClient,
queue: queue,
leaderElectionAware: leaderElectionAware,
httpClient: httpClient,
limiter: limiter,
}
}

Expand Down Expand Up @@ -151,6 +157,7 @@ func (r *PrometheusRuleCrdReconciler) CreateThirdPartyResourceReconciler(pseudoC
queue: r.queue,
pseudoClusterUid: pseudoClusterUid,
httpClient: r.httpClient,
limiter: r.limiter,
defaultApiConfigs: *selfmonitoringapiaccess.NewSynchronizedSlice[ApiConfig](),
namespacedApiConfigs: *selfmonitoringapiaccess.NewSynchronizedMapSlice[ApiConfig](),
}
Expand Down Expand Up @@ -399,11 +406,15 @@ func (r *PrometheusRuleReconciler) Create(
e event.TypedCreateEvent[*unstructured.Unstructured],
_ workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
logger := logd.FromContext(ctx)
if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) {
return
}

if prometheusRuleReconcileRequestMetric != nil {
prometheusRuleReconcileRequestMetric.Add(ctx, 1)
}

logger := logd.FromContext(ctx)
logger.Info(
"Detected a new Prometheus rule resource",
"namespace",
Expand All @@ -420,11 +431,14 @@ func (r *PrometheusRuleReconciler) Update(
e event.TypedUpdateEvent[*unstructured.Unstructured],
_ workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
logger := logd.FromContext(ctx)
if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.ObjectNew), logger) {
return
}
if prometheusRuleReconcileRequestMetric != nil {
prometheusRuleReconcileRequestMetric.Add(ctx, 1)
}

logger := logd.FromContext(ctx)
logger.Info(
"Detected a change for a Prometheus rule resource",
"namespace",
Expand All @@ -441,11 +455,14 @@ func (r *PrometheusRuleReconciler) Delete(
e event.TypedDeleteEvent[*unstructured.Unstructured],
_ workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
logger := logd.FromContext(ctx)
if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) {
return
}
if prometheusRuleReconcileRequestMetric != nil {
prometheusRuleReconcileRequestMetric.Add(ctx, 1)
}

logger := logd.FromContext(ctx)
logger.Info(
"Detected the deletion of a Prometheus rule resource",
"namespace",
Expand All @@ -462,11 +479,15 @@ func (r *PrometheusRuleReconciler) Generic(
e event.TypedGenericEvent[*unstructured.Unstructured],
_ workqueue.TypedRateLimitingInterface[reconcile.Request],
) {
logger := logd.FromContext(ctx)
if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) {
return
}

if prometheusRuleReconcileRequestMetric != nil {
prometheusRuleReconcileRequestMetric.Add(ctx, 1)
}

logger := logd.FromContext(ctx)
logger.Info(
"Reconciling check rule triggered by config event (updated API config or authorization).",
"namespace",
Expand Down Expand Up @@ -559,7 +580,7 @@ func (r *PrometheusRuleReconciler) MapResourceToHttpRequests(
allValidationIssues[checkRuleName] = validationIssues
// If a rule becomes temporarily invalid due to a bad edit, we do not want to delete it in Dash0
// (assuming it has been valid at some point before and has been synchronized). Instead, we keep the
// most recent valid state. To do that, we add its id to the list of ids we have seen (the list will
// most recent valid state. To do that, we add its id to the list of ids we have seen. The list will
// later be used to determine which of the checks in Dash0 need to be removed because they are no longer
// in the K8s resource.
originsInResource = append(originsInResource, checkRuleOriginNotUrlEncoded)
Expand All @@ -568,7 +589,7 @@ func (r *PrometheusRuleReconciler) MapResourceToHttpRequests(
if syncError != nil {
// If a rule cannot be synchronized temporarily, we do not want to delete it in Dash0 immediately
// (assuming the rule has been synchronized before at some point). Instead, we keep the
// most recent state. To do that, we add its id to the list of ids we have seen (the list will later
// most recent state. To do that, we add its id to the list of ids we have seen. The list will later
// be used to determine which of the checks in Dash0 need to be removed because they are no longer
// in the K8s resource.
allSynchronizationErrors[checkRuleName] = syncError.Error()
Expand Down Expand Up @@ -1133,9 +1154,6 @@ func (r *PrometheusRuleReconciler) synchronizeNamespacedResources(
Object: ruleResource,
}
r.Generic(ctx, evt, nil)

// stagger API requests a bit
time.Sleep(50 * time.Millisecond)
}
logger.Info(fmt.Sprintf("Triggering synchronization of check rules in namespace %s has finished.", namespace))
}()
Expand Down
7 changes: 6 additions & 1 deletion internal/controller/prometheus_rules_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1"
"github.com/dash0hq/dash0-operator/internal/util"
"github.com/dash0hq/dash0-operator/internal/util/logd"
"github.com/dash0hq/dash0-operator/internal/util/rate"

"github.com/h2non/gock"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -397,7 +398,10 @@ var _ = Describe(

Expect(prometheusRuleCrdReconciler.SetupWithManager(ctx, mgr, k8sClient, logger)).To(Succeed())

StartProcessingThirdPartySynchronizationQueue(testQueuePrometheusRules, logger)
StartProcessingThirdPartySynchronizationQueue(
testQueuePrometheusRules,
logger,
)
},
)

Expand Down Expand Up @@ -2215,6 +2219,7 @@ func createPrometheusRuleCrdReconciler() *PrometheusRuleCrdReconciler {
testQueuePrometheusRules,
&DummyLeaderElectionAware{Leader: true},
TestHTTPClient(),
rate.NewNoOpCappedRateLimiter(),
)

// We create the controller multiple times in tests, this option is required, otherwise the controller
Expand Down
Loading
Loading