Skip to content

Commit f05b663

Browse files
fhussonnoisclaude
andcommitted
fix(schema-registry): perform soft delete before hard delete for permanent schema deletion (#674)
When permanent delete is requested, the Schema Registry API requires a soft delete first, then a hard delete. The handler now correctly performs both steps sequentially. This fixes permanent deletion for both the standard Schema Registry and Aiven providers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 5b66f5b commit f05b663

5 files changed

Lines changed: 375 additions & 2 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) The original authors
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.jikkou.extension.aiven.api;
8+
9+
import static org.mockito.ArgumentMatchers.eq;
10+
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.times;
12+
import static org.mockito.Mockito.verify;
13+
import static org.mockito.Mockito.when;
14+
15+
import io.streamthoughts.jikkou.extension.aiven.api.data.MessageErrorsResponse;
16+
import java.util.Collections;
17+
import org.junit.jupiter.api.Test;
18+
19+
class AivenAsyncSchemaRegistryApiTest {
20+
21+
private static final String TEST_SUBJECT = "test-subject";
22+
23+
@Test
24+
void shouldCallDeleteWhenPermanentIsFalse() {
25+
// Given
26+
AivenApiClient apiClient = mock(AivenApiClient.class);
27+
when(apiClient.deleteSchemaRegistrySubject(eq(TEST_SUBJECT)))
28+
.thenReturn(new MessageErrorsResponse("", Collections.emptyList()));
29+
30+
AivenAsyncSchemaRegistryApi api = new AivenAsyncSchemaRegistryApi(apiClient);
31+
32+
// When
33+
api.deleteSubjectVersions(TEST_SUBJECT, false).block();
34+
35+
// Then
36+
verify(apiClient, times(1)).deleteSchemaRegistrySubject(TEST_SUBJECT);
37+
}
38+
39+
@Test
40+
void shouldCallDeleteWhenPermanentIsTrue() {
41+
// Given
42+
AivenApiClient apiClient = mock(AivenApiClient.class);
43+
when(apiClient.deleteSchemaRegistrySubject(eq(TEST_SUBJECT)))
44+
.thenReturn(new MessageErrorsResponse("", Collections.emptyList()));
45+
46+
AivenAsyncSchemaRegistryApi api = new AivenAsyncSchemaRegistryApi(apiClient);
47+
48+
// When
49+
api.deleteSubjectVersions(TEST_SUBJECT, true).block();
50+
51+
// Then
52+
verify(apiClient, times(1)).deleteSchemaRegistrySubject(TEST_SUBJECT);
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) The original authors
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.jikkou.schema.registry.reconciler;
8+
9+
import static org.junit.jupiter.api.Assertions.assertEquals;
10+
import static org.junit.jupiter.api.Assertions.assertTrue;
11+
12+
import io.streamthoughts.jikkou.core.ReconciliationContext;
13+
import io.streamthoughts.jikkou.core.ReconciliationMode;
14+
import io.streamthoughts.jikkou.core.config.Configuration;
15+
import io.streamthoughts.jikkou.core.data.SchemaHandle;
16+
import io.streamthoughts.jikkou.core.data.SchemaType;
17+
import io.streamthoughts.jikkou.core.models.ApiChangeResultList;
18+
import io.streamthoughts.jikkou.core.models.CoreAnnotations;
19+
import io.streamthoughts.jikkou.core.models.ObjectMeta;
20+
import io.streamthoughts.jikkou.core.models.ResourceList;
21+
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
22+
import io.streamthoughts.jikkou.core.reconciler.ChangeResult;
23+
import io.streamthoughts.jikkou.core.reconciler.Operation;
24+
import io.streamthoughts.jikkou.core.selector.Selectors;
25+
import io.streamthoughts.jikkou.schema.registry.BaseExtensionProviderIT;
26+
import io.streamthoughts.jikkou.schema.registry.SchemaRegistryAnnotations;
27+
import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject;
28+
import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubjectSpec;
29+
import java.util.List;
30+
import java.util.Map;
31+
import org.junit.jupiter.api.Test;
32+
33+
class SchemaRegistrySubjectDeleteTest extends BaseExtensionProviderIT {
34+
35+
@Test
36+
void shouldSoftDeleteSchemaSubject() {
37+
// Given - register a schema first
38+
V1SchemaRegistrySubject resource = createSubjectResource(TEST_SUBJECT);
39+
api.reconcile(
40+
ResourceList.of(List.of(resource)),
41+
ReconciliationMode.CREATE,
42+
ReconciliationContext.builder().dryRun(false).build());
43+
44+
// When - delete the schema (soft delete only)
45+
V1SchemaRegistrySubject deleteResource = V1SchemaRegistrySubject.builder()
46+
.withMetadata(ObjectMeta.builder()
47+
.withName(TEST_SUBJECT)
48+
.withAnnotations(Map.of(
49+
CoreAnnotations.JIKKOU_IO_DELETE, true))
50+
.build())
51+
.withSpec(V1SchemaRegistrySubjectSpec.builder()
52+
.withSchemaType(SchemaType.AVRO)
53+
.withSchema(new SchemaHandle(AVRO_SCHEMA))
54+
.build())
55+
.build();
56+
57+
ApiChangeResultList result = api.reconcile(
58+
ResourceList.of(List.of(deleteResource)),
59+
ReconciliationMode.DELETE,
60+
ReconciliationContext.builder().dryRun(false).build());
61+
62+
// Then
63+
List<ChangeResult> results = result.results();
64+
assertEquals(1, results.size());
65+
ResourceChange change = results.getFirst().change();
66+
assertEquals(Operation.DELETE, change.getSpec().getOp());
67+
68+
// Verify subject is no longer listed
69+
ResourceList<V1SchemaRegistrySubject> subjects =
70+
api.listResources(V1SchemaRegistrySubject.class, Selectors.NO_SELECTOR, Configuration.empty());
71+
assertTrue(subjects.stream().noneMatch(s -> TEST_SUBJECT.equals(s.getMetadata().getName())));
72+
}
73+
74+
@Test
75+
void shouldPermanentlyDeleteSchemaSubject() {
76+
// Given - register a schema first
77+
V1SchemaRegistrySubject resource = createSubjectResource(TEST_SUBJECT);
78+
api.reconcile(
79+
ResourceList.of(List.of(resource)),
80+
ReconciliationMode.CREATE,
81+
ReconciliationContext.builder().dryRun(false).build());
82+
83+
// When - permanently delete the schema (soft + hard delete)
84+
V1SchemaRegistrySubject deleteResource = V1SchemaRegistrySubject.builder()
85+
.withMetadata(ObjectMeta.builder()
86+
.withName(TEST_SUBJECT)
87+
.withAnnotations(Map.of(
88+
CoreAnnotations.JIKKOU_IO_DELETE, true,
89+
SchemaRegistryAnnotations.SCHEMA_REGISTRY_PERMANANTE_DELETE, true))
90+
.build())
91+
.withSpec(V1SchemaRegistrySubjectSpec.builder()
92+
.withSchemaType(SchemaType.AVRO)
93+
.withSchema(new SchemaHandle(AVRO_SCHEMA))
94+
.build())
95+
.build();
96+
97+
ApiChangeResultList result = api.reconcile(
98+
ResourceList.of(List.of(deleteResource)),
99+
ReconciliationMode.DELETE,
100+
ReconciliationContext.builder().dryRun(false).build());
101+
102+
// Then
103+
List<ChangeResult> results = result.results();
104+
assertEquals(1, results.size());
105+
ResourceChange change = results.getFirst().change();
106+
assertEquals(Operation.DELETE, change.getSpec().getOp());
107+
108+
// Verify subject is no longer listed
109+
ResourceList<V1SchemaRegistrySubject> subjects =
110+
api.listResources(V1SchemaRegistrySubject.class, Selectors.NO_SELECTOR, Configuration.empty());
111+
assertTrue(subjects.stream().noneMatch(s -> TEST_SUBJECT.equals(s.getMetadata().getName())));
112+
113+
// Verify the subject can be re-registered (only possible after hard delete)
114+
V1SchemaRegistrySubject reRegister = createSubjectResource(TEST_SUBJECT);
115+
ApiChangeResultList reRegisterResult = api.reconcile(
116+
ResourceList.of(List.of(reRegister)),
117+
ReconciliationMode.CREATE,
118+
ReconciliationContext.builder().dryRun(false).build());
119+
assertEquals(1, reRegisterResult.results().size());
120+
assertEquals(Operation.CREATE, reRegisterResult.results().getFirst().change().getSpec().getOp());
121+
}
122+
123+
private V1SchemaRegistrySubject createSubjectResource(String subjectName) {
124+
return V1SchemaRegistrySubject.builder()
125+
.withMetadata(ObjectMeta.builder()
126+
.withName(subjectName)
127+
.build())
128+
.withSpec(V1SchemaRegistrySubjectSpec.builder()
129+
.withSchemaType(SchemaType.AVRO)
130+
.withSchema(new SchemaHandle(AVRO_SCHEMA))
131+
.build())
132+
.build();
133+
}
134+
}

providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/change/handler/DeleteSchemaSubjectChangeHandler.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,32 @@ public List<ChangeResponse> handleChanges(@NotNull List<ResourceChange> changes)
5151
for (ResourceChange change : changes) {
5252
final String subject = change.getMetadata().getName();
5353
SchemaSubjectChangeOptions options = getSchemaSubjectChangeOptions(change);
54+
// Always perform a soft delete first.
5455
Mono<Void> mono = api
55-
.deleteSubjectVersions(subject, options.permanentDelete())
56+
.deleteSubjectVersions(subject, false)
5657
.handle((versions, sink) -> {
5758
if (LOG.isInfoEnabled()) {
5859
LOG.info(
59-
"Deleted all versions for Schema Registry subject '{}': {}",
60+
"Soft-deleted all versions for Schema Registry subject '{}': {}",
6061
subject,
6162
versions
6263
);
6364
}
6465
});
66+
// If permanent delete is requested, follow with a hard delete.
67+
if (options.permanentDelete()) {
68+
mono = mono.then(api
69+
.deleteSubjectVersions(subject, true)
70+
.handle((versions, sink) -> {
71+
if (LOG.isInfoEnabled()) {
72+
LOG.info(
73+
"Hard-deleted all versions for Schema Registry subject '{}': {}",
74+
subject,
75+
versions
76+
);
77+
}
78+
}));
79+
}
6580
results.add(toChangeResponse(change, mono.toFuture()));
6681
}
6782
return results;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) The original authors
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.jikkou.schema.registry.api;
8+
9+
import static org.mockito.ArgumentMatchers.eq;
10+
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.verify;
12+
import static org.mockito.Mockito.when;
13+
14+
import java.util.List;
15+
import org.junit.jupiter.api.Test;
16+
17+
class DefaultAsyncSchemaRegistryApiTest {
18+
19+
private static final String TEST_SUBJECT = "test-subject";
20+
21+
@Test
22+
void shouldDelegateSoftDeleteToApi() {
23+
// Given
24+
SchemaRegistryApi schemaRegistryApi = mock(SchemaRegistryApi.class);
25+
when(schemaRegistryApi.deleteSubjectVersions(eq(TEST_SUBJECT), eq(false)))
26+
.thenReturn(List.of(1, 2));
27+
28+
DefaultAsyncSchemaRegistryApi asyncApi = new DefaultAsyncSchemaRegistryApi(schemaRegistryApi);
29+
30+
// When
31+
asyncApi.deleteSubjectVersions(TEST_SUBJECT, false).block();
32+
33+
// Then
34+
verify(schemaRegistryApi).deleteSubjectVersions(TEST_SUBJECT, false);
35+
}
36+
37+
@Test
38+
void shouldDelegateHardDeleteToApi() {
39+
// Given
40+
SchemaRegistryApi schemaRegistryApi = mock(SchemaRegistryApi.class);
41+
when(schemaRegistryApi.deleteSubjectVersions(eq(TEST_SUBJECT), eq(true)))
42+
.thenReturn(List.of(1, 2));
43+
44+
DefaultAsyncSchemaRegistryApi asyncApi = new DefaultAsyncSchemaRegistryApi(schemaRegistryApi);
45+
46+
// When
47+
asyncApi.deleteSubjectVersions(TEST_SUBJECT, true).block();
48+
49+
// Then
50+
verify(schemaRegistryApi).deleteSubjectVersions(TEST_SUBJECT, true);
51+
}
52+
}

0 commit comments

Comments
 (0)