diff --git a/cmd/single/start.go b/cmd/single/start.go index f9e38a96260..bd6e3878757 100644 --- a/cmd/single/start.go +++ b/cmd/single/start.go @@ -14,6 +14,7 @@ import ( "golang.org/x/sync/errgroup" _ "gorm.io/driver/postgres" // Required to import database driver. "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -125,8 +126,9 @@ func startPropeller(ctx context.Context, cfg Propeller) error { SyncPeriod: &propellerCfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewCache: executors.NewCache, - NewClient: executors.BuildNewClientFunc(propellerScope), + NewCache: executors.NewCache, + NewClient: executors.BuildNewClientFunc(propellerScope), + MaxConcurrentReconciles: propellerCfg.MaxConcurrentReconciles, Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/cmd/controller/cmd/root.go b/flytepropeller/cmd/controller/cmd/root.go index 580a064cef9..f1c9437e1bb 100644 --- a/flytepropeller/cmd/controller/cmd/root.go +++ b/flytepropeller/cmd/controller/cmd/root.go @@ -14,6 +14,7 @@ import ( "golang.org/x/sync/errgroup" "k8s.io/klog" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -143,8 +144,9 @@ func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error { SyncPeriod: &cfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewCache: executors.NewCache, - NewClient: executors.BuildNewClientFunc(propellerScope), + NewCache: executors.NewCache, + NewClient: executors.BuildNewClientFunc(propellerScope), + MaxConcurrentReconciles: cfg.MaxConcurrentReconciles, Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/cmd/controller/cmd/webhook.go b/flytepropeller/cmd/controller/cmd/webhook.go index ae538385fbf..24f310c4591 100644 --- a/flytepropeller/cmd/controller/cmd/webhook.go +++ b/flytepropeller/cmd/controller/cmd/webhook.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/cobra" "golang.org/x/sync/errgroup" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ctrlWebhook "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -108,8 +109,9 @@ func runWebhook(origContext context.Context, propellerCfg *config.Config, cfg *w SyncPeriod: &propellerCfg.DownstreamEval.Duration, DefaultNamespaces: namespaceConfigs, }, - NewCache: executors.NewCache, - NewClient: executors.BuildNewClientFunc(webhookScope), + NewCache: executors.NewCache, + NewClient: executors.BuildNewClientFunc(webhookScope), + MaxConcurrentReconciles: propellerCfg.MaxConcurrentReconciles, Metrics: metricsserver.Options{ // Disable metrics serving BindAddress: "0", diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 8d077f2a5e8..804833a9f5c 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -136,6 +136,7 @@ var ( MinSizeInMBForOffloading: 10, // 10 MB is the default size for offloading MaxSizeInMBForOffloading: 1000, // 1 GB is the default size before failing fast. }, + MaxConcurrentReconciles: 1, } // This regex is used to sanitize semver versions passed to IsSupportedSDK checks for literal offloading feature. @@ -182,6 +183,7 @@ type Config struct { NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"` ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"` LiteralOffloadingConfig LiteralOffloadingConfig `json:"literal-offloading-config" pflag:",config used for literal offloading."` + MaxConcurrentReconciles int `json:"max-concurrent-reconciles" pflag:",Max concurrent reconciles for controller-runtime controllers"` } type LiteralOffloadingConfig struct {