Skip to content

Fix RabbitMQ incoming channel race condition during startup#3383

Open
gastaldi wants to merge 3 commits intosmallrye:mainfrom
gastaldi:rabbit_race
Open

Fix RabbitMQ incoming channel race condition during startup#3383
gastaldi wants to merge 3 commits intosmallrye:mainfrom
gastaldi:rabbit_race

Conversation

@gastaldi
Copy link
Copy Markdown
Contributor

@gastaldi gastaldi commented Apr 27, 2026

The incoming channel's connection and infrastructure setup (exchange, queue, bindings, consumer) was fully asynchronous, completing after the container reported "started".

This caused race conditions where producers could publish to exchanges before they existed.

Two fixes:

  • ClientHolder: move hasBeenConnected/log from onSubscription() to onItem() so they fire after the connection is actually established
  • IncomingRabbitMQChannel: eagerly await the connection and consumer creation during construction, ensuring all infrastructure is set up before getPublisher() returns

The incoming channel's connection and infrastructure setup (exchange,
queue, bindings, consumer) was fully asynchronous, completing after
the container reported "started". This caused race conditions where
producers could publish to exchanges before they existed.

Two fixes:
- ClientHolder: move hasBeenConnected/log from onSubscription() to
  onItem() so they fire after the connection is actually established
- IncomingRabbitMQChannel: eagerly await the connection and consumer
  creation during construction, ensuring all infrastructure is set up
  before getPublisher() returns
@gastaldi gastaldi requested a review from ozangunalp April 27, 2026 06:39
@gastaldi
Copy link
Copy Markdown
Contributor Author

This fixes the quarkus-quickstart failures: https://github.com/quarkusio/quarkus-quickstarts/actions/runs/24974509581

@gastaldi gastaldi requested a review from cescoffier April 27, 2026 06:41
@ozangunalp
Copy link
Copy Markdown
Collaborator

@gastaldi thanks for looking at this!

I am not sure if we need to memoize the created consumer. When there is a disconnection I think the created consumer will be stuck on the previous connection.

@ozangunalp
Copy link
Copy Markdown
Collaborator

This is probably due to #3354

@ozangunalp
Copy link
Copy Markdown
Collaborator

Hence, RabbitMQReconnectionTest is failing. I'll check the failing quickstart to reproduce the problem

Await the createConsumer() result directly and use it to build the
Multi. Memoizing would pin the consumer to the initial connection,
preventing proper reconnection behavior.
@ozangunalp
Copy link
Copy Markdown
Collaborator

@gastaldi also did you see this one : quarkusio/quarkus-quickstarts#1638 ?

@gastaldi
Copy link
Copy Markdown
Contributor Author

@ozangunalp yeah, that fixes the race problem for that particular quickstart, but doesn't solve the issue in general, which is what this PR is trying to fix.

Anyway, feel free to close this if it doesn't make sense :)

// Translate all consumers into a merged stream of messages
.onItem().transformToMulti(
tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck));
Tuple2<ClientHolder, RabbitMQConsumer> consumer = createConsumer(connector, ic)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this is correct.
It resolves the consumer exactly once during construction. But, after a disconnect, the consumer is stale and pinned to the old connection.

.onItem().transformToMulti(
tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck));
Tuple2<ClientHolder, RabbitMQConsumer> consumer = createConsumer(connector, ic)
.await().atMost(Duration.ofMillis(ic.getConnectionTimeout()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be async.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, with this change, shared connections with multiple incoming channels are now serialized.

this.client = client;
this.connection = Uni.createFrom().deferred(() -> client.start()
.onSubscription().invoke(() -> {
.onItem().invoke(ignored -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That introduces a race condition. If the Uni from client.start() completes asynchronously, there's a window where subscription has happened but hasBeenConnected is still false. This could cause health checks to briefly report unhealthy even though connection is in progress.

- Revert ClientHolder to use onSubscription() instead of onItem()
  to avoid health check race during connection establishment
- Restore lazy Uni→Multi consumer pipeline in IncomingRabbitMQChannel
  to preserve reconnection behavior and avoid pinning consumers
- Eagerly trigger getOrEstablishConnection() (fire-and-forget) to
  declare infrastructure without blocking the constructor or
  serializing shared connections
- Set client reference eagerly from the holder
- Update test to use Awaitility for async infrastructure readiness
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 30, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 77.81%. Comparing base (a96442f) to head (d36ec3c).
⚠️ Report is 1152 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##               main    #3383      +/-   ##
============================================
+ Coverage     77.47%   77.81%   +0.34%     
- Complexity     3778     5441    +1663     
============================================
  Files           306      465     +159     
  Lines         12673    18217    +5544     
  Branches       1648     2262     +614     
============================================
+ Hits           9818    14175    +4357     
- Misses         2116     2936     +820     
- Partials        739     1106     +367     
Files with missing lines Coverage Δ
...ng/rabbitmq/internals/IncomingRabbitMQChannel.java 92.17% <100.00%> (ø)

... and 249 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants