diff --git a/internal/controller/api_sync_common.go b/internal/controller/api_sync_common.go index 221d92933..ff2f1aaac 100644 --- a/internal/controller/api_sync_common.go +++ b/internal/controller/api_sync_common.go @@ -13,13 +13,14 @@ import ( "slices" "strings" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/controller-runtime/pkg/client" + dash0common "github.com/dash0hq/dash0-operator/api/operator/common" dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1" "github.com/dash0hq/dash0-operator/internal/resources" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "sigs.k8s.io/controller-runtime/pkg/client" ) type ApiConfig struct { diff --git a/internal/controller/perses_dashboards_controller.go b/internal/controller/perses_dashboards_controller.go index 5a7643e2d..ee39681e1 100644 --- a/internal/controller/perses_dashboards_controller.go +++ b/internal/controller/perses_dashboards_controller.go @@ -32,6 +32,8 @@ import ( "github.com/dash0hq/dash0-operator/internal/selfmonitoringapiaccess" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" + "github.com/dash0hq/dash0-operator/internal/util/rate" + "github.com/dash0hq/dash0-operator/internal/util/resources" ) type PersesDashboardCrdReconciler struct { @@ -40,6 +42,7 @@ type PersesDashboardCrdReconciler struct { leaderElectionAware util.LeaderElectionAware mgr ctrl.Manager httpClient *http.Client + limiter *rate.CappedRateLimiter skipNameValidation bool persesDashboardReconciler *PersesDashboardReconciler persesDashboardCrdExists atomic.Bool @@ -50,6 +53,7 @@ type PersesDashboardReconciler struct { pseudoClusterUid types.UID queue *workqueue.Typed[ThirdPartyResourceSyncJob] httpClient *http.Client + limiter *rate.CappedRateLimiter defaultApiConfigs selfmonitoringapiaccess.SynchronizedSlice[ApiConfig] namespacedApiConfigs selfmonitoringapiaccess.SynchronizedMapSlice[ApiConfig] namespacedSyncEnabled sync.Map @@ -67,12 +71,14 @@ func NewPersesDashboardCrdReconciler( queue *workqueue.Typed[ThirdPartyResourceSyncJob], leaderElectionAware util.LeaderElectionAware, httpClient *http.Client, + limiter *rate.CappedRateLimiter, ) *PersesDashboardCrdReconciler { return &PersesDashboardCrdReconciler{ Client: k8sClient, queue: queue, leaderElectionAware: leaderElectionAware, httpClient: httpClient, + limiter: limiter, } } @@ -126,6 +132,7 @@ func (r *PersesDashboardCrdReconciler) CreateThirdPartyResourceReconciler(pseudo queue: r.queue, pseudoClusterUid: pseudoClusterUid, httpClient: r.httpClient, + limiter: r.limiter, defaultApiConfigs: *selfmonitoringapiaccess.NewSynchronizedSlice[ApiConfig](), namespacedApiConfigs: *selfmonitoringapiaccess.NewSynchronizedMapSlice[ApiConfig](), } @@ -374,11 +381,15 @@ func (r *PersesDashboardReconciler) Create( e event.TypedCreateEvent[*unstructured.Unstructured], _ workqueue.TypedRateLimitingInterface[reconcile.Request], ) { + logger := logd.FromContext(ctx) + if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) { + return + } + if persesDashboardReconcileRequestMetric != nil { persesDashboardReconcileRequestMetric.Add(ctx, 1) } - logger := logd.FromContext(ctx) logger.Info( "Detected a new Perses dashboard resource", "namespace", @@ -395,11 +406,15 @@ func (r *PersesDashboardReconciler) Update( e event.TypedUpdateEvent[*unstructured.Unstructured], _ workqueue.TypedRateLimitingInterface[reconcile.Request], ) { + logger := logd.FromContext(ctx) + if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.ObjectNew), logger) { + return + } + if persesDashboardReconcileRequestMetric != nil { persesDashboardReconcileRequestMetric.Add(ctx, 1) } - logger := logd.FromContext(ctx) logger.Info( "Detected a change for a Perses dashboard resource", "namespace", @@ -416,11 +431,15 @@ func (r *PersesDashboardReconciler) Delete( e event.TypedDeleteEvent[*unstructured.Unstructured], _ workqueue.TypedRateLimitingInterface[reconcile.Request], ) { + logger := logd.FromContext(ctx) + if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) { + return + } + if persesDashboardReconcileRequestMetric != nil { persesDashboardReconcileRequestMetric.Add(ctx, 1) } - logger := logd.FromContext(ctx) logger.Info( "Detected the deletion of a Perses dashboard resource", "namespace", @@ -437,11 +456,14 @@ func (r *PersesDashboardReconciler) Generic( e event.TypedGenericEvent[*unstructured.Unstructured], _ workqueue.TypedRateLimitingInterface[reconcile.Request], ) { + logger := logd.FromContext(ctx) + if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) { + return + } if persesDashboardReconcileRequestMetric != nil { persesDashboardReconcileRequestMetric.Add(ctx, 1) } - logger := logd.FromContext(ctx) logger.Info( "Reconciling dashboard triggered by config event (updated API config or authorization).", "namespace", @@ -723,9 +745,6 @@ func (r *PersesDashboardReconciler) synchronizeNamespacedResources( Object: dashboardResource, } r.Generic(ctx, evt, nil) - - // stagger API requests a bit - time.Sleep(50 * time.Millisecond) } logger.Info(fmt.Sprintf("Triggering synchronization of dashboards in namespace %s has finished.", namespace)) }() diff --git a/internal/controller/perses_dashboards_controller_test.go b/internal/controller/perses_dashboards_controller_test.go index 39809a175..eca41e33c 100644 --- a/internal/controller/perses_dashboards_controller_test.go +++ b/internal/controller/perses_dashboards_controller_test.go @@ -27,6 +27,7 @@ import ( dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" + "github.com/dash0hq/dash0-operator/internal/util/rate" "github.com/h2non/gock" . "github.com/onsi/ginkgo/v2" @@ -392,7 +393,10 @@ var _ = Describe( Expect(persesDashboardCrdReconciler.SetupWithManager(ctx, mgr, k8sClient, logger)).To(Succeed()) - StartProcessingThirdPartySynchronizationQueue(testQueuePersesDashboards, logger) + StartProcessingThirdPartySynchronizationQueue( + testQueuePersesDashboards, + logger, + ) }, ) @@ -1018,6 +1022,7 @@ func createPersesDashboardCrdReconciler() *PersesDashboardCrdReconciler { testQueuePersesDashboards, &DummyLeaderElectionAware{Leader: true}, TestHTTPClient(), + rate.NewNoOpCappedRateLimiter(), ) // We create the controller multiple times in tests, this option is required, otherwise the controller diff --git a/internal/controller/prometheus_rules_controller.go b/internal/controller/prometheus_rules_controller.go index 4763a7db2..5143a76c7 100644 --- a/internal/controller/prometheus_rules_controller.go +++ b/internal/controller/prometheus_rules_controller.go @@ -37,6 +37,8 @@ import ( "github.com/dash0hq/dash0-operator/internal/selfmonitoringapiaccess" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" + "github.com/dash0hq/dash0-operator/internal/util/rate" + "github.com/dash0hq/dash0-operator/internal/util/resources" ) type PrometheusRuleCrdReconciler struct { @@ -45,6 +47,7 @@ type PrometheusRuleCrdReconciler struct { leaderElectionAware util.LeaderElectionAware mgr ctrl.Manager httpClient *http.Client + limiter *rate.CappedRateLimiter skipNameValidation bool prometheusRuleReconciler *PrometheusRuleReconciler prometheusRuleCrdExists atomic.Bool @@ -55,6 +58,7 @@ type PrometheusRuleReconciler struct { pseudoClusterUid types.UID queue *workqueue.Typed[ThirdPartyResourceSyncJob] httpClient *http.Client + limiter *rate.CappedRateLimiter defaultApiConfigs selfmonitoringapiaccess.SynchronizedSlice[ApiConfig] namespacedApiConfigs selfmonitoringapiaccess.SynchronizedMapSlice[ApiConfig] namespacedSyncEnabled sync.Map @@ -92,12 +96,14 @@ func NewPrometheusRuleCrdReconciler( queue *workqueue.Typed[ThirdPartyResourceSyncJob], leaderElectionAware util.LeaderElectionAware, httpClient *http.Client, + limiter *rate.CappedRateLimiter, ) *PrometheusRuleCrdReconciler { return &PrometheusRuleCrdReconciler{ Client: k8sClient, queue: queue, leaderElectionAware: leaderElectionAware, httpClient: httpClient, + limiter: limiter, } } @@ -151,6 +157,7 @@ func (r *PrometheusRuleCrdReconciler) CreateThirdPartyResourceReconciler(pseudoC queue: r.queue, pseudoClusterUid: pseudoClusterUid, httpClient: r.httpClient, + limiter: r.limiter, defaultApiConfigs: *selfmonitoringapiaccess.NewSynchronizedSlice[ApiConfig](), namespacedApiConfigs: *selfmonitoringapiaccess.NewSynchronizedMapSlice[ApiConfig](), } @@ -399,11 +406,15 @@ func (r *PrometheusRuleReconciler) Create( e event.TypedCreateEvent[*unstructured.Unstructured], _ workqueue.TypedRateLimitingInterface[reconcile.Request], ) { + logger := logd.FromContext(ctx) + if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) { + return + } + if prometheusRuleReconcileRequestMetric != nil { prometheusRuleReconcileRequestMetric.Add(ctx, 1) } - logger := logd.FromContext(ctx) logger.Info( "Detected a new Prometheus rule resource", "namespace", @@ -420,11 +431,14 @@ func (r *PrometheusRuleReconciler) Update( e event.TypedUpdateEvent[*unstructured.Unstructured], _ workqueue.TypedRateLimitingInterface[reconcile.Request], ) { + logger := logd.FromContext(ctx) + if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.ObjectNew), logger) { + return + } if prometheusRuleReconcileRequestMetric != nil { prometheusRuleReconcileRequestMetric.Add(ctx, 1) } - logger := logd.FromContext(ctx) logger.Info( "Detected a change for a Prometheus rule resource", "namespace", @@ -441,11 +455,14 @@ func (r *PrometheusRuleReconciler) Delete( e event.TypedDeleteEvent[*unstructured.Unstructured], _ workqueue.TypedRateLimitingInterface[reconcile.Request], ) { + logger := logd.FromContext(ctx) + if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) { + return + } if prometheusRuleReconcileRequestMetric != nil { prometheusRuleReconcileRequestMetric.Add(ctx, 1) } - logger := logd.FromContext(ctx) logger.Info( "Detected the deletion of a Prometheus rule resource", "namespace", @@ -462,11 +479,15 @@ func (r *PrometheusRuleReconciler) Generic( e event.TypedGenericEvent[*unstructured.Unstructured], _ workqueue.TypedRateLimitingInterface[reconcile.Request], ) { + logger := logd.FromContext(ctx) + if !r.limiter.Wait(ctx, r.KindDisplayName(), resources.QualifiedResourceName(e.Object), logger) { + return + } + if prometheusRuleReconcileRequestMetric != nil { prometheusRuleReconcileRequestMetric.Add(ctx, 1) } - logger := logd.FromContext(ctx) logger.Info( "Reconciling check rule triggered by config event (updated API config or authorization).", "namespace", @@ -559,7 +580,7 @@ func (r *PrometheusRuleReconciler) MapResourceToHttpRequests( allValidationIssues[checkRuleName] = validationIssues // If a rule becomes temporarily invalid due to a bad edit, we do not want to delete it in Dash0 // (assuming it has been valid at some point before and has been synchronized). Instead, we keep the - // most recent valid state. To do that, we add its id to the list of ids we have seen (the list will + // most recent valid state. To do that, we add its id to the list of ids we have seen. The list will // later be used to determine which of the checks in Dash0 need to be removed because they are no longer // in the K8s resource. originsInResource = append(originsInResource, checkRuleOriginNotUrlEncoded) @@ -568,7 +589,7 @@ func (r *PrometheusRuleReconciler) MapResourceToHttpRequests( if syncError != nil { // If a rule cannot be synchronized temporarily, we do not want to delete it in Dash0 immediately // (assuming the rule has been synchronized before at some point). Instead, we keep the - // most recent state. To do that, we add its id to the list of ids we have seen (the list will later + // most recent state. To do that, we add its id to the list of ids we have seen. The list will later // be used to determine which of the checks in Dash0 need to be removed because they are no longer // in the K8s resource. allSynchronizationErrors[checkRuleName] = syncError.Error() @@ -1133,9 +1154,6 @@ func (r *PrometheusRuleReconciler) synchronizeNamespacedResources( Object: ruleResource, } r.Generic(ctx, evt, nil) - - // stagger API requests a bit - time.Sleep(50 * time.Millisecond) } logger.Info(fmt.Sprintf("Triggering synchronization of check rules in namespace %s has finished.", namespace)) }() diff --git a/internal/controller/prometheus_rules_controller_test.go b/internal/controller/prometheus_rules_controller_test.go index 51242c98e..96fc038ed 100644 --- a/internal/controller/prometheus_rules_controller_test.go +++ b/internal/controller/prometheus_rules_controller_test.go @@ -31,6 +31,7 @@ import ( dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" + "github.com/dash0hq/dash0-operator/internal/util/rate" "github.com/h2non/gock" . "github.com/onsi/ginkgo/v2" @@ -397,7 +398,10 @@ var _ = Describe( Expect(prometheusRuleCrdReconciler.SetupWithManager(ctx, mgr, k8sClient, logger)).To(Succeed()) - StartProcessingThirdPartySynchronizationQueue(testQueuePrometheusRules, logger) + StartProcessingThirdPartySynchronizationQueue( + testQueuePrometheusRules, + logger, + ) }, ) @@ -2215,6 +2219,7 @@ func createPrometheusRuleCrdReconciler() *PrometheusRuleCrdReconciler { testQueuePrometheusRules, &DummyLeaderElectionAware{Leader: true}, TestHTTPClient(), + rate.NewNoOpCappedRateLimiter(), ) // We create the controller multiple times in tests, this option is required, otherwise the controller diff --git a/internal/controller/synthetic_check_controller.go b/internal/controller/synthetic_check_controller.go index b1a5dec48..873a9945c 100644 --- a/internal/controller/synthetic_check_controller.go +++ b/internal/controller/synthetic_check_controller.go @@ -34,6 +34,7 @@ import ( "github.com/dash0hq/dash0-operator/internal/selfmonitoringapiaccess" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" + "github.com/dash0hq/dash0-operator/internal/util/rate" ) type SyntheticCheckReconciler struct { @@ -41,6 +42,7 @@ type SyntheticCheckReconciler struct { pseudoClusterUid types.UID leaderElectionAware util.LeaderElectionAware httpClient *http.Client + limiter *rate.CappedRateLimiter defaultApiConfigs selfmonitoringapiaccess.SynchronizedSlice[ApiConfig] namespacedApiConfigs selfmonitoringapiaccess.SynchronizedMapSlice[ApiConfig] initialSyncMutex sync.Mutex @@ -57,12 +59,14 @@ func NewSyntheticCheckReconciler( pseudoClusterUid types.UID, leaderElectionAware util.LeaderElectionAware, httpClient *http.Client, + limiter *rate.CappedRateLimiter, ) *SyntheticCheckReconciler { return &SyntheticCheckReconciler{ Client: k8sClient, pseudoClusterUid: pseudoClusterUid, leaderElectionAware: leaderElectionAware, httpClient: httpClient, + limiter: limiter, defaultApiConfigs: *selfmonitoringapiaccess.NewSynchronizedSlice[ApiConfig](), namespacedApiConfigs: *selfmonitoringapiaccess.NewSynchronizedMapSlice[ApiConfig](), namespacedSyncMutex: *selfmonitoringapiaccess.NewNamespaceMutex(), @@ -231,9 +235,8 @@ func (r *SyntheticCheckReconciler) maybeDoInitialSynchronizationOfAllResources( }, } _, _ = r.Reconcile(ctx, pseudoReconcileRequest) - // stagger API requests a bit - time.Sleep(50 * time.Millisecond) } + logger.Info("Initial synchronization of synthetic checks has finished.") r.initialSyncHasHappend.Store(true) }() @@ -284,20 +287,23 @@ func (r *SyntheticCheckReconciler) synchronizeNamespacedResources( }, } _, _ = r.Reconcile(ctx, pseudoReconcileRequest) - // stagger API requests a bit - time.Sleep(50 * time.Millisecond) } logger.Info(fmt.Sprintf("Synchronization of synthetic checks in namespace %s has finished.", namespace)) }() } func (r *SyntheticCheckReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + qualifiedName := req.NamespacedName.String() //nolint:staticcheck + logger := logd.FromContext(ctx) + + if !r.limiter.Wait(ctx, r.KindDisplayName(), qualifiedName, logger) { + return reconcile.Result{}, nil + } + if syntheticCheckReconcileRequestMetric != nil { syntheticCheckReconcileRequestMetric.Add(ctx, 1) } - qualifiedName := req.NamespacedName.String() //nolint:staticcheck - logger := logd.FromContext(ctx) logger.Info("processing reconcile request for a synthetic check resource", "name", qualifiedName) action := upsertAction diff --git a/internal/controller/synthetic_check_controller_test.go b/internal/controller/synthetic_check_controller_test.go index 00f261adc..ec2427e36 100644 --- a/internal/controller/synthetic_check_controller_test.go +++ b/internal/controller/synthetic_check_controller_test.go @@ -22,6 +22,7 @@ import ( dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/operator/v1alpha1" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" + "github.com/dash0hq/dash0-operator/internal/util/rate" "github.com/h2non/gock" . "github.com/onsi/ginkgo/v2" @@ -831,6 +832,7 @@ func createSyntheticCheckReconciler(clusterId string) *SyntheticCheckReconciler types.UID(clusterId), leaderElectionAware, TestHTTPClient(), + rate.NewNoOpCappedRateLimiter(), ) return syntheticCheckReconciler } diff --git a/internal/controller/third_party_crd_common.go b/internal/controller/third_party_crd_common.go index 8f1d98c6f..5c5a75eff 100644 --- a/internal/controller/third_party_crd_common.go +++ b/internal/controller/third_party_crd_common.go @@ -429,8 +429,12 @@ func upsertViaApi( // each for a Perses dashboard and a Prometheus rules might be processed concurrently. All resource synchronization // attempts that are related to third-party resources types (Prometheus rules, Perses dashboards) end with writing // to the Dash0 monitoring resource status in the same namespace. These write attempts will fail if they happen - // concurrently. Therefore, we add all synchronization attempts to one queue that is shared across resource types, - // and only process them one at a time. + // concurrently. Therefore, we add all synchronization attempts to one queue that is shared across third-party + // resource types, and only process them one at a time. + // This queueing is independent of the rate limiting implemented by CappedRateLimer, which spreads out API request for + // all types (not only third-party CRDs) to avoid running into Dash0 API rate limits. (CappedRateLimer does not offer + // a guarantee that API sync operations including updating the Dash0 monitoring resource's status are handled + // sequentially.) thirdPartyResourceReconciler.Queue().Add( ThirdPartyResourceSyncJob{ thirdPartyResourceReconciler: thirdPartyResourceReconciler, @@ -444,7 +448,7 @@ func deleteViaApi( thirdPartyResourceReconciler ThirdPartyResourceReconciler, dash0ApiResource *unstructured.Unstructured, ) { - // See comment in upsertViaApi for an explanation why we use a shared queue for all resource types. + // See comment in upsertViaApi for an explanation why we use a shared queue for all third-pary resource types. thirdPartyResourceReconciler.Queue().Add( ThirdPartyResourceSyncJob{ thirdPartyResourceReconciler: thirdPartyResourceReconciler, diff --git a/internal/controller/view_controller.go b/internal/controller/view_controller.go index a24809c33..57353cf20 100644 --- a/internal/controller/view_controller.go +++ b/internal/controller/view_controller.go @@ -34,6 +34,7 @@ import ( "github.com/dash0hq/dash0-operator/internal/selfmonitoringapiaccess" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" + "github.com/dash0hq/dash0-operator/internal/util/rate" ) type ViewReconciler struct { @@ -41,6 +42,7 @@ type ViewReconciler struct { pseudoClusterUid types.UID leaderElectionAware util.LeaderElectionAware httpClient *http.Client + limiter *rate.CappedRateLimiter defaultApiConfigs selfmonitoringapiaccess.SynchronizedSlice[ApiConfig] namespacedApiConfigs selfmonitoringapiaccess.SynchronizedMapSlice[ApiConfig] initialSyncMutex sync.Mutex @@ -57,12 +59,14 @@ func NewViewReconciler( pseudoClusterUid types.UID, leaderElectionAware util.LeaderElectionAware, httpClient *http.Client, + limiter *rate.CappedRateLimiter, ) *ViewReconciler { return &ViewReconciler{ Client: k8sClient, pseudoClusterUid: pseudoClusterUid, leaderElectionAware: leaderElectionAware, httpClient: httpClient, + limiter: limiter, defaultApiConfigs: *selfmonitoringapiaccess.NewSynchronizedSlice[ApiConfig](), namespacedApiConfigs: *selfmonitoringapiaccess.NewSynchronizedMapSlice[ApiConfig](), namespacedSyncMutex: *selfmonitoringapiaccess.NewNamespaceMutex(), @@ -220,8 +224,6 @@ func (r *ViewReconciler) maybeDoInitialSynchronizationOfAllResources(ctx context }, } _, _ = r.Reconcile(ctx, pseudoReconcileRequest) - // stagger API requests a bit - time.Sleep(50 * time.Millisecond) } logger.Info("Initial synchronization of views has finished.") r.initialSyncHasHappend.Store(true) @@ -269,20 +271,22 @@ func (r *ViewReconciler) synchronizeNamespacedResources(ctx context.Context, nam }, } _, _ = r.Reconcile(ctx, pseudoReconcileRequest) - // stagger API requests a bit - time.Sleep(50 * time.Millisecond) } logger.Info(fmt.Sprintf("Synchronization of views in namespace %s has finished.", namespace)) }() } func (r *ViewReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + qualifiedName := req.NamespacedName.String() //nolint:staticcheck + logger := logd.FromContext(ctx) + if !r.limiter.Wait(ctx, r.KindDisplayName(), qualifiedName, logger) { + return reconcile.Result{}, nil + } + if viewReconcileRequestMetric != nil { viewReconcileRequestMetric.Add(ctx, 1) } - qualifiedName := req.NamespacedName.String() //nolint:staticcheck - logger := logd.FromContext(ctx) logger.Info("processing reconcile request for a view resource", "name", qualifiedName) action := upsertAction diff --git a/internal/controller/view_controller_test.go b/internal/controller/view_controller_test.go index a96a7024e..f242a0496 100644 --- a/internal/controller/view_controller_test.go +++ b/internal/controller/view_controller_test.go @@ -22,6 +22,7 @@ import ( dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/operator/v1alpha1" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" + "github.com/dash0hq/dash0-operator/internal/util/rate" "github.com/h2non/gock" . "github.com/onsi/ginkgo/v2" @@ -1009,6 +1010,7 @@ func createViewReconciler(clusterId string) *ViewReconciler { types.UID(clusterId), viewLeaderElectionAware, TestHTTPClient(), + rate.NewNoOpCappedRateLimiter(), ) return viewReconciler } diff --git a/internal/startup/operator_manager_startup.go b/internal/startup/operator_manager_startup.go index 14033d07d..0066712c6 100644 --- a/internal/startup/operator_manager_startup.go +++ b/internal/startup/operator_manager_startup.go @@ -16,7 +16,7 @@ import ( "strings" "time" - dash0 "github.com/dash0hq/dash0-api-client-go" + "github.com/dash0hq/dash0-api-client-go" "github.com/go-logr/zapr" persesv1alpha1 "github.com/perses/perses-operator/api/v1alpha1" prometheusv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" @@ -55,6 +55,7 @@ import ( "github.com/dash0hq/dash0-operator/internal/targetallocator/taresources" "github.com/dash0hq/dash0-operator/internal/util" "github.com/dash0hq/dash0-operator/internal/util/logd" + "github.com/dash0hq/dash0-operator/internal/util/rate" zaputil "github.com/dash0hq/dash0-operator/internal/util/zap" "github.com/dash0hq/dash0-operator/internal/webhooks" ) @@ -1258,11 +1259,16 @@ func startDash0Controllers( } } + // Shared rate limiter for all API-sync reconcilers (Perses dashboards, Prometheus rules, synthetic checks, views, + // etc.). It enforces a maximum of 500 API sync operations per 5-minute window. + apiSyncRateLimiter := rate.NewDefaultCappedRateLimiter() + syntheticCheckReconciler := controller.NewSyntheticCheckReconciler( k8sClient, clusterUid, leaderElectionAwareRunnable, httpClient, + apiSyncRateLimiter, ) if err := syntheticCheckReconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to set up the synthetic check reconciler: %w", err) @@ -1274,6 +1280,7 @@ func startDash0Controllers( clusterUid, leaderElectionAwareRunnable, httpClient, + apiSyncRateLimiter, ) if err := viewReconciler.SetupWithManager(mgr); err != nil { return fmt.Errorf("unable to set up the view reconciler: %w", err) @@ -1292,6 +1299,7 @@ func startDash0Controllers( thirdPartyResourceSynchronizationQueue, leaderElectionAwareRunnable, httpClient, + apiSyncRateLimiter, ) if err := persesDashboardCrdReconciler.SetupWithManager(ctx, mgr, startupTasksK8sClient, setupLog); err != nil { return fmt.Errorf("unable to set up the Perses dashboard reconciler: %w", err) @@ -1302,12 +1310,16 @@ func startDash0Controllers( thirdPartyResourceSynchronizationQueue, leaderElectionAwareRunnable, httpClient, + apiSyncRateLimiter, ) if err := prometheusRuleCrdReconciler.SetupWithManager(ctx, mgr, startupTasksK8sClient, setupLog); err != nil { return fmt.Errorf("unable to set up the Prometheus rule reconciler: %w", err) } - controller.StartProcessingThirdPartySynchronizationQueue(thirdPartyResourceSynchronizationQueue, setupLog) + controller.StartProcessingThirdPartySynchronizationQueue( + thirdPartyResourceSynchronizationQueue, + setupLog, + ) setupLog.Info("Creating the self-monitoring OTel SDK starter.") oTelSdkStarter = selfmonitoringapiaccess.NewOTelSdkStarter(delegatingZapCoreWrapper) diff --git a/internal/util/rate/capped_rate_limiter.go b/internal/util/rate/capped_rate_limiter.go new file mode 100644 index 000000000..de1327eca --- /dev/null +++ b/internal/util/rate/capped_rate_limiter.go @@ -0,0 +1,111 @@ +// SPDX-FileCopyrightText: Copyright 2026 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +package rate + +import ( + "context" + "fmt" + "sync/atomic" + + "golang.org/x/time/rate" + + "github.com/dash0hq/dash0-operator/internal/util/logd" +) + +const ( + // Allow at most apiSyncMaxItem per apiSyncTimeWindoSeconds, by processing them at a frequency of apiSyncRateLimit + // requests per second. Naturally, this does not fully guarantee not running into backend-side API rate limits, since + // other sources might also make requests (operators in other clusters, terraform, dash0-cli etc.) + apiSyncMaxItem = 500.0 + apiSyncTimeWindoSeconds = 300.0 + + // apiSyncRateLimit per second API sync requests will be processed + apiSyncRateLimit = apiSyncMaxItem / apiSyncTimeWindoSeconds + + // maxPendingApiSyncItems is the hard limit for pending API sync requests for dashboards, check rules etc. This is + // an additional limit on top of the rate-limiting that we apply client-side. Under normal circumstances, it should + // never be reached. + maxPendingApiSyncItems = 20_000 +) + +// waiter blocks until a rate-limiting token is available or the context is canceled. +type waiter interface { + Wait(ctx context.Context) error +} + +// CappedRateLimiter combines a rate limiter with an upper limit on the number of pending operatrions. It is used to +// spread out API sync requests to avoid running into the backend's rate limit, while also protecting against an +// ever-growing backlog of API sync operations. +type CappedRateLimiter struct { + limiter waiter + pendingOperationsCount atomic.Int64 + maxPendingOperations int64 +} + +func NewDefaultCappedRateLimiter() *CappedRateLimiter { + return NewCappedRateLimiter(apiSyncRateLimit, maxPendingApiSyncItems) +} + +func NewNoOpCappedRateLimiter() *CappedRateLimiter { + return NewCappedRateLimiter(rate.Inf, 0) +} + +// NewCappedRateLimiter creates a new rate limiter that will let operations pass with a frequency of rateLimit, and drop +// operations once the hard limit of maxPendingOperations has been reached. Use golang.org/x/time#Inf to disable waiting +// entirely. Use maxPendingOperations=0 disable limiting the number of pending operations. +func NewCappedRateLimiter(rateLimit rate.Limit, maxPendingOperations int64) *CappedRateLimiter { + return &CappedRateLimiter{ + limiter: rate.NewLimiter(rateLimit, 1), + maxPendingOperations: maxPendingOperations, + } +} + +// Wait blocks requests according to the rateLimit frequency set at creation time. After the wait time, it returns true +// to signal that the operation can proceed. +// It also checks whether the limit for pending operations has been reached. It return false immediately if that is the +// case and the operation should be dropped. It also returns false if the operation should be dropped for other reason, +// for example if the context has been canceled. +func (c *CappedRateLimiter) Wait( + ctx context.Context, + kindDisplayName string, + resourceName string, + logger logd.Logger, +) bool { + if c.maxPendingOperations > 0 { + newCount := c.pendingOperationsCount.Add(1) + defer func() { + c.pendingOperationsCount.Add(-1) + }() + if c.maxPendingOperations > 0 && newCount > c.maxPendingOperations { + logger.Error( + fmt.Errorf("the limit for pending API synchronization operations has been breached, dropping request"), + fmt.Sprintf( + "Dropping API synchronization request for %s %s: too many pending items (%d/%d)", + kindDisplayName, + resourceName, + newCount-1, + c.maxPendingOperations, + ), + ) + return false + } + } + + logger.Debug( + "Waiting on rate limiter to proceed with the API synchronization operation for %s %s.", + kindDisplayName, + resourceName, + ) + if err := c.limiter.Wait(ctx); err != nil { + // context canceled (e.g. during shutdown), no need to log + logger.Debug("Context canceled while waiting on rate limiter for %s %s.", kindDisplayName, resourceName) + return false + } + logger.Debug( + "Rate limiter has allowed the API synchronization operation for %s %s to proceed.", + kindDisplayName, + resourceName, + ) + return true +} diff --git a/internal/util/rate/capped_rate_limiter_test.go b/internal/util/rate/capped_rate_limiter_test.go new file mode 100644 index 000000000..bb323c541 --- /dev/null +++ b/internal/util/rate/capped_rate_limiter_test.go @@ -0,0 +1,271 @@ +// SPDX-FileCopyrightText: Copyright 2026 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +package rate + +import ( + "context" + "sync" + "time" + + "github.com/dash0hq/dash0-operator/internal/util/logd" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +// mockWaiter is a controllable waiter for tests. When blocking is true, Wait blocks until either the context is +// canceled or unblock() is called. When blocking is false, Wait returns nil immediately. +type mockWaiter struct { + blocking bool + unblockCh chan struct{} + once sync.Once +} + +func newImmediateWaiter() *mockWaiter { + return &mockWaiter{ + blocking: false, + unblockCh: make(chan struct{}), + } +} + +func newBlockingWaiter() *mockWaiter { + return &mockWaiter{ + blocking: true, + unblockCh: make(chan struct{}), + } +} + +func (m *mockWaiter) Wait(ctx context.Context) error { + if !m.blocking { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-m.unblockCh: + return nil + } +} + +// unblock releases all goroutines currently blocked in Wait and allows future calls to return immediately. +func (m *mockWaiter) unblock() { + m.once.Do(func() { close(m.unblockCh) }) +} + +var _ = Describe("CappedRateLimiter", func() { + + ctx := context.Background() + var logger = logd.Discard() + + Describe("NewDefaultCappedRateLimiter", func() { + It("passes the first call immediately using the initial burst token", func() { + crl := NewDefaultCappedRateLimiter() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + Expect(crl.Wait(ctx, "kind", "name", logger)).To(BeTrue()) + }) + + It("blocks until the rate limiter grants a token", func() { + w := newBlockingWaiter() + crl := &CappedRateLimiter{limiter: w, maxPendingOperations: 0} + + result := make(chan bool, 1) + go func() { + defer GinkgoRecover() + result <- crl.Wait(ctx, "kind", "name", logger) + }() + + // Nothing arrives while the waiter is blocking. + Consistently(result, "100ms", "10ms").ShouldNot(Receive()) + + // Once the rate limiter grants the token, the operation completes successfully. + w.unblock() + Eventually(result).Should(Receive(BeTrue())) + }) + }) + + Describe("with maxPendingOperations limit", func() { + It("lets operations through when below the maxPendingOperations limit", func() { + crl := &CappedRateLimiter{ + limiter: newImmediateWaiter(), + maxPendingOperations: 5, + } + for range 5 { + Expect(crl.Wait(ctx, "kind", "name", logger)).To(BeTrue()) + } + }) + + It("drops an operation when the maxPendingOperations limit is exceeded", func() { + w := newBlockingWaiter() + crl := &CappedRateLimiter{ + limiter: w, + maxPendingOperations: 2, + } + + // Start 2 goroutines that will block at the waiter. + results := make(chan bool, 2) + for range 2 { + go func() { + defer GinkgoRecover() + results <- crl.Wait(ctx, "kind", "name", logger) + }() + } + + // Wait until both goroutines have incremented the counter. + Eventually(func() int64 { + return crl.pendingOperationsCount.Load() + }).Should(Equal(int64(2))) + + // The 3rd operation exceeds the maxPendingOperations limit and must be dropped. + Expect(crl.Wait(ctx, "kind", "name", logger)).To(BeFalse()) + + // Unblock the two waiting goroutines. + w.unblock() + Expect(<-results).To(BeTrue()) + Expect(<-results).To(BeTrue()) + }) + + It("permits new operations once pending operations have completed", func() { + w := newBlockingWaiter() + crl := &CappedRateLimiter{ + limiter: w, + maxPendingOperations: 1, + } + + firstResult := make(chan bool, 1) + go func() { + defer GinkgoRecover() + firstResult <- crl.Wait(ctx, "kind", "name", logger) + }() + + // Wait until the first operation is pending. + Eventually(func() int64 { + return crl.pendingOperationsCount.Load() + }).Should(Equal(int64(1))) + + // While the first is pending, a second is dropped (maxPendingOperations=1, pending count would become 2). + Expect(crl.Wait(ctx, "kind", "name", logger)).To(BeFalse()) + + // Unblock the first and verify it succeeded. + w.unblock() + Expect(<-firstResult).To(BeTrue()) + + // Counter must return to zero. + Eventually(func() int64 { + return crl.pendingOperationsCount.Load() + }).Should(Equal(int64(0))) + + // A new operation must now succeed. The closed unblockCh immediately satisfies the select in mockWaiter. + Expect(crl.Wait(ctx, "kind", "name", logger)).To(BeTrue()) + }) + }) + + Describe("context cancellation", func() { + It("returns false when the context is canceled while waiting at the rate limiter", func() { + w := newBlockingWaiter() + crl := &CappedRateLimiter{ + limiter: w, + maxPendingOperations: 5, + } + + ctx, cancel := context.WithCancel(ctx) + result := make(chan bool, 1) + go func() { + defer GinkgoRecover() + result <- crl.Wait(ctx, "kind", "name", logger) + }() + + // Wait until the goroutine is pending, then cancel. + Eventually(func() int64 { + return crl.pendingOperationsCount.Load() + }).Should(Equal(int64(1))) + cancel() + + Expect(<-result).To(BeFalse()) + }) + + It("decrements the pending counter after context cancellation", func() { + w := newBlockingWaiter() + crl := &CappedRateLimiter{ + limiter: w, + maxPendingOperations: 5, + } + + ctx, cancel := context.WithCancel(ctx) + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(done) + crl.Wait(ctx, "kind", "name", logger) + }() + + Eventually(func() int64 { + return crl.pendingOperationsCount.Load() + }).Should(Equal(int64(1))) + + cancel() + <-done + + Expect(crl.pendingOperationsCount.Load()).To(Equal(int64(0))) + }) + }) + + Describe("pending operations counter", func() { + It("accurately reflects the number of concurrently pending operations", func() { + const numOps = 5 + w := newBlockingWaiter() + crl := &CappedRateLimiter{ + limiter: w, + maxPendingOperations: numOps, + } + + for range numOps { + go func() { + defer GinkgoRecover() + crl.Wait(ctx, "kind", "name", logger) + }() + } + + // Counter must reach numOps. + Eventually(func() int64 { + return crl.pendingOperationsCount.Load() + }).Should(Equal(int64(numOps))) + + // Unblock all goroutines. + w.unblock() + + // Counter must return to zero once all operations complete. + Eventually(func() int64 { + return crl.pendingOperationsCount.Load() + }).Should(Equal(int64(0))) + }) + + It("decrements the counter even when the maxPendingOperations limit is exceeded", func() { + w := newBlockingWaiter() + crl := &CappedRateLimiter{ + limiter: w, + maxPendingOperations: 1, + } + + firstDone := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(firstDone) + crl.Wait(ctx, "kind", "name", logger) + }() + + Eventually(func() int64 { + return crl.pendingOperationsCount.Load() + }).Should(Equal(int64(1))) + + // The second operation is dropped; its counter increment must be rolled back immediately. + Expect(crl.Wait(ctx, "kind", "name", logger)).To(BeFalse()) + Expect(crl.pendingOperationsCount.Load()).To(Equal(int64(1))) + + w.unblock() + <-firstDone + Expect(crl.pendingOperationsCount.Load()).To(Equal(int64(0))) + }) + }) +}) diff --git a/internal/util/rate/rate_suite_test.go b/internal/util/rate/rate_suite_test.go new file mode 100644 index 000000000..f36c98d2b --- /dev/null +++ b/internal/util/rate/rate_suite_test.go @@ -0,0 +1,16 @@ +// SPDX-FileCopyrightText: Copyright 2026 Dash0 Inc. +// SPDX-License-Identifier: Apache-2.0 + +package rate + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestRate(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Rate Suite") +} diff --git a/internal/util/resources/util.go b/internal/util/resources/util.go index a60a61d0c..8f87c8d4c 100644 --- a/internal/util/resources/util.go +++ b/internal/util/resources/util.go @@ -12,6 +12,7 @@ import ( appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes/scheme" @@ -116,3 +117,7 @@ func SetOwnerReference( func RenderName(prefix string, parts ...string) string { return strings.Join(append([]string{prefix}, parts...), "-") } + +func QualifiedResourceName(resource *unstructured.Unstructured) string { + return resource.GetNamespace() + "/" + resource.GetName() +}