From a596e6f9be33f1366b4185a87dc148a441b6891e Mon Sep 17 00:00:00 2001 From: Austin Beattie Date: Wed, 25 Mar 2026 12:08:21 -0700 Subject: [PATCH] Add support for DaemonSet, StatefulSet, Job, and CronJob workloads Extend the controller to track pods owned by all standard Kubernetes workload types, not just Deployments. Includes short-lived Job detection via terminal phase handling, CronJob name resolution through Job ownership chain, and RBAC/docs updates. --- README.md | 10 +- .../templates/clusterrole.yaml | 25 + deploy/manifest.yaml | 9 + internal/controller/controller.go | 379 +++++++++++- .../controller/controller_integration_test.go | 570 +++++++++++++++++- internal/controller/controller_test.go | 372 +++++++++++- internal/metadata/metadata.go | 15 +- internal/metadata/metadata_test.go | 66 +- 8 files changed, 1398 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 986dd6c..9f134ba 100644 --- a/README.md +++ b/README.md @@ -80,11 +80,11 @@ Two modes of authentication are supported: The `DN_TEMPLATE` supports the following placeholders: - `{{namespace}}` - Pod namespace -- `{{deploymentName}}` - Name of the owning Deployment +- `{{deploymentName}}` - Name of the owning workload (Deployment, DaemonSet, StatefulSet, CronJob, or Job) - `{{containerName}}` - Container name ## Annotations -Runtime risks and custom tags can be added to deployment records using annotations. Annotations will be aggregated from the pod and its owner reference objects (e.g. Deployment, ReplicaSet) so they can be added at any level of the ownership hierarchy. +Runtime risks and custom tags can be added to deployment records using annotations. Annotations will be aggregated from the pod and its owner reference objects (e.g. Deployment, ReplicaSet, DaemonSet, StatefulSet, CronJob, Job) so they can be added at any level of the ownership hierarchy. ### Runtime Risks @@ -110,7 +110,7 @@ which includes: - **Namespace**: `deployment-tracker` - **ServiceAccount**: Identity for the controller pod -- **ClusterRole**: Minimal permissions (`get`, `list`, `watch` on pods and deployments; `get` on other supported objects) +- **ClusterRole**: Minimal permissions (`get`, `list`, `watch` on pods, deployments, daemonsets, statefulsets, jobs, and cronjobs; `get` on replicasets) - **ClusterRoleBinding**: Binds the ServiceAccount to the ClusterRole - **Deployment**: Runs the controller with security hardening @@ -142,6 +142,10 @@ The controller requires the following minimum permissions: | `""` (core) | `pods` | `get`, `list`, `watch` | | `apps` | `deployments` | `get`, `list`, `watch` | | `apps` | `replicasets` | `get` | +| `apps` | `daemonsets` | `get`, `list`, `watch` | +| `apps` | `statefulsets` | `get`, `list`, `watch` | +| `batch` | `jobs` | `get`, `list`, `watch` | +| `batch` | `cronjobs` | `get`, `list`, `watch` | If you only need to monitor a single namespace, you can modify the manifest to use a `Role` and `RoleBinding` instead of `ClusterRole` and `ClusterRoleBinding` for more restricted permissions. diff --git a/deploy/charts/deployment-tracker/templates/clusterrole.yaml b/deploy/charts/deployment-tracker/templates/clusterrole.yaml index fa69c59..8112882 100644 --- a/deploy/charts/deployment-tracker/templates/clusterrole.yaml +++ b/deploy/charts/deployment-tracker/templates/clusterrole.yaml @@ -27,3 +27,28 @@ rules: - replicasets verbs: - get + - apiGroups: + - apps + resources: + - daemonsets + verbs: + - get + - list + - watch + - apiGroups: + - apps + resources: + - statefulsets + verbs: + - get + - list + - watch + - apiGroups: + - batch + resources: + - jobs + - cronjobs + verbs: + - get + - list + - watch diff --git a/deploy/manifest.yaml b/deploy/manifest.yaml index 8df9ceb..4f2a5a8 100644 --- a/deploy/manifest.yaml +++ b/deploy/manifest.yaml @@ -23,6 +23,15 @@ rules: - apiGroups: ["apps"] resources: ["replicasets"] verbs: ["get"] + - apiGroups: ["apps"] + resources: ["daemonsets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["apps"] + resources: ["statefulsets"] + verbs: ["get", "list", "watch"] + - apiGroups: ["batch"] + resources: ["jobs", "cronjobs"] + verbs: ["get", "list", "watch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding diff --git a/internal/controller/controller.go b/internal/controller/controller.go index abc0367..52febfb 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1" + batchlisters "k8s.io/client-go/listers/batch/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -60,16 +61,30 @@ type PodEvent struct { DeletedPod *corev1.Pod // Only populated for delete events } +// workloadRef describes the top-level workload that owns a pod. +type workloadRef struct { + Name string + Kind string // "Deployment", "DaemonSet", "StatefulSet", "CronJob", or "Job" +} + // Controller is the Kubernetes controller for tracking deployments. type Controller struct { - clientset kubernetes.Interface - metadataAggregator podMetadataAggregator - podInformer cache.SharedIndexInformer - deploymentInformer cache.SharedIndexInformer - deploymentLister appslisters.DeploymentLister - workqueue workqueue.TypedRateLimitingInterface[PodEvent] - apiClient deploymentRecordPoster - cfg *Config + clientset kubernetes.Interface + metadataAggregator podMetadataAggregator + podInformer cache.SharedIndexInformer + deploymentInformer cache.SharedIndexInformer + deploymentLister appslisters.DeploymentLister + daemonSetInformer cache.SharedIndexInformer + daemonSetLister appslisters.DaemonSetLister + statefulSetInformer cache.SharedIndexInformer + statefulSetLister appslisters.StatefulSetLister + jobInformer cache.SharedIndexInformer + jobLister batchlisters.JobLister + cronJobInformer cache.SharedIndexInformer + cronJobLister batchlisters.CronJobLister + workqueue workqueue.TypedRateLimitingInterface[PodEvent] + apiClient deploymentRecordPoster + cfg *Config // best effort cache to avoid redundant posts // post requests are idempotent, so if this cache fails due to // restarts or other events, nothing will break. @@ -87,6 +102,14 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato podInformer := factory.Core().V1().Pods().Informer() deploymentInformer := factory.Apps().V1().Deployments().Informer() deploymentLister := factory.Apps().V1().Deployments().Lister() + daemonSetInformer := factory.Apps().V1().DaemonSets().Informer() + daemonSetLister := factory.Apps().V1().DaemonSets().Lister() + statefulSetInformer := factory.Apps().V1().StatefulSets().Informer() + statefulSetLister := factory.Apps().V1().StatefulSets().Lister() + jobInformer := factory.Batch().V1().Jobs().Informer() + jobLister := factory.Batch().V1().Jobs().Lister() + cronJobInformer := factory.Batch().V1().CronJobs().Informer() + cronJobLister := factory.Batch().V1().CronJobs().Lister() // Create work queue with rate limiting queue := workqueue.NewTypedRateLimitingQueue( @@ -124,6 +147,14 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato podInformer: podInformer, deploymentInformer: deploymentInformer, deploymentLister: deploymentLister, + daemonSetInformer: daemonSetInformer, + daemonSetLister: daemonSetLister, + statefulSetInformer: statefulSetInformer, + statefulSetLister: statefulSetLister, + jobInformer: jobInformer, + jobLister: jobLister, + cronJobInformer: cronJobInformer, + cronJobLister: cronJobLister, workqueue: queue, apiClient: apiClient, cfg: cfg, @@ -143,8 +174,8 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato } // Only process pods that are running and belong - // to a deployment - if pod.Status.Phase == corev1.PodRunning && getDeploymentName(pod) != "" { + // to a supported workload (Deployment, DaemonSet, StatefulSet, Job, or CronJob) + if pod.Status.Phase == corev1.PodRunning && hasSupportedOwner(pod) { key, err := cache.MetaNamespaceKeyFunc(obj) // For our purposes, there are in practice @@ -157,6 +188,18 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato }) } } + + // Also process Job-owned pods that completed before + // we observed them in Running phase (e.g. sub-second Jobs). + if isTerminalPhase(pod) && getJobOwnerName(pod) != "" { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + queue.Add(PodEvent{ + Key: key, + EventType: EventCreated, + }) + } + } }, UpdateFunc: func(oldObj, newObj any) { oldPod, ok := oldObj.(*corev1.Pod) @@ -175,9 +218,18 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato } // Skip if pod is being deleted or doesn't belong - // to a deployment - if newPod.DeletionTimestamp != nil || getDeploymentName(newPod) == "" { - return + // to a supported workload. + // Exception: Job-owned pods transitioning to a terminal phase + // (Succeeded/Failed) from a non-Running state should still be + // processed — this catches short-lived Jobs that skip Running. + // We exclude Running→terminal transitions since those pods + // were already enqueued when they entered Running. + isJobTerminal := !isTerminalPhase(oldPod) && isTerminalPhase(newPod) && + oldPod.Status.Phase != corev1.PodRunning && getJobOwnerName(newPod) != "" + if !isJobTerminal { + if newPod.DeletionTimestamp != nil || !hasSupportedOwner(newPod) { + return + } } // Only process if pod just became running. @@ -199,6 +251,18 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato }) } } + + // Also catch Job-owned pods that transitioned directly + // to a terminal phase without us seeing them as Running. + if isJobTerminal { + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err == nil { + queue.Add(PodEvent{ + Key: key, + EventType: EventCreated, + }) + } + } }, DeleteFunc: func(obj any) { pod, ok := obj.(*corev1.Pod) @@ -214,8 +278,8 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato } } - // Only process pods that belong to a deployment - if getDeploymentName(pod) == "" { + // Only process pods that belong to a supported workload + if !hasSupportedOwner(pod) { return } @@ -249,10 +313,21 @@ func (c *Controller) Run(ctx context.Context, workers int) error { // Start the informers go c.podInformer.Run(ctx.Done()) go c.deploymentInformer.Run(ctx.Done()) + go c.daemonSetInformer.Run(ctx.Done()) + go c.statefulSetInformer.Run(ctx.Done()) + go c.jobInformer.Run(ctx.Done()) + go c.cronJobInformer.Run(ctx.Done()) // Wait for the caches to be synced slog.Info("Waiting for informer caches to sync") - if !cache.WaitForCacheSync(ctx.Done(), c.podInformer.HasSynced, c.deploymentInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), + c.podInformer.HasSynced, + c.deploymentInformer.HasSynced, + c.daemonSetInformer.HasSynced, + c.statefulSetInformer.HasSynced, + c.jobInformer.HasSynced, + c.cronJobInformer.HasSynced, + ) { return errors.New("timed out waiting for caches to sync") } @@ -314,6 +389,7 @@ func (c *Controller) processNextItem(ctx context.Context) bool { // processEvent processes a single pod event. func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { var pod *corev1.Pod + var wl workloadRef if event.EventType == EventDeleted { // For delete events, use the pod captured at deletion time @@ -325,20 +401,22 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { return nil } - // Check if the parent deployment still exists - // If it does, this is just a scale-down event, skip it. + // Check if the parent workload still exists. + // If it does, this is just a scale-down event (or a completed + // Job pod while the CronJob is still active), skip it. // - // If a deployment changes image versions, this will not + // If a workload changes image versions, this will not // fire delete/decommissioned events to the remote API. // This is as intended, as the server will keep track of // the (cluster unique) deployment name, and just update // the referenced image digest to the newly observed (via // the create event). - deploymentName := getDeploymentName(pod) - if deploymentName != "" && c.deploymentExists(pod.Namespace, deploymentName) { - slog.Debug("Deployment still exists, skipping pod delete (scale down)", + wl = c.getWorkloadRef(pod) + if wl.Name != "" && c.workloadActive(pod.Namespace, wl) { + slog.Debug("Parent workload still exists, skipping pod delete", "namespace", pod.Namespace, - "deployment", deploymentName, + "workload_kind", wl.Kind, + "workload_name", wl.Name, "pod", pod.Name, ) return nil @@ -368,29 +446,37 @@ func (c *Controller) processEvent(ctx context.Context, event PodEvent) error { } } - status := deploymentrecord.StatusDeployed - if event.EventType == EventDeleted { - status = deploymentrecord.StatusDecommissioned + // Resolve the workload name for the deployment record. + // For delete events, wl was already resolved above. + if wl.Name == "" { + wl = c.getWorkloadRef(pod) + } + if wl.Name == "" { + slog.Debug("Could not resolve workload name for pod, skipping", + "namespace", pod.Namespace, + "pod", pod.Name, + ) + return nil } var lastErr error // Gather aggregate metadata for adds/updates var aggPodMetadata *metadata.AggregatePodMetadata - if status != deploymentrecord.StatusDecommissioned { + if event.EventType != EventDeleted { aggPodMetadata = c.metadataAggregator.BuildAggregatePodMetadata(ctx, podToPartialMetadata(pod)) } // Record info for each container in the pod for _, container := range pod.Spec.Containers { - if err := c.recordContainer(ctx, pod, container, status, event.EventType, aggPodMetadata); err != nil { + if err := c.recordContainer(ctx, pod, container, event.EventType, wl.Name, aggPodMetadata); err != nil { lastErr = err } } // Also record init containers for _, container := range pod.Spec.InitContainers { - if err := c.recordContainer(ctx, pod, container, status, event.EventType, aggPodMetadata); err != nil { + if err := c.recordContainer(ctx, pod, container, event.EventType, wl.Name, aggPodMetadata); err != nil { lastErr = err } } @@ -416,10 +502,15 @@ func (c *Controller) deploymentExists(namespace, name string) bool { } // recordContainer records a single container's deployment info. -func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, status, eventType string, aggPodMetadata *metadata.AggregatePodMetadata) error { +func (c *Controller) recordContainer(ctx context.Context, pod *corev1.Pod, container corev1.Container, eventType, workloadName string, aggPodMetadata *metadata.AggregatePodMetadata) error { var cacheKey string - dn := getARDeploymentName(pod, container, c.cfg.Template) + status := deploymentrecord.StatusDeployed + if eventType == EventDeleted { + status = deploymentrecord.StatusDecommissioned + } + + dn := getARDeploymentName(pod, container, c.cfg.Template, workloadName) digest := getContainerDigest(pod, container.Name) if dn == "" || digest == "" { @@ -567,6 +658,19 @@ func getCacheKey(ev, dn, digest string) string { return ev + "||" + dn + "||" + digest } +// isNumeric returns true if s is non-empty and consists entirely of ASCII digits. +func isNumeric(s string) bool { + if s == "" { + return false + } + for _, c := range s { + if c < '0' || c > '9' { + return false + } + } + return true +} + // createInformerFactory creates a shared informer factory with the given resync period. // If excludeNamespaces is non-empty, it will exclude those namespaces from being watched. // If namespace is non-empty, it will only watch that namespace. @@ -622,10 +726,10 @@ func createInformerFactory(clientset kubernetes.Interface, namespace string, exc // as the K8s deployment's name!) // The deployment name must unique within logical, physical environment and // the cluster. -func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl string) string { +func getARDeploymentName(p *corev1.Pod, c corev1.Container, tmpl, workloadName string) string { res := tmpl res = strings.ReplaceAll(res, TmplNS, p.Namespace) - res = strings.ReplaceAll(res, TmplDN, getDeploymentName(p)) + res = strings.ReplaceAll(res, TmplDN, workloadName) res = strings.ReplaceAll(res, TmplCN, c.Name) return res } @@ -651,6 +755,24 @@ func getContainerDigest(pod *corev1.Pod, containerName string) string { return "" } +// hasSupportedOwner returns true if the pod is owned by a supported +// workload controller (ReplicaSet for Deployments, DaemonSet, StatefulSet, or Job for Jobs/CronJobs). +func hasSupportedOwner(pod *corev1.Pod) bool { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "ReplicaSet" || owner.Kind == "DaemonSet" || owner.Kind == "StatefulSet" || owner.Kind == "Job" { + return true + } + } + return false +} + +// isTerminalPhase returns true if the pod has reached a terminal phase +// (Succeeded or Failed). Used to catch short-lived Job pods that complete +// before the controller observes them in the Running phase. +func isTerminalPhase(pod *corev1.Pod) bool { + return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed +} + // getDeploymentName returns the deployment name for a pod, if it belongs // to one. func getDeploymentName(pod *corev1.Pod) string { @@ -671,6 +793,197 @@ func getDeploymentName(pod *corev1.Pod) string { return "" } +// getJobOwnerName returns the Job name from the pod's owner references, +// if the pod is owned by a Job. +func getJobOwnerName(pod *corev1.Pod) string { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "Job" { + return owner.Name + } + } + return "" +} + +// getDaemonSetName returns the DaemonSet name for a pod, if it belongs +// to one. DaemonSet pods are owned directly by the DaemonSet. +func getDaemonSetName(pod *corev1.Pod) string { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "DaemonSet" { + return owner.Name + } + } + return "" +} + +// getStatefulSetName returns the StatefulSet name for a pod, if it belongs +// to one. StatefulSet pods are owned directly by the StatefulSet. +func getStatefulSetName(pod *corev1.Pod) string { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "StatefulSet" { + return owner.Name + } + } + return "" +} + +// getWorkloadRef resolves the top-level workload that owns a pod. +// For Deployment-owned pods (via ReplicaSets), returns the Deployment name. +// For DaemonSet-owned pods, returns the DaemonSet name. +// For StatefulSet-owned pods, returns the StatefulSet name. +// For CronJob-owned pods (via Jobs), returns the CronJob name. +// For standalone Job-owned pods, returns the Job name. +func (c *Controller) getWorkloadRef(pod *corev1.Pod) workloadRef { + // Check for Deployment (via ReplicaSet) + if dn := getDeploymentName(pod); dn != "" { + return workloadRef{Name: dn, Kind: "Deployment"} + } + + // Check for DaemonSet (direct ownership) + if dsn := getDaemonSetName(pod); dsn != "" { + return workloadRef{Name: dsn, Kind: "DaemonSet"} + } + + // Check for StatefulSet (direct ownership) + if ssn := getStatefulSetName(pod); ssn != "" { + return workloadRef{Name: ssn, Kind: "StatefulSet"} + } + + // Check for Job + jobName := getJobOwnerName(pod) + if jobName == "" { + return workloadRef{} + } + + return c.resolveJobWorkload(pod.Namespace, jobName) +} + +// resolveJobWorkload determines whether a Job is owned by a CronJob or is standalone. +func (c *Controller) resolveJobWorkload(namespace, jobName string) workloadRef { + // Try to look up the Job to check for CronJob ownership + if c.jobLister != nil { + job, err := c.jobLister.Jobs(namespace).Get(jobName) + if err == nil { + for _, owner := range job.OwnerReferences { + if owner.Kind == "CronJob" { + return workloadRef{Name: owner.Name, Kind: "CronJob"} + } + } + return workloadRef{Name: jobName, Kind: "Job"} + } + } + + // Job not found in cache - try CronJob name derivation as fallback. + // CronJob-created Jobs follow the naming pattern: - + // where the suffix is always numeric. We validate the suffix is all digits to + // reduce false matches from standalone Jobs that coincidentally share a prefix + // with an existing CronJob. A residual false positive is still possible if a + // standalone Job is named exactly -, but the primary path + // (checking Job OwnerReferences) handles the common case; this fallback only + // fires when the Job has already been garbage-collected. + if c.cronJobLister != nil { + lastDash := strings.LastIndex(jobName, "-") + if lastDash > 0 { + suffix := jobName[lastDash+1:] + if isNumeric(suffix) { + potentialCronJobName := jobName[:lastDash] + if c.cronJobExists(namespace, potentialCronJobName) { + return workloadRef{Name: potentialCronJobName, Kind: "CronJob"} + } + } + } + } + + // Standalone Job (possibly already deleted) + return workloadRef{Name: jobName, Kind: "Job"} +} + +// workloadActive checks if the parent workload for a pod still exists +// in the local informer cache. +func (c *Controller) workloadActive(namespace string, ref workloadRef) bool { + switch ref.Kind { + case "Deployment": + return c.deploymentExists(namespace, ref.Name) + case "DaemonSet": + return c.daemonSetExists(namespace, ref.Name) + case "StatefulSet": + return c.statefulSetExists(namespace, ref.Name) + case "CronJob": + return c.cronJobExists(namespace, ref.Name) + case "Job": + return c.jobExists(namespace, ref.Name) + default: + return false + } +} + +// jobExists checks if a job exists in the local informer cache. +func (c *Controller) jobExists(namespace, name string) bool { + _, err := c.jobLister.Jobs(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if job exists in cache, assuming it does", + "namespace", namespace, + "job", name, + "error", err, + ) + return true + } + return true +} + +// daemonSetExists checks if a daemonset exists in the local informer cache. +func (c *Controller) daemonSetExists(namespace, name string) bool { + _, err := c.daemonSetLister.DaemonSets(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if daemonset exists in cache, assuming it does", + "namespace", namespace, + "daemonset", name, + "error", err, + ) + return true + } + return true +} + +// statefulSetExists checks if a statefulset exists in the local informer cache. +func (c *Controller) statefulSetExists(namespace, name string) bool { + _, err := c.statefulSetLister.StatefulSets(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if statefulset exists in cache, assuming it does", + "namespace", namespace, + "statefulset", name, + "error", err, + ) + return true + } + return true +} + +// cronJobExists checks if a cronjob exists in the local informer cache. +func (c *Controller) cronJobExists(namespace, name string) bool { + _, err := c.cronJobLister.CronJobs(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + return false + } + slog.Warn("Failed to check if cronjob exists in cache, assuming it does", + "namespace", namespace, + "cronjob", name, + "error", err, + ) + return true + } + return true +} + func podToPartialMetadata(pod *corev1.Pod) *metav1.PartialObjectMetadata { return &metav1.PartialObjectMetadata{ TypeMeta: metav1.TypeMeta{ diff --git a/internal/controller/controller_integration_test.go b/internal/controller/controller_integration_test.go index 75e6265..bfc3818 100644 --- a/internal/controller/controller_integration_test.go +++ b/internal/controller/controller_integration_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -119,7 +120,14 @@ func setup(t *testing.T, onlyNamespace string, excludeNamespaces string) (*kuber go func() { _ = ctrl.Run(ctx, 1) }() - if !cache.WaitForCacheSync(ctx.Done(), ctrl.podInformer.HasSynced, ctrl.deploymentInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), + ctrl.podInformer.HasSynced, + ctrl.deploymentInformer.HasSynced, + ctrl.daemonSetInformer.HasSynced, + ctrl.statefulSetInformer.HasSynced, + ctrl.jobInformer.HasSynced, + ctrl.cronJobInformer.HasSynced, + ) { t.Fatal("timed out waiting for informer cache to sync") } @@ -286,6 +294,231 @@ func deletePod(t *testing.T, clientset *kubernetes.Clientset, namespace, name st } } +func makeDaemonSet(t *testing.T, clientset *kubernetes.Clientset, namespace, name string) *appsv1.DaemonSet { + t.Helper() + ctx := context.Background() + labels := map[string]string{"app": name} + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{MatchLabels: labels}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "agent", Image: "fluentd:latest"}}, + }, + }, + }, + } + created, err := clientset.AppsV1().DaemonSets(namespace).Create(ctx, ds, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create DaemonSet: %v", err) + } + return created +} + +func makeDaemonSetPod(t *testing.T, clientset *kubernetes.Clientset, owners []metav1.OwnerReference, namespace, name string) *corev1.Pod { + t.Helper() + ctx := context.Background() + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + OwnerReferences: owners, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "agent", Image: "fluentd:latest"}}, + }, + } + created, err := clientset.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create Pod: %v", err) + } + + created.Status.Phase = corev1.PodPending + pending, err := clientset.CoreV1().Pods(namespace).UpdateStatus(ctx, created, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update Pod status to Pending: %v", err) + } + + pending.Status.Phase = corev1.PodRunning + pending.Status.ContainerStatuses = []corev1.ContainerStatus{{ + Name: "agent", + ImageID: "docker-pullable://fluentd@sha256:dsdigest123", + }} + updated, err := clientset.CoreV1().Pods(namespace).UpdateStatus(ctx, pending, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update Pod status to Running: %v", err) + } + return updated +} + +func deleteDaemonSet(t *testing.T, clientset *kubernetes.Clientset, namespace, name string) { + t.Helper() + ctx := context.Background() + err := clientset.AppsV1().DaemonSets(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("failed to delete DaemonSet: %v", err) + } +} + +func makeCronJob(t *testing.T, clientset *kubernetes.Clientset, namespace, name string) *batchv1.CronJob { + t.Helper() + ctx := context.Background() + cronJob := &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: batchv1.CronJobSpec{ + Schedule: "*/5 * * * *", + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "worker", Image: "busybox:latest"}}, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + }, + }, + } + created, err := clientset.BatchV1().CronJobs(namespace).Create(ctx, cronJob, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create CronJob: %v", err) + } + return created +} + +func makeJob(t *testing.T, clientset *kubernetes.Clientset, owners []metav1.OwnerReference, namespace, name string) *batchv1.Job { + t.Helper() + ctx := context.Background() + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + OwnerReferences: owners, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "worker", Image: "busybox:latest"}}, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } + created, err := clientset.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create Job: %v", err) + } + return created +} + +func makeJobPod(t *testing.T, clientset *kubernetes.Clientset, owners []metav1.OwnerReference, namespace, name string) *corev1.Pod { + t.Helper() + ctx := context.Background() + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + OwnerReferences: owners, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "worker", Image: "busybox:latest"}}, + RestartPolicy: corev1.RestartPolicyNever, + }, + } + created, err := clientset.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create Pod: %v", err) + } + + created.Status.Phase = corev1.PodPending + pending, err := clientset.CoreV1().Pods(namespace).UpdateStatus(ctx, created, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update Pod status to Pending: %v", err) + } + + pending.Status.Phase = corev1.PodRunning + pending.Status.ContainerStatuses = []corev1.ContainerStatus{{ + Name: "worker", + ImageID: "docker-pullable://busybox@sha256:jobdigest789", + }} + updated, err := clientset.CoreV1().Pods(namespace).UpdateStatus(ctx, pending, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update Pod status to Running: %v", err) + } + return updated +} + +// makeCompletedJobPod creates a Job-owned pod that transitions directly from +// Pending to Succeeded, skipping the Running phase. This simulates a very +// short-lived Job (e.g. sub-second container execution) where the kubelet +// reports the final status without an intermediate Running update. +func makeCompletedJobPod(t *testing.T, clientset *kubernetes.Clientset, owners []metav1.OwnerReference, namespace, name string) *corev1.Pod { + t.Helper() + ctx := context.Background() + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + OwnerReferences: owners, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "worker", Image: "busybox:latest"}}, + RestartPolicy: corev1.RestartPolicyNever, + }, + } + created, err := clientset.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create Pod: %v", err) + } + + created.Status.Phase = corev1.PodPending + pending, err := clientset.CoreV1().Pods(namespace).UpdateStatus(ctx, created, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update Pod status to Pending: %v", err) + } + + // Transition directly to Succeeded without passing through Running + pending.Status.Phase = corev1.PodSucceeded + pending.Status.ContainerStatuses = []corev1.ContainerStatus{{ + Name: "worker", + ImageID: "docker-pullable://busybox@sha256:jobdigest789", + }} + updated, err := clientset.CoreV1().Pods(namespace).UpdateStatus(ctx, pending, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update Pod status to Succeeded: %v", err) + } + return updated +} + +func deleteCronJob(t *testing.T, clientset *kubernetes.Clientset, namespace, name string) { + t.Helper() + ctx := context.Background() + err := clientset.BatchV1().CronJobs(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("failed to delete CronJob: %v", err) + } +} + +func deleteJob(t *testing.T, clientset *kubernetes.Clientset, namespace, name string) { + t.Helper() + ctx := context.Background() + propagation := metav1.DeletePropagationBackground + err := clientset.BatchV1().Jobs(namespace).Delete(ctx, name, metav1.DeleteOptions{ + PropagationPolicy: &propagation, + }) + if err != nil { + t.Fatalf("failed to delete Job: %v", err) + } +} + func TestControllerIntegration_KubernetesDeploymentLifecycle(t *testing.T) { if testing.Short() { t.Skip("skipping integration test in short mode") @@ -527,3 +760,338 @@ func TestControllerIntegration_ExcludeNamespaces(t *testing.T) { return len(mock.getRecords()) != 1 }, 3*time.Second, 100*time.Millisecond) } + +func TestControllerIntegration_StandaloneJobLifecycle(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + t.Parallel() + namespace := "test-controller-ns" + clientset, mock := setup(t, "", "") + + // Create a standalone Job and its pod; expect 1 record + job := makeJob(t, clientset, []metav1.OwnerReference{}, namespace, "standalone-job") + _ = makeJobPod(t, clientset, []metav1.OwnerReference{{ + APIVersion: "batch/v1", + Kind: "Job", + Name: job.Name, + UID: job.UID, + }}, namespace, "standalone-job-pod-1") + + require.Eventually(t, func() bool { + return len(mock.getRecords()) >= 1 + }, 3*time.Second, 100*time.Millisecond) + records := mock.getRecords() + require.Len(t, records, 1) + assert.Equal(t, deploymentrecord.StatusDeployed, records[0].Status) + assert.Equal(t, fmt.Sprintf("%s/standalone-job/worker", namespace), records[0].DeploymentName) + + // Delete the pod while the Job still exists; should not decommission (like scale-down) + deletePod(t, clientset, namespace, "standalone-job-pod-1") + require.Never(t, func() bool { + return len(mock.getRecords()) != 1 + }, 3*time.Second, 100*time.Millisecond) + + // Create a new pod for the same job and then delete both. + // The second pod has the same deployment name and digest, so the dedup + // cache suppresses a duplicate CREATED record (2-minute TTL). + _ = makeJobPod(t, clientset, []metav1.OwnerReference{{ + APIVersion: "batch/v1", + Kind: "Job", + Name: job.Name, + UID: job.UID, + }}, namespace, "standalone-job-pod-2") + + // Delete the Job first, then the pod manually (envtest has no garbage + // collector, so Background propagation does not cascade pod deletion). + deleteJob(t, clientset, namespace, "standalone-job") + deletePod(t, clientset, namespace, "standalone-job-pod-2") + + require.Eventually(t, func() bool { + return len(mock.getRecords()) >= 2 + }, 3*time.Second, 100*time.Millisecond) + records = mock.getRecords() + require.Len(t, records, 2) + assert.Equal(t, deploymentrecord.StatusDecommissioned, records[1].Status) + assert.Equal(t, fmt.Sprintf("%s/standalone-job/worker", namespace), records[1].DeploymentName) +} + +func TestControllerIntegration_ShortLivedJobCaughtOnCompletion(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + t.Parallel() + namespace := "test-controller-ns" + clientset, mock := setup(t, "", "") + + // Create a Job and a pod that goes directly from Pending to Succeeded + // (simulating a sub-second Job that completes before the Running phase is observed). + job := makeJob(t, clientset, []metav1.OwnerReference{}, namespace, "fast-job") + _ = makeCompletedJobPod(t, clientset, []metav1.OwnerReference{{ + APIVersion: "batch/v1", + Kind: "Job", + Name: job.Name, + UID: job.UID, + }}, namespace, "fast-job-pod-1") + + // The controller should still catch it via the terminal phase handler + require.Eventually(t, func() bool { + return len(mock.getRecords()) >= 1 + }, 3*time.Second, 100*time.Millisecond) + records := mock.getRecords() + require.Len(t, records, 1) + assert.Equal(t, deploymentrecord.StatusDeployed, records[0].Status) + assert.Equal(t, fmt.Sprintf("%s/fast-job/worker", namespace), records[0].DeploymentName) +} + +func TestControllerIntegration_CronJobLifecycle(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + t.Parallel() + namespace := "test-controller-ns" + clientset, mock := setup(t, "", "") + + // Create a CronJob, a Job owned by it, and a pod; expect 1 record + cronJob := makeCronJob(t, clientset, namespace, "my-cronjob") + job := makeJob(t, clientset, []metav1.OwnerReference{{ + APIVersion: "batch/v1", + Kind: "CronJob", + Name: cronJob.Name, + UID: cronJob.UID, + }}, namespace, "my-cronjob-28485120") + _ = makeJobPod(t, clientset, []metav1.OwnerReference{{ + APIVersion: "batch/v1", + Kind: "Job", + Name: job.Name, + UID: job.UID, + }}, namespace, "my-cronjob-28485120-pod-1") + + require.Eventually(t, func() bool { + return len(mock.getRecords()) >= 1 + }, 3*time.Second, 100*time.Millisecond) + records := mock.getRecords() + require.Len(t, records, 1) + assert.Equal(t, deploymentrecord.StatusDeployed, records[0].Status) + // The deployment name should use the CronJob name, not the Job name + assert.Equal(t, fmt.Sprintf("%s/my-cronjob/worker", namespace), records[0].DeploymentName) + + // Delete the Job and pod while CronJob still exists; should not decommission + deleteJob(t, clientset, namespace, "my-cronjob-28485120") + deletePod(t, clientset, namespace, "my-cronjob-28485120-pod-1") + require.Never(t, func() bool { + return len(mock.getRecords()) != 1 + }, 3*time.Second, 100*time.Millisecond) + + // Now delete the CronJob and create a new job+pod to simulate final cleanup + deleteCronJob(t, clientset, namespace, "my-cronjob") + + // Create another job+pod that gets cleaned up after CronJob deletion. + // The dedup cache suppresses a new CREATED since the deployment name + // and digest match the earlier record (2-minute TTL). + job2 := makeJob(t, clientset, []metav1.OwnerReference{{ + APIVersion: "batch/v1", + Kind: "CronJob", + Name: cronJob.Name, + UID: cronJob.UID, + }}, namespace, "my-cronjob-28485240") + pod2 := makeJobPod(t, clientset, []metav1.OwnerReference{{ + APIVersion: "batch/v1", + Kind: "Job", + Name: job2.Name, + UID: job2.UID, + }}, namespace, "my-cronjob-28485240-pod-1") + + // Delete the pod first so the Job is still in the informer cache + // when the delete event is processed — this allows resolveJobWorkload + // to find the CronJob owner via the Job's OwnerReferences. + // Then delete the Job for cleanup. + deletePod(t, clientset, namespace, pod2.Name) + deleteJob(t, clientset, namespace, "my-cronjob-28485240") + + require.Eventually(t, func() bool { + return len(mock.getRecords()) >= 2 + }, 3*time.Second, 100*time.Millisecond) + records = mock.getRecords() + require.Len(t, records, 2) + assert.Equal(t, deploymentrecord.StatusDecommissioned, records[1].Status) + assert.Equal(t, fmt.Sprintf("%s/my-cronjob/worker", namespace), records[1].DeploymentName) +} + +func TestControllerIntegration_DaemonSetLifecycle(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + t.Parallel() + namespace := "test-controller-ns" + clientset, mock := setup(t, "", "") + + // Create a DaemonSet and a pod owned by it; expect 1 record + ds := makeDaemonSet(t, clientset, namespace, "logging-agent") + _ = makeDaemonSetPod(t, clientset, []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "DaemonSet", + Name: ds.Name, + UID: ds.UID, + }}, namespace, "logging-agent-node1") + + require.Eventually(t, func() bool { + return len(mock.getRecords()) >= 1 + }, 3*time.Second, 100*time.Millisecond) + records := mock.getRecords() + require.Len(t, records, 1) + assert.Equal(t, deploymentrecord.StatusDeployed, records[0].Status) + assert.Equal(t, fmt.Sprintf("%s/logging-agent/agent", namespace), records[0].DeploymentName) + + // Delete the pod while DaemonSet still exists; should NOT decommission + deletePod(t, clientset, namespace, "logging-agent-node1") + require.Never(t, func() bool { + return len(mock.getRecords()) != 1 + }, 3*time.Second, 100*time.Millisecond) + + // Create another pod, then delete both the DaemonSet and pod. + // The dedup cache suppresses a new CREATED since the deployment name + // and digest match the earlier record (2-minute TTL). + pod2 := makeDaemonSetPod(t, clientset, []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "DaemonSet", + Name: ds.Name, + UID: ds.UID, + }}, namespace, "logging-agent-node2") + + deleteDaemonSet(t, clientset, namespace, "logging-agent") + deletePod(t, clientset, namespace, pod2.Name) + + require.Eventually(t, func() bool { + return len(mock.getRecords()) >= 2 + }, 3*time.Second, 100*time.Millisecond) + records = mock.getRecords() + require.Len(t, records, 2) + assert.Equal(t, deploymentrecord.StatusDecommissioned, records[1].Status) + assert.Equal(t, fmt.Sprintf("%s/logging-agent/agent", namespace), records[1].DeploymentName) +} + +func makeStatefulSet(t *testing.T, clientset *kubernetes.Clientset, namespace, name string) *appsv1.StatefulSet { + t.Helper() + ctx := context.Background() + labels := map[string]string{"app": name} + ss := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: appsv1.StatefulSetSpec{ + ServiceName: name, + Selector: &metav1.LabelSelector{MatchLabels: labels}, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "db", Image: "postgres:latest"}}, + }, + }, + }, + } + created, err := clientset.AppsV1().StatefulSets(namespace).Create(ctx, ss, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create StatefulSet: %v", err) + } + return created +} + +func makeStatefulSetPod(t *testing.T, clientset *kubernetes.Clientset, owners []metav1.OwnerReference, namespace, name string) *corev1.Pod { + t.Helper() + ctx := context.Background() + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + OwnerReferences: owners, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "db", Image: "postgres:latest"}}, + }, + } + created, err := clientset.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create Pod: %v", err) + } + + created.Status.Phase = corev1.PodPending + pending, err := clientset.CoreV1().Pods(namespace).UpdateStatus(ctx, created, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update Pod status to Pending: %v", err) + } + + pending.Status.Phase = corev1.PodRunning + pending.Status.ContainerStatuses = []corev1.ContainerStatus{{ + Name: "db", + ImageID: "docker-pullable://postgres@sha256:ssdigest456", + }} + updated, err := clientset.CoreV1().Pods(namespace).UpdateStatus(ctx, pending, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update Pod status to Running: %v", err) + } + return updated +} + +func deleteStatefulSet(t *testing.T, clientset *kubernetes.Clientset, namespace, name string) { + t.Helper() + ctx := context.Background() + err := clientset.AppsV1().StatefulSets(namespace).Delete(ctx, name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("failed to delete StatefulSet: %v", err) + } +} + +func TestControllerIntegration_StatefulSetLifecycle(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + t.Parallel() + namespace := "test-controller-ns" + clientset, mock := setup(t, "", "") + + // Create a StatefulSet and a pod owned by it; expect 1 record + ss := makeStatefulSet(t, clientset, namespace, "my-db") + _ = makeStatefulSetPod(t, clientset, []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: ss.Name, + UID: ss.UID, + }}, namespace, "my-db-0") + + require.Eventually(t, func() bool { + return len(mock.getRecords()) >= 1 + }, 3*time.Second, 100*time.Millisecond) + records := mock.getRecords() + require.Len(t, records, 1) + assert.Equal(t, deploymentrecord.StatusDeployed, records[0].Status) + assert.Equal(t, fmt.Sprintf("%s/my-db/db", namespace), records[0].DeploymentName) + + // Delete the pod while StatefulSet still exists; should NOT decommission + deletePod(t, clientset, namespace, "my-db-0") + require.Never(t, func() bool { + return len(mock.getRecords()) != 1 + }, 3*time.Second, 100*time.Millisecond) + + // Create another pod, then delete both the StatefulSet and pod. + // The dedup cache suppresses a new CREATED since the deployment name + // and digest match the earlier record (2-minute TTL). + pod2 := makeStatefulSetPod(t, clientset, []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: ss.Name, + UID: ss.UID, + }}, namespace, "my-db-1") + + deleteStatefulSet(t, clientset, namespace, "my-db") + deletePod(t, clientset, namespace, pod2.Name) + + require.Eventually(t, func() bool { + return len(mock.getRecords()) >= 2 + }, 3*time.Second, 100*time.Millisecond) + records = mock.getRecords() + require.Len(t, records, 2) + assert.Equal(t, deploymentrecord.StatusDecommissioned, records[1].Status) + assert.Equal(t, fmt.Sprintf("%s/my-db/db", namespace), records[1].DeploymentName) +} diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 8d7388f..f758a4d 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -87,7 +87,7 @@ func TestRecordContainer_UnknownArtifactCachePopulatedOn404(t *testing.T) { pod, container := testPod(digest) // First call should hit the API and get a 404 - err := ctrl.recordContainer(context.Background(), pod, container, deploymentrecord.StatusDeployed, EventCreated, nil) + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) assert.Equal(t, 1, poster.getCalls()) @@ -106,12 +106,12 @@ func TestRecordContainer_UnknownArtifactCacheSkipsAPICall(t *testing.T) { pod, container := testPod(digest) // First call — API returns 404, populates cache - err := ctrl.recordContainer(context.Background(), pod, container, deploymentrecord.StatusDeployed, EventCreated, nil) + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) assert.Equal(t, 1, poster.getCalls()) // Second call — should be served from cache, no API call - err = ctrl.recordContainer(context.Background(), pod, container, deploymentrecord.StatusDeployed, EventCreated, nil) + err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) assert.Equal(t, 1, poster.getCalls(), "API should not be called for cached unknown artifact") } @@ -126,12 +126,12 @@ func TestRecordContainer_UnknownArtifactCacheAppliesToDecommission(t *testing.T) pod, container := testPod(digest) // Deploy call — 404, populates cache - err := ctrl.recordContainer(context.Background(), pod, container, deploymentrecord.StatusDeployed, EventCreated, nil) + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) assert.Equal(t, 1, poster.getCalls()) // Decommission call for same digest — should skip API - err = ctrl.recordContainer(context.Background(), pod, container, deploymentrecord.StatusDecommissioned, EventDeleted, nil) + err = ctrl.recordContainer(context.Background(), pod, container, EventDeleted, "test-deployment", nil) require.NoError(t, err) assert.Equal(t, 1, poster.getCalls(), "decommission should also be skipped for cached unknown artifact") } @@ -149,7 +149,7 @@ func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { ctrl.unknownArtifacts.Set(digest, true, 50*time.Millisecond) // Immediately — should be cached - err := ctrl.recordContainer(context.Background(), pod, container, deploymentrecord.StatusDeployed, EventCreated, nil) + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) assert.Equal(t, 0, poster.getCalls(), "should skip API while cached") @@ -157,7 +157,7 @@ func TestRecordContainer_UnknownArtifactCacheExpires(t *testing.T) { time.Sleep(100 * time.Millisecond) // After expiry — should call API again - err = ctrl.recordContainer(context.Background(), pod, container, deploymentrecord.StatusDeployed, EventCreated, nil) + err = ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) assert.Equal(t, 1, poster.getCalls(), "should call API after cache expiry") } @@ -169,7 +169,7 @@ func TestRecordContainer_SuccessfulPostDoesNotPopulateUnknownCache(t *testing.T) ctrl := newTestController(poster) pod, container := testPod(digest) - err := ctrl.recordContainer(context.Background(), pod, container, deploymentrecord.StatusDeployed, EventCreated, nil) + err := ctrl.recordContainer(context.Background(), pod, container, EventCreated, "test-deployment", nil) require.NoError(t, err) assert.Equal(t, 1, poster.getCalls()) @@ -177,3 +177,359 @@ func TestRecordContainer_SuccessfulPostDoesNotPopulateUnknownCache(t *testing.T) _, exists := ctrl.unknownArtifacts.Get(digest) assert.False(t, exists, "successful post should not cache digest as unknown") } + +func TestHasSupportedOwner(t *testing.T) { + t.Parallel() + tests := []struct { + name string + pod *corev1.Pod + expected bool + }{ + { + name: "pod owned by ReplicaSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "test-rs-abc123", + }}, + }, + }, + expected: true, + }, + { + name: "pod owned by Job", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Job", + Name: "test-job", + }}, + }, + }, + expected: true, + }, + { + name: "pod with no owner", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + }, + expected: false, + }, + { + name: "pod owned by DaemonSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "DaemonSet", + Name: "test-ds", + }}, + }, + }, + expected: true, + }, + { + name: "pod owned by StatefulSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "StatefulSet", + Name: "test-ss", + }}, + }, + }, + expected: true, + }, + { + name: "pod owned by ReplicationController", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicationController", + Name: "test-rc", + }}, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := hasSupportedOwner(tt.pod) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetJobOwnerName(t *testing.T) { + t.Parallel() + tests := []struct { + name string + pod *corev1.Pod + expected string + }{ + { + name: "pod owned by Job", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Job", + Name: "my-job", + }}, + }, + }, + expected: "my-job", + }, + { + name: "pod not owned by Job", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "my-rs-abc123", + }}, + }, + }, + expected: "", + }, + { + name: "pod with no owner", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + }, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := getJobOwnerName(tt.pod) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetWorkloadRef_Deployment(t *testing.T) { + t.Parallel() + ctrl := newTestController(&mockPoster{}) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "my-deployment-abc123", + }}, + }, + } + wl := ctrl.getWorkloadRef(pod) + assert.Equal(t, "my-deployment", wl.Name) + assert.Equal(t, "Deployment", wl.Kind) +} + +func TestGetWorkloadRef_DaemonSet(t *testing.T) { + t.Parallel() + ctrl := newTestController(&mockPoster{}) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "DaemonSet", + Name: "my-daemonset", + }}, + }, + } + wl := ctrl.getWorkloadRef(pod) + assert.Equal(t, "my-daemonset", wl.Name) + assert.Equal(t, "DaemonSet", wl.Kind) +} + +func TestGetDaemonSetName(t *testing.T) { + t.Parallel() + tests := []struct { + name string + pod *corev1.Pod + expected string + }{ + { + name: "pod owned by DaemonSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "DaemonSet", + Name: "my-daemonset", + }}, + }, + }, + expected: "my-daemonset", + }, + { + name: "pod not owned by DaemonSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "my-rs-abc123", + }}, + }, + }, + expected: "", + }, + { + name: "pod with no owner", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + }, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := getDaemonSetName(tt.pod) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetStatefulSetName(t *testing.T) { + t.Parallel() + tests := []struct { + name string + pod *corev1.Pod + expected string + }{ + { + name: "pod owned by StatefulSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "StatefulSet", + Name: "my-statefulset", + }}, + }, + }, + expected: "my-statefulset", + }, + { + name: "pod not owned by StatefulSet", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "ReplicaSet", + Name: "my-rs-abc123", + }}, + }, + }, + expected: "", + }, + { + name: "pod with no owner", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + }, + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := getStatefulSetName(tt.pod) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetWorkloadRef_StatefulSet(t *testing.T) { + t.Parallel() + ctrl := newTestController(&mockPoster{}) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "StatefulSet", + Name: "my-statefulset", + }}, + }, + } + wl := ctrl.getWorkloadRef(pod) + assert.Equal(t, "my-statefulset", wl.Name) + assert.Equal(t, "StatefulSet", wl.Kind) +} + +func TestIsNumeric(t *testing.T) { + t.Parallel() + tests := []struct { + input string + expected bool + }{ + {"28485120", true}, + {"0", true}, + {"123456789", true}, + {"", false}, + {"abc", false}, + {"123abc", false}, + {"12-34", false}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.expected, isNumeric(tt.input)) + }) + } +} + +func TestGetWorkloadRef_StandaloneJob(t *testing.T) { + t.Parallel() + // With nil listers, resolveJobWorkload falls back to standalone Job + ctrl := newTestController(&mockPoster{}) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{{ + Kind: "Job", + Name: "my-standalone-job", + }}, + }, + } + wl := ctrl.getWorkloadRef(pod) + assert.Equal(t, "my-standalone-job", wl.Name) + assert.Equal(t, "Job", wl.Kind) +} + +func TestGetWorkloadRef_NoOwner(t *testing.T) { + t.Parallel() + ctrl := newTestController(&mockPoster{}) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{}, + } + wl := ctrl.getWorkloadRef(pod) + assert.Empty(t, wl.Name) + assert.Empty(t, wl.Kind) +} + +func TestIsTerminalPhase(t *testing.T) { + t.Parallel() + tests := []struct { + name string + phase corev1.PodPhase + expected bool + }{ + {"Succeeded", corev1.PodSucceeded, true}, + {"Failed", corev1.PodFailed, true}, + {"Running", corev1.PodRunning, false}, + {"Pending", corev1.PodPending, false}, + {"Unknown", corev1.PodUnknown, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + pod := &corev1.Pod{ + Status: corev1.PodStatus{Phase: tt.phase}, + } + assert.Equal(t, tt.expected, isTerminalPhase(pod)) + }) + } +} diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go index 039c192..9a1733e 100644 --- a/internal/metadata/metadata.go +++ b/internal/metadata/metadata.go @@ -101,15 +101,28 @@ func (m *Aggregator) addOwnersToQueue(ctx context.Context, current *metav1.Parti // getOwnerMetadata retrieves partial object metadata for an owner ref. func (m *Aggregator) getOwnerMetadata(ctx context.Context, namespace string, owner metav1.OwnerReference) (*metav1.PartialObjectMetadata, error) { gvr := schema.GroupVersionResource{ - Group: "apps", Version: "v1", } switch owner.Kind { case "ReplicaSet": + gvr.Group = "apps" gvr.Resource = "replicasets" case "Deployment": + gvr.Group = "apps" gvr.Resource = "deployments" + case "DaemonSet": + gvr.Group = "apps" + gvr.Resource = "daemonsets" + case "StatefulSet": + gvr.Group = "apps" + gvr.Resource = "statefulsets" + case "Job": + gvr.Group = "batch" + gvr.Resource = "jobs" + case "CronJob": + gvr.Group = "batch" + gvr.Resource = "cronjobs" default: slog.Debug("Unsupported owner kind for metadata collection", "kind", owner.Kind, diff --git a/internal/metadata/metadata_test.go b/internal/metadata/metadata_test.go index 02e0d23..8c4c9db 100644 --- a/internal/metadata/metadata_test.go +++ b/internal/metadata/metadata_test.go @@ -16,8 +16,12 @@ import ( ) func newPartialObject(uid, name, kind string, annotations map[string]string, owners []metav1.OwnerReference) *metav1.PartialObjectMetadata { + apiVersion := "apps/v1" + if kind == "Job" || kind == "CronJob" { + apiVersion = "batch/v1" + } return &metav1.PartialObjectMetadata{ - TypeMeta: metav1.TypeMeta{Kind: kind, APIVersion: "apps/v1"}, + TypeMeta: metav1.TypeMeta{Kind: kind, APIVersion: apiVersion}, ObjectMeta: metav1.ObjectMeta{ UID: types.UID(uid), Name: name, @@ -29,8 +33,12 @@ func newPartialObject(uid, name, kind string, annotations map[string]string, own } func ownerRef(kind, name, uid string) metav1.OwnerReference { + apiVersion := "apps/v1" + if kind == "Job" || kind == "CronJob" { + apiVersion = "batch/v1" + } return metav1.OwnerReference{ - APIVersion: "apps/v1", + APIVersion: apiVersion, Kind: kind, Name: name, UID: types.UID(uid), @@ -319,6 +327,56 @@ func TestBuildAggregatePodMetadata(t *testing.T) { }, expectedTags: map[string]string{"team": "platform", "org": "engineering", "env": "prod"}, }, + { + name: "aggregates through job to cronjob ownership chain: pod -> job -> cronjob", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "data", + }, []metav1.OwnerReference{ownerRef("Job", "my-cronjob-28485120", "job-1")}), + clusterObjects: []runtime.Object{ + newPartialObject("job-1", "my-cronjob-28485120", "Job", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "sensitive-data", + }, []metav1.OwnerReference{ownerRef("CronJob", "my-cronjob", "cj-1")}), + newPartialObject("cj-1", "my-cronjob", "CronJob", map[string]string{ + MetadataAnnotationPrefix + "env": "prod", + }, nil), + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.SensitiveData: true, + }, + expectedTags: map[string]string{"team": "data", "env": "prod"}, + }, + { + name: "aggregates metadata from daemonset owner", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "infra", + }, []metav1.OwnerReference{ownerRef("DaemonSet", "my-ds", "ds-1")}), + clusterObjects: []runtime.Object{ + newPartialObject("ds-1", "my-ds", "DaemonSet", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "critical-resource", + MetadataAnnotationPrefix + "env": "prod", + }, nil), + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.CriticalResource: true, + }, + expectedTags: map[string]string{"team": "infra", "env": "prod"}, + }, + { + name: "aggregates metadata from statefulset owner", + pod: newPartialObject("pod-1", "my-pod", "Pod", map[string]string{ + MetadataAnnotationPrefix + "team": "data", + }, []metav1.OwnerReference{ownerRef("StatefulSet", "my-ss", "ss-1")}), + clusterObjects: []runtime.Object{ + newPartialObject("ss-1", "my-ss", "StatefulSet", map[string]string{ + MetadataAnnotationPrefix + RuntimeRisksAnnotationKey: "sensitive-data", + MetadataAnnotationPrefix + "env": "prod", + }, nil), + }, + expectedRuntimeRisks: map[deploymentrecord.RuntimeRisk]bool{ + deploymentrecord.SensitiveData: true, + }, + expectedTags: map[string]string{"team": "data", "env": "prod"}, + }, } for _, tt := range tests { @@ -327,6 +385,10 @@ func TestBuildAggregatePodMetadata(t *testing.T) { _ = metav1.AddMetaToScheme(scheme) scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}, &metav1.PartialObjectMetadata{}) scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}, &metav1.PartialObjectMetadata{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "DaemonSet"}, &metav1.PartialObjectMetadata{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "StatefulSet"}, &metav1.PartialObjectMetadata{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"}, &metav1.PartialObjectMetadata{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "CronJob"}, &metav1.PartialObjectMetadata{}) fakeClient := metadatafake.NewSimpleMetadataClient(scheme, tt.clusterObjects...) m := NewAggregator(fakeClient)