Skip to content

Commit 8afcc32

Browse files
committed
fix(schemaregistry): fix subject mode
1 parent 95b559f commit 8afcc32

5 files changed

Lines changed: 12 additions & 13 deletions

File tree

providers/jikkou-provider-aiven/src/main/java/io/streamthoughts/jikkou/extension/aiven/api/AivenAsyncSchemaRegistryApi.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public Mono<ModeObject> getGlobalMode() {
184184
* {@inheritDoc}
185185
**/
186186
@Override
187-
public Mono<ModeObject> getSubjectMode(@NotNull String subject, boolean defaultToGlobal) {
187+
public Mono<ModeObject> getSubjectMode(@NotNull String subject) {
188188
throw new UnsupportedOperationException("Aiven schema registry does not support subject mode");
189189
}
190190

providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/AsyncSchemaRegistryApi.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,9 @@ Mono<CompatibilityCheck> testCompatibility(@NotNull String subject,
106106
* Gets mode level for the specified subject.
107107
*
108108
* @param subject the name of the subject.
109-
* @param defaultToGlobal flag to default to global mode.
110109
* @return the mode.
111110
*/
112-
Mono<ModeObject> getSubjectMode(@NotNull String subject, boolean defaultToGlobal);
111+
Mono<ModeObject> getSubjectMode(@NotNull String subject);
113112

114113
/**
115114
* Updates mode for the specified subject.

providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/DefaultAsyncSchemaRegistryApi.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ public Mono<CompatibilityObject> deleteSubjectCompatibilityLevel(@NotNull final
106106
}
107107

108108
@Override
109-
public Mono<ModeObject> getSubjectMode(@NotNull String subject, boolean defaultToGlobal) {
110-
return Mono.fromCallable(() -> api.getMode(subject, defaultToGlobal));
109+
public Mono<ModeObject> getSubjectMode(@NotNull String subject) {
110+
return Mono.fromCallable(() -> api.getMode(subject));
111111
}
112112

113113
@Override

providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/api/SchemaRegistryApi.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,7 @@ CompatibilityLevelObject getConfigCompatibility(@PathParam("subject") String sub
249249
@GET
250250
@Path("mode/{subject}")
251251
@Produces("application/vnd.schemaregistry.v1+json")
252-
ModeObject getMode(@PathParam("subject") String subject,
253-
@QueryParam("defaultToGlobal") @DefaultValue("false") boolean defaultToGlobal);
252+
ModeObject getMode(@PathParam("subject") String subject);
254253

255254
@PUT
256255
@Path("mode/{subject}")

providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package io.streamthoughts.jikkou.schema.registry.reconciler;
88

99
import com.google.common.base.Strings;
10+
import io.streamthoughts.jikkou.common.utils.Enums;
1011
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
1112
import io.streamthoughts.jikkou.core.config.ConfigProperty;
1213
import io.streamthoughts.jikkou.core.config.Configuration;
@@ -124,16 +125,16 @@ private ResourceList<V1SchemaRegistrySubject> listAll(@NotNull Configuration con
124125
.onErrorResume(SchemaRegistrySubjectCollector::emptyOn404);
125126
// Get Schema Registry Subject Mode
126127
Mono<String> modeMono =
127-
api.getGlobalMode()
128+
api.getSubjectMode(subjectSchemaVersion.subject())
128129
.map(ModeObject::mode)
129130
.onErrorResume(SchemaRegistrySubjectCollector::emptyOn404);
130131

131132
return Mono.zip(compatibilityMono.defaultIfEmpty(EMPTY_STRING), modeMono.defaultIfEmpty(EMPTY_STRING))
132-
.map(tuple -> {
133-
CompatibilityLevels compatibilityLevel = Strings.isNullOrEmpty(tuple.getT1()) ? null : CompatibilityLevels.valueOf(tuple.getT1());
134-
Modes mode = Strings.isNullOrEmpty(tuple.getT1()) ? null : Modes.valueOf(tuple.getT2());
135-
return schemaRegistrySubjectFactory.createSchemaRegistrySubject(subjectSchemaVersion, compatibilityLevel, mode);
136-
});
133+
.map(tuple -> schemaRegistrySubjectFactory.createSchemaRegistrySubject(
134+
subjectSchemaVersion,
135+
Enums.safeValueOf(CompatibilityLevels.class, tuple.getT1()),
136+
Enums.safeValueOf(Modes.class, tuple.getT2())
137+
));
137138
});
138139
try {
139140
return new V1SchemaRegistrySubjectList.Builder().withItems(flux.collectList().block()).build();

0 commit comments

Comments
 (0)