Skip to content

Commit 4dc1932

Browse files
david-yuclaude
andcommitted
operator: add PodDisruptionBudget support to Pipeline CRD
Add spec.budget field to Pipeline with maxUnavailable/minAvailable options, following the convention used by Strimzi and Prometheus Operator. The PDB is rendered by the Syncer alongside the Deployment and ConfigMap, so it is automatically garbage-collected on CR deletion. CRD validation enforces exactly one of maxUnavailable or minAvailable via CEL rule. RBAC updated for policy/poddisruptionbudgets. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d46f37b commit 4dc1932

6 files changed

Lines changed: 198 additions & 0 deletions

File tree

operator/api/redpanda/v1alpha2/pipeline_types.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ package v1alpha2
1212
import (
1313
corev1 "k8s.io/api/core/v1"
1414
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/util/intstr"
1516
"k8s.io/utils/ptr"
1617

1718
"github.com/redpanda-data/redpanda-operator/operator/pkg/functional"
@@ -178,11 +179,33 @@ type PipelineSpec struct {
178179
// +optional
179180
Zones []string `json:"zones,omitempty"`
180181

182+
// Budget configures a PodDisruptionBudget for the pipeline Deployment,
183+
// protecting pipeline pods from voluntary disruptions such as node drains
184+
// and cluster autoscaler evictions. When not set, no PDB is created.
185+
// +optional
186+
Budget *PipelineBudget `json:"budget,omitempty"`
187+
181188
// ClusterSource is a reference to the Redpanda cluster this pipeline connects to.
182189
// +optional
183190
ClusterSource *ClusterSource `json:"cluster,omitempty"`
184191
}
185192

193+
// PipelineBudget configures a PodDisruptionBudget for the pipeline.
194+
// Exactly one of MaxUnavailable or MinAvailable must be specified.
195+
// +kubebuilder:validation:XValidation:message="exactly one of maxUnavailable or minAvailable must be set",rule="has(self.maxUnavailable) != has(self.minAvailable)"
196+
type PipelineBudget struct {
197+
// MaxUnavailable is the maximum number of pipeline pods that can be
198+
// unavailable during a voluntary disruption. Can be an absolute number
199+
// (e.g. 1) or a percentage (e.g. "25%").
200+
// +optional
201+
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
202+
// MinAvailable is the minimum number of pipeline pods that must remain
203+
// available during a voluntary disruption. Can be an absolute number
204+
// (e.g. 2) or a percentage (e.g. "75%").
205+
// +optional
206+
MinAvailable *intstr.IntOrString `json:"minAvailable,omitempty"`
207+
}
208+
186209
// PipelineStatus defines the observed state of a Connect resource.
187210
type PipelineStatus struct {
188211
// ObservedGeneration is the last observed generation of the Connect resource.

operator/chart/files/rbac/pipeline.ClusterRole.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,18 @@ rules:
7676
- get
7777
- list
7878
- watch
79+
- apiGroups:
80+
- policy
81+
resources:
82+
- poddisruptionbudgets
83+
verbs:
84+
- get
85+
- list
86+
- watch
87+
- create
88+
- update
89+
- patch
90+
- delete
7991
- apiGroups:
8092
- monitoring.coreos.com
8193
resources:

operator/config/crd/bases/cluster.redpanda.com_pipelines.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,34 @@ spec:
6666
per-pipeline annotations taking precedence. Useful for integrations like
6767
Datadog autodiscovery that rely on pod annotations.
6868
type: object
69+
budget:
70+
description: |-
71+
Budget configures a PodDisruptionBudget for the pipeline Deployment,
72+
protecting pipeline pods from voluntary disruptions such as node drains
73+
and cluster autoscaler evictions. When not set, no PDB is created.
74+
properties:
75+
maxUnavailable:
76+
anyOf:
77+
- type: integer
78+
- type: string
79+
description: |-
80+
MaxUnavailable is the maximum number of pipeline pods that can be
81+
unavailable during a voluntary disruption. Can be an absolute number
82+
(e.g. 1) or a percentage (e.g. "25%").
83+
x-kubernetes-int-or-string: true
84+
minAvailable:
85+
anyOf:
86+
- type: integer
87+
- type: string
88+
description: |-
89+
MinAvailable is the minimum number of pipeline pods that must remain
90+
available during a voluntary disruption. Can be an absolute number
91+
(e.g. 2) or a percentage (e.g. "75%").
92+
x-kubernetes-int-or-string: true
93+
type: object
94+
x-kubernetes-validations:
95+
- message: exactly one of maxUnavailable or minAvailable must be set
96+
rule: has(self.maxUnavailable) != has(self.minAvailable)
6997
cluster:
7098
description: ClusterSource is a reference to the Redpanda cluster
7199
this pipeline connects to.

operator/config/rbac/itemized/pipeline.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,18 @@ rules:
7676
- get
7777
- list
7878
- watch
79+
- apiGroups:
80+
- policy
81+
resources:
82+
- poddisruptionbudgets
83+
verbs:
84+
- get
85+
- list
86+
- watch
87+
- create
88+
- update
89+
- patch
90+
- delete
7991
- apiGroups:
8092
- monitoring.coreos.com
8193
resources:

operator/internal/controller/pipeline/controller_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import (
2727
"github.com/stretchr/testify/require"
2828
appsv1 "k8s.io/api/apps/v1"
2929
corev1 "k8s.io/api/core/v1"
30+
policyv1 "k8s.io/api/policy/v1"
3031
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
3132
apierrors "k8s.io/apimachinery/pkg/api/errors"
3233
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3334
"k8s.io/apimachinery/pkg/runtime"
35+
"k8s.io/apimachinery/pkg/util/intstr"
3436
ctrl "sigs.k8s.io/controller-runtime"
3537
"sigs.k8s.io/controller-runtime/pkg/client"
3638
"sigs.k8s.io/yaml"
@@ -667,6 +669,87 @@ func TestRender_Deployment_Zones(t *testing.T) {
667669
assert.Equal(t, zoneTopologyKey, dp.Spec.Template.Spec.TopologySpreadConstraints[0].TopologyKey)
668670
}
669671

672+
// PodDisruptionBudget tests.
673+
674+
func TestRender_PDB_NotConfigured(t *testing.T) {
675+
pipeline := &redpandav1alpha2.Pipeline{
676+
ObjectMeta: metav1.ObjectMeta{Name: "no-pdb", Namespace: "default"},
677+
Spec: redpandav1alpha2.PipelineSpec{ConfigYAML: "input:\n stdin: {}\noutput:\n stdout: {}\n"},
678+
}
679+
680+
r := &render{pipeline: pipeline, labels: Labels(pipeline)}
681+
objs, err := r.Render(t.Context())
682+
require.NoError(t, err)
683+
684+
// Should only have ConfigMap + Deployment, no PDB.
685+
for _, obj := range objs {
686+
assert.NotEqual(t, "PodDisruptionBudget", obj.GetObjectKind().GroupVersionKind().Kind)
687+
}
688+
}
689+
690+
func TestRender_PDB_MaxUnavailable(t *testing.T) {
691+
maxUnavail := intstr.FromInt32(1)
692+
pipeline := &redpandav1alpha2.Pipeline{
693+
ObjectMeta: metav1.ObjectMeta{Name: "pdb-max", Namespace: "default"},
694+
Spec: redpandav1alpha2.PipelineSpec{
695+
ConfigYAML: "input:\n stdin: {}\noutput:\n stdout: {}\n",
696+
Budget: &redpandav1alpha2.PipelineBudget{
697+
MaxUnavailable: &maxUnavail,
698+
},
699+
},
700+
}
701+
702+
labels := Labels(pipeline)
703+
r := &render{pipeline: pipeline, labels: labels}
704+
objs, err := r.Render(t.Context())
705+
require.NoError(t, err)
706+
707+
// Find the PDB.
708+
var pdb *policyv1.PodDisruptionBudget
709+
for _, obj := range objs {
710+
if p, ok := obj.(*policyv1.PodDisruptionBudget); ok {
711+
pdb = p
712+
}
713+
}
714+
require.NotNil(t, pdb, "expected a PodDisruptionBudget in rendered objects")
715+
assert.Equal(t, "pdb-max", pdb.Name)
716+
assert.Equal(t, "default", pdb.Namespace)
717+
assert.Equal(t, labels, pdb.Labels)
718+
assert.Equal(t, labels, pdb.Spec.Selector.MatchLabels)
719+
require.NotNil(t, pdb.Spec.MaxUnavailable)
720+
assert.Equal(t, int32(1), pdb.Spec.MaxUnavailable.IntVal)
721+
assert.Nil(t, pdb.Spec.MinAvailable)
722+
}
723+
724+
func TestRender_PDB_MinAvailable(t *testing.T) {
725+
minAvail := intstr.FromString("75%")
726+
pipeline := &redpandav1alpha2.Pipeline{
727+
ObjectMeta: metav1.ObjectMeta{Name: "pdb-min", Namespace: "default"},
728+
Spec: redpandav1alpha2.PipelineSpec{
729+
ConfigYAML: "input:\n stdin: {}\noutput:\n stdout: {}\n",
730+
Budget: &redpandav1alpha2.PipelineBudget{
731+
MinAvailable: &minAvail,
732+
},
733+
},
734+
}
735+
736+
labels := Labels(pipeline)
737+
r := &render{pipeline: pipeline, labels: labels}
738+
objs, err := r.Render(t.Context())
739+
require.NoError(t, err)
740+
741+
var pdb *policyv1.PodDisruptionBudget
742+
for _, obj := range objs {
743+
if p, ok := obj.(*policyv1.PodDisruptionBudget); ok {
744+
pdb = p
745+
}
746+
}
747+
require.NotNil(t, pdb, "expected a PodDisruptionBudget in rendered objects")
748+
require.NotNil(t, pdb.Spec.MinAvailable)
749+
assert.Equal(t, "75%", pdb.Spec.MinAvailable.StrVal)
750+
assert.Nil(t, pdb.Spec.MaxUnavailable)
751+
}
752+
670753
// License validation unit tests.
671754

672755
func TestValidateLicenseNoPath(t *testing.T) {

operator/internal/controller/pipeline/render.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/redpanda-data/common-go/kube"
1919
appsv1 "k8s.io/api/apps/v1"
2020
corev1 "k8s.io/api/core/v1"
21+
policyv1 "k8s.io/api/policy/v1"
2122
"k8s.io/apimachinery/pkg/api/resource"
2223
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2324
"k8s.io/apimachinery/pkg/util/intstr"
@@ -52,6 +53,7 @@ func Types() []kube.Object {
5253
return []kube.Object{
5354
&appsv1.Deployment{},
5455
&corev1.ConfigMap{},
56+
&policyv1.PodDisruptionBudget{},
5557
&monitoringv1.PodMonitor{},
5658
}
5759
}
@@ -71,6 +73,10 @@ func (r *render) Render(_ context.Context) ([]kube.Object, error) {
7173

7274
objs := []kube.Object{cm, dp}
7375

76+
if pdb := r.podDisruptionBudget(); pdb != nil {
77+
objs = append(objs, pdb)
78+
}
79+
7480
if pm := r.podMonitor(); pm != nil {
7581
objs = append(objs, pm)
7682
}
@@ -264,6 +270,40 @@ func (r *render) deployment() *appsv1.Deployment {
264270
}
265271
}
266272

273+
func (r *render) podDisruptionBudget() *policyv1.PodDisruptionBudget {
274+
budget := r.pipeline.Spec.Budget
275+
if budget == nil {
276+
return nil
277+
}
278+
279+
pdb := &policyv1.PodDisruptionBudget{
280+
TypeMeta: metav1.TypeMeta{
281+
APIVersion: "policy/v1",
282+
Kind: "PodDisruptionBudget",
283+
},
284+
ObjectMeta: metav1.ObjectMeta{
285+
Name: r.pipeline.Name,
286+
Namespace: r.pipeline.Namespace,
287+
Labels: r.labels,
288+
Annotations: r.annotations(),
289+
},
290+
Spec: policyv1.PodDisruptionBudgetSpec{
291+
Selector: &metav1.LabelSelector{
292+
MatchLabels: r.labels,
293+
},
294+
},
295+
}
296+
297+
if budget.MaxUnavailable != nil {
298+
pdb.Spec.MaxUnavailable = budget.MaxUnavailable
299+
}
300+
if budget.MinAvailable != nil {
301+
pdb.Spec.MinAvailable = budget.MinAvailable
302+
}
303+
304+
return pdb
305+
}
306+
267307
// buildClusterConnectionResources returns the env vars, volumes, and volume
268308
// mounts needed to connect to a Redpanda cluster via clusterRef.
269309
func buildClusterConnectionResources(cc *clusterConnection) ([]corev1.EnvVar, []corev1.Volume, []corev1.VolumeMount) {

0 commit comments

Comments
 (0)