Skip to content

Commit a833a5d

Browse files
committed
fix(schemaregistry): fix wrong recreations of schemas (#598)
Fixes: #598
1 parent cbe224e commit a833a5d

1 file changed

Lines changed: 3 additions & 7 deletions

File tree

providers/jikkou-provider-schema-registry/src/main/java/io/streamthoughts/jikkou/schema/registry/reconciler/SchemaRegistrySubjectCollector.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
*/
77
package io.streamthoughts.jikkou.schema.registry.reconciler;
88

9-
import com.google.common.base.Strings;
109
import io.streamthoughts.jikkou.common.utils.Enums;
1110
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
1211
import io.streamthoughts.jikkou.core.config.ConfigProperty;
@@ -101,6 +100,7 @@ public ResourceList<V1SchemaRegistrySubject> listAll(@NotNull Configuration conf
101100
}
102101

103102
public ResourceList<V1SchemaRegistrySubject> listAll(@NotNull Configuration configuration, @NotNull List<String> subjects) {
103+
System.err.println(subjects);
104104
try (AsyncSchemaRegistryApi api = new DefaultAsyncSchemaRegistryApi(SchemaRegistryApiFactory.create(config))) {
105105
return listAll(configuration, Flux.fromIterable(subjects), api);
106106
}
@@ -112,15 +112,11 @@ private ResourceList<V1SchemaRegistrySubject> listAll(@NotNull Configuration con
112112
Flux<V1SchemaRegistrySubject> flux =
113113
subjects
114114
// Get Schema Registry Latest Subject Version
115-
.flatMap(api::getLatestSubjectSchema)
116-
.onErrorResume(SchemaRegistrySubjectCollector::emptyOn404)
115+
.flatMap(subject -> api.getLatestSubjectSchema(subject).onErrorResume(SchemaRegistrySubjectCollector::emptyOn404))
117116
.flatMap(subjectSchemaVersion -> {
118117
// Get Schema Registry Subject Compatibility
119118
Mono<String> compatibilityMono =
120-
api.getSubjectCompatibilityLevel(
121-
subjectSchemaVersion.subject(),
122-
Config.DEFAULT_GLOBAL_COMPATIBILITY_LEVEL.get(configuration)
123-
)
119+
api.getSubjectCompatibilityLevel(subjectSchemaVersion.subject(), Config.DEFAULT_GLOBAL_COMPATIBILITY_LEVEL.get(configuration))
124120
.map(CompatibilityLevelObject::compatibilityLevel)
125121
.onErrorResume(SchemaRegistrySubjectCollector::emptyOn404);
126122
// Get Schema Registry Subject Mode

0 commit comments

Comments
 (0)