Skip to content

Commit 9a266c5

Browse files
committed
chore(controller): spread out API sync requests
Rate-limit API sync requests on the client side to avoid hitting the backend's rate limit.
1 parent 64f9e7a commit 9a266c5

15 files changed

Lines changed: 476 additions & 23 deletions

internal/controller/api_sync_common.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@ import (
1313
"slices"
1414
"strings"
1515

16+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
17+
"sigs.k8s.io/controller-runtime/pkg/client"
18+
1619
dash0common "github.com/dash0hq/dash0-operator/api/operator/common"
1720
dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1"
1821
"github.com/dash0hq/dash0-operator/internal/resources"
1922
"github.com/dash0hq/dash0-operator/internal/util"
2023
"github.com/dash0hq/dash0-operator/internal/util/logd"
21-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
22-
"sigs.k8s.io/controller-runtime/pkg/client"
24+
"github.com/dash0hq/dash0-operator/internal/util/rate"
25+
utilResources "github.com/dash0hq/dash0-operator/internal/util/resources"
2326
)
2427

2528
type ApiConfig struct {
@@ -337,6 +340,7 @@ func synchronizeViaApiAndUpdateStatus(
337340
dash0ApiResource *unstructured.Unstructured,
338341
ownedResource client.Object,
339342
action apiAction,
343+
limiter *rate.CappedRateLimiter,
340344
logger logd.Logger,
341345
) {
342346
preconditionChecksResult := validatePreconditionsAndPreprocess(
@@ -349,6 +353,18 @@ func synchronizeViaApiAndUpdateStatus(
349353
return
350354
}
351355

356+
// limiter.Wait deliberately slows down API sync requests to avoid running into the backend's rate limit,
357+
// at the cost of keeping the associated goroutine. It also protects against an ever-growing backlog of API sync
358+
// operations by dropping requests when the number of pending items exceeds the cap.
359+
if proceed := limiter.Wait(
360+
ctx,
361+
apiSyncReconciler.KindDisplayName(),
362+
utilResources.QualifiedResourceName(dash0ApiResource),
363+
logger,
364+
); !proceed {
365+
return
366+
}
367+
352368
resourceHasBeenDeleted := action == deleteAction
353369

354370
if preconditionChecksResult.syncDisabledViaLabel {

internal/controller/perses_dashboards_controller.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/dash0hq/dash0-operator/internal/selfmonitoringapiaccess"
3333
"github.com/dash0hq/dash0-operator/internal/util"
3434
"github.com/dash0hq/dash0-operator/internal/util/logd"
35+
"github.com/dash0hq/dash0-operator/internal/util/rate"
3536
)
3637

3738
type PersesDashboardCrdReconciler struct {
@@ -40,6 +41,7 @@ type PersesDashboardCrdReconciler struct {
4041
leaderElectionAware util.LeaderElectionAware
4142
mgr ctrl.Manager
4243
httpClient *http.Client
44+
limiter *rate.CappedRateLimiter
4345
skipNameValidation bool
4446
persesDashboardReconciler *PersesDashboardReconciler
4547
persesDashboardCrdExists atomic.Bool
@@ -50,6 +52,7 @@ type PersesDashboardReconciler struct {
5052
pseudoClusterUid types.UID
5153
queue *workqueue.Typed[ThirdPartyResourceSyncJob]
5254
httpClient *http.Client
55+
limiter *rate.CappedRateLimiter
5356
defaultApiConfigs selfmonitoringapiaccess.SynchronizedSlice[ApiConfig]
5457
namespacedApiConfigs selfmonitoringapiaccess.SynchronizedMapSlice[ApiConfig]
5558
namespacedSyncEnabled sync.Map
@@ -67,12 +70,14 @@ func NewPersesDashboardCrdReconciler(
6770
queue *workqueue.Typed[ThirdPartyResourceSyncJob],
6871
leaderElectionAware util.LeaderElectionAware,
6972
httpClient *http.Client,
73+
limiter *rate.CappedRateLimiter,
7074
) *PersesDashboardCrdReconciler {
7175
return &PersesDashboardCrdReconciler{
7276
Client: k8sClient,
7377
queue: queue,
7478
leaderElectionAware: leaderElectionAware,
7579
httpClient: httpClient,
80+
limiter: limiter,
7681
}
7782
}
7883

@@ -126,6 +131,7 @@ func (r *PersesDashboardCrdReconciler) CreateThirdPartyResourceReconciler(pseudo
126131
queue: r.queue,
127132
pseudoClusterUid: pseudoClusterUid,
128133
httpClient: r.httpClient,
134+
limiter: r.limiter,
129135
defaultApiConfigs: *selfmonitoringapiaccess.NewSynchronizedSlice[ApiConfig](),
130136
namespacedApiConfigs: *selfmonitoringapiaccess.NewSynchronizedMapSlice[ApiConfig](),
131137
}
@@ -723,9 +729,6 @@ func (r *PersesDashboardReconciler) synchronizeNamespacedResources(
723729
Object: dashboardResource,
724730
}
725731
r.Generic(ctx, evt, nil)
726-
727-
// stagger API requests a bit
728-
time.Sleep(50 * time.Millisecond)
729732
}
730733
logger.Info(fmt.Sprintf("Triggering synchronization of dashboards in namespace %s has finished.", namespace))
731734
}()

internal/controller/perses_dashboards_controller_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1"
2828
"github.com/dash0hq/dash0-operator/internal/util"
2929
"github.com/dash0hq/dash0-operator/internal/util/logd"
30+
"github.com/dash0hq/dash0-operator/internal/util/rate"
3031

3132
"github.com/h2non/gock"
3233
. "github.com/onsi/ginkgo/v2"
@@ -392,7 +393,11 @@ var _ = Describe(
392393

393394
Expect(persesDashboardCrdReconciler.SetupWithManager(ctx, mgr, k8sClient, logger)).To(Succeed())
394395

395-
StartProcessingThirdPartySynchronizationQueue(testQueuePersesDashboards, logger)
396+
StartProcessingThirdPartySynchronizationQueue(
397+
testQueuePersesDashboards,
398+
rate.NewNoOpCappedRateLimiter(),
399+
logger,
400+
)
396401
},
397402
)
398403

@@ -1018,6 +1023,7 @@ func createPersesDashboardCrdReconciler() *PersesDashboardCrdReconciler {
10181023
testQueuePersesDashboards,
10191024
&DummyLeaderElectionAware{Leader: true},
10201025
TestHTTPClient(),
1026+
rate.NewNoOpCappedRateLimiter(),
10211027
)
10221028

10231029
// We create the controller multiple times in tests, this option is required, otherwise the controller

internal/controller/prometheus_rules_controller.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/dash0hq/dash0-operator/internal/selfmonitoringapiaccess"
3838
"github.com/dash0hq/dash0-operator/internal/util"
3939
"github.com/dash0hq/dash0-operator/internal/util/logd"
40+
"github.com/dash0hq/dash0-operator/internal/util/rate"
4041
)
4142

4243
type PrometheusRuleCrdReconciler struct {
@@ -45,6 +46,7 @@ type PrometheusRuleCrdReconciler struct {
4546
leaderElectionAware util.LeaderElectionAware
4647
mgr ctrl.Manager
4748
httpClient *http.Client
49+
limiter *rate.CappedRateLimiter
4850
skipNameValidation bool
4951
prometheusRuleReconciler *PrometheusRuleReconciler
5052
prometheusRuleCrdExists atomic.Bool
@@ -55,6 +57,7 @@ type PrometheusRuleReconciler struct {
5557
pseudoClusterUid types.UID
5658
queue *workqueue.Typed[ThirdPartyResourceSyncJob]
5759
httpClient *http.Client
60+
limiter *rate.CappedRateLimiter
5861
defaultApiConfigs selfmonitoringapiaccess.SynchronizedSlice[ApiConfig]
5962
namespacedApiConfigs selfmonitoringapiaccess.SynchronizedMapSlice[ApiConfig]
6063
namespacedSyncEnabled sync.Map
@@ -92,12 +95,14 @@ func NewPrometheusRuleCrdReconciler(
9295
queue *workqueue.Typed[ThirdPartyResourceSyncJob],
9396
leaderElectionAware util.LeaderElectionAware,
9497
httpClient *http.Client,
98+
limiter *rate.CappedRateLimiter,
9599
) *PrometheusRuleCrdReconciler {
96100
return &PrometheusRuleCrdReconciler{
97101
Client: k8sClient,
98102
queue: queue,
99103
leaderElectionAware: leaderElectionAware,
100104
httpClient: httpClient,
105+
limiter: limiter,
101106
}
102107
}
103108

@@ -151,6 +156,7 @@ func (r *PrometheusRuleCrdReconciler) CreateThirdPartyResourceReconciler(pseudoC
151156
queue: r.queue,
152157
pseudoClusterUid: pseudoClusterUid,
153158
httpClient: r.httpClient,
159+
limiter: r.limiter,
154160
defaultApiConfigs: *selfmonitoringapiaccess.NewSynchronizedSlice[ApiConfig](),
155161
namespacedApiConfigs: *selfmonitoringapiaccess.NewSynchronizedMapSlice[ApiConfig](),
156162
}
@@ -559,7 +565,7 @@ func (r *PrometheusRuleReconciler) MapResourceToHttpRequests(
559565
allValidationIssues[checkRuleName] = validationIssues
560566
// If a rule becomes temporarily invalid due to a bad edit, we do not want to delete it in Dash0
561567
// (assuming it has been valid at some point before and has been synchronized). Instead, we keep the
562-
// most recent valid state. To do that, we add its id to the list of ids we have seen (the list will
568+
// most recent valid state. To do that, we add its id to the list of ids we have seen. The list will
563569
// later be used to determine which of the checks in Dash0 need to be removed because they are no longer
564570
// in the K8s resource.
565571
originsInResource = append(originsInResource, checkRuleOriginNotUrlEncoded)
@@ -568,7 +574,7 @@ func (r *PrometheusRuleReconciler) MapResourceToHttpRequests(
568574
if syncError != nil {
569575
// If a rule cannot be synchronized temporarily, we do not want to delete it in Dash0 immediately
570576
// (assuming the rule has been synchronized before at some point). Instead, we keep the
571-
// most recent state. To do that, we add its id to the list of ids we have seen (the list will later
577+
// most recent state. To do that, we add its id to the list of ids we have seen. The list will later
572578
// be used to determine which of the checks in Dash0 need to be removed because they are no longer
573579
// in the K8s resource.
574580
allSynchronizationErrors[checkRuleName] = syncError.Error()
@@ -1133,9 +1139,6 @@ func (r *PrometheusRuleReconciler) synchronizeNamespacedResources(
11331139
Object: ruleResource,
11341140
}
11351141
r.Generic(ctx, evt, nil)
1136-
1137-
// stagger API requests a bit
1138-
time.Sleep(50 * time.Millisecond)
11391142
}
11401143
logger.Info(fmt.Sprintf("Triggering synchronization of check rules in namespace %s has finished.", namespace))
11411144
}()

internal/controller/prometheus_rules_controller_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1"
3232
"github.com/dash0hq/dash0-operator/internal/util"
3333
"github.com/dash0hq/dash0-operator/internal/util/logd"
34+
"github.com/dash0hq/dash0-operator/internal/util/rate"
3435

3536
"github.com/h2non/gock"
3637
. "github.com/onsi/ginkgo/v2"
@@ -397,7 +398,11 @@ var _ = Describe(
397398

398399
Expect(prometheusRuleCrdReconciler.SetupWithManager(ctx, mgr, k8sClient, logger)).To(Succeed())
399400

400-
StartProcessingThirdPartySynchronizationQueue(testQueuePrometheusRules, logger)
401+
StartProcessingThirdPartySynchronizationQueue(
402+
testQueuePrometheusRules,
403+
rate.NewNoOpCappedRateLimiter(),
404+
logger,
405+
)
401406
},
402407
)
403408

@@ -2215,6 +2220,7 @@ func createPrometheusRuleCrdReconciler() *PrometheusRuleCrdReconciler {
22152220
testQueuePrometheusRules,
22162221
&DummyLeaderElectionAware{Leader: true},
22172222
TestHTTPClient(),
2223+
rate.NewNoOpCappedRateLimiter(),
22182224
)
22192225

22202226
// We create the controller multiple times in tests, this option is required, otherwise the controller

internal/controller/synthetic_check_controller.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ import (
3434
"github.com/dash0hq/dash0-operator/internal/selfmonitoringapiaccess"
3535
"github.com/dash0hq/dash0-operator/internal/util"
3636
"github.com/dash0hq/dash0-operator/internal/util/logd"
37+
"github.com/dash0hq/dash0-operator/internal/util/rate"
3738
)
3839

3940
type SyntheticCheckReconciler struct {
4041
client.Client
4142
pseudoClusterUid types.UID
4243
leaderElectionAware util.LeaderElectionAware
4344
httpClient *http.Client
45+
limiter *rate.CappedRateLimiter
4446
defaultApiConfigs selfmonitoringapiaccess.SynchronizedSlice[ApiConfig]
4547
namespacedApiConfigs selfmonitoringapiaccess.SynchronizedMapSlice[ApiConfig]
4648
initialSyncMutex sync.Mutex
@@ -57,12 +59,14 @@ func NewSyntheticCheckReconciler(
5759
pseudoClusterUid types.UID,
5860
leaderElectionAware util.LeaderElectionAware,
5961
httpClient *http.Client,
62+
limiter *rate.CappedRateLimiter,
6063
) *SyntheticCheckReconciler {
6164
return &SyntheticCheckReconciler{
6265
Client: k8sClient,
6366
pseudoClusterUid: pseudoClusterUid,
6467
leaderElectionAware: leaderElectionAware,
6568
httpClient: httpClient,
69+
limiter: limiter,
6670
defaultApiConfigs: *selfmonitoringapiaccess.NewSynchronizedSlice[ApiConfig](),
6771
namespacedApiConfigs: *selfmonitoringapiaccess.NewSynchronizedMapSlice[ApiConfig](),
6872
namespacedSyncMutex: *selfmonitoringapiaccess.NewNamespaceMutex(),
@@ -231,9 +235,8 @@ func (r *SyntheticCheckReconciler) maybeDoInitialSynchronizationOfAllResources(
231235
},
232236
}
233237
_, _ = r.Reconcile(ctx, pseudoReconcileRequest)
234-
// stagger API requests a bit
235-
time.Sleep(50 * time.Millisecond)
236238
}
239+
237240
logger.Info("Initial synchronization of synthetic checks has finished.")
238241
r.initialSyncHasHappend.Store(true)
239242
}()
@@ -284,8 +287,6 @@ func (r *SyntheticCheckReconciler) synchronizeNamespacedResources(
284287
},
285288
}
286289
_, _ = r.Reconcile(ctx, pseudoReconcileRequest)
287-
// stagger API requests a bit
288-
time.Sleep(50 * time.Millisecond)
289290
}
290291
logger.Info(fmt.Sprintf("Synchronization of synthetic checks in namespace %s has finished.", namespace))
291292
}()
@@ -345,6 +346,7 @@ func (r *SyntheticCheckReconciler) Reconcile(ctx context.Context, req reconcile.
345346
unstructuredResource,
346347
syntheticCheckResource,
347348
action,
349+
r.limiter,
348350
logger,
349351
)
350352

internal/controller/synthetic_check_controller_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
dash0v1alpha1 "github.com/dash0hq/dash0-operator/api/operator/v1alpha1"
2323
"github.com/dash0hq/dash0-operator/internal/util"
2424
"github.com/dash0hq/dash0-operator/internal/util/logd"
25+
"github.com/dash0hq/dash0-operator/internal/util/rate"
2526

2627
"github.com/h2non/gock"
2728
. "github.com/onsi/ginkgo/v2"
@@ -831,6 +832,7 @@ func createSyntheticCheckReconciler(clusterId string) *SyntheticCheckReconciler
831832
types.UID(clusterId),
832833
leaderElectionAware,
833834
TestHTTPClient(),
835+
rate.NewNoOpCappedRateLimiter(),
834836
)
835837
return syntheticCheckReconciler
836838
}

internal/controller/third_party_crd_common.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
dash0v1beta1 "github.com/dash0hq/dash0-operator/api/operator/v1beta1"
3434
"github.com/dash0hq/dash0-operator/internal/util"
3535
"github.com/dash0hq/dash0-operator/internal/util/logd"
36+
"github.com/dash0hq/dash0-operator/internal/util/rate"
3637
)
3738

3839
// ThirdPartyCrdReconciler is an interface for reconcilers that act on CRDs of third-party resource types (i.e. when a
@@ -430,7 +431,11 @@ func upsertViaApi(
430431
// attempts that are related to third-party resources types (Prometheus rules, Perses dashboards) end with writing
431432
// to the Dash0 monitoring resource status in the same namespace. These write attempts will fail if they happen
432433
// concurrently. Therefore, we add all synchronization attempts to one queue that is shared across resource types,
433-
// and only process them one at a time.
434+
// and only process them one at a time. This queueing is independent of the rate limiting which happens later in
435+
// api_sync_common#synchronizeViaApiAndUpdateStatus via limiter.Wait(ctx), which spreads out API request for all
436+
// types (not only third-party CRDs) to avoid running into Dash0 API rate limits. That rate limiting does not offer
437+
// any guarantee that API sync operations (including updating the Dash0 monitoring resource's status) are handled
438+
// sequentially.
434439
thirdPartyResourceReconciler.Queue().Add(
435440
ThirdPartyResourceSyncJob{
436441
thirdPartyResourceReconciler: thirdPartyResourceReconciler,
@@ -456,6 +461,7 @@ func deleteViaApi(
456461

457462
func StartProcessingThirdPartySynchronizationQueue(
458463
thirdPartyResourceSynchronizationQueue *workqueue.Typed[ThirdPartyResourceSyncJob],
464+
limiter *rate.CappedRateLimiter,
459465
setupLog logd.Logger,
460466
) {
461467
setupLog.Info("Starting the Dash0 API resource synchronization queue.")
@@ -483,6 +489,7 @@ func StartProcessingThirdPartySynchronizationQueue(
483489
item.dash0ApiResource,
484490
nil,
485491
item.action,
492+
limiter,
486493
logger,
487494
)
488495
logger.Info(

0 commit comments

Comments
 (0)