Skip to content

Commit ac807c2

Browse files
committed
fix(kafka): use ScramCredentialDeletion for KafkaUser DELETE operations (#733)
UserChangeHandler incorrectly called handleScramSha256/handleScramSha512 (which build UserScramCredentialUpsertion) for DELETE operations, silently re-creating the user with a random password instead of removing it. Add deleteScramSha256/deleteScramSha512 methods to KafkaUserService that construct UserScramCredentialDeletion, and wire them into the DELETE case.
1 parent 0a80e71 commit ac807c2

4 files changed

Lines changed: 418 additions & 2 deletions

File tree

providers/jikkou-provider-kafka/src/main/java/io/streamthoughts/jikkou/kafka/change/user/UserChangeHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ yield switch (authentication) {
7676
yield switch (authentication) {
7777
// SCRAM_SHA_256
7878
case V1KafkaUserAuthentication.ScramSha256 auth ->
79-
KafkaUserService.handleScramSha256(userName, auth);
79+
KafkaUserService.deleteScramSha256(userName, auth);
8080
// SCRAM_SHA_512
8181
case V1KafkaUserAuthentication.ScramSha512 auth ->
82-
KafkaUserService.handleScramSha512(userName, auth);
82+
KafkaUserService.deleteScramSha512(userName, auth);
8383
};
8484
}
8585
};

providers/jikkou-provider-kafka/src/main/java/io/streamthoughts/jikkou/kafka/reconciler/service/KafkaUserService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.clients.admin.ScramCredentialInfo;
2525
import org.apache.kafka.clients.admin.ScramMechanism;
2626
import org.apache.kafka.clients.admin.UserScramCredentialAlteration;
27+
import org.apache.kafka.clients.admin.UserScramCredentialDeletion;
2728
import org.apache.kafka.clients.admin.UserScramCredentialUpsertion;
2829
import org.apache.kafka.clients.admin.UserScramCredentialsDescription;
2930
import org.jetbrains.annotations.NotNull;
@@ -129,6 +130,18 @@ public static Pair<V1KafkaUserAuthentication, UserScramCredentialAlteration> han
129130
);
130131
}
131132

133+
public static Pair<V1KafkaUserAuthentication, UserScramCredentialAlteration> deleteScramSha512(String userName,
134+
V1KafkaUserAuthentication.ScramSha512 auth) {
135+
var alteration = new UserScramCredentialDeletion(userName, ScramMechanism.SCRAM_SHA_512);
136+
return Pair.of(auth, alteration);
137+
}
138+
139+
public static Pair<V1KafkaUserAuthentication, UserScramCredentialAlteration> deleteScramSha256(String userName,
140+
V1KafkaUserAuthentication.ScramSha256 auth) {
141+
var alteration = new UserScramCredentialDeletion(userName, ScramMechanism.SCRAM_SHA_256);
142+
return Pair.of(auth, alteration);
143+
}
144+
132145
private V1KafkaUserAuthentication map(final ScramCredentialInfo info) {
133146
ScramMechanism mechanism = info.mechanism();
134147
return switch (mechanism) {
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) The original authors
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.jikkou.kafka.change.user;
8+
9+
import static org.junit.jupiter.api.Assertions.assertEquals;
10+
import static org.junit.jupiter.api.Assertions.assertNotNull;
11+
12+
import io.streamthoughts.jikkou.core.models.ObjectMeta;
13+
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
14+
import io.streamthoughts.jikkou.core.reconciler.Operation;
15+
import io.streamthoughts.jikkou.kafka.model.user.V1KafkaUser;
16+
import io.streamthoughts.jikkou.kafka.model.user.V1KafkaUserAuthentication;
17+
import io.streamthoughts.jikkou.kafka.model.user.V1KafkaUserSpec;
18+
import java.util.List;
19+
import org.junit.jupiter.api.Test;
20+
21+
class UserChangeComputerTest {
22+
23+
private final UserChangeComputer computer = new UserChangeComputer();
24+
private final UserChangeComputer.UserChangeFactory factory = new UserChangeComputer.UserChangeFactory();
25+
26+
@Test
27+
void shouldComputeCreateChangeForNewUser() {
28+
// Given
29+
V1KafkaUser after = buildUser("testuser", List.of(
30+
new V1KafkaUserAuthentication.ScramSha512("password", 8192, null)
31+
));
32+
33+
// When
34+
List<ResourceChange> changes = computer.computeChanges(List.of(), List.of(after));
35+
36+
// Then
37+
assertEquals(1, changes.size());
38+
ResourceChange change = changes.getFirst();
39+
assertEquals(Operation.CREATE, change.getSpec().getOp());
40+
assertEquals("testuser", change.getMetadata().getName());
41+
42+
var stateChanges = change.getSpec().getChanges().all();
43+
assertEquals(1, stateChanges.size());
44+
assertEquals(Operation.CREATE, stateChanges.getFirst().getOp());
45+
assertNotNull(stateChanges.getFirst().getAfter());
46+
}
47+
48+
@Test
49+
void shouldCreateDeleteChangeForScramSha512User() {
50+
// Given
51+
V1KafkaUser before = buildUser("testuser", List.of(
52+
new V1KafkaUserAuthentication.ScramSha512(null, 8192, null)
53+
));
54+
55+
// When
56+
ResourceChange change = factory.createChangeForDelete("testuser", before);
57+
58+
// Then
59+
assertEquals(Operation.DELETE, change.getSpec().getOp());
60+
assertEquals("testuser", change.getMetadata().getName());
61+
62+
var stateChanges = change.getSpec().getChanges().all();
63+
assertEquals(1, stateChanges.size());
64+
assertEquals(Operation.DELETE, stateChanges.getFirst().getOp());
65+
assertNotNull(stateChanges.getFirst().getBefore());
66+
67+
V1KafkaUserAuthentication.ScramSha512 beforeAuth =
68+
(V1KafkaUserAuthentication.ScramSha512) stateChanges.getFirst().getBefore();
69+
assertEquals(8192, beforeAuth.iterations());
70+
}
71+
72+
@Test
73+
void shouldCreateDeleteChangeForScramSha256User() {
74+
// Given
75+
V1KafkaUser before = buildUser("testuser", List.of(
76+
new V1KafkaUserAuthentication.ScramSha256(null, 4096, null)
77+
));
78+
79+
// When
80+
ResourceChange change = factory.createChangeForDelete("testuser", before);
81+
82+
// Then
83+
assertEquals(Operation.DELETE, change.getSpec().getOp());
84+
85+
var stateChanges = change.getSpec().getChanges().all();
86+
assertEquals(1, stateChanges.size());
87+
assertEquals(Operation.DELETE, stateChanges.getFirst().getOp());
88+
89+
V1KafkaUserAuthentication.ScramSha256 beforeAuth =
90+
(V1KafkaUserAuthentication.ScramSha256) stateChanges.getFirst().getBefore();
91+
assertEquals(4096, beforeAuth.iterations());
92+
}
93+
94+
@Test
95+
void shouldCreateDeleteChangeWithMultipleAuthentications() {
96+
// Given
97+
V1KafkaUser before = buildUser("testuser", List.of(
98+
new V1KafkaUserAuthentication.ScramSha512(null, 8192, null),
99+
new V1KafkaUserAuthentication.ScramSha256(null, 4096, null)
100+
));
101+
102+
// When
103+
ResourceChange change = factory.createChangeForDelete("testuser", before);
104+
105+
// Then
106+
assertEquals(Operation.DELETE, change.getSpec().getOp());
107+
108+
var stateChanges = change.getSpec().getChanges().all();
109+
assertEquals(2, stateChanges.size());
110+
stateChanges.forEach(sc -> assertEquals(Operation.DELETE, sc.getOp()));
111+
}
112+
113+
@Test
114+
void shouldComputeUpdateChangeForExistingUser() {
115+
// Given
116+
V1KafkaUser before = buildUser("testuser", List.of(
117+
new V1KafkaUserAuthentication.ScramSha512(null, 8192, null)
118+
));
119+
V1KafkaUser after = buildUser("testuser", List.of(
120+
new V1KafkaUserAuthentication.ScramSha256("newpassword", 4096, null)
121+
));
122+
123+
// When
124+
List<ResourceChange> changes = computer.computeChanges(List.of(before), List.of(after));
125+
126+
// Then
127+
assertEquals(1, changes.size());
128+
assertEquals("testuser", changes.getFirst().getMetadata().getName());
129+
}
130+
131+
@Test
132+
void shouldComputeNoChangeForIdenticalUser() {
133+
// Given
134+
V1KafkaUser before = buildUser("testuser", List.of(
135+
new V1KafkaUserAuthentication.ScramSha512(null, 8192, null)
136+
));
137+
V1KafkaUser after = buildUser("testuser", List.of(
138+
new V1KafkaUserAuthentication.ScramSha512(null, 8192, null)
139+
));
140+
141+
// When
142+
List<ResourceChange> changes = computer.computeChanges(List.of(before), List.of(after));
143+
144+
// Then
145+
assertEquals(1, changes.size());
146+
assertEquals(Operation.NONE, changes.getFirst().getSpec().getOp());
147+
}
148+
149+
private static V1KafkaUser buildUser(String name, List<V1KafkaUserAuthentication> authentications) {
150+
return V1KafkaUser.builder()
151+
.withMetadata(new ObjectMeta(name))
152+
.withSpec(V1KafkaUserSpec.builder()
153+
.withAuthentications(authentications)
154+
.build())
155+
.build();
156+
}
157+
}

0 commit comments

Comments
 (0)