From 3ebc8ae36ea6ec56ce39db7f410268d19baa0b87 Mon Sep 17 00:00:00 2001 From: Dylan Date: Tue, 2 Dec 2025 09:11:52 -0500 Subject: [PATCH] Add scaling support --- api/v1alpha1/asset_types.go | 5 + api/v1alpha1/propagation_types.go | 9 ++ api/v1alpha1/subtreevalidator_types.go | 5 + cmd/main.go | 44 +++++-- .../teranode.bsvblockchain.org_assets.yaml | 13 +++ ...ranode.bsvblockchain.org_propagations.yaml | 31 ++++- ...e.bsvblockchain.org_subtreevalidators.yaml | 13 +++ ...ranode_service_runner_service_account.yaml | 2 +- .../teranode_v1alpha1_propagation.yaml | 11 +- internal/controller/asset_controller.go | 22 ++++ internal/controller/asset_deployment.go | 8 +- .../controller/blockassembly_deployment.go | 2 +- internal/controller/blockchain_controller.go | 6 + internal/controller/blockchain_deployment.go | 2 +- internal/controller/blockchain_fsm.go | 15 +++ .../controller/blockpersister_deployment.go | 5 +- .../controller/blockvalidator_deployment.go | 2 +- internal/controller/cluster_asset.go | 32 ++++- internal/controller/cluster_controller.go | 6 +- .../controller/cluster_controller_test.go | 2 +- internal/controller/cluster_helpers.go | 61 ++++++++++ internal/controller/cluster_propagation.go | 41 ++++++- internal/controller/cluster_rpc.go | 20 +++- .../controller/cluster_subtreevalidator.go | 26 ++++- internal/controller/cluster_utxopersister.go | 5 +- internal/controller/cluster_validator.go | 6 +- internal/controller/constants.go | 30 +++++ internal/controller/legacy_deployment.go | 2 +- internal/controller/peer_deployment.go | 2 +- internal/controller/propagation_controller.go | 36 +++++- internal/controller/propagation_deployment.go | 24 ++-- internal/controller/rpc_deployment.go | 2 +- .../controller/subtreevalidator_controller.go | 24 +++- .../controller/subtreevalidator_deployment.go | 2 +- .../controller/utxopersister_deployment.go | 2 +- internal/utils/deployment.go | 110 +++++++++++++++--- test/utils/utils.go | 1 + 37 files changed, 555 insertions(+), 74 deletions(-) create mode 100644 internal/controller/cluster_helpers.go diff --git a/api/v1alpha1/asset_types.go b/api/v1alpha1/asset_types.go index 7b446d6..d93d9f7 100644 --- a/api/v1alpha1/asset_types.go +++ b/api/v1alpha1/asset_types.go @@ -33,10 +33,15 @@ type AssetStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file Conditions []metav1.Condition `json:"conditions,omitempty"` + // Replicas is the number of actual replicas of the asset deployment + Replicas int32 `json:"replicas,omitempty"` + // Selector is the label selector for pods corresponding to this asset deployment + Selector string `json:"selector,omitempty"` } //+kubebuilder:object:root=true //+kubebuilder:subresource:status +//+kubebuilder:subresource:scale:specpath=.spec.deploymentOverrides.replicas,statuspath=.status.replicas,selectorpath=.status.selector // Asset is the Schema for the assets API type Asset struct { diff --git a/api/v1alpha1/propagation_types.go b/api/v1alpha1/propagation_types.go index c76fb2f..52c287a 100644 --- a/api/v1alpha1/propagation_types.go +++ b/api/v1alpha1/propagation_types.go @@ -34,10 +34,19 @@ type PropagationSpec struct { // PropagationStatus defines the observed state of Propagation type PropagationStatus struct { Conditions []metav1.Condition `json:"conditions,omitempty"` + // Replicas is the number of actual replicas of the propagation deployment + Replicas int32 `json:"replicas,omitempty"` + // Selector is the label selector for pods corresponding to this propagation deployment + Selector string `json:"selector,omitempty"` } //+kubebuilder:object:root=true //+kubebuilder:subresource:status +//+kubebuilder:subresource:scale:specpath=.spec.deploymentOverrides.replicas,statuspath=.status.replicas,selectorpath=.status.selector +//+kubebuilder:printcolumn:name="Desired",type=integer,JSONPath=`.spec.deploymentOverrides.replicas`,description="Desired number of replicas" +//+kubebuilder:printcolumn:name="Current",type=integer,JSONPath=`.status.replicas`,description="Current number of replicas" +//+kubebuilder:printcolumn:name="Ready",type=string,JSONPath=`.status.conditions[?(@.type=="Reconciled")].status`,description="Ready status" +//+kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` // Propagation is the Schema for the propagations API type Propagation struct { diff --git a/api/v1alpha1/subtreevalidator_types.go b/api/v1alpha1/subtreevalidator_types.go index 22d19c5..8d16ea2 100644 --- a/api/v1alpha1/subtreevalidator_types.go +++ b/api/v1alpha1/subtreevalidator_types.go @@ -31,10 +31,15 @@ type SubtreeValidatorStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file Conditions []metav1.Condition `json:"conditions,omitempty"` + // Replicas is the number of actual replicas of the subtreevalidator deployment + Replicas int32 `json:"replicas,omitempty"` + // Selector is the label selector for pods corresponding to this subtreevalidator deployment + Selector string `json:"selector,omitempty"` } //+kubebuilder:object:root=true //+kubebuilder:subresource:status +//+kubebuilder:subresource:scale:specpath=.spec.deploymentOverrides.replicas,statuspath=.status.replicas,selectorpath=.status.selector // SubtreeValidator is the Schema for the subtreevalidators API type SubtreeValidator struct { diff --git a/cmd/main.go b/cmd/main.go index 3d9aa16..db19b40 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -25,9 +25,12 @@ import ( "sort" "strings" + teranodev1alpha1 "github.com/bsv-blockchain/teranode-operator/api/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + scalev1 "k8s.io/api/autoscaling/v1" + corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -39,7 +42,6 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" - teranodev1alpha1 "github.com/bsv-blockchain/teranode-operator/api/v1alpha1" "github.com/bsv-blockchain/teranode-operator/internal/controller" ) @@ -55,13 +57,6 @@ const ( CreateControllerError = "unable to create controller" ) -func init() { - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - - utilruntime.Must(teranodev1alpha1.AddToScheme(scheme)) - //+kubebuilder:scaffold:scheme -} - //nolint:gocognit,gocyclo // Main function complexity is acceptable for initialization func main() { var metricsAddr string @@ -70,7 +65,7 @@ func main() { var secureMetrics bool var enableHTTP2 bool flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") - flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") + flag.StringVar(&probeAddr, "health-probe-bind-address", ":8181", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") @@ -84,6 +79,33 @@ func main() { opts.BindFlags(flag.CommandLine) flag.Parse() + if err := scalev1.AddToScheme(scheme); err != nil { + setupLog.Error(err, "unable to add autoscaling/v1 scheme") + os.Exit(1) + } + + if err := teranodev1alpha1.AddToScheme(scheme); err != nil { + setupLog.Error(err, "unable to add v1alpha1 scheme") + os.Exit(1) + } + + if err := appsv1.AddToScheme(scheme); err != nil { + setupLog.Error(err, "unable to add apps/v1 scheme") + os.Exit(1) + } + + if err := corev1.AddToScheme(scheme); err != nil { + setupLog.Error(err, "unable to add core/v1 scheme") + os.Exit(1) + } + + if err := networkingv1.AddToScheme(scheme); err != nil { + setupLog.Error(err, "unable to add networking/v1 scheme") + os.Exit(1) + } + + //+kubebuilder:scaffold:scheme + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) // if the enable-http2 flag is false (the default), http/2 should be disabled diff --git a/config/crd/bases/teranode.bsvblockchain.org_assets.yaml b/config/crd/bases/teranode.bsvblockchain.org_assets.yaml index b992dd8..71a0189 100644 --- a/config/crd/bases/teranode.bsvblockchain.org_assets.yaml +++ b/config/crd/bases/teranode.bsvblockchain.org_assets.yaml @@ -3820,9 +3820,22 @@ spec: - type type: object type: array + replicas: + description: Replicas is the number of actual replicas of the asset + deployment + format: int32 + type: integer + selector: + description: Selector is the label selector for pods corresponding + to this asset deployment + type: string type: object type: object served: true storage: true subresources: + scale: + labelSelectorPath: .status.selector + specReplicasPath: .spec.deploymentOverrides.replicas + statusReplicasPath: .status.replicas status: {} diff --git a/config/crd/bases/teranode.bsvblockchain.org_propagations.yaml b/config/crd/bases/teranode.bsvblockchain.org_propagations.yaml index f61d0b7..0dba386 100644 --- a/config/crd/bases/teranode.bsvblockchain.org_propagations.yaml +++ b/config/crd/bases/teranode.bsvblockchain.org_propagations.yaml @@ -14,7 +14,23 @@ spec: singular: propagation scope: Namespaced versions: - - name: v1alpha1 + - additionalPrinterColumns: + - description: Desired number of replicas + jsonPath: .spec.deploymentOverrides.replicas + name: Desired + type: integer + - description: Current number of replicas + jsonPath: .status.replicas + name: Current + type: integer + - description: Ready status + jsonPath: .status.conditions[?(@.type=="Reconciled")].status + name: Ready + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 schema: openAPIV3Schema: description: Propagation is the Schema for the propagations API @@ -3849,9 +3865,22 @@ spec: - type type: object type: array + replicas: + description: Replicas is the number of actual replicas of the propagation + deployment + format: int32 + type: integer + selector: + description: Selector is the label selector for pods corresponding + to this propagation deployment + type: string type: object type: object served: true storage: true subresources: + scale: + labelSelectorPath: .status.selector + specReplicasPath: .spec.deploymentOverrides.replicas + statusReplicasPath: .status.replicas status: {} diff --git a/config/crd/bases/teranode.bsvblockchain.org_subtreevalidators.yaml b/config/crd/bases/teranode.bsvblockchain.org_subtreevalidators.yaml index 9703fa5..01de53c 100644 --- a/config/crd/bases/teranode.bsvblockchain.org_subtreevalidators.yaml +++ b/config/crd/bases/teranode.bsvblockchain.org_subtreevalidators.yaml @@ -3782,9 +3782,22 @@ spec: - type type: object type: array + replicas: + description: Replicas is the number of actual replicas of the subtreevalidator + deployment + format: int32 + type: integer + selector: + description: Selector is the label selector for pods corresponding + to this subtreevalidator deployment + type: string type: object type: object served: true storage: true subresources: + scale: + labelSelectorPath: .status.selector + specReplicasPath: .spec.deploymentOverrides.replicas + statusReplicasPath: .status.replicas status: {} diff --git a/config/rbac/teranode_service_runner_service_account.yaml b/config/rbac/teranode_service_runner_service_account.yaml index 916487a..a92cf7e 100644 --- a/config/rbac/teranode_service_runner_service_account.yaml +++ b/config/rbac/teranode_service_runner_service_account.yaml @@ -2,4 +2,4 @@ apiVersion: v1 kind: ServiceAccount metadata: name: service-runner - namespace: system + namespace: system \ No newline at end of file diff --git a/config/samples/teranode_v1alpha1_propagation.yaml b/config/samples/teranode_v1alpha1_propagation.yaml index c6011da..aed9faf 100644 --- a/config/samples/teranode_v1alpha1_propagation.yaml +++ b/config/samples/teranode_v1alpha1_propagation.yaml @@ -9,7 +9,6 @@ metadata: app.kubernetes.io/created-by: teranode-operator name: propagation-sample spec: - # TODO(user): Add fields here resources: requests: cpu: 1 @@ -17,3 +16,13 @@ spec: limits: memory: 2Gi imagePullPolicy: Always + replicas: 1 + deploymentOverrides: + replicas: 1 + resources: + requests: + cpu: 1 + memory: 1Gi + limits: + memory: 2Gi + imagePullPolicy: Always diff --git a/internal/controller/asset_controller.go b/internal/controller/asset_controller.go index f0eb593..7cb71df 100644 --- a/internal/controller/asset_controller.go +++ b/internal/controller/asset_controller.go @@ -26,6 +26,8 @@ import ( networkingv1 "k8s.io/api/networking/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -77,6 +79,17 @@ func (r *AssetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl r.ReconcileHTTPSIngress, ) + // Update scale status (replicas and selector) from deployment + deployment := &appsv1.Deployment{} + if err := r.Get(ctx, types.NamespacedName{ + Name: AssetDeploymentName, + Namespace: asset.Namespace, + }, deployment); err == nil { + replicas, selector := utils.GetScaleStatusFromDeployment(deployment) + asset.Status.Replicas = replicas + asset.Status.Selector = selector + } + if err != nil { apimeta.SetStatusCondition(&asset.Status.Conditions, metav1.Condition{ @@ -103,6 +116,14 @@ func (r *AssetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } err = r.Client.Status().Update(ctx, &asset) + + if asset.Spec.DeploymentOverrides != nil && asset.Spec.DeploymentOverrides.Replicas != nil { + if asset.Status.Replicas != *asset.Spec.DeploymentOverrides.Replicas { + r.Log.Info("requeuing to monitor replica status", "status", asset.Status.Replicas, "spec", asset.Spec.DeploymentOverrides.Replicas) + return ctrl.Result{RequeueAfter: time.Second}, nil + } + } + return ctrl.Result{Requeue: false, RequeueAfter: 0}, err } @@ -113,5 +134,6 @@ func (r *AssetReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&appsv1.Deployment{}). Owns(&networkingv1.Ingress{}). Owns(&corev1.Service{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/internal/controller/asset_deployment.go b/internal/controller/asset_deployment.go index 4965379..aaba8e2 100644 --- a/internal/controller/asset_deployment.go +++ b/internal/controller/asset_deployment.go @@ -22,9 +22,9 @@ func (r *AssetReconciler) ReconcileDeployment(log logr.Logger) (bool, error) { } dep := appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: "asset", + Name: AssetDeploymentName, Namespace: r.NamespacedName.Namespace, - Labels: getAppLabels("asset"), + Labels: getAppLabels(AssetDeploymentName), }, } _, err := controllerutil.CreateOrUpdate(r.Context, r.Client, &dep, func() error { @@ -50,7 +50,7 @@ func (r *AssetReconciler) updateDeployment(dep *appsv1.Deployment, asset *terano } func defaultAssetDeploymentSpec() *appsv1.DeploymentSpec { - labels := getAppLabels("asset") + labels := getAppLabels(AssetDeploymentName) envFrom := []corev1.EnvFromSource{} env := []corev1.EnvVar{ { @@ -59,7 +59,7 @@ func defaultAssetDeploymentSpec() *appsv1.DeploymentSpec { }, } return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(2)), + Replicas: ptr.To(int32(DefaultAssetReplicas)), Selector: metav1.SetAsLabelSelector(labels), Strategy: appsv1.DeploymentStrategy{ Type: appsv1.RecreateDeploymentStrategyType, diff --git a/internal/controller/blockassembly_deployment.go b/internal/controller/blockassembly_deployment.go index 8020dd4..aa218b3 100644 --- a/internal/controller/blockassembly_deployment.go +++ b/internal/controller/blockassembly_deployment.go @@ -65,7 +65,7 @@ func defaultBlockAssemblyDeploymentSpec() *appsv1.DeploymentSpec { Strategy: appsv1.DeploymentStrategy{ Type: appsv1.RecreateDeploymentStrategyType, }, - Replicas: ptr.To(int32(1)), + Replicas: ptr.To(int32(DefaultBlockAssemblyReplicas)), Selector: metav1.SetAsLabelSelector(labels), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/controller/blockchain_controller.go b/internal/controller/blockchain_controller.go index 035173c..42f5c6d 100644 --- a/internal/controller/blockchain_controller.go +++ b/internal/controller/blockchain_controller.go @@ -123,6 +123,12 @@ func (r *BlockchainReconciler) Reconcile(ctx context.Context, req ctrl.Request) // FSM is disabled, skip FSM reconciliation return ctrl.Result{}, nil } + + if isConnectionError(err) { + r.Log.Info("FSM monitoring unavailable - operator may be running out-of-cluster. FSM state cannot be retrieved when .svc.cluster.local DNS is not resolvable", "error", err.Error()) + // Don't requeue aggressively if we know we're out-of-cluster + return ctrl.Result{Requeue: true, RequeueAfter: time.Minute * 5}, nil + } r.Log.Error(err, "unable to get FSM state, trying again in a minute") return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil } diff --git a/internal/controller/blockchain_deployment.go b/internal/controller/blockchain_deployment.go index 582c618..bc6c150 100644 --- a/internal/controller/blockchain_deployment.go +++ b/internal/controller/blockchain_deployment.go @@ -60,7 +60,7 @@ func defaultBlockchainDeploymentSpec() *appsv1.DeploymentSpec { } // TODO: set a default return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(1)), + Replicas: ptr.To(int32(DefaultBlockchainReplicas)), Selector: metav1.SetAsLabelSelector(podLabels), Strategy: appsv1.DeploymentStrategy{ Type: appsv1.RecreateDeploymentStrategyType, diff --git a/internal/controller/blockchain_fsm.go b/internal/controller/blockchain_fsm.go index c9d6086..03682da 100644 --- a/internal/controller/blockchain_fsm.go +++ b/internal/controller/blockchain_fsm.go @@ -3,6 +3,7 @@ package controller import ( "errors" "fmt" + "strings" "github.com/bsv-blockchain/teranode/services/blockchain" "github.com/bsv-blockchain/teranode/settings" @@ -90,3 +91,17 @@ func (r *BlockchainReconciler) ReconcileState(state blockchain.FSMStateType) err } return nil } + +// isConnectionError checks if the error is a gRPC connection/unavailability error +// This typically happens when the operator runs out-of-cluster and cannot resolve +// Kubernetes internal DNS names like *.svc.cluster.local +func isConnectionError(err error) bool { + if err == nil { + return false + } + errMsg := err.Error() + return strings.Contains(errMsg, "Unavailable") || + strings.Contains(errMsg, "no children to pick from") || + strings.Contains(errMsg, "connection refused") || + strings.Contains(errMsg, "no such host") +} diff --git a/internal/controller/blockpersister_deployment.go b/internal/controller/blockpersister_deployment.go index 21dd2cd..03f642a 100644 --- a/internal/controller/blockpersister_deployment.go +++ b/internal/controller/blockpersister_deployment.go @@ -62,11 +62,8 @@ func defaultBlockPersisterDeploymentSpec() *appsv1.DeploymentSpec { }, } return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(2)), + Replicas: ptr.To(int32(DefaultBlockPersisterReplicas)), Selector: metav1.SetAsLabelSelector(labels), - // Strategy: appsv1.DeploymentStrategy{ // TODO: confirm the lack of defined update strategy - // Type: appsv1.RecreateDeploymentStrategyType, - // }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ CreationTimestamp: metav1.Time{}, diff --git a/internal/controller/blockvalidator_deployment.go b/internal/controller/blockvalidator_deployment.go index abab273..cdb8133 100644 --- a/internal/controller/blockvalidator_deployment.go +++ b/internal/controller/blockvalidator_deployment.go @@ -60,7 +60,7 @@ func defaultBlockValidatorDeploymentSpec() *appsv1.DeploymentSpec { } // TODO: set a default return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(1)), + Replicas: ptr.To(int32(DefaultBlockValidationReplicas)), Selector: metav1.SetAsLabelSelector(podLabels), Strategy: appsv1.DeploymentStrategy{ Type: appsv1.RecreateDeploymentStrategyType, diff --git a/internal/controller/cluster_asset.go b/internal/controller/cluster_asset.go index c243f71..04552a8 100644 --- a/internal/controller/cluster_asset.go +++ b/internal/controller/cluster_asset.go @@ -55,18 +55,44 @@ func (r *ClusterReconciler) updateAsset(asset *teranodev1alpha1.Asset, cluster * if err != nil { return err } - asset.Spec = *defaultAssetSpec() - // if user configures a spec + // Only set defaults if this is a new resource (no spec configured yet) + if asset.Spec.DeploymentOverrides == nil && cluster.Spec.Asset.Spec == nil { + asset.Spec = *defaultAssetSpec() + } + + // Selectively merge cluster spec - only override fields that are explicitly set if cluster.Spec.Asset.Spec != nil { - asset.Spec = *cluster.Spec.Asset.Spec + clusterSpec := cluster.Spec.Asset.Spec + + // Merge ingress configurations + if clusterSpec.GrpcIngress != nil { + asset.Spec.GrpcIngress = clusterSpec.GrpcIngress + } + if clusterSpec.HTTPIngress != nil { + asset.Spec.HTTPIngress = clusterSpec.HTTPIngress + } + if clusterSpec.HTTPSIngress != nil { + asset.Spec.HTTPSIngress = clusterSpec.HTTPSIngress + } + + // Merge deployment overrides selectively + if clusterSpec.DeploymentOverrides != nil { + if asset.Spec.DeploymentOverrides == nil { + asset.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} + } + mergeDeploymentOverrides(asset.Spec.DeploymentOverrides, clusterSpec.DeploymentOverrides) + } } + + // Apply cluster-level defaults if asset.Spec.DeploymentOverrides == nil { asset.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} } if cluster.Spec.Image != "" && asset.Spec.DeploymentOverrides.Image == "" { asset.Spec.DeploymentOverrides.Image = cluster.Spec.Image } + // Always apply cluster-level ImagePullSecrets (they override or are the default) if cluster.Spec.ImagePullSecrets != nil { asset.Spec.DeploymentOverrides.ImagePullSecrets = cluster.Spec.ImagePullSecrets } diff --git a/internal/controller/cluster_controller.go b/internal/controller/cluster_controller.go index 94d31e4..cdfca42 100644 --- a/internal/controller/cluster_controller.go +++ b/internal/controller/cluster_controller.go @@ -20,9 +20,12 @@ import ( "context" "time" - "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/predicate" + networkingv1 "k8s.io/api/networking/v1" + + "github.com/go-logr/logr" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -144,5 +147,6 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&corev1.PersistentVolumeClaim{}). Owns(&networkingv1.NetworkPolicy{}). Owns(&networkingv1.Ingress{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/internal/controller/cluster_controller_test.go b/internal/controller/cluster_controller_test.go index 37d877b..8cbdc88 100644 --- a/internal/controller/cluster_controller_test.go +++ b/internal/controller/cluster_controller_test.go @@ -443,7 +443,7 @@ var _ = Describe("Cluster Controller", func() { // fetch asset deployment to verify pull secrets are set there too dep := &appsv1.Deployment{} Expect(k8sClient.Get(ctx, types.NamespacedName{ - Name: "asset", + Name: AssetDeploymentName, Namespace: "default", }, dep)).To(Succeed()) Expect(dep.Spec.Template.Spec.ImagePullSecrets).To(ContainElements(customSecrets)) diff --git a/internal/controller/cluster_helpers.go b/internal/controller/cluster_helpers.go new file mode 100644 index 0000000..9faae29 --- /dev/null +++ b/internal/controller/cluster_helpers.go @@ -0,0 +1,61 @@ +package controller + +import ( + teranodev1alpha1 "github.com/bsv-blockchain/teranode-operator/api/v1alpha1" +) + +// mergeDeploymentOverrides selectively merges deployment overrides from cluster spec +// Only fields explicitly set in clusterOverrides will override the target +func mergeDeploymentOverrides(target *teranodev1alpha1.DeploymentOverrides, clusterOverrides *teranodev1alpha1.DeploymentOverrides) { + if clusterOverrides.Replicas != nil { + target.Replicas = clusterOverrides.Replicas + } + if clusterOverrides.Image != "" { + target.Image = clusterOverrides.Image + } + if clusterOverrides.ImagePullPolicy != "" { + target.ImagePullPolicy = clusterOverrides.ImagePullPolicy + } + if clusterOverrides.ImagePullSecrets != nil { + target.ImagePullSecrets = clusterOverrides.ImagePullSecrets + } + if clusterOverrides.ServiceAccount != "" { + target.ServiceAccount = clusterOverrides.ServiceAccount + } + if clusterOverrides.ConfigMapName != "" { + target.ConfigMapName = clusterOverrides.ConfigMapName + } + if clusterOverrides.Resources != nil { + target.Resources = clusterOverrides.Resources + } + if clusterOverrides.NodeSelector != nil { + target.NodeSelector = clusterOverrides.NodeSelector + } + if clusterOverrides.Tolerations != nil { + target.Tolerations = clusterOverrides.Tolerations + } + if clusterOverrides.Affinity != nil { + target.Affinity = clusterOverrides.Affinity + } + if clusterOverrides.PodAntiAffinity != nil { + target.PodAntiAffinity = clusterOverrides.PodAntiAffinity + } + if clusterOverrides.Strategy != nil { + target.Strategy = clusterOverrides.Strategy + } + if len(clusterOverrides.Command) > 0 { + target.Command = clusterOverrides.Command + } + if len(clusterOverrides.Args) > 0 { + target.Args = clusterOverrides.Args + } + if len(clusterOverrides.Env) > 0 { + target.Env = clusterOverrides.Env + } + if len(clusterOverrides.EnvFrom) > 0 { + target.EnvFrom = clusterOverrides.EnvFrom + } + if clusterOverrides.ServiceAnnotations != nil { + target.ServiceAnnotations = clusterOverrides.ServiceAnnotations + } +} diff --git a/internal/controller/cluster_propagation.go b/internal/controller/cluster_propagation.go index 38465bd..c703e43 100644 --- a/internal/controller/cluster_propagation.go +++ b/internal/controller/cluster_propagation.go @@ -55,18 +55,53 @@ func (r *ClusterReconciler) updatePropagation(propagation *teranodev1alpha1.Prop if err != nil { return err } - propagation.Spec = *defaultPropagationSpec() - // if user configures a spec + // Only set defaults if this is a new resource (no spec configured yet) + if propagation.Spec.DeploymentOverrides == nil && cluster.Spec.Propagation.Spec == nil { + propagation.Spec = *defaultPropagationSpec() + } + + // Selectively merge cluster spec - only override fields that are explicitly set if cluster.Spec.Propagation.Spec != nil { - propagation.Spec = *cluster.Spec.Propagation.Spec + clusterSpec := cluster.Spec.Propagation.Spec + + // Merge ingress configurations + if clusterSpec.DelveIngress != nil { + propagation.Spec.DelveIngress = clusterSpec.DelveIngress + } + if clusterSpec.QuicIngress != nil { + propagation.Spec.QuicIngress = clusterSpec.QuicIngress + } + if clusterSpec.GrpcIngress != nil { + propagation.Spec.GrpcIngress = clusterSpec.GrpcIngress + } + if clusterSpec.HTTPIngress != nil { + propagation.Spec.HTTPIngress = clusterSpec.HTTPIngress + } + if clusterSpec.ProfilerIngress != nil { + propagation.Spec.ProfilerIngress = clusterSpec.ProfilerIngress + } + if clusterSpec.ServiceAnnotations != nil { + propagation.Spec.ServiceAnnotations = clusterSpec.ServiceAnnotations + } + + // Merge deployment overrides selectively + if clusterSpec.DeploymentOverrides != nil { + if propagation.Spec.DeploymentOverrides == nil { + propagation.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} + } + mergeDeploymentOverrides(propagation.Spec.DeploymentOverrides, clusterSpec.DeploymentOverrides) + } } + + // Apply cluster-level defaults if propagation.Spec.DeploymentOverrides == nil { propagation.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} } if cluster.Spec.Image != "" && propagation.Spec.DeploymentOverrides.Image == "" { propagation.Spec.DeploymentOverrides.Image = cluster.Spec.Image } + // Always apply cluster-level ImagePullSecrets (they override or are the default) if cluster.Spec.ImagePullSecrets != nil { propagation.Spec.DeploymentOverrides.ImagePullSecrets = cluster.Spec.ImagePullSecrets } diff --git a/internal/controller/cluster_rpc.go b/internal/controller/cluster_rpc.go index 4d1b496..c360917 100644 --- a/internal/controller/cluster_rpc.go +++ b/internal/controller/cluster_rpc.go @@ -55,12 +55,26 @@ func (r *ClusterReconciler) updateRPC(rpc *teranodev1alpha1.RPC, cluster *terano if err != nil { return err } - rpc.Spec = *defaultRPCSpec() - // if user configures a config map name + // Only set defaults if this is a new resource (no spec configured yet) + if rpc.Spec.DeploymentOverrides == nil && cluster.Spec.RPC.Spec == nil { + rpc.Spec = *defaultRPCSpec() + } + + // Selectively merge cluster spec - only override fields that are explicitly set if cluster.Spec.RPC.Spec != nil { - rpc.Spec = *cluster.Spec.RPC.Spec + clusterSpec := cluster.Spec.RPC.Spec + + // Merge deployment overrides selectively + if clusterSpec.DeploymentOverrides != nil { + if rpc.Spec.DeploymentOverrides == nil { + rpc.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} + } + mergeDeploymentOverrides(rpc.Spec.DeploymentOverrides, clusterSpec.DeploymentOverrides) + } } + + // Apply cluster-level defaults (only if not already set) if rpc.Spec.DeploymentOverrides == nil { rpc.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} } diff --git a/internal/controller/cluster_subtreevalidator.go b/internal/controller/cluster_subtreevalidator.go index f42fcb0..7b8d558 100644 --- a/internal/controller/cluster_subtreevalidator.go +++ b/internal/controller/cluster_subtreevalidator.go @@ -55,18 +55,38 @@ func (r *ClusterReconciler) updateSubtreeValidator(subtreeValidator *teranodev1a if err != nil { return err } - subtreeValidator.Spec = *defaultSubtreeValidatorSpec() - // if user configures a config map name + // Only set defaults if this is a new resource (no spec configured yet) + if subtreeValidator.Spec.DeploymentOverrides == nil && cluster.Spec.SubtreeValidator.Spec == nil { + subtreeValidator.Spec = *defaultSubtreeValidatorSpec() + } + + // Selectively merge cluster spec - only override fields that are explicitly set if cluster.Spec.SubtreeValidator.Spec != nil { - subtreeValidator.Spec = *cluster.Spec.SubtreeValidator.Spec + clusterSpec := cluster.Spec.SubtreeValidator.Spec + + // Merge service-specific fields + if clusterSpec.PodTemplateAnnotations != nil { + subtreeValidator.Spec.PodTemplateAnnotations = clusterSpec.PodTemplateAnnotations + } + + // Merge deployment overrides selectively + if clusterSpec.DeploymentOverrides != nil { + if subtreeValidator.Spec.DeploymentOverrides == nil { + subtreeValidator.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} + } + mergeDeploymentOverrides(subtreeValidator.Spec.DeploymentOverrides, clusterSpec.DeploymentOverrides) + } } + + // Apply cluster-level defaults (only if not already set) if subtreeValidator.Spec.DeploymentOverrides == nil { subtreeValidator.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} } if cluster.Spec.Image != "" && subtreeValidator.Spec.DeploymentOverrides.Image == "" { subtreeValidator.Spec.DeploymentOverrides.Image = cluster.Spec.Image } + // Always apply cluster-level ImagePullSecrets (they override or are the default) if cluster.Spec.ImagePullSecrets != nil { subtreeValidator.Spec.DeploymentOverrides.ImagePullSecrets = cluster.Spec.ImagePullSecrets } diff --git a/internal/controller/cluster_utxopersister.go b/internal/controller/cluster_utxopersister.go index 21cdf6a..f7d52c1 100644 --- a/internal/controller/cluster_utxopersister.go +++ b/internal/controller/cluster_utxopersister.go @@ -55,16 +55,17 @@ func (r *ClusterReconciler) updateUtxoPersister(up *teranodev1alpha1.UtxoPersist if err != nil { return err } + up.Spec = *defaultUtxoPersisterSpec() - // if user configures a config map name if cluster.Spec.UtxoPersister.Spec != nil { up.Spec = *cluster.Spec.UtxoPersister.Spec } + if up.Spec.DeploymentOverrides == nil { up.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} } - if cluster.Spec.Image != "" && up.Spec.DeploymentOverrides.Image == "" { + if cluster.Spec.Image != "" { up.Spec.DeploymentOverrides.Image = cluster.Spec.Image } if cluster.Spec.ImagePullSecrets != nil { diff --git a/internal/controller/cluster_validator.go b/internal/controller/cluster_validator.go index fc7d495..4c6be74 100644 --- a/internal/controller/cluster_validator.go +++ b/internal/controller/cluster_validator.go @@ -55,16 +55,18 @@ func (r *ClusterReconciler) updateValidator(validator *teranodev1alpha1.Validato if err != nil { return err } + validator.Spec = *defaultValidatorSpec() - // if user configures a config map name + // Selectively merge cluster spec - only override fields that are explicitly set if cluster.Spec.Validator.Spec != nil { validator.Spec = *cluster.Spec.Validator.Spec } + if validator.Spec.DeploymentOverrides == nil { validator.Spec.DeploymentOverrides = &teranodev1alpha1.DeploymentOverrides{} } - if cluster.Spec.Image != "" && validator.Spec.DeploymentOverrides.Image == "" { + if cluster.Spec.Image != "" { validator.Spec.DeploymentOverrides.Image = cluster.Spec.Image } if cluster.Spec.ImagePullSecrets != nil { diff --git a/internal/controller/constants.go b/internal/controller/constants.go index f438a89..fa1ae5c 100644 --- a/internal/controller/constants.go +++ b/internal/controller/constants.go @@ -48,3 +48,33 @@ const ( DebuggerPort = 4040 HealthPort = 8000 ) + +// Deployment Names +const ( + AssetDeploymentName = "asset" + BlockAssemblyDeploymentName = "block-assembly" + BlockchainDeploymentName = "blockchain" + BlockValidationDeploymentName = "block-validation" + BootstrapDeploymentName = "bootstrap" + CoinbaseDeploymentName = "coinbase" + MinerDeploymentName = "miner" + PropagationDeploymentName = "propagation" + SubtreeValidatorDeploymentName = "subtree-validator" + AlertSystemDeploymentName = "alert-system" +) + +// Replicas +const ( + DefaultAssetReplicas = 2 + DefaultBlockAssemblyReplicas = 2 + DefaultBlockchainReplicas = 1 + DefaultBlockPersisterReplicas = 1 + DefaultBlockValidationReplicas = 1 + DefaultLegacyReplicas = 1 + DefaultPeerReplicas = 1 + DefaultRPCReplicas = 1 + DefaultUtxoPersisterReplicas = 1 + DefaultPropagationReplicas = 2 + DefaultSubtreeValidatorReplicas = 2 + DefaultAlertSystemReplicas = 1 +) diff --git a/internal/controller/legacy_deployment.go b/internal/controller/legacy_deployment.go index 954b855..dd501e4 100644 --- a/internal/controller/legacy_deployment.go +++ b/internal/controller/legacy_deployment.go @@ -58,7 +58,7 @@ func defaultLegacyDeploymentSpec() *appsv1.DeploymentSpec { }, } return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(1)), + Replicas: ptr.To(int32(DefaultLegacyReplicas)), Selector: metav1.SetAsLabelSelector(labels), Strategy: appsv1.DeploymentStrategy{ Type: appsv1.RecreateDeploymentStrategyType, diff --git a/internal/controller/peer_deployment.go b/internal/controller/peer_deployment.go index e0f95ba..13cee9f 100644 --- a/internal/controller/peer_deployment.go +++ b/internal/controller/peer_deployment.go @@ -58,7 +58,7 @@ func defaultPeerDeploymentSpec() *appsv1.DeploymentSpec { }, } return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(1)), + Replicas: ptr.To(int32(DefaultPeerReplicas)), Selector: metav1.SetAsLabelSelector(podLabels), Strategy: appsv1.DeploymentStrategy{ Type: appsv1.RecreateDeploymentStrategyType, diff --git a/internal/controller/propagation_controller.go b/internal/controller/propagation_controller.go index 9c9ae3c..3c2fb6c 100644 --- a/internal/controller/propagation_controller.go +++ b/internal/controller/propagation_controller.go @@ -26,6 +26,9 @@ import ( networkingv1 "k8s.io/api/networking/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "sigs.k8s.io/controller-runtime/pkg/predicate" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -65,18 +68,32 @@ func (r *PropagationReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.Log = log.FromContext(ctx).WithValues("propagation", req.NamespacedName) r.Context = ctx r.NamespacedName = req.NamespacedName + + // Fetch the latest version of the Propagation CR propagation := teranodev1alpha1.Propagation{} if err := r.Get(ctx, req.NamespacedName, &propagation); err != nil { r.Log.Error(err, "unable to fetch propagation CR") return result, nil } - _, err := utils.ReconcileBatch(r.Log, + var err error + _, err = utils.ReconcileBatch(r.Log, r.ReconcileDeployment, r.ReconcileService, r.ReconcileGrpcIngress, ) + // Update scale status (replicas and selector) from deployment + deployment := &appsv1.Deployment{} + if err := r.Get(ctx, types.NamespacedName{ + Name: PropagationDeploymentName, + Namespace: propagation.Namespace, + }, deployment); err == nil { + replicas, selector := utils.GetScaleStatusFromDeployment(deployment) + propagation.Status.Replicas = replicas + propagation.Status.Selector = selector + } + if err != nil { apimeta.SetStatusCondition(&propagation.Status.Conditions, metav1.Condition{ @@ -86,11 +103,11 @@ func (r *PropagationReconciler) Reconcile(ctx context.Context, req ctrl.Request) Message: err.Error(), }, ) - _ = r.Client.Status().Update(ctx, &propagation) + _ = r.Status().Update(ctx, &propagation) // Since error is written on the status, let's log it and requeue // Returning error here is redundant r.Log.Error(err, "requeuing object for reconciliation") - return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil + return ctrl.Result{RequeueAfter: time.Second}, nil } else { apimeta.SetStatusCondition(&propagation.Status.Conditions, metav1.Condition{ @@ -102,8 +119,16 @@ func (r *PropagationReconciler) Reconcile(ctx context.Context, req ctrl.Request) ) } - err = r.Client.Status().Update(ctx, &propagation) - return ctrl.Result{Requeue: false, RequeueAfter: 0}, err + _ = r.Status().Update(ctx, &propagation) + + if propagation.Spec.DeploymentOverrides != nil && propagation.Spec.DeploymentOverrides.Replicas != nil { + if propagation.Status.Replicas != *propagation.Spec.DeploymentOverrides.Replicas { + r.Log.Info("requeuing to monitor replica status", "status", propagation.Status.Replicas, "spec", propagation.Spec.DeploymentOverrides.Replicas) + return ctrl.Result{RequeueAfter: time.Second}, nil + } + } + + return ctrl.Result{RequeueAfter: 0}, nil } // SetupWithManager sets up the controller with the Manager. @@ -113,5 +138,6 @@ func (r *PropagationReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&appsv1.Deployment{}). Owns(&networkingv1.Ingress{}). Owns(&corev1.Service{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/internal/controller/propagation_deployment.go b/internal/controller/propagation_deployment.go index bad25bc..619f2df 100644 --- a/internal/controller/propagation_deployment.go +++ b/internal/controller/propagation_deployment.go @@ -22,15 +22,18 @@ func (r *PropagationReconciler) ReconcileDeployment(log logr.Logger) (bool, erro } dep := appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: "propagation", + Name: PropagationDeploymentName, Namespace: r.NamespacedName.Namespace, Labels: getAppLabels("propagation"), }, } + + // CreateOrUpdate with retry on conflicts - controller-runtime handles retries internally _, err := controllerutil.CreateOrUpdate(r.Context, r.Client, &dep, func() error { return r.updateDeployment(&dep, &propagation) }) if err != nil { + // Let the controller-runtime retry mechanism handle conflicts return false, err } return true, nil @@ -41,8 +44,18 @@ func (r *PropagationReconciler) updateDeployment(dep *appsv1.Deployment, propaga if err != nil { return err } - dep.Spec = *defaultPropagationDeploymentSpec() - utils.SetDeploymentOverrides(r.Client, dep, propagation) + + // Check if this is a new deployment (no ResourceVersion yet) + isNewDeployment := dep.ResourceVersion == "" + + // Only set the full spec for new deployments + // For existing deployments, we'll selectively update fields to avoid conflicts + if isNewDeployment { + dep.Spec = *defaultPropagationDeploymentSpec() + } + + // Apply CR spec to deployment + utils.SetDeploymentOverridesWithContext(r.Context, r.Log, r.Client, dep, propagation, "Propagation") utils.SetClusterOverrides(r.Client, dep, propagation) return nil @@ -58,11 +71,8 @@ func defaultPropagationDeploymentSpec() *appsv1.DeploymentSpec { }, } return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(2)), // TODO: verify the replicas number, the spec has 2 + Replicas: ptr.To(int32(DefaultPropagationReplicas)), Selector: metav1.SetAsLabelSelector(labels), - // Strategy: appsv1.DeploymentStrategy{ // TODO: verify if no strategy should be used by default - // Type: appsv1.RecreateDeploymentStrategyType, - // }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ CreationTimestamp: metav1.Time{}, diff --git a/internal/controller/rpc_deployment.go b/internal/controller/rpc_deployment.go index c8ff3f5..05cc072 100644 --- a/internal/controller/rpc_deployment.go +++ b/internal/controller/rpc_deployment.go @@ -58,7 +58,7 @@ func defaultRPCDeploymentSpec() *appsv1.DeploymentSpec { }, } return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(1)), + Replicas: ptr.To(int32(DefaultRPCReplicas)), Selector: metav1.SetAsLabelSelector(labels), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/controller/subtreevalidator_controller.go b/internal/controller/subtreevalidator_controller.go index ec2df3e..7af8ac1 100644 --- a/internal/controller/subtreevalidator_controller.go +++ b/internal/controller/subtreevalidator_controller.go @@ -25,8 +25,10 @@ import ( corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -75,6 +77,17 @@ func (r *SubtreeValidatorReconciler) Reconcile(ctx context.Context, req ctrl.Req r.ReconcileService, ) + // Update scale status (replicas and selector) from deployment + deployment := &appsv1.Deployment{} + if err := r.Get(ctx, types.NamespacedName{ + Name: SubtreeValidatorDeploymentName, + Namespace: req.Namespace, + }, deployment); err == nil { + replicas, selector := utils.GetScaleStatusFromDeployment(deployment) + subtreeValidator.Status.Replicas = replicas + subtreeValidator.Status.Selector = selector + } + if err != nil { apimeta.SetStatusCondition(&subtreeValidator.Status.Conditions, metav1.Condition{ @@ -101,6 +114,14 @@ func (r *SubtreeValidatorReconciler) Reconcile(ctx context.Context, req ctrl.Req } err = r.Client.Status().Update(ctx, &subtreeValidator) + + if subtreeValidator.Spec.DeploymentOverrides != nil && subtreeValidator.Spec.DeploymentOverrides.Replicas != nil { + if subtreeValidator.Status.Replicas != *subtreeValidator.Spec.DeploymentOverrides.Replicas { + r.Log.Info("requeuing to monitor replica status", "status", subtreeValidator.Status.Replicas, "spec", subtreeValidator.Spec.DeploymentOverrides.Replicas) + return ctrl.Result{RequeueAfter: time.Second}, nil + } + } + return ctrl.Result{Requeue: false, RequeueAfter: 0}, err } @@ -111,5 +132,6 @@ func (r *SubtreeValidatorReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&appsv1.Deployment{}). Owns(&corev1.Service{}). Owns(&corev1.PersistentVolumeClaim{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/internal/controller/subtreevalidator_deployment.go b/internal/controller/subtreevalidator_deployment.go index 72b0cc4..7d6cd25 100644 --- a/internal/controller/subtreevalidator_deployment.go +++ b/internal/controller/subtreevalidator_deployment.go @@ -58,7 +58,7 @@ func defaultSubtreeValidatorDeploymentSpec() *appsv1.DeploymentSpec { }, } return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(2)), + Replicas: ptr.To(int32(DefaultSubtreeValidatorReplicas)), Selector: metav1.SetAsLabelSelector(labels), Strategy: appsv1.DeploymentStrategy{ Type: appsv1.RecreateDeploymentStrategyType, diff --git a/internal/controller/utxopersister_deployment.go b/internal/controller/utxopersister_deployment.go index 6758f1c..8676627 100644 --- a/internal/controller/utxopersister_deployment.go +++ b/internal/controller/utxopersister_deployment.go @@ -62,7 +62,7 @@ func defaultUtxoPersisterDeploymentSpec() *appsv1.DeploymentSpec { }, } return &appsv1.DeploymentSpec{ - Replicas: ptr.To(int32(1)), + Replicas: ptr.To(int32(DefaultUtxoPersisterReplicas)), Selector: metav1.SetAsLabelSelector(labels), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/utils/deployment.go b/internal/utils/deployment.go index baf1b4d..a15d3e7 100644 --- a/internal/utils/deployment.go +++ b/internal/utils/deployment.go @@ -19,8 +19,11 @@ package utils import ( "context" + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/bsv-blockchain/teranode-operator/api/v1alpha1" @@ -28,6 +31,10 @@ import ( //nolint:gocognit,gocyclo // Function complexity is inherent to handling multiple override cases func SetDeploymentOverrides(client client.Client, dep *appsv1.Deployment, cr v1alpha1.TeranodeService) { + SetDeploymentOverridesWithContext(context.Background(), logr.Logger{}, client, dep, cr, "") +} + +func SetDeploymentOverridesWithContext(ctx context.Context, log logr.Logger, client client.Client, dep *appsv1.Deployment, cr v1alpha1.TeranodeService, crKind string) { if cr.DeploymentOverrides() == nil { return } @@ -51,11 +58,6 @@ func SetDeploymentOverrides(client client.Client, dep *appsv1.Deployment, cr v1a dep.Spec.Template.Spec.Containers[0].Resources = *cr.DeploymentOverrides().Resources } - // if user configures replicas - if cr.DeploymentOverrides().Replicas != nil { - dep.Spec.Replicas = cr.DeploymentOverrides().Replicas - } - // if user configures image or image pull policy if cr.DeploymentOverrides().Image != "" { dep.Spec.Template.Spec.Containers[0].Image = cr.DeploymentOverrides().Image @@ -114,6 +116,9 @@ func SetDeploymentOverrides(client client.Client, dep *appsv1.Deployment, cr v1a } if len(cr.DeploymentOverrides().VolumeMounts) > 0 { dep.Spec.Template.Spec.Containers[0].VolumeMounts = append(dep.Spec.Template.Spec.Containers[0].VolumeMounts, cr.DeploymentOverrides().VolumeMounts...) + if cr.DeploymentOverrides().Replicas != nil { + dep.Spec.Replicas = cr.DeploymentOverrides().Replicas + } } } @@ -124,18 +129,24 @@ func SetClusterOverrides(client client.Client, dep *appsv1.Deployment, cr v1alph return } if clusterOwner.Spec.ConfigMapName != "" { - dep.Spec.Template.Spec.Containers[0].EnvFrom = append(dep.Spec.Template.Spec.Containers[0].EnvFrom, corev1.EnvFromSource{ - ConfigMapRef: &corev1.ConfigMapEnvSource{ - LocalObjectReference: corev1.LocalObjectReference{Name: clusterOwner.Spec.ConfigMapName}, - }, - }) + exists := false + for _, envFrom := range dep.Spec.Template.Spec.Containers[0].EnvFrom { + if envFrom.ConfigMapRef != nil && envFrom.ConfigMapRef.Name == clusterOwner.Spec.ConfigMapName { + // ConfigMap already present, skip adding + exists = true + } + } + if !exists { + dep.Spec.Template.Spec.Containers[0].EnvFrom = append(dep.Spec.Template.Spec.Containers[0].EnvFrom, corev1.EnvFromSource{ + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: clusterOwner.Spec.ConfigMapName}, + }, + }) + } } if len(clusterOwner.Spec.Env) > 0 { dep.Spec.Template.Spec.Containers[0].Env = append(dep.Spec.Template.Spec.Containers[0].Env, clusterOwner.Spec.Env...) } - if len(clusterOwner.Spec.EnvFrom) > 0 { - dep.Spec.Template.Spec.Containers[0].EnvFrom = append(dep.Spec.Template.Spec.Containers[0].EnvFrom, clusterOwner.Spec.EnvFrom...) - } if clusterOwner.Spec.ImagePullSecrets != nil { if dep.Spec.Template.Spec.ImagePullSecrets == nil { dep.Spec.Template.Spec.ImagePullSecrets = []corev1.LocalObjectReference{} @@ -145,4 +156,77 @@ func SetClusterOverrides(client client.Client, dep *appsv1.Deployment, cr v1alph if clusterOwner.Spec.Image != "" { dep.Spec.Template.Spec.Containers[0].Image = clusterOwner.Spec.Image } + if len(clusterOwner.Spec.EnvFrom) == 0 { + return + } + + // Append cluster envFrom vars, avoiding duplicates + for _, clusterEnvFrom := range clusterOwner.Spec.EnvFrom { + found := false + for _, envFrom := range dep.Spec.Template.Spec.Containers[0].EnvFrom { + if envFrom.ConfigMapRef != nil && clusterEnvFrom.ConfigMapRef != nil && + envFrom.ConfigMapRef.Name == clusterEnvFrom.ConfigMapRef.Name { + // ConfigMap already present, skip adding + found = true + } + if envFrom.SecretRef != nil && clusterEnvFrom.SecretRef != nil && + envFrom.SecretRef.Name == clusterEnvFrom.SecretRef.Name { + // Secret already present, skip adding + found = true + } + } + if !found { + dep.Spec.Template.Spec.Containers[0].EnvFrom = append(dep.Spec.Template.Spec.Containers[0].EnvFrom, clusterEnvFrom) + } + } +} + +// ScaleStatus defines the interface for CRs that support scale subresource +type ScaleStatus interface { + SetReplicas(replicas int32) + SetSelector(selector string) +} + +// UpdateScaleStatus updates the scale-related status fields (replicas and selector) for a CR +// based on the actual deployment state. This is required for the /scale subresource to work properly. +func UpdateScaleStatus(ctx context.Context, c client.Client, namespace, deploymentName string, status ScaleStatus) error { + deployment := &appsv1.Deployment{} + err := c.Get(ctx, types.NamespacedName{ + Name: deploymentName, + Namespace: namespace, + }, deployment) + if err != nil { + return err + } + + // Update replicas from deployment status + if deployment.Status.Replicas != 0 { + status.SetReplicas(deployment.Status.Replicas) + } else if deployment.Spec.Replicas != nil { + status.SetReplicas(*deployment.Spec.Replicas) + } + + // Update selector from deployment spec + if deployment.Spec.Selector != nil { + selector := metav1.FormatLabelSelector(deployment.Spec.Selector) + status.SetSelector(selector) + } + + return nil +} + +// GetScaleStatusFromDeployment extracts replicas and selector from a deployment +func GetScaleStatusFromDeployment(deployment *appsv1.Deployment) (replicas int32, selector string) { + if deployment.Status.ReadyReplicas != 0 { + replicas = deployment.Status.ReadyReplicas + } else if deployment.Spec.Replicas != nil { + replicas = *deployment.Spec.Replicas + } + + if deployment.Spec.Selector != nil { + // Use FormatLabelSelector for proper formatting + selector = metav1.FormatLabelSelector(deployment.Spec.Selector) + } + + return replicas, selector } diff --git a/test/utils/utils.go b/test/utils/utils.go index e773788..db26e96 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -64,6 +64,7 @@ func Run(cmd *exec.Cmd) ([]byte, error) { cmd.Env = append(os.Environ(), "GO111MODULE=on") command := strings.Join(cmd.Args, " ") _, _ = fmt.Fprintf(ginkgo.GinkgoWriter, "running: %s\n", command) + output, err := cmd.CombinedOutput() if err != nil { return output, fmt.Errorf("%s failed with error: %w %s", command, err, string(output))