Skip to content

Commit c0c8731

Browse files
committed
Control pods directly instead of StatefulSet
1 parent 3dc30b5 commit c0c8731

1 file changed

Lines changed: 107 additions & 106 deletions

File tree

internal/controller/appruntime_controller.go

Lines changed: 107 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import (
1212
enterpriseApi "github.com/splunk/splunk-operator/api/v4"
1313
"github.com/splunk/splunk-operator/pkg/splunk/client/metrics"
1414
"github.com/splunk/splunk-operator/pkg/splunk/enterprise"
15-
appsv1 "k8s.io/api/apps/v1"
1615
corev1 "k8s.io/api/core/v1"
1716
k8serrors "k8s.io/apimachinery/pkg/api/errors"
1817
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"k8s.io/apimachinery/pkg/labels"
1919
"k8s.io/apimachinery/pkg/runtime"
2020
"k8s.io/apimachinery/pkg/types"
2121
"k8s.io/client-go/tools/record"
@@ -37,6 +37,7 @@ type AppRuntimeReconciler struct {
3737
// +kubebuilder:rbac:groups=enterprise.splunk.com,resources=appruntimes,verbs=get;list;watch;create;update;patch;delete
3838
// +kubebuilder:rbac:groups=enterprise.splunk.com,resources=appruntimes/status,verbs=get;update;patch
3939
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
40+
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
4041

4142
// Reconcile reconciles the AppRuntime
4243
func (r *AppRuntimeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
@@ -94,48 +95,59 @@ func (r *AppRuntimeReconciler) Reconcile(ctx context.Context, req reconcile.Requ
9495
}
9596
}
9697

97-
// Fetch or create StatefulSet
98-
statefulSetNN := types.NamespacedName{
99-
Name: getCommonName(req.Name),
100-
Namespace: req.Namespace,
101-
}
102-
statefulSet := &appsv1.StatefulSet{}
103-
err = r.Get(ctx, statefulSetNN, statefulSet)
104-
if err != nil {
105-
if k8serrors.IsNotFound(err) {
106-
reqLogger.Info(statefulSetNN.Name + " statefulSet not found; creating new one")
107-
statefulSet, err = r.createStatefulSet(ctx, appRuntime, statefulSetNN)
108-
if err != nil {
109-
reqLogger.Error(err, "failed to create statefulset; returning reconcilation")
110-
return reconcile.Result{}, nil
98+
// Reconcile individual Pods (one per replica, each with its own Splunk PVCs)
99+
parentName := getParentName(appRuntime.Name)
100+
parentKind := getParentKind(appRuntime.Name)
101+
splunkStsName := getSplunkStatefulSetName(parentName, parentKind)
102+
103+
// Create missing pods
104+
for i := int32(0); i < appRuntime.Spec.Replicas; i++ {
105+
podName := getPodName(appRuntime.Name, i)
106+
podNN := types.NamespacedName{Name: podName, Namespace: req.Namespace}
107+
pod := &corev1.Pod{}
108+
err = r.Get(ctx, podNN, pod)
109+
if err != nil {
110+
if k8serrors.IsNotFound(err) {
111+
reqLogger.Info(fmt.Sprintf("pod %s not found; creating", podName))
112+
err = r.createPod(ctx, appRuntime, podNN, splunkStsName, i)
113+
if err != nil {
114+
reqLogger.Error(err, fmt.Sprintf("failed to create pod %s", podName))
115+
return reconcile.Result{}, err
116+
}
117+
reqLogger.Info(fmt.Sprintf("created pod %s", podName))
118+
} else {
119+
reqLogger.Error(err, fmt.Sprintf("failed to get pod %s", podName))
120+
return reconcile.Result{}, err
111121
}
112-
reqLogger.Info("successfully created statefulset")
113-
} else {
114-
reqLogger.Error(err, "failed to get statefulset; returning reconciliation with error")
115-
return reconcile.Result{}, err
116122
}
117123
}
118-
// Check statefulSet Replicas
119-
err = r.checkStatefulSetReplicas(ctx, statefulSet, appRuntime, err, reqLogger)
124+
125+
// Delete excess pods (scale down)
126+
existingPods := &corev1.PodList{}
127+
err = r.List(ctx, existingPods, &client.ListOptions{
128+
Namespace: req.Namespace,
129+
LabelSelector: labels.SelectorFromSet(getCommonLabels(appRuntime.Name)),
130+
})
120131
if err != nil {
121-
reqLogger.Error(err, "failed to check statefulset replicas")
132+
reqLogger.Error(err, "failed to list pods")
122133
return reconcile.Result{}, err
123134
}
124-
125-
return reconcile.Result{}, nil
126-
}
127-
128-
func (r *AppRuntimeReconciler) checkStatefulSetReplicas(ctx context.Context, statefulSet *appsv1.StatefulSet, appRuntime *enterpriseApi.AppRuntime, err error, reqLogger logr.Logger) error {
129-
if *statefulSet.Spec.Replicas != appRuntime.Spec.Replicas {
130-
statefulSet.Spec.Replicas = &appRuntime.Spec.Replicas
131-
err = r.Update(ctx, statefulSet)
135+
for idx := range existingPods.Items {
136+
pod := &existingPods.Items[idx]
137+
ordinal, err := getPodOrdinal(pod.Name)
132138
if err != nil {
133-
reqLogger.Error(err, "cannot update statefulset")
134-
return err
139+
continue
140+
}
141+
if ordinal >= appRuntime.Spec.Replicas {
142+
reqLogger.Info(fmt.Sprintf("deleting excess pod %s", pod.Name))
143+
if err := r.Delete(ctx, pod); err != nil {
144+
reqLogger.Error(err, fmt.Sprintf("failed to delete pod %s", pod.Name))
145+
return reconcile.Result{}, err
146+
}
135147
}
136-
reqLogger.Info("updated statefulset replicas")
137148
}
138-
return nil
149+
150+
return reconcile.Result{}, nil
139151
}
140152

141153
// checkReplicas check if replicas number is correct
@@ -230,7 +242,7 @@ func (r *AppRuntimeReconciler) createCR(ctx context.Context, crNN types.Namespac
230242
return cr, r.Create(ctx, cr)
231243
}
232244
case enterprise.SplunkSearchHead.ToString():
233-
searchHead := &enterpriseApi.SearchHeadCluster{} // can it be client.Object and then reuse the following code?
245+
searchHead := &enterpriseApi.SearchHeadCluster{}
234246
if err := r.Get(ctx, parentName, searchHead); err == nil {
235247
cr.Spec.Replicas = searchHead.Spec.Replicas
236248
err = ctrl.SetControllerReference(searchHead, cr, r.Scheme)
@@ -266,50 +278,19 @@ func (r *AppRuntimeReconciler) createHeadlessService(ctx context.Context, ar *en
266278
return svc, nil
267279
}
268280

269-
func (r *AppRuntimeReconciler) createStatefulSet(ctx context.Context, appRuntime *enterpriseApi.AppRuntime, nn types.NamespacedName) (*appsv1.StatefulSet, error) {
270-
ss := &appsv1.StatefulSet{
281+
func (r *AppRuntimeReconciler) createPod(ctx context.Context, appRuntime *enterpriseApi.AppRuntime, nn types.NamespacedName, splunkStsName string, ordinal int32) error {
282+
etcPvcName := fmt.Sprintf("pvc-etc-%s-%d", splunkStsName, ordinal)
283+
varPvcName := fmt.Sprintf("pvc-var-%s-%d", splunkStsName, ordinal)
284+
285+
pod := &corev1.Pod{
271286
ObjectMeta: v1.ObjectMeta{
272287
Name: nn.Name,
273288
Namespace: nn.Namespace,
274-
},
275-
}
276-
err := ctrl.SetControllerReference(appRuntime, ss, r.Scheme)
277-
if err != nil {
278-
return nil, err
279-
}
280-
ss.Spec.Replicas = &appRuntime.Spec.Replicas
281-
ss.Labels = getCommonLabels(appRuntime.Name)
282-
ss.Spec.Selector = &v1.LabelSelector{MatchLabels: ss.Labels}
283-
ss.Spec.ServiceName = getHeadlessName(appRuntime.Name)
284-
ss.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
285-
286-
//quantity, err := resource2.ParseQuantity(splcommon.DefaultEtcVolumeStorageCapacity)
287-
//if err != nil {
288-
// return nil, err
289-
//}
290-
//
291-
//pvc := corev1.PersistentVolumeClaim{
292-
// ObjectMeta: v1.ObjectMeta{
293-
// Name: "pvc-etc",
294-
// Namespace: nn.Namespace,
295-
// Labels: getCommonLabels(appRuntime.Name),
296-
// },
297-
// Spec: corev1.PersistentVolumeClaimSpec{
298-
// AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"},
299-
// Resources: corev1.VolumeResourceRequirements{
300-
// Requests: corev1.ResourceList{
301-
// corev1.ResourceStorage: quantity,
302-
// },
303-
// },
304-
// },
305-
//}
306-
//ss.Spec.VolumeClaimTemplates = append(ss.Spec.VolumeClaimTemplates, pvc)
307-
308-
ss.Spec.Template = corev1.PodTemplateSpec{
309-
ObjectMeta: v1.ObjectMeta{
310-
Labels: ss.Labels,
289+
Labels: getCommonLabels(appRuntime.Name),
311290
},
312291
Spec: corev1.PodSpec{
292+
Hostname: nn.Name,
293+
Subdomain: getHeadlessName(appRuntime.Name),
313294
Containers: []corev1.Container{
314295
{
315296
Image: appRuntime.Spec.Image,
@@ -318,44 +299,45 @@ func (r *AppRuntimeReconciler) createStatefulSet(ctx context.Context, appRuntime
318299
"sleep",
319300
"infinity",
320301
},
321-
//VolumeMounts: []corev1.VolumeMount{
322-
// {
323-
// Name: "pvc-etc",
324-
// MountPath: "/opt/splunk/etc",
325-
// },
326-
// {
327-
// Name: "pvc-var",
328-
// MountPath: "/opt/splunk/var",
329-
// },
330-
//},
302+
VolumeMounts: []corev1.VolumeMount{
303+
{
304+
Name: "pvc-etc",
305+
MountPath: "/opt/splunk/etc",
306+
},
307+
{
308+
Name: "pvc-var",
309+
MountPath: "/opt/splunk/var",
310+
},
311+
},
312+
},
313+
},
314+
Volumes: []corev1.Volume{
315+
{
316+
Name: "pvc-etc",
317+
VolumeSource: corev1.VolumeSource{
318+
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
319+
ClaimName: etcPvcName,
320+
},
321+
},
322+
},
323+
{
324+
Name: "pvc-var",
325+
VolumeSource: corev1.VolumeSource{
326+
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
327+
ClaimName: varPvcName,
328+
},
329+
},
331330
},
332331
},
333-
//Volumes: []corev1.Volume{
334-
// {
335-
// Name: "pvc-etc",
336-
// VolumeSource: corev1.VolumeSource{
337-
// PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
338-
// ClaimName: "pvc-etc-splunk",
339-
// },
340-
// },
341-
// },
342-
// {
343-
// Name: "pvc-var",
344-
// VolumeSource: corev1.VolumeSource{
345-
// PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
346-
// ClaimName: "pvc-var-splunk",
347-
// },
348-
// },
349-
// },
350-
//},
351332
},
352333
}
353-
err = r.Create(ctx, ss)
334+
335+
err := ctrl.SetControllerReference(appRuntime, pod, r.Scheme)
354336
if err != nil {
355-
return nil, err
337+
return err
356338
}
357339

358-
return &appsv1.StatefulSet{}, nil
340+
return r.Create(ctx, pod)
359341
}
360342

361343
func (r *AppRuntimeReconciler) updateStatus(ctx context.Context, appRuntime *enterpriseApi.AppRuntime, phase enterpriseApi.Phase, message string) error {
@@ -381,7 +363,7 @@ func (r *AppRuntimeReconciler) SetupWithManager(mgr ctrl.Manager) error {
381363
Watches(&enterpriseApi.Standalone{}, getEventHandlerForAppRuntime(enterprise.SplunkStandalone)).
382364
Watches(&enterpriseApi.IndexerCluster{}, getEventHandlerForAppRuntime(enterprise.SplunkIndexer)).
383365
Watches(&enterpriseApi.SearchHeadCluster{}, getEventHandlerForAppRuntime(enterprise.SplunkSearchHead)).
384-
Owns(&appsv1.StatefulSet{}).
366+
Owns(&corev1.Pod{}).
385367
Owns(&corev1.Service{}).
386368
WithOptions(controller.Options{MaxConcurrentReconciles: enterpriseApi.TotalWorker}).
387369
Named("appruntime-controller").
@@ -423,6 +405,25 @@ func getHeadlessName(appRuntimeName string) string {
423405
return fmt.Sprintf("%s-%s-%s", "splunk", appRuntimeName, "headless")
424406
}
425407

408+
// getSplunkStatefulSetName returns the Splunk StatefulSet name: splunk-{parentName}-{parentKind}
409+
func getSplunkStatefulSetName(parentName string, parentKind string) string {
410+
return fmt.Sprintf("splunk-%s-%s", parentName, parentKind)
411+
}
412+
413+
// getPodName returns the AppRuntime pod name for a given ordinal: splunk-{appRuntimeName}-{ordinal}
414+
func getPodName(appRuntimeName string, ordinal int32) string {
415+
return fmt.Sprintf("splunk-%s-%d", appRuntimeName, ordinal)
416+
}
417+
418+
// getPodOrdinal extracts the ordinal index from a pod name (last segment after "-")
419+
func getPodOrdinal(podName string) (int32, error) {
420+
parts := strings.Split(podName, "-")
421+
last := parts[len(parts)-1]
422+
var ordinal int32
423+
_, err := fmt.Sscanf(last, "%d", &ordinal)
424+
return ordinal, err
425+
}
426+
426427
func getCommonLabels(appRuntimeName string) map[string]string {
427428
labels := make(map[string]string)
428429
labels["app.kubernetes.io/managed-by"] = "splunk-operator"

0 commit comments

Comments
 (0)