diff --git a/internal/pkg/callbacks/pause_deployment.go b/internal/pkg/callbacks/pause_deployment.go new file mode 100644 index 000000000..a1a80f983 --- /dev/null +++ b/internal/pkg/callbacks/pause_deployment.go @@ -0,0 +1,247 @@ +package callbacks + +import ( + "context" + "fmt" + "time" + + "github.com/sirupsen/logrus" + "github.com/stakater/Reloader/internal/pkg/options" + "github.com/stakater/Reloader/pkg/kube" + app "k8s.io/api/apps/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +// Keeps track of currently active timers +var activeTimers = make(map[string]*time.Timer) + +// Returns unique key for the activeTimers map +func getTimerKey(namespace, deploymentName string) string { + return fmt.Sprintf("%s/%s", namespace, deploymentName) +} + +// Checks if a deployment is currently paused +func IsPaused(deployment *app.Deployment) bool { + return deployment.Spec.Paused +} + +// Deployment paused by reloader ? +func IsPausedByReloader(deployment *app.Deployment) bool { + if IsPaused(deployment) { + pausedAtAnnotationValue := deployment.Annotations[options.PauseDeploymentTimeAnnotation] + return pausedAtAnnotationValue != "" + } + return false +} + +// Returns the time, the deployment was paused by reloader, nil otherwise +func GetPauseStartTime(deployment *app.Deployment) (*time.Time, error) { + if !IsPausedByReloader(deployment) { + return nil, nil + } + + pausedAtStr := deployment.Annotations[options.PauseDeploymentTimeAnnotation] + parsedTime, err := time.Parse(time.RFC3339, pausedAtStr) + if err != nil { + return nil, err + } + + return &parsedTime, nil +} + +// Parses the pause interval value and returns a time.Duration +func ParsePauseDuration(pauseIntervalValue string) (time.Duration, error) { + pauseDuration, err := time.ParseDuration(pauseIntervalValue) + if err != nil { + logrus.Warnf("Failed to parse pause interval value '%s': %v", pauseIntervalValue, err) + return 0, err + } + return pauseDuration, nil +} + +// Returns the duration for which the deployment is paused +func GetPauseDuration(deployment app.Deployment) (string, time.Duration) { + pauseDurationStr := deployment.Annotations[options.PauseDeploymentAnnotation] + if pauseDurationStr == "" { + return "", 0 + } + pauseDuration, err := time.ParseDuration(pauseDurationStr) + if err != nil { + logrus.Warnf("Failed to parse pause interval value '%s': %v", pauseDurationStr, err) + return "", 0 + } + return pauseDurationStr, pauseDuration +} + +// Handles the case where missing timers for deployments that have been paused by reloader. +// Could occur after new leader election or reloader restart +func HandleMissingTimer(deployment *app.Deployment, pauseDuration time.Duration, clients kube.Clients) { + pauseStartTime, err := GetPauseStartTime(deployment) + if err != nil { + logrus.Errorf("Error parsing pause start time for deployment '%s' in namespace '%s': %v. Resuming deployment immediately", + deployment.Name, deployment.Namespace, err) + ResumeDeployment(deployment.Name, deployment.Namespace, clients) + return + } + + if pauseStartTime == nil { + return + } + + elapsedPauseTime := time.Since(*pauseStartTime) + remainingPauseTime := pauseDuration - elapsedPauseTime + + if remainingPauseTime <= 0 { + logrus.Infof("Pause period for deployment '%s' in namespace '%s' has expired. Resuming immediately", + deployment.Name, deployment.Namespace) + ResumeDeployment(deployment.Name, deployment.Namespace, clients) + return + } + + logrus.Infof("Creating missing timer for already paused deployment '%s' in namespace '%s' with remaining time %s", + deployment.Name, deployment.Namespace, remainingPauseTime) + CreateResumeTimer(deployment, clients, remainingPauseTime) +} + +// Creates a timer to resume the deployment after the specified duration +func CreateResumeTimer(deployment *app.Deployment, clients kube.Clients, pauseDuration time.Duration) { + timerKey := getTimerKey(deployment.Namespace, deployment.Name) + + if _, exists := activeTimers[timerKey]; exists { + return + } + + timer := time.AfterFunc(pauseDuration, func() { + ResumeDeployment(deployment.Name, deployment.Namespace, clients) + }) + + activeTimers[timerKey] = timer +} + +// Resumes a deployment that has been paused by reloader +func ResumeDeployment(deploymentName, namespace string, clients kube.Clients) { + deployment, err := clients.KubernetesClient.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{}) + if err != nil { + logrus.Errorf("Failed to get deployment '%s' in namespace '%s': %v", deploymentName, namespace, err) + return + } + + if !IsPausedByReloader(deployment) { + logrus.Infof("Deployment '%s' in namespace '%s' not paused by Reloader. Skipping resume", deploymentName, namespace) + return + } + + // Remove the timer + timerKey := getTimerKey(namespace, deploymentName) + if timer, exists := activeTimers[timerKey]; exists { + timer.Stop() + delete(activeTimers, timerKey) + logrus.Debugf("Removed pause timer for deployment '%s' in namespace '%s'", deploymentName, namespace) + } + + deployment.Spec.Paused = false + delete(deployment.Annotations, options.PauseDeploymentTimeAnnotation) + + _, err = clients.KubernetesClient.AppsV1().Deployments(namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{}) + if err != nil { + logrus.Errorf("Failed to resume deployment '%s' in namespace '%s': %v", deploymentName, namespace, err) + return + } + + logrus.Infof("Successfully resumed deployment '%s' in namespace '%s'", deploymentName, namespace) +} + +// Creates a patch to pause the deployment +func CreatePausePatch(pauseIntervalValue string) string { + timeAnnotation := time.Now().Format(time.RFC3339) + return fmt.Sprintf(`{ + "spec": {"paused": true}, + "metadata": {"annotations": {"%s": "%s","%s": "%s" }}}`, + options.PauseDeploymentTimeAnnotation, timeAnnotation, + options.PauseDeploymentAnnotation, pauseIntervalValue) +} + +// Applies the pause annotation to the deployment directly +func ApplyPauseAnnotation(deployment *app.Deployment, pauseIntervalValue string) { + if deployment.Annotations == nil { + deployment.Annotations = make(map[string]string) + } + deployment.Annotations[options.PauseDeploymentTimeAnnotation] = time.Now().Format(time.RFC3339) + deployment.Annotations[options.PauseDeploymentAnnotation] = pauseIntervalValue +} + +// Checks if a resume timer exists for the deployment +func ResumeTimerExists(deployment *app.Deployment) bool { + timerKey := getTimerKey(deployment.Namespace, deployment.Name) + _, exists := activeTimers[timerKey] + if !exists { + logrus.Warnf("Timer does not exist for deployment '%s' in namespace '%s'", deployment.Name, deployment.Namespace) + } + return exists +} + +// Main function for pausing a deployment +func PauseDeployment(deployment *app.Deployment, clients kube.Clients, doPatch bool) bool { + pauseDurationString, pauseDuration := GetPauseDuration(*deployment) + + if pauseDurationString == "" || IsPaused(deployment) { + return false + } + + pausedByReloader := IsPausedByReloader(deployment) + if pausedByReloader { + // In case of leader election or reloader restart, check for missing timers + return CheckForMissingTimer(deployment, pauseDuration, clients) + } + + return PerformUpdate(deployment, clients, pauseDurationString, pauseDuration, doPatch) +} + +// Checks if a resume timer exists +func CheckForMissingTimer(deployment *app.Deployment, pauseDuration time.Duration, clients kube.Clients) bool { + if !ResumeTimerExists(deployment) { + HandleMissingTimer(deployment, pauseDuration, clients) + } + return true +} + +// Does the actual update of the deployment (patching or directly) +func PerformUpdate(deployment *app.Deployment, clients kube.Clients, pauseDurationString string, pauseDuration time.Duration, doPatch bool) bool { + logrus.Infof("Pausing Deployment '%s' in namespace '%s' for %s", deployment.Name, deployment.Namespace, pauseDuration) + + if doPatch { + return PauseWithPatch(deployment, clients, pauseDurationString, pauseDuration) + } + + ApplyPauseToDeployment(deployment, pauseDurationString, pauseDuration) + return true +} + +// Pauses the deployment using patch +func PauseWithPatch(deployment *app.Deployment, clients kube.Clients, pauseDurationString string, pauseDuration time.Duration) bool { + pausePatch := CreatePausePatch(pauseDurationString) + + _, err := clients.KubernetesClient.AppsV1().Deployments(deployment.Namespace).Patch( + context.TODO(), + deployment.Name, + types.StrategicMergePatchType, + []byte(pausePatch), + meta_v1.PatchOptions{FieldManager: "Reloader"}, + ) + + if err != nil { + logrus.Errorf("Failed to pause deployment %s: %v", deployment.Name, err) + return false + } + + CreateResumeTimer(deployment, clients, pauseDuration) + return true +} + +// Applies the pause annotation directly +func ApplyPauseToDeployment(deployment *app.Deployment, pauseDurationString string, pauseDuration time.Duration) { + ApplyPauseAnnotation(deployment, pauseDurationString) + deployment.Spec.Paused = true +} diff --git a/internal/pkg/callbacks/pause_deployment_test.go b/internal/pkg/callbacks/pause_deployment_test.go new file mode 100644 index 000000000..b0aadaf36 --- /dev/null +++ b/internal/pkg/callbacks/pause_deployment_test.go @@ -0,0 +1,653 @@ +package callbacks + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stakater/Reloader/internal/pkg/options" + "github.com/stakater/Reloader/pkg/kube" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + app "k8s.io/api/apps/v1" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + testclient "k8s.io/client-go/kubernetes/fake" +) + +func TestIsPaused(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + paused bool + }{ + { + name: "paused deployment", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + }, + paused: true, + }, + { + name: "unpaused deployment", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + paused: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := IsPaused(test.deployment) + assert.Equal(t, test.paused, result) + }) + } +} + +func TestIsPausedByReloader(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + pausedByReloader bool + }{ + { + name: "paused by reloader", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + options.PauseDeploymentTimeAnnotation: time.Now().Format(time.RFC3339), + options.PauseDeploymentAnnotation: "10s", + }, + }, + }, + pausedByReloader: true, + }, + { + name: "not paused by reloader", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + }, + pausedByReloader: false, + }, + { + name: "not paused", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pausedByReloader: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + pausedByReloader := IsPausedByReloader(test.deployment) + assert.Equal(t, test.pausedByReloader, pausedByReloader) + }) + } +} + +func TestGetPauseStartTime(t *testing.T) { + now := time.Now() + nowStr := now.Format(time.RFC3339) + + tests := []struct { + name string + deployment *appsv1.Deployment + pausedByReloader bool + expectedStartTime time.Time + }{ + { + name: "valid pause time", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + options.PauseDeploymentTimeAnnotation: nowStr, + }, + }, + }, + pausedByReloader: true, + expectedStartTime: now, + }, + { + name: "not paused by reloader", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pausedByReloader: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actualStartTime, err := GetPauseStartTime(test.deployment) + + require.NoError(t, err) + + if !test.pausedByReloader { + assert.Nil(t, actualStartTime) + } else { + require.NotNil(t, actualStartTime) + assert.WithinDuration(t, test.expectedStartTime, *actualStartTime, time.Second) + } + }) + } +} + +func TestParsePauseDuration(t *testing.T) { + tests := []struct { + name string + pauseIntervalValue string + expectedDuration time.Duration + invalidDuration bool + }{ + { + name: "valid duration", + pauseIntervalValue: "10s", + expectedDuration: 10 * time.Second, + invalidDuration: false, + }, + { + name: "valid minute duration", + pauseIntervalValue: "2m", + expectedDuration: 2 * time.Minute, + invalidDuration: false, + }, + { + name: "invalid duration", + pauseIntervalValue: "invalid", + expectedDuration: 0, + invalidDuration: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actualDuration, err := ParsePauseDuration(test.pauseIntervalValue) + + if test.invalidDuration { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expectedDuration, actualDuration) + } + }) + } +} + +func TestHandleMissingTimerSimple(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + shouldBePaused bool // Should be unpaused after HandleMissingTimer ? + }{ + { + name: "deployment paused by reloader, pause period has expired and no timer", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-1", + Namespace: "default", + Annotations: map[string]string{ + options.PauseDeploymentTimeAnnotation: time.Now().Add(-6 * time.Minute).Format(time.RFC3339), + options.PauseDeploymentAnnotation: "5m", + }, + }, + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + }, + shouldBePaused: false, + }, + { + name: "deployment paused by reloader, pause period expires in the future and no timer", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-2", + Namespace: "default", + Annotations: map[string]string{ + options.PauseDeploymentTimeAnnotation: time.Now().Add(1 * time.Minute).Format(time.RFC3339), + options.PauseDeploymentAnnotation: "5m", + }, + }, + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + }, + shouldBePaused: true, + }, + } + + for _, test := range tests { + // Clean up any timers at the end of the test + defer func() { + for key, timer := range activeTimers { + timer.Stop() + delete(activeTimers, key) + } + }() + + t.Run(test.name, func(t *testing.T) { + fakeClient := testclient.NewSimpleClientset() + clients := kube.Clients{ + KubernetesClient: fakeClient, + } + + _, err := fakeClient.AppsV1().Deployments("default").Create( + context.TODO(), + test.deployment, + metav1.CreateOptions{}) + require.NoError(t, err, "Expected no error when creating deployment") + + pauseDuration, _ := ParsePauseDuration(test.deployment.Annotations[options.PauseDeploymentAnnotation]) + HandleMissingTimer(test.deployment, pauseDuration, clients) + + updatedDeployment, _ := fakeClient.AppsV1().Deployments("default").Get(context.TODO(), test.deployment.Name, metav1.GetOptions{}) + + assert.Equal(t, test.shouldBePaused, updatedDeployment.Spec.Paused, + "Deployment should have correct paused state after timer expiration") + + if test.shouldBePaused { + pausedAtAnnotationValue := updatedDeployment.Annotations[options.PauseDeploymentTimeAnnotation] + assert.NotEmpty(t, pausedAtAnnotationValue, + "Pause annotation should be present and contain a value when deployment is paused") + } + }) + } +} + +func TestPauseDeployment(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + expectedError bool + expectedPaused bool + shouldBePaused bool // Return value of PauseDeployment + expectedAnnotation bool // Should have pause time annotation + pauseInterval string + }{ + { + name: "deployment without pause annotation", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + Annotations: map[string]string{}, + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + expectedError: true, + expectedPaused: false, + shouldBePaused: false, + expectedAnnotation: false, + pauseInterval: "", + }, + { + name: "deployment already paused but not by reloader", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + Annotations: map[string]string{ + options.PauseDeploymentAnnotation: "5m", + }, + }, + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + }, + expectedError: false, + expectedPaused: true, + shouldBePaused: false, + expectedAnnotation: false, + pauseInterval: "5m", + }, + { + name: "deployment unpaused that needs to be paused by reloader", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-3", + Annotations: map[string]string{ + options.PauseDeploymentAnnotation: "5m", + }, + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + expectedError: false, + expectedPaused: true, + shouldBePaused: true, + expectedAnnotation: true, + pauseInterval: "5m", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClient := testclient.NewSimpleClientset() + clients := kube.Clients{ + KubernetesClient: fakeClient, + } + + hasBeenPaused := PauseDeployment(test.deployment, clients, false) + + require.Equal(t, test.shouldBePaused, hasBeenPaused, + "PauseDeployment should have returned correct paused state") + + assert.Equal(t, test.expectedPaused, test.deployment.Spec.Paused, + "Deployment should have correct paused state after pause") + + if test.expectedAnnotation { + pausedAtAnnotationValue := test.deployment.Annotations[options.PauseDeploymentTimeAnnotation] + assert.NotEmpty(t, pausedAtAnnotationValue, + "Pause annotation should be present and contain a value when deployment is paused") + } else { + pausedAtAnnotationValue := test.deployment.Annotations[options.PauseDeploymentTimeAnnotation] + assert.Empty(t, pausedAtAnnotationValue, + "Pause annotation should not be present when deployment has not been paused by reloader") + } + }) + } +} + +func TestPauseWithPatch(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + pauseDurationString string + pauseDuration time.Duration + expectedResult bool + expectTimer bool + shouldPatchSucceed bool + }{ + { + name: "successful pause deployment with patch", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-1", + Namespace: "default", + Annotations: map[string]string{ + options.PauseDeploymentAnnotation: "5m", + }, + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pauseDurationString: "5m", + pauseDuration: 5 * time.Minute, + expectedResult: true, + expectTimer: true, + shouldPatchSucceed: true, + }, + { + name: "deployment without annotations", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-2", + Namespace: "default", + Annotations: nil, + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pauseDurationString: "30s", + pauseDuration: 30 * time.Second, + expectedResult: true, + expectTimer: true, + shouldPatchSucceed: true, + }, + } + + for _, test := range tests { + // Clean up any timers at the end of each test + defer func() { + for key, timer := range activeTimers { + timer.Stop() + delete(activeTimers, key) + } + }() + + t.Run(test.name, func(t *testing.T) { + fakeClient := testclient.NewSimpleClientset() + clients := kube.Clients{ + KubernetesClient: fakeClient, + } + + _, err := fakeClient.AppsV1().Deployments(test.deployment.Namespace).Create( + context.TODO(), + test.deployment, + metav1.CreateOptions{}) + require.NoError(t, err, "Expected no error when creating deployment") + + result := PauseWithPatch(test.deployment, clients, test.pauseDurationString, test.pauseDuration) + + require.Equal(t, test.expectedResult, result, + "PauseWithPatch should return correct state") + + // Check if timer was created + timerKey := getTimerKey(test.deployment.Namespace, test.deployment.Name) + _, timerExists := activeTimers[timerKey] + assert.Equal(t, test.expectTimer, timerExists, + "Timer should exist if pause was successful") + + if test.shouldPatchSucceed { + updatedDeployment, err := fakeClient.AppsV1().Deployments(test.deployment.Namespace).Get( + context.TODO(), + test.deployment.Name, + metav1.GetOptions{}) + + require.NoError(t, err, "Should be able to get updated deployment") + assert.True(t, updatedDeployment.Spec.Paused, "Deployment should be paused after patch") + + pauseTimeAnnotation := updatedDeployment.Annotations[options.PauseDeploymentTimeAnnotation] + assert.NotEmpty(t, pauseTimeAnnotation, "Pause time annotation should be set") + + pauseIntervalAnnotation := updatedDeployment.Annotations[options.PauseDeploymentAnnotation] + assert.Equal(t, test.pauseDurationString, pauseIntervalAnnotation, "Pause interval annotation should match") + } + }) + } +} + +func TestApplyPauseToDeployment(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + pauseDurationString string + pauseDuration time.Duration + expectPaused bool + expectAnnotations bool + }{ + { + name: "apply pause to unpaused deployment", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-1", + Namespace: "default", + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pauseDurationString: "5m", + pauseDuration: 5 * time.Minute, + expectPaused: true, + expectAnnotations: true, + }, + { + name: "apply pause to deployment with no annotations map", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-3", + Namespace: "default", + Annotations: nil, + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pauseDurationString: "30s", + pauseDuration: 30 * time.Second, + expectPaused: true, + expectAnnotations: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ApplyPauseToDeployment(test.deployment, test.pauseDurationString, test.pauseDuration) + + require.Equal(t, test.expectPaused, test.deployment.Spec.Paused, + "Deployment should be paused after applying pause") + + if test.expectAnnotations { + require.NotNil(t, test.deployment.Annotations, + "Annotations map should be created if it didn't exist") + + pauseTimeAnnotation := test.deployment.Annotations[options.PauseDeploymentTimeAnnotation] + assert.NotEmpty(t, pauseTimeAnnotation, + "Pause time annotation should be set") + + pauseIntervalAnnotation := test.deployment.Annotations[options.PauseDeploymentAnnotation] + assert.Equal(t, test.pauseDurationString, pauseIntervalAnnotation, + "Pause interval annotation should match the input") + } + }) + } +} + +func TestPerformUpdate(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + pauseDurationString string + pauseDuration time.Duration + doPatch bool + updateSucceeded bool + }{ + { + name: "update deployment with patch", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-patch", + Namespace: "default", + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pauseDurationString: "5m", + pauseDuration: 5 * time.Minute, + doPatch: true, + updateSucceeded: true, + }, + { + name: "update deployment without patch", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-no-patch", + Namespace: "default", + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pauseDurationString: "10m", + pauseDuration: 10 * time.Minute, + doPatch: false, + updateSucceeded: true, + }, + } + + for _, test := range tests { + defer func() { + for key, timer := range activeTimers { + timer.Stop() + delete(activeTimers, key) + } + }() + + t.Run(test.name, func(t *testing.T) { + fakeClient := testclient.NewSimpleClientset() + clients := kube.Clients{ + KubernetesClient: fakeClient, + } + + _, err := fakeClient.AppsV1().Deployments(test.deployment.Namespace).Create( + context.TODO(), + test.deployment, + metav1.CreateOptions{}) + require.Nil(t, err, "Expected no error when creating deployment") + + result := PerformUpdate(test.deployment, clients, test.pauseDurationString, test.pauseDuration, test.doPatch) + + require.Equal(t, test.updateSucceeded, result, + "PerformUpdate should return correct result") + + if test.doPatch { + // For patch, get deployment + updatedDeployment, err := fakeClient.AppsV1().Deployments(test.deployment.Namespace).Get( + context.TODO(), + test.deployment.Name, + metav1.GetOptions{}) + + require.NoError(t, err, "Should be able to get updated deployment") + assert.True(t, updatedDeployment.Spec.Paused, "Deployment should be paused") + + assert.NotEmpty(t, updatedDeployment.Annotations[options.PauseDeploymentTimeAnnotation], + "Pause time annotation should be present") + assert.Equal(t, test.pauseDurationString, updatedDeployment.Annotations[options.PauseDeploymentAnnotation], + "Pause interval annotation should match") + } else { + assert.True(t, test.deployment.Spec.Paused, "Deployment should be paused") + + assert.NotEmpty(t, test.deployment.Annotations[options.PauseDeploymentTimeAnnotation], + "Pause time annotation should be present") + assert.Equal(t, test.pauseDurationString, test.deployment.Annotations[options.PauseDeploymentAnnotation], + "Pause interval annotation should match") + } + }) + } +} + +// Simple helper function for the test cases +func FindDeploymentByName(deployments []runtime.Object, deploymentName string) (*app.Deployment, error) { + for _, deployment := range deployments { + accessor, err := meta.Accessor(deployment) + if err != nil { + return nil, fmt.Errorf("error getting accessor for item: %v", err) + } + if accessor.GetName() == deploymentName { + deploymentObj, ok := deployment.(*app.Deployment) + if !ok { + return nil, fmt.Errorf("failed to cast to Deployment") + } + return deploymentObj, nil + } + } + return nil, fmt.Errorf("deployment '%s' not found", deploymentName) +} diff --git a/internal/pkg/callbacks/rolling_upgrade.go b/internal/pkg/callbacks/rolling_upgrade.go index c7b2e5ccb..f4db889c2 100644 --- a/internal/pkg/callbacks/rolling_upgrade.go +++ b/internal/pkg/callbacks/rolling_upgrade.go @@ -431,6 +431,9 @@ func GetPatchTemplates() PatchTemplates { // UpdateDeployment performs rolling upgrade on deployment func UpdateDeployment(clients kube.Clients, namespace string, resource runtime.Object) error { deployment := resource.(*appsv1.Deployment) + + PauseDeployment(deployment, clients, false) + _, err := clients.KubernetesClient.AppsV1().Deployments(namespace).Update(context.TODO(), deployment, meta_v1.UpdateOptions{FieldManager: "Reloader"}) return err } @@ -438,6 +441,9 @@ func UpdateDeployment(clients kube.Clients, namespace string, resource runtime.O // PatchDeployment performs rolling upgrade on deployment func PatchDeployment(clients kube.Clients, namespace string, resource runtime.Object, patchType patchtypes.PatchType, bytes []byte) error { deployment := resource.(*appsv1.Deployment) + + PauseDeployment(deployment, clients, true) + _, err := clients.KubernetesClient.AppsV1().Deployments(namespace).Patch(context.TODO(), deployment.Name, patchType, bytes, meta_v1.PatchOptions{FieldManager: "Reloader"}) return err } diff --git a/internal/pkg/handler/upgrade_test.go b/internal/pkg/handler/upgrade_test.go index 5c77cae65..bdd2436d0 100644 --- a/internal/pkg/handler/upgrade_test.go +++ b/internal/pkg/handler/upgrade_test.go @@ -18,6 +18,7 @@ import ( "github.com/stakater/Reloader/internal/pkg/util" "github.com/stakater/Reloader/pkg/kube" "github.com/stretchr/testify/assert" + apps "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -52,6 +53,7 @@ var ( arsConfigmapWithConfigMapAutoAnnotation = "testconfigmapwithconfigmapautoannotationdeployment-handler-" + testutil.RandSeq(5) arsSecretWithExcludeSecretAnnotation = "testsecretwithsecretexcludeannotationdeployment-handler-" + testutil.RandSeq(5) arsConfigmapWithExcludeConfigMapAnnotation = "testconfigmapwithconfigmapexcludeannotationdeployment-handler-" + testutil.RandSeq(5) + arsConfigmapWithPausedDeployment = "testconfigmapWithPausedDeployment-handler-" + testutil.RandSeq(5) arsConfigmapWithIgnoreAnnotation = "testconfigmapWithIgnoreAnnotation-handler-" + testutil.RandSeq(5) arsSecretWithIgnoreAnnotation = "testsecretWithIgnoreAnnotation-handler-" + testutil.RandSeq(5) @@ -77,6 +79,7 @@ var ( ersConfigmapWithConfigMapAutoAnnotation = "testconfigmapwithconfigmapautoannotationdeployment-handler-" + testutil.RandSeq(5) ersSecretWithSecretExcludeAnnotation = "testsecretwithsecretexcludeannotationdeployment-handler-" + testutil.RandSeq(5) ersConfigmapWithConfigMapExcludeAnnotation = "testconfigmapwithconfigmapexcludeannotationdeployment-handler-" + testutil.RandSeq(5) + ersConfigmapWithPausedDeployment = "testconfigmapWithPausedDeployment-handler-" + testutil.RandSeq(5) ersConfigmapWithIgnoreAnnotation = "testconfigmapWithIgnoreAnnotation-handler-" + testutil.RandSeq(5) ersSecretWithIgnoreAnnotation = "testsecretWithIgnoreAnnotation-handler-" + testutil.RandSeq(5) ) @@ -207,6 +210,12 @@ func setupArs() { logrus.Errorf("Error in configmap creation: %v", err) } + // Creating configmap for testing pausing deployments + _, err = testutil.CreateConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment, "www.google.com") + if err != nil { + logrus.Errorf("Error in configmap creation: %v", err) + } + // Creating secret used with secret auto annotation _, err = testutil.CreateSecret(clients.KubernetesClient, arsNamespace, arsSecretWithExcludeSecretAnnotation, data) if err != nil { @@ -457,6 +466,12 @@ func setupArs() { if err != nil { logrus.Errorf("Error in Deployment with both annotations: %v", err) } + + // Creating Deployment with pause annotation + _, err = testutil.CreateDeploymentWithAnnotations(clients.KubernetesClient, arsConfigmapWithPausedDeployment, arsNamespace, map[string]string{options.PauseDeploymentAnnotation: "10s"}, false) + if err != nil { + logrus.Errorf("Error in Deployment with configmap creation: %v", err) + } } func teardownArs() { @@ -658,6 +673,12 @@ func teardownArs() { logrus.Errorf("Error while deleting statefulSet with secret as env var source %v", statefulSetError) } + // Deleting Deployment with pasuse annotation + deploymentError = testutil.DeleteDeployment(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment) + if deploymentError != nil { + logrus.Errorf("Error while deleting deployment with configmap %v", deploymentError) + } + // Deleting Configmap err := testutil.DeleteConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapName) if err != nil { @@ -771,6 +792,12 @@ func teardownArs() { logrus.Errorf("Error while deleting the configmap used with configmap auto annotations: %v", err) } + // Deleting configmap for testing pausing deployments + err = testutil.DeleteConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment) + if err != nil { + logrus.Errorf("Error while deleting the configmap: %v", err) + } + // Deleting namespace testutil.DeleteNamespace(arsNamespace, clients.KubernetesClient) @@ -830,6 +857,12 @@ func setupErs() { logrus.Errorf("Error in configmap creation: %v", err) } + // Creating configmap for testing pausing deployments + _, err = testutil.CreateConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment, "www.google.com") + if err != nil { + logrus.Errorf("Error in configmap creation: %v", err) + } + // Creating secret _, err = testutil.CreateSecret(clients.KubernetesClient, ersNamespace, ersSecretWithInitEnv, data) if err != nil { @@ -1034,6 +1067,12 @@ func setupErs() { logrus.Errorf("Error in Deployment with configmap and with configmap exclude annotation: %v", err) } + // Creating Deployment with pause annotation + _, err = testutil.CreateDeploymentWithAnnotations(clients.KubernetesClient, ersConfigmapWithPausedDeployment, ersNamespace, map[string]string{options.PauseDeploymentAnnotation: "10s"}, false) + if err != nil { + logrus.Errorf("Error in Deployment with configmap creation: %v", err) + } + // Creating DaemonSet with configmap _, err = testutil.CreateDaemonSet(clients.KubernetesClient, ersConfigmapName, ersNamespace, true) if err != nil { @@ -1318,6 +1357,12 @@ func teardownErs() { logrus.Errorf("Error while deleting statefulSet with secret as env var source %v", statefulSetError) } + // Deleting Deployment for testing pausing deployments + deploymentError = testutil.DeleteDeployment(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment) + if deploymentError != nil { + logrus.Errorf("Error while deleting deployment with configmap %v", deploymentError) + } + // Deleting Configmap err := testutil.DeleteConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapName) if err != nil { @@ -1431,6 +1476,12 @@ func teardownErs() { logrus.Errorf("Error while deleting the configmap used with configmap exclude annotation: %v", err) } + // Deleting ConfigMap for testins pausing deployments + err = testutil.DeleteConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment) + if err != nil { + logrus.Errorf("Error while deleting the configmap: %v", err) + } + // Deleting namespace testutil.DeleteNamespace(ersNamespace, clients.KubernetesClient) @@ -4053,3 +4104,108 @@ func TestFailedRollingUpgradeUsingErs(t *testing.T) { t.Errorf("Counter by namespace was not increased") } } + +func TestPausingDeploymentUsingErs(t *testing.T) { + options.ReloadStrategy = constants.EnvVarsReloadStrategy + testPausingDeployment(t, options.ReloadStrategy, ersConfigmapWithPausedDeployment, ersNamespace) +} + +func TestPausingDeploymentUsingArs(t *testing.T) { + options.ReloadStrategy = constants.AnnotationsReloadStrategy + testPausingDeployment(t, options.ReloadStrategy, arsConfigmapWithPausedDeployment, arsNamespace) +} + +func testPausingDeployment(t *testing.T, reloadStrategy string, testName string, namespace string) { + options.ReloadStrategy = reloadStrategy + envVarPostfix := constants.ConfigmapEnvVarPostfix + + shaData := testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, namespace, testName, "pause.stakater.com") + config := getConfigWithAnnotations(envVarPostfix, testName, shaData, options.ConfigmapUpdateOnChangeAnnotation, options.ConfigmapReloaderAutoAnnotation) + deploymentFuncs := GetDeploymentRollingUpgradeFuncs() + collectors := getCollectors() + + _ = PerformAction(clients, config, deploymentFuncs, collectors, nil, invokeReloadStrategy) + + if promtestutil.ToFloat64(collectors.Reloaded.With(labelSucceeded)) != 1 { + t.Errorf("Counter was not increased") + } + + if promtestutil.ToFloat64(collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": namespace})) != 1 { + t.Errorf("Counter by namespace was not increased") + } + + logrus.Infof("Verifying deployment has been paused") + items := deploymentFuncs.ItemsFunc(clients, config.Namespace) + deploymentPaused, err := isDeploymentPaused(items, testName) + if err != nil { + t.Errorf("%s", err.Error()) + } + if !deploymentPaused { + t.Errorf("Deployment has not been paused") + } + + shaData = testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, namespace, testName, "pause-changed.stakater.com") + config = getConfigWithAnnotations(envVarPostfix, testName, shaData, options.ConfigmapUpdateOnChangeAnnotation, options.ConfigmapReloaderAutoAnnotation) + + _ = PerformAction(clients, config, deploymentFuncs, collectors, nil, invokeReloadStrategy) + + if promtestutil.ToFloat64(collectors.Reloaded.With(labelSucceeded)) != 2 { + t.Errorf("Counter was not increased") + } + + if promtestutil.ToFloat64(collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": namespace})) != 2 { + t.Errorf("Counter by namespace was not increased") + } + + logrus.Infof("Verifying deployment is still paused") + items = deploymentFuncs.ItemsFunc(clients, config.Namespace) + deploymentPaused, err = isDeploymentPaused(items, testName) + if err != nil { + t.Errorf("%s", err.Error()) + } + if !deploymentPaused { + t.Errorf("Deployment should still be paused") + } + + logrus.Infof("Verifying deployment has been resumed after pause interval") + time.Sleep(11 * time.Second) + items = deploymentFuncs.ItemsFunc(clients, config.Namespace) + deploymentPaused, err = isDeploymentPaused(items, testName) + if err != nil { + t.Errorf("%s", err.Error()) + } + if deploymentPaused { + t.Errorf("Deployment should have been resumed after pause interval") + } +} + +func isDeploymentPaused(deployments []runtime.Object, deploymentName string) (bool, error) { + deployment, err := FindDeploymentByName(deployments, deploymentName) + if err != nil { + return false, err + } + return IsPaused(deployment), nil +} + +// Simple helper function for test cases +func FindDeploymentByName(deployments []runtime.Object, deploymentName string) (*apps.Deployment, error) { + for _, deployment := range deployments { + accessor, err := meta.Accessor(deployment) + if err != nil { + return nil, fmt.Errorf("error getting accessor for item: %v", err) + } + if accessor.GetName() == deploymentName { + deploymentObj, ok := deployment.(*apps.Deployment) + if !ok { + return nil, fmt.Errorf("failed to cast to Deployment") + } + return deploymentObj, nil + } + } + return nil, fmt.Errorf("deployment '%s' not found", deploymentName) +} + +// Checks if a deployment is currently paused +func IsPaused(deployment *apps.Deployment) bool { + return deployment.Spec.Paused +} diff --git a/internal/pkg/options/flags.go b/internal/pkg/options/flags.go index 7c0e14e13..bfa63fc97 100644 --- a/internal/pkg/options/flags.go +++ b/internal/pkg/options/flags.go @@ -40,6 +40,12 @@ var ( SearchMatchAnnotation = "reloader.stakater.com/match" // RolloutStrategyAnnotation is an annotation to define rollout update strategy RolloutStrategyAnnotation = "reloader.stakater.com/rollout-strategy" + // PauseDeploymentAnnotation is an annotation to define the time period to pause a deployment after + // a configmap/secret change has been detected. Valid values are described here: https://pkg.go.dev/time#ParseDuration + // only positive values are allowed + PauseDeploymentAnnotation = "deployment.reloader.stakater.com/pause-period" + // Annotation set by reloader to indicate that the deployment has been paused + PauseDeploymentTimeAnnotation = "deployment.reloader.stakater.com/paused-at" // LogFormat is the log format to use (json, or empty string for default) LogFormat = "" // LogLevel is the log level to use (trace, debug, info, warning, error, fatal and panic) diff --git a/internal/pkg/testutil/kube.go b/internal/pkg/testutil/kube.go index f2d3bb4ad..7a1aae3ac 100644 --- a/internal/pkg/testutil/kube.go +++ b/internal/pkg/testutil/kube.go @@ -794,6 +794,26 @@ func CreateDeployment(client kubernetes.Interface, deploymentName string, namesp return deployment, err } +// CreateDeployment creates a deployment in given namespace and returns the Deployment +func CreateDeploymentWithAnnotations(client kubernetes.Interface, deploymentName string, namespace string, additionalAnnotations map[string]string, volumeMount bool) (*appsv1.Deployment, error) { + logrus.Infof("Creating Deployment") + deploymentClient := client.AppsV1().Deployments(namespace) + var deploymentObj *appsv1.Deployment + if volumeMount { + deploymentObj = GetDeployment(namespace, deploymentName) + } else { + deploymentObj = GetDeploymentWithEnvVars(namespace, deploymentName) + } + + for annotationKey, annotationValue := range additionalAnnotations { + deploymentObj.Annotations[annotationKey] = annotationValue + } + + deployment, err := deploymentClient.Create(context.TODO(), deploymentObj, metav1.CreateOptions{}) + time.Sleep(3 * time.Second) + return deployment, err +} + // CreateDeploymentConfig creates a deploymentConfig in given namespace and returns the DeploymentConfig func CreateDeploymentConfig(client appsclient.Interface, deploymentName string, namespace string, volumeMount bool) (*openshiftv1.DeploymentConfig, error) { logrus.Infof("Creating DeploymentConfig")