Skip to content

Commit 3f35fb4

Browse files
committed
fix(controllers): wait for connection secret before ready
1 parent 56bc51c commit 3f35fb4

12 files changed

Lines changed: 558 additions & 30 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
- **BREAKING**: Change `KafkaSchema` deletion to perform a hard delete instead of soft delete only.
3030
The subject is no longer visible in the registry's listing after deletion,
3131
and re-applying a `KafkaSchema` with the same `subjectName` after deletion starts at version 1.
32+
- Fix service resources: wait for connection Secret publication before reporting Ready.
3233

3334
## v0.38.0 - 2026-05-18
3435

controllers/basic_controller.go

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/go-logr/logr"
1414
corev1 "k8s.io/api/core/v1"
1515
"k8s.io/apimachinery/pkg/api/equality"
16+
apierrors "k8s.io/apimachinery/pkg/api/errors"
1617
"k8s.io/apimachinery/pkg/api/meta"
1718
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1819
"k8s.io/apimachinery/pkg/runtime"
@@ -191,7 +192,7 @@ func (i *instanceReconcilerHelper) reconcile(ctx context.Context, o v1alpha1.Aiv
191192
return false, nil
192193
}
193194

194-
if IsReadyToUse(o) && !hasPendingMigration(o) {
195+
if IsReadyToUse(o) && !hasPendingMigration(o) && !i.resourceNeedsConnectionSecretSync(ctx, o) {
195196
return false, nil
196197
}
197198

@@ -327,6 +328,34 @@ func hasPendingMigration(o v1alpha1.AivenManagedObject) bool {
327328
return cond != nil && cond.Reason == v1alpha1.MigrationReasonInProgress
328329
}
329330

331+
func (i *instanceReconcilerHelper) resourceNeedsConnectionSecretSync(ctx context.Context, o v1alpha1.AivenManagedObject) bool {
332+
if IsMarkedAsPoweredOff(o) {
333+
return false
334+
}
335+
336+
withSecret, ok := connectionSecretOwner(o)
337+
if !ok {
338+
return false
339+
}
340+
341+
if hasConnectionSecretPublishError(o) {
342+
return true
343+
}
344+
345+
secret := &corev1.Secret{}
346+
if err := i.k8s.Get(ctx, types.NamespacedName{Name: connectionSecretName(withSecret), Namespace: withSecret.GetNamespace()}, secret); err != nil {
347+
if !apierrors.IsNotFound(err) {
348+
i.log.Info("unable to verify connection secret ownership", "error", err)
349+
}
350+
return true
351+
}
352+
353+
// Don't treat a same-named Secret as fresh connection details unless it was published for this exact object.
354+
// Otherwise a Secret left behind by a resource deleted and recreated quickly could make the new resource look Ready too early.
355+
ref := metav1.GetControllerOf(secret)
356+
return ref == nil || o.GetUID() == "" || ref.UID != o.GetUID()
357+
}
358+
330359
func (i *instanceReconcilerHelper) checkPreconditions(ctx context.Context, o client.Object, refs []client.Object) (bool, error) {
331360
i.rec.Event(o, corev1.EventTypeNormal, eventWaitingForPreconditions, "waiting for preconditions of the instance")
332361

@@ -555,11 +584,18 @@ func (i *instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx
555584
goalSecret, err := i.h.get(ctx, i.avnGen, o)
556585

557586
if goalSecret == nil || err != nil {
587+
if err != nil && goalSecret != nil {
588+
markConnectionSecretPublishFailed(o, err)
589+
}
590+
if err == nil && hasIsRunningAnnotation(o) {
591+
clearConnectionSecretPublishError(o)
592+
}
558593
return err
559594
}
560595

561596
if o.NoSecret() {
562597
i.rec.Event(o, corev1.EventTypeNormal, eventConnInfoSecretCreationDisabled, "connInfoSecretTargetDisabled is true, secret will not be created")
598+
clearConnectionSecretPublishError(o)
563599
return nil
564600
}
565601

@@ -590,8 +626,47 @@ func (i *instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx
590626

591627
return controllerutil.SetControllerReference(o, secret, i.k8s.Scheme())
592628
})
629+
if err != nil {
630+
markConnectionSecretPublishFailed(o, err)
631+
return err
632+
}
633+
634+
clearConnectionSecretPublishError(o)
635+
return nil
636+
}
637+
638+
func connectionSecretOwner(o v1alpha1.AivenManagedObject) (objWithSecret, bool) {
639+
if o.NoSecret() {
640+
return nil, false
641+
}
642+
643+
withSecret, ok := any(o).(objWithSecret)
644+
return withSecret, ok
645+
}
646+
647+
func hasConnectionSecretPublishError(o v1alpha1.AivenManagedObject) bool {
648+
cond := meta.FindStatusCondition(*o.Conditions(), ConditionTypeError)
649+
return cond != nil && cond.Reason == string(errConditionConnInfoSecret)
650+
}
651+
652+
func markConnectionSecretPublishFailed(o v1alpha1.AivenManagedObject, err error) {
653+
if _, ok := connectionSecretOwner(o); !ok {
654+
return
655+
}
656+
657+
delete(o.GetAnnotations(), instanceIsRunningAnnotation)
658+
meta.SetStatusCondition(o.Conditions(), getRunningCondition(
659+
metav1.ConditionUnknown,
660+
string(errConditionConnInfoSecret),
661+
"Connection details are not published",
662+
))
663+
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionConnInfoSecret, err))
664+
}
593665

594-
return err
666+
func clearConnectionSecretPublishError(o v1alpha1.AivenManagedObject) {
667+
if hasConnectionSecretPublishError(o) {
668+
meta.RemoveStatusCondition(o.Conditions(), ConditionTypeError)
669+
}
595670
}
596671

597672
func setupLogger(log logr.Logger, o client.Object) logr.Logger {
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
avngen "github.com/aiven/go-client-codegen"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/mock"
10+
"github.com/stretchr/testify/require"
11+
corev1 "k8s.io/api/core/v1"
12+
"k8s.io/apimachinery/pkg/api/meta"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/runtime"
15+
"k8s.io/apimachinery/pkg/types"
16+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
17+
"k8s.io/client-go/tools/record"
18+
"sigs.k8s.io/controller-runtime/pkg/client"
19+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
20+
21+
"github.com/aiven/aiven-operator/api/v1alpha1"
22+
)
23+
24+
func TestInstanceReconcilerHelper_updateInstanceStateAndSecretUntilRunning(t *testing.T) {
25+
connectionSecret := func(pg *v1alpha1.PostgreSQL) *corev1.Secret {
26+
return &corev1.Secret{
27+
ObjectMeta: metav1.ObjectMeta{
28+
Name: pg.Name,
29+
Namespace: pg.Namespace,
30+
},
31+
StringData: map[string]string{
32+
"PGPASSWORD": "pw",
33+
},
34+
}
35+
}
36+
37+
t.Run("Marks resource not ready when connection Secret publish fails", func(t *testing.T) {
38+
pg := newReadyPostgreSQL(t)
39+
controller := true
40+
41+
scheme := newBasicControllerTestScheme(t)
42+
k8sClient := fake.NewClientBuilder().
43+
WithScheme(scheme).
44+
WithObjects(&corev1.Secret{
45+
ObjectMeta: metav1.ObjectMeta{
46+
Name: pg.Name,
47+
Namespace: pg.Namespace,
48+
OwnerReferences: []metav1.OwnerReference{
49+
{
50+
APIVersion: "aiven.io/v1alpha1",
51+
Kind: "PostgreSQL",
52+
Name: "other",
53+
UID: types.UID("other-uid"),
54+
Controller: &controller,
55+
},
56+
},
57+
},
58+
}).
59+
Build()
60+
61+
h := NewMockHandlers(t)
62+
h.EXPECT().
63+
get(mock.Anything, mock.Anything, pg).
64+
Run(func(_ context.Context, _ avngen.Client, _ client.Object) {
65+
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, instanceIsRunningAnnotation, "true")
66+
meta.SetStatusCondition(&pg.Status.Conditions, getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
67+
}).
68+
Return(connectionSecret(pg), nil).
69+
Once()
70+
71+
helper := &instanceReconcilerHelper{
72+
k8s: k8sClient,
73+
h: h,
74+
rec: record.NewFakeRecorder(10),
75+
}
76+
77+
err := helper.updateInstanceStateAndSecretUntilRunning(t.Context(), pg)
78+
require.Error(t, err)
79+
80+
require.False(t, hasIsRunningAnnotation(pg))
81+
82+
running := meta.FindStatusCondition(pg.Status.Conditions, conditionTypeRunning)
83+
require.NotNil(t, running)
84+
require.Equal(t, metav1.ConditionUnknown, running.Status)
85+
require.Equal(t, string(errConditionConnInfoSecret), running.Reason)
86+
87+
errCond := meta.FindStatusCondition(pg.Status.Conditions, ConditionTypeError)
88+
require.NotNil(t, errCond)
89+
require.Equal(t, metav1.ConditionUnknown, errCond.Status)
90+
require.Equal(t, string(errConditionConnInfoSecret), errCond.Reason)
91+
})
92+
93+
t.Run("Clears connection Secret publish error after successful publish", func(t *testing.T) {
94+
pg := newReadyPostgreSQL(t)
95+
meta.SetStatusCondition(&pg.Status.Conditions, getErrorCondition(errConditionConnInfoSecret, assert.AnError))
96+
97+
scheme := newBasicControllerTestScheme(t)
98+
k8sClient := fake.NewClientBuilder().
99+
WithScheme(scheme).
100+
Build()
101+
102+
h := NewMockHandlers(t)
103+
h.EXPECT().
104+
get(mock.Anything, mock.Anything, pg).
105+
Run(func(_ context.Context, _ avngen.Client, _ client.Object) {
106+
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, instanceIsRunningAnnotation, "true")
107+
meta.SetStatusCondition(&pg.Status.Conditions, getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
108+
}).
109+
Return(connectionSecret(pg), nil).
110+
Once()
111+
112+
helper := &instanceReconcilerHelper{
113+
k8s: k8sClient,
114+
h: h,
115+
rec: record.NewFakeRecorder(10),
116+
}
117+
118+
require.NoError(t, helper.updateInstanceStateAndSecretUntilRunning(t.Context(), pg))
119+
require.True(t, hasIsRunningAnnotation(pg))
120+
require.Nil(t, meta.FindStatusCondition(pg.Status.Conditions, ConditionTypeError))
121+
122+
got := &corev1.Secret{}
123+
require.NoError(t, k8sClient.Get(t.Context(), types.NamespacedName{Name: pg.Name, Namespace: pg.Namespace}, got))
124+
require.Equal(t, []byte("pw"), got.Data["PGPASSWORD"])
125+
})
126+
}
127+
128+
func TestInstanceReconcilerHelper_resourceNeedsConnectionSecretSync(t *testing.T) {
129+
ownedSecret := func(pg *v1alpha1.PostgreSQL) *corev1.Secret {
130+
controller := true
131+
return &corev1.Secret{
132+
ObjectMeta: metav1.ObjectMeta{
133+
Name: pg.Name,
134+
Namespace: pg.Namespace,
135+
OwnerReferences: []metav1.OwnerReference{
136+
{
137+
APIVersion: "aiven.io/v1alpha1",
138+
Kind: "PostgreSQL",
139+
Name: pg.Name,
140+
UID: pg.UID,
141+
Controller: &controller,
142+
},
143+
},
144+
},
145+
Data: map[string][]byte{
146+
"PGPASSWORD": []byte("pw"),
147+
},
148+
}
149+
}
150+
151+
tests := map[string]struct {
152+
mutate func(*v1alpha1.PostgreSQL)
153+
objects []runtime.Object
154+
wantNeedsSync bool
155+
}{
156+
"does not need sync for powered-off resource": {
157+
mutate: func(pg *v1alpha1.PostgreSQL) {
158+
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, instanceIsRunningAnnotation, "false")
159+
},
160+
wantNeedsSync: false,
161+
},
162+
"does not need sync when connection Secret creation is disabled": {
163+
mutate: func(pg *v1alpha1.PostgreSQL) {
164+
pg.Spec.ConnInfoSecretTargetDisabled = new(bool)
165+
*pg.Spec.ConnInfoSecretTargetDisabled = true
166+
},
167+
wantNeedsSync: false,
168+
},
169+
"needs sync when connection Secret is missing": {
170+
wantNeedsSync: true,
171+
},
172+
"needs sync when connection Secret is not controlled by the resource": {
173+
objects: []runtime.Object{
174+
&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "pg-no-ref", Namespace: "default"}},
175+
},
176+
wantNeedsSync: true,
177+
},
178+
"needs sync when a stale connection Secret error is present": {
179+
mutate: func(pg *v1alpha1.PostgreSQL) {
180+
meta.SetStatusCondition(&pg.Status.Conditions, getErrorCondition(errConditionConnInfoSecret, assert.AnError))
181+
},
182+
objects: []runtime.Object{
183+
ownedSecret(newReadyPostgreSQL(t)),
184+
},
185+
wantNeedsSync: true,
186+
},
187+
"does not need sync when connection Secret is controlled by the resource": {
188+
objects: []runtime.Object{
189+
ownedSecret(newReadyPostgreSQL(t)),
190+
},
191+
wantNeedsSync: false,
192+
},
193+
}
194+
195+
for name, tt := range tests {
196+
t.Run(name, func(t *testing.T) {
197+
pg := newReadyPostgreSQL(t)
198+
if tt.mutate != nil {
199+
tt.mutate(pg)
200+
}
201+
202+
scheme := newBasicControllerTestScheme(t)
203+
k8sClient := fake.NewClientBuilder().
204+
WithScheme(scheme).
205+
WithRuntimeObjects(tt.objects...).
206+
Build()
207+
208+
helper := &instanceReconcilerHelper{k8s: k8sClient}
209+
require.Equal(t, tt.wantNeedsSync, helper.resourceNeedsConnectionSecretSync(t.Context(), pg))
210+
})
211+
}
212+
}
213+
214+
func newBasicControllerTestScheme(t *testing.T) *runtime.Scheme {
215+
t.Helper()
216+
217+
scheme := runtime.NewScheme()
218+
require.NoError(t, clientgoscheme.AddToScheme(scheme))
219+
require.NoError(t, v1alpha1.AddToScheme(scheme))
220+
return scheme
221+
}
222+
223+
func newReadyPostgreSQL(t *testing.T) *v1alpha1.PostgreSQL {
224+
t.Helper()
225+
226+
pg := newObjectFromYAML[v1alpha1.PostgreSQL](t, yamlPostgres)
227+
pg.UID = types.UID("pg-uid")
228+
pg.Generation = 1
229+
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, processedGenerationAnnotation, "1")
230+
metav1.SetMetaDataAnnotation(&pg.ObjectMeta, instanceIsRunningAnnotation, "true")
231+
meta.SetStatusCondition(&pg.Status.Conditions, getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side"))
232+
return pg
233+
}

0 commit comments

Comments
 (0)