Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- **BREAKING**: Change `KafkaSchema` deletion to perform a hard delete instead of soft delete only.
The subject is no longer visible in the registry's listing after deletion,
and re-applying a `KafkaSchema` with the same `subjectName` after deletion starts at version 1.
- Fix service resources: wait for connection Secret publication before reporting Ready.

## v0.38.0 - 2026-05-18

Expand Down
79 changes: 77 additions & 2 deletions controllers/basic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -191,7 +192,7 @@ func (i *instanceReconcilerHelper) reconcile(ctx context.Context, o v1alpha1.Aiv
return false, nil
}

if IsReadyToUse(o) && !hasPendingMigration(o) {
if IsReadyToUse(o) && !hasPendingMigration(o) && !i.resourceNeedsConnectionSecretSync(ctx, o) {
return false, nil
}

Expand Down Expand Up @@ -327,6 +328,34 @@ func hasPendingMigration(o v1alpha1.AivenManagedObject) bool {
return cond != nil && cond.Reason == v1alpha1.MigrationReasonInProgress
}

func (i *instanceReconcilerHelper) resourceNeedsConnectionSecretSync(ctx context.Context, o v1alpha1.AivenManagedObject) bool {
if IsMarkedAsPoweredOff(o) {
return false
}

withSecret, ok := connectionSecretOwner(o)
if !ok {
return false
}

if hasConnectionSecretPublishError(o) {
return true
}

secret := &corev1.Secret{}
if err := i.k8s.Get(ctx, types.NamespacedName{Name: connectionSecretName(withSecret), Namespace: withSecret.GetNamespace()}, secret); err != nil {
if !apierrors.IsNotFound(err) {
i.log.Info("unable to verify connection secret ownership", "error", err)
}
return true
}

// Don't treat a same-named Secret as fresh connection details unless it was published for this exact object.
// Otherwise a Secret left behind by a resource deleted and recreated quickly could make the new resource look Ready too early.
ref := metav1.GetControllerOf(secret)
return ref == nil || o.GetUID() == "" || ref.UID != o.GetUID()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a bit of explanation what happens here.

}

func (i *instanceReconcilerHelper) checkPreconditions(ctx context.Context, o client.Object, refs []client.Object) (bool, error) {
i.rec.Event(o, corev1.EventTypeNormal, eventWaitingForPreconditions, "waiting for preconditions of the instance")

Expand Down Expand Up @@ -555,11 +584,18 @@ func (i *instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx
goalSecret, err := i.h.get(ctx, i.avnGen, o)

if goalSecret == nil || err != nil {
if err != nil && goalSecret != nil {
markConnectionSecretPublishFailed(o, err)
}
if err == nil && hasIsRunningAnnotation(o) {
clearConnectionSecretPublishError(o)
}
return err
}

if o.NoSecret() {
i.rec.Event(o, corev1.EventTypeNormal, eventConnInfoSecretCreationDisabled, "connInfoSecretTargetDisabled is true, secret will not be created")
clearConnectionSecretPublishError(o)
return nil
}

Expand Down Expand Up @@ -590,8 +626,47 @@ func (i *instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx

return controllerutil.SetControllerReference(o, secret, i.k8s.Scheme())
})
if err != nil {
markConnectionSecretPublishFailed(o, err)
return err
}

clearConnectionSecretPublishError(o)
return nil
}

func connectionSecretOwner(o v1alpha1.AivenManagedObject) (objWithSecret, bool) {
if o.NoSecret() {
return nil, false
}

withSecret, ok := any(o).(objWithSecret)
return withSecret, ok
}

func hasConnectionSecretPublishError(o v1alpha1.AivenManagedObject) bool {
cond := meta.FindStatusCondition(*o.Conditions(), ConditionTypeError)
return cond != nil && cond.Reason == string(errConditionConnInfoSecret)
}

func markConnectionSecretPublishFailed(o v1alpha1.AivenManagedObject, err error) {
if _, ok := connectionSecretOwner(o); !ok {
return
}

delete(o.GetAnnotations(), instanceIsRunningAnnotation)
meta.SetStatusCondition(o.Conditions(), getRunningCondition(
metav1.ConditionUnknown,
string(errConditionConnInfoSecret),
"Connection details are not published",
))
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionConnInfoSecret, err))
}

return err
func clearConnectionSecretPublishError(o v1alpha1.AivenManagedObject) {
if hasConnectionSecretPublishError(o) {
meta.RemoveStatusCondition(o.Conditions(), ConditionTypeError)
}
}

func setupLogger(log logr.Logger, o client.Object) logr.Logger {
Expand Down
233 changes: 233 additions & 0 deletions controllers/basic_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package controllers

import (
"context"
"testing"

avngen "github.com/aiven/go-client-codegen"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/aiven/aiven-operator/api/v1alpha1"
)

func TestInstanceReconcilerHelper_updateInstanceStateAndSecretUntilRunning(t *testing.T) {
connectionSecret := func(pg *v1alpha1.PostgreSQL) *corev1.Secret {
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pg.Name,
Namespace: pg.Namespace,
},
StringData: map[string]string{
"PGPASSWORD": "pw",
},
}
}

t.Run("Marks resource not ready when connection Secret publish fails", func(t *testing.T) {
pg := newReadyPostgreSQL(t)
controller := true

scheme := newBasicControllerTestScheme(t)
k8sClient := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pg.Name,
Namespace: pg.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "aiven.io/v1alpha1",
Kind: "PostgreSQL",
Name: "other",
UID: types.UID("other-uid"),
Controller: &controller,
},
},
},
}).
Build()

h := NewMockHandlers(t)
h.EXPECT().
get(mock.Anything, mock.Anything, pg).
Run(func(_ context.Context, _ avngen.Client, _ client.Object) {
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, instanceIsRunningAnnotation, "true")
meta.SetStatusCondition(&pg.Status.Conditions, getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
}).
Return(connectionSecret(pg), nil).
Once()

helper := &instanceReconcilerHelper{
k8s: k8sClient,
h: h,
rec: record.NewFakeRecorder(10),
}

err := helper.updateInstanceStateAndSecretUntilRunning(t.Context(), pg)
require.Error(t, err)

require.False(t, hasIsRunningAnnotation(pg))

running := meta.FindStatusCondition(pg.Status.Conditions, conditionTypeRunning)
require.NotNil(t, running)
require.Equal(t, metav1.ConditionUnknown, running.Status)
require.Equal(t, string(errConditionConnInfoSecret), running.Reason)

errCond := meta.FindStatusCondition(pg.Status.Conditions, ConditionTypeError)
require.NotNil(t, errCond)
require.Equal(t, metav1.ConditionUnknown, errCond.Status)
require.Equal(t, string(errConditionConnInfoSecret), errCond.Reason)
})

t.Run("Clears connection Secret publish error after successful publish", func(t *testing.T) {
pg := newReadyPostgreSQL(t)
meta.SetStatusCondition(&pg.Status.Conditions, getErrorCondition(errConditionConnInfoSecret, assert.AnError))

scheme := newBasicControllerTestScheme(t)
k8sClient := fake.NewClientBuilder().
WithScheme(scheme).
Build()

h := NewMockHandlers(t)
h.EXPECT().
get(mock.Anything, mock.Anything, pg).
Run(func(_ context.Context, _ avngen.Client, _ client.Object) {
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, instanceIsRunningAnnotation, "true")
meta.SetStatusCondition(&pg.Status.Conditions, getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
}).
Return(connectionSecret(pg), nil).
Once()

helper := &instanceReconcilerHelper{
k8s: k8sClient,
h: h,
rec: record.NewFakeRecorder(10),
}

require.NoError(t, helper.updateInstanceStateAndSecretUntilRunning(t.Context(), pg))
require.True(t, hasIsRunningAnnotation(pg))
require.Nil(t, meta.FindStatusCondition(pg.Status.Conditions, ConditionTypeError))

got := &corev1.Secret{}
require.NoError(t, k8sClient.Get(t.Context(), types.NamespacedName{Name: pg.Name, Namespace: pg.Namespace}, got))
require.Equal(t, []byte("pw"), got.Data["PGPASSWORD"])
})
}

func TestInstanceReconcilerHelper_resourceNeedsConnectionSecretSync(t *testing.T) {
ownedSecret := func(pg *v1alpha1.PostgreSQL) *corev1.Secret {
controller := true
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pg.Name,
Namespace: pg.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "aiven.io/v1alpha1",
Kind: "PostgreSQL",
Name: pg.Name,
UID: pg.UID,
Controller: &controller,
},
},
},
Data: map[string][]byte{
"PGPASSWORD": []byte("pw"),
},
}
}

tests := map[string]struct {
mutate func(*v1alpha1.PostgreSQL)
objects []runtime.Object
wantNeedsSync bool
}{
"does not need sync for powered-off resource": {
mutate: func(pg *v1alpha1.PostgreSQL) {
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, instanceIsRunningAnnotation, "false")
},
wantNeedsSync: false,
},
"does not need sync when connection Secret creation is disabled": {
mutate: func(pg *v1alpha1.PostgreSQL) {
pg.Spec.ConnInfoSecretTargetDisabled = new(bool)
*pg.Spec.ConnInfoSecretTargetDisabled = true
},
wantNeedsSync: false,
},
"needs sync when connection Secret is missing": {
wantNeedsSync: true,
},
"needs sync when connection Secret is not controlled by the resource": {
objects: []runtime.Object{
&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "pg-no-ref", Namespace: "default"}},
},
wantNeedsSync: true,
},
"needs sync when a stale connection Secret error is present": {
mutate: func(pg *v1alpha1.PostgreSQL) {
meta.SetStatusCondition(&pg.Status.Conditions, getErrorCondition(errConditionConnInfoSecret, assert.AnError))
},
objects: []runtime.Object{
ownedSecret(newReadyPostgreSQL(t)),
},
wantNeedsSync: true,
},
"does not need sync when connection Secret is controlled by the resource": {
objects: []runtime.Object{
ownedSecret(newReadyPostgreSQL(t)),
},
wantNeedsSync: false,
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
pg := newReadyPostgreSQL(t)
if tt.mutate != nil {
tt.mutate(pg)
}

scheme := newBasicControllerTestScheme(t)
k8sClient := fake.NewClientBuilder().
WithScheme(scheme).
WithRuntimeObjects(tt.objects...).
Build()

helper := &instanceReconcilerHelper{k8s: k8sClient}
require.Equal(t, tt.wantNeedsSync, helper.resourceNeedsConnectionSecretSync(t.Context(), pg))
})
}
}

func newBasicControllerTestScheme(t *testing.T) *runtime.Scheme {
t.Helper()

scheme := runtime.NewScheme()
require.NoError(t, clientgoscheme.AddToScheme(scheme))
require.NoError(t, v1alpha1.AddToScheme(scheme))
return scheme
}

func newReadyPostgreSQL(t *testing.T) *v1alpha1.PostgreSQL {
t.Helper()

pg := newObjectFromYAML[v1alpha1.PostgreSQL](t, yamlPostgres)
pg.UID = types.UID("pg-uid")
pg.Generation = 1
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, processedGenerationAnnotation, "1")
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, instanceIsRunningAnnotation, "true")
meta.SetStatusCondition(&pg.Status.Conditions, getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
return pg
}
Loading
Loading