Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/v1alpha1/asset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ type AssetStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Conditions []metav1.Condition `json:"conditions,omitempty"`
// Replicas is the number of actual replicas of the asset deployment
Replicas int32 `json:"replicas,omitempty"`
// Selector is the label selector for pods corresponding to this asset deployment
Selector string `json:"selector,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:subresource:scale:specpath=.spec.deploymentOverrides.replicas,statuspath=.status.replicas,selectorpath=.status.selector

// Asset is the Schema for the assets API
type Asset struct {
Expand Down
9 changes: 9 additions & 0 deletions api/v1alpha1/propagation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,19 @@ type PropagationSpec struct {
// PropagationStatus defines the observed state of Propagation
type PropagationStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
// Replicas is the number of actual replicas of the propagation deployment
Replicas int32 `json:"replicas,omitempty"`
// Selector is the label selector for pods corresponding to this propagation deployment
Selector string `json:"selector,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:subresource:scale:specpath=.spec.deploymentOverrides.replicas,statuspath=.status.replicas,selectorpath=.status.selector
//+kubebuilder:printcolumn:name="Desired",type=integer,JSONPath=`.spec.deploymentOverrides.replicas`,description="Desired number of replicas"
//+kubebuilder:printcolumn:name="Current",type=integer,JSONPath=`.status.replicas`,description="Current number of replicas"
//+kubebuilder:printcolumn:name="Ready",type=string,JSONPath=`.status.conditions[?(@.type=="Reconciled")].status`,description="Ready status"
//+kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp`

// Propagation is the Schema for the propagations API
type Propagation struct {
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/subtreevalidator_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ type SubtreeValidatorStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Conditions []metav1.Condition `json:"conditions,omitempty"`
// Replicas is the number of actual replicas of the subtreevalidator deployment
Replicas int32 `json:"replicas,omitempty"`
// Selector is the label selector for pods corresponding to this subtreevalidator deployment
Selector string `json:"selector,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:subresource:scale:specpath=.spec.deploymentOverrides.replicas,statuspath=.status.replicas,selectorpath=.status.selector

// SubtreeValidator is the Schema for the subtreevalidators API
type SubtreeValidator struct {
Expand Down
44 changes: 33 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (
"sort"
"strings"

teranodev1alpha1 "github.com/bsv-blockchain/teranode-operator/api/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
scalev1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -39,7 +42,6 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"

teranodev1alpha1 "github.com/bsv-blockchain/teranode-operator/api/v1alpha1"
"github.com/bsv-blockchain/teranode-operator/internal/controller"
)

Expand All @@ -55,13 +57,6 @@ const (
CreateControllerError = "unable to create controller"
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

utilruntime.Must(teranodev1alpha1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

//nolint:gocognit,gocyclo // Main function complexity is acceptable for initialization
func main() {
var metricsAddr string
Expand All @@ -70,7 +65,7 @@ func main() {
var secureMetrics bool
var enableHTTP2 bool
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8181", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
Expand All @@ -84,6 +79,33 @@ func main() {
opts.BindFlags(flag.CommandLine)
flag.Parse()

if err := scalev1.AddToScheme(scheme); err != nil {
setupLog.Error(err, "unable to add autoscaling/v1 scheme")
os.Exit(1)
}

if err := teranodev1alpha1.AddToScheme(scheme); err != nil {
setupLog.Error(err, "unable to add v1alpha1 scheme")
os.Exit(1)
}

if err := appsv1.AddToScheme(scheme); err != nil {
setupLog.Error(err, "unable to add apps/v1 scheme")
os.Exit(1)
}

if err := corev1.AddToScheme(scheme); err != nil {
setupLog.Error(err, "unable to add core/v1 scheme")
os.Exit(1)
}

if err := networkingv1.AddToScheme(scheme); err != nil {
setupLog.Error(err, "unable to add networking/v1 scheme")
os.Exit(1)
}

//+kubebuilder:scaffold:scheme

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

// if the enable-http2 flag is false (the default), http/2 should be disabled
Expand Down
13 changes: 13 additions & 0 deletions config/crd/bases/teranode.bsvblockchain.org_assets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3820,9 +3820,22 @@ spec:
- type
type: object
type: array
replicas:
description: Replicas is the number of actual replicas of the asset
deployment
format: int32
type: integer
selector:
description: Selector is the label selector for pods corresponding
to this asset deployment
type: string
type: object
type: object
served: true
storage: true
subresources:
scale:
labelSelectorPath: .status.selector
specReplicasPath: .spec.deploymentOverrides.replicas
statusReplicasPath: .status.replicas
status: {}
31 changes: 30 additions & 1 deletion config/crd/bases/teranode.bsvblockchain.org_propagations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,23 @@ spec:
singular: propagation
scope: Namespaced
versions:
- name: v1alpha1
- additionalPrinterColumns:
- description: Desired number of replicas
jsonPath: .spec.deploymentOverrides.replicas
name: Desired
type: integer
- description: Current number of replicas
jsonPath: .status.replicas
name: Current
type: integer
- description: Ready status
jsonPath: .status.conditions[?(@.type=="Reconciled")].status
name: Ready
type: string
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v1alpha1
schema:
openAPIV3Schema:
description: Propagation is the Schema for the propagations API
Expand Down Expand Up @@ -3849,9 +3865,22 @@ spec:
- type
type: object
type: array
replicas:
description: Replicas is the number of actual replicas of the propagation
deployment
format: int32
type: integer
selector:
description: Selector is the label selector for pods corresponding
to this propagation deployment
type: string
type: object
type: object
served: true
storage: true
subresources:
scale:
labelSelectorPath: .status.selector
specReplicasPath: .spec.deploymentOverrides.replicas
statusReplicasPath: .status.replicas
status: {}
Original file line number Diff line number Diff line change
Expand Up @@ -3782,9 +3782,22 @@ spec:
- type
type: object
type: array
replicas:
description: Replicas is the number of actual replicas of the subtreevalidator
deployment
format: int32
type: integer
selector:
description: Selector is the label selector for pods corresponding
to this subtreevalidator deployment
type: string
type: object
type: object
served: true
storage: true
subresources:
scale:
labelSelectorPath: .status.selector
specReplicasPath: .spec.deploymentOverrides.replicas
statusReplicasPath: .status.replicas
status: {}
2 changes: 1 addition & 1 deletion config/rbac/teranode_service_runner_service_account.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ apiVersion: v1
kind: ServiceAccount
metadata:
name: service-runner
namespace: system
namespace: system
11 changes: 10 additions & 1 deletion config/samples/teranode_v1alpha1_propagation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,20 @@ metadata:
app.kubernetes.io/created-by: teranode-operator
name: propagation-sample
spec:
# TODO(user): Add fields here
resources:
requests:
cpu: 1
memory: 1Gi
limits:
memory: 2Gi
imagePullPolicy: Always
replicas: 1
deploymentOverrides:
replicas: 1
resources:
requests:
cpu: 1
memory: 1Gi
limits:
memory: 2Gi
imagePullPolicy: Always
22 changes: 22 additions & 0 deletions internal/controller/asset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
networkingv1 "k8s.io/api/networking/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -77,6 +79,17 @@
r.ReconcileHTTPSIngress,
)

// Update scale status (replicas and selector) from deployment
deployment := &appsv1.Deployment{}
if err := r.Get(ctx, types.NamespacedName{

Check warning on line 84 in internal/controller/asset_controller.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unnecessary variable declaration and use the expression directly in the condition.

See more on https://sonarcloud.io/project/issues?id=bsv-blockchain_teranode-operator&issues=AZrfaNCA-h1a3WPhFJfS&open=AZrfaNCA-h1a3WPhFJfS&pullRequest=28

Check failure on line 84 in internal/controller/asset_controller.go

View workflow job for this annotation

GitHub Actions / 📊 Code Quality / ✨ Lint Code

shadow: declaration of "err" shadows declaration at line 75 (govet)

Check failure on line 84 in internal/controller/asset_controller.go

View workflow job for this annotation

GitHub Actions / 🪝 Pre-commit Checks / 🪝 Pre-commit Checks

shadow: declaration of "err" shadows declaration at line 75 (govet)

Check failure on line 84 in internal/controller/asset_controller.go

View workflow job for this annotation

GitHub Actions / 🪝 Pre-commit Checks / 🪝 Pre-commit Checks

shadow: declaration of "err" shadows declaration at line 75 (govet)
Name: AssetDeploymentName,
Namespace: asset.Namespace,
}, deployment); err == nil {
replicas, selector := utils.GetScaleStatusFromDeployment(deployment)
asset.Status.Replicas = replicas
asset.Status.Selector = selector
}

if err != nil {
apimeta.SetStatusCondition(&asset.Status.Conditions,
metav1.Condition{
Expand All @@ -103,6 +116,14 @@
}

err = r.Client.Status().Update(ctx, &asset)

if asset.Spec.DeploymentOverrides != nil && asset.Spec.DeploymentOverrides.Replicas != nil {
if asset.Status.Replicas != *asset.Spec.DeploymentOverrides.Replicas {
r.Log.Info("requeuing to monitor replica status", "status", asset.Status.Replicas, "spec", asset.Spec.DeploymentOverrides.Replicas)
return ctrl.Result{RequeueAfter: time.Second}, nil
}
}

return ctrl.Result{Requeue: false, RequeueAfter: 0}, err
}

Expand All @@ -113,5 +134,6 @@
Owns(&appsv1.Deployment{}).
Owns(&networkingv1.Ingress{}).
Owns(&corev1.Service{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}
8 changes: 4 additions & 4 deletions internal/controller/asset_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func (r *AssetReconciler) ReconcileDeployment(log logr.Logger) (bool, error) {
}
dep := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "asset",
Name: AssetDeploymentName,
Namespace: r.NamespacedName.Namespace,
Labels: getAppLabels("asset"),
Labels: getAppLabels(AssetDeploymentName),
},
}
_, err := controllerutil.CreateOrUpdate(r.Context, r.Client, &dep, func() error {
Expand All @@ -50,7 +50,7 @@ func (r *AssetReconciler) updateDeployment(dep *appsv1.Deployment, asset *terano
}

func defaultAssetDeploymentSpec() *appsv1.DeploymentSpec {
labels := getAppLabels("asset")
labels := getAppLabels(AssetDeploymentName)
envFrom := []corev1.EnvFromSource{}
env := []corev1.EnvVar{
{
Expand All @@ -59,7 +59,7 @@ func defaultAssetDeploymentSpec() *appsv1.DeploymentSpec {
},
}
return &appsv1.DeploymentSpec{
Replicas: ptr.To(int32(2)),
Replicas: ptr.To(int32(DefaultAssetReplicas)),
Selector: metav1.SetAsLabelSelector(labels),
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RecreateDeploymentStrategyType,
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/blockassembly_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func defaultBlockAssemblyDeploymentSpec() *appsv1.DeploymentSpec {
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RecreateDeploymentStrategyType,
},
Replicas: ptr.To(int32(1)),
Replicas: ptr.To(int32(DefaultBlockAssemblyReplicas)),
Selector: metav1.SetAsLabelSelector(labels),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/blockchain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ func (r *BlockchainReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// FSM is disabled, skip FSM reconciliation
return ctrl.Result{}, nil
}

if isConnectionError(err) {
r.Log.Info("FSM monitoring unavailable - operator may be running out-of-cluster. FSM state cannot be retrieved when .svc.cluster.local DNS is not resolvable", "error", err.Error())
// Don't requeue aggressively if we know we're out-of-cluster
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute * 5}, nil
}
r.Log.Error(err, "unable to get FSM state, trying again in a minute")
return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/blockchain_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func defaultBlockchainDeploymentSpec() *appsv1.DeploymentSpec {
}
// TODO: set a default
return &appsv1.DeploymentSpec{
Replicas: ptr.To(int32(1)),
Replicas: ptr.To(int32(DefaultBlockchainReplicas)),
Selector: metav1.SetAsLabelSelector(podLabels),
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RecreateDeploymentStrategyType,
Expand Down
15 changes: 15 additions & 0 deletions internal/controller/blockchain_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"errors"
"fmt"
"strings"

"github.com/bsv-blockchain/teranode/services/blockchain"
"github.com/bsv-blockchain/teranode/settings"
Expand Down Expand Up @@ -90,3 +91,17 @@ func (r *BlockchainReconciler) ReconcileState(state blockchain.FSMStateType) err
}
return nil
}

// isConnectionError checks if the error is a gRPC connection/unavailability error
// This typically happens when the operator runs out-of-cluster and cannot resolve
// Kubernetes internal DNS names like *.svc.cluster.local
func isConnectionError(err error) bool {
if err == nil {
return false
}
errMsg := err.Error()
return strings.Contains(errMsg, "Unavailable") ||
strings.Contains(errMsg, "no children to pick from") ||
strings.Contains(errMsg, "connection refused") ||
strings.Contains(errMsg, "no such host")
}
Loading
Loading