diff --git a/.changes/unreleased/operator-Added-20260409-120000.yaml b/.changes/unreleased/operator-Added-20260409-120000.yaml new file mode 100644 index 000000000..a5c2a569f --- /dev/null +++ b/.changes/unreleased/operator-Added-20260409-120000.yaml @@ -0,0 +1,4 @@ +project: operator +kind: Added +body: Support importing existing Redpanda users into operator management and ongoing credential sync via spec.authentication.syncCredentials for external secret rotation (e.g. ESO). +time: 2026-04-09T12:00:00.000000-04:00 diff --git a/operator/api/applyconfiguration/redpanda/v1alpha2/userauthenticationspec.go b/operator/api/applyconfiguration/redpanda/v1alpha2/userauthenticationspec.go index ee1b6954e..8967a187d 100644 --- a/operator/api/applyconfiguration/redpanda/v1alpha2/userauthenticationspec.go +++ b/operator/api/applyconfiguration/redpanda/v1alpha2/userauthenticationspec.go @@ -18,8 +18,9 @@ import ( // UserAuthenticationSpecApplyConfiguration represents a declarative configuration of the UserAuthenticationSpec type for use // with apply. type UserAuthenticationSpecApplyConfiguration struct { - Type *redpandav1alpha2.SASLMechanism `json:"type,omitempty"` - Password *PasswordApplyConfiguration `json:"password,omitempty"` + Type *redpandav1alpha2.SASLMechanism `json:"type,omitempty"` + Password *PasswordApplyConfiguration `json:"password,omitempty"` + SyncCredentials *bool `json:"syncCredentials,omitempty"` } // UserAuthenticationSpecApplyConfiguration constructs a declarative configuration of the UserAuthenticationSpec type for use with @@ -43,3 +44,11 @@ func (b *UserAuthenticationSpecApplyConfiguration) WithPassword(value *PasswordA b.Password = value return b } + +// WithSyncCredentials sets the SyncCredentials field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the SyncCredentials field is set to the value of the last call. +func (b *UserAuthenticationSpecApplyConfiguration) WithSyncCredentials(value bool) *UserAuthenticationSpecApplyConfiguration { + b.SyncCredentials = &value + return b +} diff --git a/operator/api/redpanda/v1alpha2/testdata/crd-docs.adoc b/operator/api/redpanda/v1alpha2/testdata/crd-docs.adoc index ba6a379ae..e65dd4885 100644 --- a/operator/api/redpanda/v1alpha2/testdata/crd-docs.adoc +++ b/operator/api/redpanda/v1alpha2/testdata/crd-docs.adoc @@ -4748,6 +4748,11 @@ UserAuthenticationSpec defines the authentication mechanism enabled for this Red - scram-sha-256 + | scram-sha-512 | Enum: [scram-sha-256 scram-sha-512 SCRAM-SHA-256 SCRAM-SHA-512] + | *`password`* __xref:{anchor_prefix}-github-com-redpanda-data-redpanda-operator-operator-api-redpanda-v1alpha2-password[$$Password$$]__ | Password specifies where a password is read from. + | | +| *`syncCredentials`* __boolean__ | SyncCredentials when set to true causes the operator to re-read the + +password from the referenced Secret on every reconciliation cycle + +(default: every 5 minutes) and upsert the credentials to Redpanda. + +This enables password rotation via external systems like the External + +Secrets Operator (ESO) without requiring user recreation. + | | |=== diff --git a/operator/api/redpanda/v1alpha2/user_types.go b/operator/api/redpanda/v1alpha2/user_types.go index 2e38b8fdf..a2e09c286 100644 --- a/operator/api/redpanda/v1alpha2/user_types.go +++ b/operator/api/redpanda/v1alpha2/user_types.go @@ -72,6 +72,28 @@ func (u *User) HasManagedUser() bool { return u.Status.ManagedUser } +// ShouldSyncCredentials returns true if the user has credential sync enabled, +// meaning the operator should re-read the password from the referenced Secret +// and upsert it to Redpanda on each reconciliation cycle. +func (u *User) ShouldSyncCredentials() bool { + return u.Spec.Authentication != nil && u.Spec.Authentication.SyncCredentials +} + +// GetPasswordSecretName returns the name of the Secret referenced by the +// user's password configuration, or empty string if no Secret is referenced. +func (u *User) GetPasswordSecretName() string { + if u.Spec.Authentication == nil { + return "" + } + if u.Spec.Authentication.Password.ValueFrom == nil { + return "" + } + if u.Spec.Authentication.Password.ValueFrom.SecretKeyRef == nil { + return "" + } + return u.Spec.Authentication.Password.ValueFrom.SecretKeyRef.Name +} + func (u *User) ShouldManageACLs() bool { return u.Spec.Authorization != nil } @@ -125,6 +147,12 @@ type UserAuthenticationSpec struct { Type *SASLMechanism `json:"type,omitempty"` // Password specifies where a password is read from. Password Password `json:"password"` + // SyncCredentials when set to true causes the operator to re-read the + // password from the referenced Secret on every reconciliation cycle + // (default: every 5 minutes) and upsert the credentials to Redpanda. + // This enables password rotation via external systems like the External + // Secrets Operator (ESO) without requiring user recreation. + SyncCredentials bool `json:"syncCredentials,omitempty"` } // Password specifies a password for the user. diff --git a/operator/config/crd/bases/cluster.redpanda.com_users.yaml b/operator/config/crd/bases/cluster.redpanda.com_users.yaml index 7764dcb5f..00cc53806 100644 --- a/operator/config/crd/bases/cluster.redpanda.com_users.yaml +++ b/operator/config/crd/bases/cluster.redpanda.com_users.yaml @@ -110,6 +110,14 @@ spec: x-kubernetes-validations: - message: valueFrom must not be empty if no value supplied rule: self.value != "" || has(self.valueFrom) + syncCredentials: + description: |- + SyncCredentials when set to true causes the operator to re-read the + password from the referenced Secret on every reconciliation cycle + (default: every 5 minutes) and upsert the credentials to Redpanda. + This enables password rotation via external systems like the External + Secrets Operator (ESO) without requiring user recreation. + type: boolean type: default: scram-sha-512 description: |- diff --git a/operator/internal/controller/redpanda/user_controller.go b/operator/internal/controller/redpanda/user_controller.go index 3e15f55a6..b78ec35f1 100644 --- a/operator/internal/controller/redpanda/user_controller.go +++ b/operator/internal/controller/redpanda/user_controller.go @@ -18,8 +18,12 @@ import ( "github.com/twmb/franz-go/pkg/kgo" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler" redpandav1alpha2ac "github.com/redpanda-data/redpanda-operator/operator/api/applyconfiguration/redpanda/v1alpha2" redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2" @@ -87,14 +91,25 @@ func (r *UserReconciler) SyncResource(ctx context.Context, request ResourceReque defer usersClient.Close() defer syncer.Close() - if !hasUser && shouldManageUser { + switch { + case shouldManageUser && !hasManagedUser: + // Create a new user or adopt an existing one. UpsertSCRAM is + // idempotent so this is safe regardless of whether the user + // already exists in Redpanda. if err := usersClient.Create(ctx, user); err != nil { return createPatch(err) } hasManagedUser = true - } - if hasUser && !shouldManageUser { + case shouldManageUser && hasManagedUser && user.ShouldSyncCredentials(): + // Re-read the password from the referenced Secret and upsert + // credentials. This enables external rotation (e.g. via ESO) + // to propagate on each reconciliation cycle. + if err := usersClient.Update(ctx, user); err != nil { + return createPatch(err) + } + + case !shouldManageUser && hasUser && hasManagedUser: if err := usersClient.Delete(ctx, user); err != nil { return createPatch(err) } @@ -171,6 +186,8 @@ func (r *UserReconciler) userAndACLClients(ctx context.Context, request Resource return usersClient, syncer, hasUser, nil } +const userPasswordSecretIndex = "__user_referencing_password_secret" + func SetupUserController(ctx context.Context, mgr multicluster.Manager, expander *secrets.CloudExpander, includeV1, includeV2 bool, namespace string) error { factory := internalclient.NewFactory(mgr, expander) @@ -194,12 +211,67 @@ func SetupUserController(ctx context.Context, mgr multicluster.Manager, expander } builder.Watches(&redpandav1alpha2.Redpanda{}, enqueueV2User, controller.WatchOptions(clusterName)...) } + + // Index Users by the password Secret they reference so we can + // watch for external Secret changes (e.g. from ESO) and + // immediately reconcile the referencing User. + cluster, err := mgr.GetCluster(ctx, clusterName) + if err != nil { + return err + } + if err := cluster.GetFieldIndexer().IndexField(ctx, &redpandav1alpha2.User{}, userPasswordSecretIndex, func(o client.Object) []string { + user, ok := o.(*redpandav1alpha2.User) + if !ok { + return nil + } + if name := user.GetPasswordSecretName(); name != "" { + return []string{types.NamespacedName{Namespace: user.Namespace, Name: name}.String()} + } + return nil + }); err != nil { + return err + } + + builder.Watches(&corev1.Secret{}, enqueueUsersForSecret(mgr, clusterName), controller.WatchOptions(clusterName)...) } - controller := NewResourceController(mgr, factory, &UserReconciler{}, "UserReconciler") + ctrl := NewResourceController(mgr, factory, &UserReconciler{}, "UserReconciler") // Every 5 minutes try and check to make sure no manual modifications // happened on the resource synced to the cluster and attempt to correct // any drift. - return builder.Complete(controller.PeriodicallyReconcile(5 * time.Minute).FilterNamespace(namespace)) + return builder.Complete(ctrl.PeriodicallyReconcile(5 * time.Minute).FilterNamespace(namespace)) +} + +// enqueueUsersForSecret returns an event handler that, when a Secret changes, +// looks up all User resources that reference that Secret via +// spec.authentication.password.valueFrom.secretKeyRef and enqueues them for +// reconciliation. This enables immediate reconciliation when an external +// system (e.g. ESO) updates a password Secret. +func enqueueUsersForSecret(mgr multicluster.Manager, clusterName string) mchandler.EventHandlerFunc { + return mchandler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request { + cluster, err := mgr.GetCluster(ctx, clusterName) + if err != nil { + return nil + } + + var userList redpandav1alpha2.UserList + nn := types.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} + if err := cluster.GetClient().List(ctx, &userList, &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(userPasswordSecretIndex, nn.String()), + }); err != nil { + return nil + } + + requests := make([]reconcile.Request, 0, len(userList.Items)) + for i := range userList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: userList.Items[i].Namespace, + Name: userList.Items[i].Name, + }, + }) + } + return requests + }) } diff --git a/operator/internal/controller/redpanda/user_controller_test.go b/operator/internal/controller/redpanda/user_controller_test.go index c03239b10..754b94875 100644 --- a/operator/internal/controller/redpanda/user_controller_test.go +++ b/operator/internal/controller/redpanda/user_controller_test.go @@ -30,6 +30,193 @@ import ( redpandav1alpha2 "github.com/redpanda-data/redpanda-operator/operator/api/redpanda/v1alpha2" ) +// TestUserAdoptExisting verifies that a User CR applied for an already-existing +// Redpanda user is adopted (managedUser becomes true) instead of being left +// unmanaged. This is the fix for +// https://github.com/redpanda-data/redpanda-operator/issues/1354. +func TestUserAdoptExisting(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + + timeoutOption := kgo.RetryTimeout(1 * time.Millisecond) + environment := InitializeResourceReconcilerTest(t, ctx, &UserReconciler{ + extraOptions: []kgo.Opt{timeoutOption}, + }) + + k8sClient, err := environment.Factory.GetClient(ctx, mcmanager.LocalCluster) + require.NoError(t, err) + + userName := "adopt-user-" + strconv.Itoa(int(time.Now().UnixNano())) + + // Step 1: Pre-create the user directly in Redpanda via the Kafka admin + // API, simulating a user that existed before the operator was deployed. + kafkaClient, err := kgo.NewClient(kgo.SeedBrokers(environment.KafkaURL), timeoutOption, kgo.SASL(scram.Auth{ + User: "superuser", + Pass: "password", + }.AsSha256Mechanism())) + require.NoError(t, err) + defer kafkaClient.Close() + + adminClient := kadm.NewClient(kafkaClient) + _, err = adminClient.AlterUserSCRAMs(ctx, nil, []kadm.UpsertSCRAM{{ + User: userName, + Password: "original-password", + Mechanism: kadm.ScramSha512, + Iterations: 4096, + }}) + require.NoError(t, err) + + // Step 2: Apply a User CR for this pre-existing user. + user := &redpandav1alpha2.User{ + ObjectMeta: metav1.ObjectMeta{ + Name: userName, + Namespace: metav1.NamespaceDefault, + }, + Spec: redpandav1alpha2.UserSpec{ + ClusterSource: environment.ClusterSourceValid, + Authentication: &redpandav1alpha2.UserAuthenticationSpec{ + Password: redpandav1alpha2.Password{ + Value: "adopted-password", + ValueFrom: &redpandav1alpha2.PasswordSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: userName + "-password", + }, + }, + }, + }, + }, + }, + } + + key := client.ObjectKeyFromObject(user) + req := mcreconcile.Request{Request: ctrl.Request{NamespacedName: key}, ClusterName: mcmanager.LocalCluster} + + require.NoError(t, k8sClient.Create(ctx, user)) + + // Reconcile twice: first adds finalizer, second does sync. + _, err = environment.Reconciler.Reconcile(ctx, req) + require.NoError(t, err) + + require.NoError(t, k8sClient.Get(ctx, key, user)) + require.True(t, user.Status.ManagedUser, "expected managedUser=true after adopting existing user") + require.Equal(t, metav1.ConditionTrue, user.Status.Conditions[0].Status) + + // Verify we can authenticate with the new password. + verifyClient, err := kgo.NewClient(kgo.SeedBrokers(environment.KafkaURL), timeoutOption, kgo.SASL(scram.Auth{ + User: userName, + Pass: "adopted-password", + }.AsSha512Mechanism())) + require.NoError(t, err) + defer verifyClient.Close() + verifyAdmin := kadm.NewClient(verifyClient) + _, err = verifyAdmin.BrokerMetadata(ctx) + require.NoError(t, err) + + // Cleanup. + require.NoError(t, k8sClient.Delete(ctx, user)) + _, err = environment.Reconciler.Reconcile(ctx, req) + require.NoError(t, err) +} + +// TestUserCredentialSync verifies that when syncCredentials is enabled, updating +// the password Secret causes the operator to push the new password to Redpanda +// on the next reconciliation cycle. +func TestUserCredentialSync(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + + timeoutOption := kgo.RetryTimeout(1 * time.Millisecond) + environment := InitializeResourceReconcilerTest(t, ctx, &UserReconciler{ + extraOptions: []kgo.Opt{timeoutOption}, + }) + + k8sClient, err := environment.Factory.GetClient(ctx, mcmanager.LocalCluster) + require.NoError(t, err) + + userName := "sync-user-" + strconv.Itoa(int(time.Now().UnixNano())) + secretName := userName + "-password" + + // Step 1: Create the password Secret (simulating ESO-managed secret). + passwordSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: metav1.NamespaceDefault, + }, + Data: map[string][]byte{ + "password": []byte("initial-password"), + }, + } + require.NoError(t, k8sClient.Create(ctx, passwordSecret)) + + // Step 2: Create User CR with syncCredentials enabled. + user := &redpandav1alpha2.User{ + ObjectMeta: metav1.ObjectMeta{ + Name: userName, + Namespace: metav1.NamespaceDefault, + }, + Spec: redpandav1alpha2.UserSpec{ + ClusterSource: environment.ClusterSourceValid, + Authentication: &redpandav1alpha2.UserAuthenticationSpec{ + Password: redpandav1alpha2.Password{ + ValueFrom: &redpandav1alpha2.PasswordSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName, + }, + Key: "password", + }, + }, + NoGenerate: true, + }, + SyncCredentials: true, + }, + }, + } + + key := client.ObjectKeyFromObject(user) + req := mcreconcile.Request{Request: ctrl.Request{NamespacedName: key}, ClusterName: mcmanager.LocalCluster} + + require.NoError(t, k8sClient.Create(ctx, user)) + _, err = environment.Reconciler.Reconcile(ctx, req) + require.NoError(t, err) + + require.NoError(t, k8sClient.Get(ctx, key, user)) + require.True(t, user.Status.ManagedUser) + + // Verify initial password works. + verifyAuth := func(password string) { + t.Helper() + c, err := kgo.NewClient(kgo.SeedBrokers(environment.KafkaURL), timeoutOption, kgo.SASL(scram.Auth{ + User: userName, + Pass: password, + }.AsSha512Mechanism())) + require.NoError(t, err) + defer c.Close() + _, err = kadm.NewClient(c).BrokerMetadata(ctx) + require.NoError(t, err) + } + verifyAuth("initial-password") + + // Step 3: Simulate ESO rotating the password. + require.NoError(t, k8sClient.Get(ctx, client.ObjectKeyFromObject(passwordSecret), passwordSecret)) + passwordSecret.Data["password"] = []byte("rotated-password") + require.NoError(t, k8sClient.Update(ctx, passwordSecret)) + + // Step 4: Reconcile again — syncCredentials should push the new password. + require.NoError(t, k8sClient.Get(ctx, key, user)) + _, err = environment.Reconciler.Reconcile(ctx, req) + require.NoError(t, err) + + // Verify rotated password now works. + verifyAuth("rotated-password") + + // Cleanup. + require.NoError(t, k8sClient.Delete(ctx, user)) + _, err = environment.Reconciler.Reconcile(ctx, req) + require.NoError(t, err) +} + func TestUserReconcile(t *testing.T) { // nolint:funlen // These tests have clear subtests. ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) defer cancel() @@ -115,6 +302,14 @@ func TestUserReconcile(t *testing.T) { // nolint:funlen // These tests have clea expectedCondition: environment.SyncedCondition, onlyCheckDeletion: true, }, + "success - adopt existing user": { + mutate: func(user *redpandav1alpha2.User) { + // Authorization is left nil so we only check user + // management, not ACLs. + user.Spec.Authorization = nil + }, + expectedCondition: environment.SyncedCondition, + }, "error - invalid cluster ref": { mutate: func(user *redpandav1alpha2.User) { user.Spec.ClusterSource = environment.ClusterSourceInvalidRef diff --git a/operator/pkg/client/users/client.go b/operator/pkg/client/users/client.go index 0798d05b6..7d33e770e 100644 --- a/operator/pkg/client/users/client.go +++ b/operator/pkg/client/users/client.go @@ -92,6 +92,25 @@ func (c *Client) Create(ctx context.Context, user *redpandav1alpha2.User) error return c.create(ctx, user.Name, password, sasl) } +// Update re-reads the password from the referenced Secret and upserts the +// user's SCRAM credentials in Redpanda. Unlike Create, it never generates +// or stores a new password — it only reads the current value from the +// existing Secret. This is used for ongoing credential sync when +// syncCredentials is enabled. +func (c *Client) Update(ctx context.Context, user *redpandav1alpha2.User) error { + password, err := user.Spec.Authentication.Password.Fetch(ctx, c.client, user.Namespace) + if err != nil { + return err + } + + sasl, err := user.Spec.Authentication.Type.ScramToKafka() + if err != nil { + return err + } + + return c.create(ctx, user.Name, password, sasl) +} + // Has returns whether or not the Redpanda cluster already contains the given user. func (c *Client) Has(ctx context.Context, user *redpandav1alpha2.User) (bool, error) { return c.has(ctx, user.Name)