Skip to content

fix(provider/kafka): replace block() with toFuture().get() in listConsumerGroups#767

Open
jeroen92 wants to merge 1 commit into
streamthoughts:mainfrom
jeroen92:fix-admin-service-reactor-race-condition
Open

fix(provider/kafka): replace block() with toFuture().get() in listConsumerGroups#767
jeroen92 wants to merge 1 commit into
streamthoughts:mainfrom
jeroen92:fix-admin-service-reactor-race-condition

Conversation

@jeroen92
Copy link
Copy Markdown

@jeroen92 jeroen92 commented May 12, 2026

Background
When fetching Kafka Consumer Groups from the Jikkou API, we often get 500's returned, e.g.

  "message": "Internal Server Error",
  "errors": [
    {
      "status": 500,
      "error_code": "internal_server_error",
      "message": "Failed to describe consumer groups. Cause IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread default-nioEventLoopGroup-1-43."
    }
  ]
}

Calling Reactor's block() from a Netty event loop thread throws IllegalStateException when Kafka is slow to respond, because Reactor's fast-path (which skips the non-blocking thread check) is only taken when the Mono is already resolved. Under normal conditions the Kafka futures resolve quickly enough to hit that fast-path, making the failure intermittent and correlated with broker latency or availability.

Replace block() with toFuture().get() to bypass Reactor's non-blocking thread detection.

Also explicitly handle ExecutionException and InterruptException. In the former we now unwrap the exception cause so the user-facing error message will be more useful. When catching the InterruptException we re-set the thread's interrupt flag, to prevent having stuck threads during shutdown.

…sumerGroups

Calling Reactor's block() from a Netty event loop thread throws
IllegalStateException when Kafka is slow to respond, because Reactor's
fast-path (which skips the non-blocking thread check) is only taken when
the Mono is already resolved. Under normal conditions the Kafka futures
resolve quickly enough to hit that fast-path, making the failure
intermittent and correlated with broker latency or availability.

Replace block() with toFuture().get() to bypass Reactor's non-blocking
thread detection.

Also explicitly handle ExecutionException and InterruptException. In the
former we now unwrap the exception cause so the user-facing error
message will be more useful. When catching the InterruptException we
re-set the thread's interrupt flag, to prevent having stuck threads
during shutdown.
@jeroen92 jeroen92 requested a review from fhussonnois as a code owner May 12, 2026 12:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant