Fix RabbitMQ incoming channel race condition during startup#3383
Fix RabbitMQ incoming channel race condition during startup#3383gastaldi wants to merge 3 commits intosmallrye:mainfrom
Conversation
The incoming channel's connection and infrastructure setup (exchange, queue, bindings, consumer) was fully asynchronous, completing after the container reported "started". This caused race conditions where producers could publish to exchanges before they existed. Two fixes: - ClientHolder: move hasBeenConnected/log from onSubscription() to onItem() so they fire after the connection is actually established - IncomingRabbitMQChannel: eagerly await the connection and consumer creation during construction, ensuring all infrastructure is set up before getPublisher() returns
|
This fixes the quarkus-quickstart failures: https://github.com/quarkusio/quarkus-quickstarts/actions/runs/24974509581 |
|
@gastaldi thanks for looking at this! I am not sure if we need to memoize the created consumer. When there is a disconnection I think the created consumer will be stuck on the previous connection. |
|
This is probably due to #3354 |
|
Hence, RabbitMQReconnectionTest is failing. I'll check the failing quickstart to reproduce the problem |
Await the createConsumer() result directly and use it to build the Multi. Memoizing would pin the consumer to the initial connection, preventing proper reconnection behavior.
|
@gastaldi also did you see this one : quarkusio/quarkus-quickstarts#1638 ? |
|
@ozangunalp yeah, that fixes the race problem for that particular quickstart, but doesn't solve the issue in general, which is what this PR is trying to fix. Anyway, feel free to close this if it doesn't make sense :) |
| // Translate all consumers into a merged stream of messages | ||
| .onItem().transformToMulti( | ||
| tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck)); | ||
| Tuple2<ClientHolder, RabbitMQConsumer> consumer = createConsumer(connector, ic) |
There was a problem hiding this comment.
I don't believe this is correct.
It resolves the consumer exactly once during construction. But, after a disconnect, the consumer is stale and pinned to the old connection.
| .onItem().transformToMulti( | ||
| tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck)); | ||
| Tuple2<ClientHolder, RabbitMQConsumer> consumer = createConsumer(connector, ic) | ||
| .await().atMost(Duration.ofMillis(ic.getConnectionTimeout())); |
There was a problem hiding this comment.
In addition, with this change, shared connections with multiple incoming channels are now serialized.
| this.client = client; | ||
| this.connection = Uni.createFrom().deferred(() -> client.start() | ||
| .onSubscription().invoke(() -> { | ||
| .onItem().invoke(ignored -> { |
There was a problem hiding this comment.
That introduces a race condition. If the Uni from client.start() completes asynchronously, there's a window where subscription has happened but hasBeenConnected is still false. This could cause health checks to briefly report unhealthy even though connection is in progress.
- Revert ClientHolder to use onSubscription() instead of onItem() to avoid health check race during connection establishment - Restore lazy Uni→Multi consumer pipeline in IncomingRabbitMQChannel to preserve reconnection behavior and avoid pinning consumers - Eagerly trigger getOrEstablishConnection() (fire-and-forget) to declare infrastructure without blocking the constructor or serializing shared connections - Set client reference eagerly from the holder - Update test to use Awaitility for async infrastructure readiness
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3383 +/- ##
============================================
+ Coverage 77.47% 77.81% +0.34%
- Complexity 3778 5441 +1663
============================================
Files 306 465 +159
Lines 12673 18217 +5544
Branches 1648 2262 +614
============================================
+ Hits 9818 14175 +4357
- Misses 2116 2936 +820
- Partials 739 1106 +367
🚀 New features to boost your workflow:
|
The incoming channel's connection and infrastructure setup (exchange, queue, bindings, consumer) was fully asynchronous, completing after the container reported "started".
This caused race conditions where producers could publish to exchanges before they existed.
Two fixes:
ClientHolder: movehasBeenConnected/logfromonSubscription()toonItem()so they fire after the connection is actually establishedIncomingRabbitMQChannel: eagerly await the connection and consumer creation during construction, ensuring all infrastructure is set up beforegetPublisher()returns