diff --git a/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java index 45a1852f0..d7d0801fe 100644 --- a/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java +++ b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java @@ -21,6 +21,7 @@ import io.jikkou.kafka.reconciler.service.KafkaOffsetSpec.ToTimestamp; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.*; @@ -255,9 +256,22 @@ public V1KafkaConsumerGroupList listConsumerGroups(@NotNull List groups, return Mono.just(group); }) .collectList() - .block(); + .toFuture() + .get(); return new V1KafkaConsumerGroupList.Builder().withItems(items).build(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Failed to describe consumer groups.", e); + throw new JikkouRuntimeException(String.format( + "Failed to describe consumer groups. Cause %s: %s.", e.getClass().getSimpleName(), e.getLocalizedMessage()) + ); + } catch (ExecutionException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + LOG.error("Failed to describe consumer groups.", cause); + throw new JikkouRuntimeException(String.format( + "Failed to describe consumer groups. Cause %s: %s.", cause.getClass().getSimpleName(), cause.getLocalizedMessage()) + ); } catch (Exception e) { LOG.error("Failed to describe consumer groups.", e); throw new JikkouRuntimeException(String.format(