From b5883c384381b075e24e40788ce3c0e1881176ca Mon Sep 17 00:00:00 2001 From: Jeroen Schutrup Date: Tue, 12 May 2026 13:54:56 +0200 Subject: [PATCH] fix(provider/kafka): replace block() with toFuture().get() in listConsumerGroups Calling Reactor's block() from a Netty event loop thread throws IllegalStateException when Kafka is slow to respond, because Reactor's fast-path (which skips the non-blocking thread check) is only taken when the Mono is already resolved. Under normal conditions the Kafka futures resolve quickly enough to hit that fast-path, making the failure intermittent and correlated with broker latency or availability. Replace block() with toFuture().get() to bypass Reactor's non-blocking thread detection. Also explicitly handle ExecutionException and InterruptException. In the former we now unwrap the exception cause so the user-facing error message will be more useful. When catching the InterruptException we re-set the thread's interrupt flag, to prevent having stuck threads during shutdown. --- .../reconciler/service/KafkaAdminService.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java index 45a1852f0..d7d0801fe 100644 --- a/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java +++ b/providers/jikkou-provider-kafka/src/main/java/io/jikkou/kafka/reconciler/service/KafkaAdminService.java @@ -21,6 +21,7 @@ import io.jikkou.kafka.reconciler.service.KafkaOffsetSpec.ToTimestamp; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.*; @@ -255,9 +256,22 @@ public V1KafkaConsumerGroupList listConsumerGroups(@NotNull List groups, return Mono.just(group); }) .collectList() - .block(); + .toFuture() + .get(); return new V1KafkaConsumerGroupList.Builder().withItems(items).build(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Failed to describe consumer groups.", e); + throw new JikkouRuntimeException(String.format( + "Failed to describe consumer groups. Cause %s: %s.", e.getClass().getSimpleName(), e.getLocalizedMessage()) + ); + } catch (ExecutionException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + LOG.error("Failed to describe consumer groups.", cause); + throw new JikkouRuntimeException(String.format( + "Failed to describe consumer groups. Cause %s: %s.", cause.getClass().getSimpleName(), cause.getLocalizedMessage()) + ); } catch (Exception e) { LOG.error("Failed to describe consumer groups.", e); throw new JikkouRuntimeException(String.format(