From 68b95f5d6f1811089186be4b6e90c8ff3fd9d33e Mon Sep 17 00:00:00 2001 From: George Gastaldi Date: Mon, 27 Apr 2026 03:37:35 -0300 Subject: [PATCH 1/3] Fix RabbitMQ incoming channel race condition during startup The incoming channel's connection and infrastructure setup (exchange, queue, bindings, consumer) was fully asynchronous, completing after the container reported "started". This caused race conditions where producers could publish to exchanges before they existed. Two fixes: - ClientHolder: move hasBeenConnected/log from onSubscription() to onItem() so they fire after the connection is actually established - IncomingRabbitMQChannel: eagerly await the connection and consumer creation during construction, ensuring all infrastructure is set up before getPublisher() returns --- .../messaging/rabbitmq/ClientHolder.java | 2 +- .../internals/IncomingRabbitMQChannel.java | 9 ++++-- .../messaging/rabbitmq/RabbitMQTest.java | 30 +++++++++++++++++++ 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java index eae70ea41..85bd8a858 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java @@ -30,7 +30,7 @@ public class ClientHolder { public ClientHolder(RabbitMQClient client) { this.client = client; this.connection = Uni.createFrom().deferred(() -> client.start() - .onSubscription().invoke(() -> { + .onItem().invoke(ignored -> { hasBeenConnected.set(true); log.connectionEstablished(String.join(", ", channels)); }) diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index cfd37b71a..7d22071da 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -5,6 +5,7 @@ import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.parseArguments; import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.serverQueueName; +import java.time.Duration; import java.util.Map; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicReference; @@ -65,9 +66,13 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic); final RabbitMQAckHandler onAck = createAckHandler(ic); - Multi> multi = createConsumer(connector, ic) + Uni> consumerUni = createConsumer(connector, ic) .invoke(tuple -> client = tuple.getItem1().client()) - // Translate all consumers into a merged stream of messages + .memoize().indefinitely(); + + consumerUni.await().atMost(Duration.ofMillis(ic.getConnectionTimeout())); + + Multi> multi = consumerUni .onItem().transformToMulti( tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck)); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index 8f081d66a..481ee1111 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -199,6 +199,36 @@ void testIncomingDeclarations() throws Exception { assertThat(binding2.getString("routing_key")).isEqualTo("urgent"); } + /** + * Verifies that incoming channel infrastructure (exchange, queue, bindings) is fully + * set up immediately after container initialization, without requiring any polling. + */ + @Test + void testIncomingChannelReadyImmediatelyAfterInitialization() throws Exception { + weld.addBeanClass(IncomingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + + // No await() - infrastructure must be ready synchronously after initialization + assertThat(isRabbitMQConnectorAvailable(container)).isTrue(); + assertThat(usage.getExchange(exchangeName)).isNotNull(); + assertThat(usage.getQueue(queueName)).isNotNull(); + } + @Test void testSharedConnectionIncomingAndOutgoingStartup() { final String routingKey = "shared"; From 9dfaeb66df00c0c2b31cd42bd6aa6e2279acc0e9 Mon Sep 17 00:00:00 2001 From: George Gastaldi Date: Mon, 27 Apr 2026 07:01:54 -0300 Subject: [PATCH 2/3] Drop unnecessary memoization of the consumer Uni Await the createConsumer() result directly and use it to build the Multi. Memoizing would pin the consumer to the initial connection, preventing proper reconnection behavior. --- .../rabbitmq/internals/IncomingRabbitMQChannel.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index 7d22071da..dd7483169 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -66,15 +66,12 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic); final RabbitMQAckHandler onAck = createAckHandler(ic); - Uni> consumerUni = createConsumer(connector, ic) - .invoke(tuple -> client = tuple.getItem1().client()) - .memoize().indefinitely(); + Tuple2 consumer = createConsumer(connector, ic) + .await().atMost(Duration.ofMillis(ic.getConnectionTimeout())); + client = consumer.getItem1().client(); - consumerUni.await().atMost(Duration.ofMillis(ic.getConnectionTimeout())); - - Multi> multi = consumerUni - .onItem().transformToMulti( - tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck)); + Multi> multi = getStreamOfMessages( + consumer.getItem2(), consumer.getItem1(), incomingContext, ic, onNack, onAck); if (ic.getBroadcast()) { multi = multi.broadcast().toAllSubscribers(); From d36ec3c0881d021ea0385b7ac1443b335f3c093c Mon Sep 17 00:00:00 2001 From: George Gastaldi Date: Thu, 30 Apr 2026 08:12:10 -0300 Subject: [PATCH 3/3] Address PR review: keep async consumer creation and reconnection support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Revert ClientHolder to use onSubscription() instead of onItem() to avoid health check race during connection establishment - Restore lazy Uni→Multi consumer pipeline in IncomingRabbitMQChannel to preserve reconnection behavior and avoid pinning consumers - Eagerly trigger getOrEstablishConnection() (fire-and-forget) to declare infrastructure without blocking the constructor or serializing shared connections - Set client reference eagerly from the holder - Update test to use Awaitility for async infrastructure readiness --- .../messaging/rabbitmq/ClientHolder.java | 2 +- .../internals/IncomingRabbitMQChannel.java | 59 +++++++++---------- .../messaging/rabbitmq/RabbitMQTest.java | 9 ++- 3 files changed, 32 insertions(+), 38 deletions(-) diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java index 85bd8a858..eae70ea41 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/ClientHolder.java @@ -30,7 +30,7 @@ public class ClientHolder { public ClientHolder(RabbitMQClient client) { this.client = client; this.connection = Uni.createFrom().deferred(() -> client.start() - .onItem().invoke(ignored -> { + .onSubscription().invoke(() -> { hasBeenConnected.set(true); log.connectionEstablished(String.join(", ", channels)); }) diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index dd7483169..bdbb8fe9c 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -5,7 +5,6 @@ import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.parseArguments; import static io.smallrye.reactive.messaging.rabbitmq.internals.RabbitMQClientHelper.serverQueueName; -import java.time.Duration; import java.util.Map; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicReference; @@ -19,7 +18,6 @@ import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.tuples.Tuple2; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; @@ -66,12 +64,34 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic); final RabbitMQAckHandler onAck = createAckHandler(ic); - Tuple2 consumer = createConsumer(connector, ic) - .await().atMost(Duration.ofMillis(ic.getConnectionTimeout())); - client = consumer.getItem1().client(); + ClientHolder holder = connector.getClientHolder(ic); + final RabbitMQClient rabbitClient = holder.client(); + this.client = rabbitClient; + + rabbitClient.getDelegate().addConnectionEstablishedCallback(promise -> { + Uni uni; + if (ic.getMaxOutstandingMessages().isPresent()) { + uni = rabbitClient.basicQos(ic.getMaxOutstandingMessages().get(), false); + } else { + uni = Uni.createFrom().nullItem(); + } - Multi> multi = getStreamOfMessages( - consumer.getItem2(), consumer.getItem1(), incomingContext, ic, onNack, onAck); + Instance> maps = connector.configMaps(); + uni + .call(() -> declareQueue(rabbitClient, ic, maps)) + .call(() -> RabbitMQClientHelper.configureDLQorDLX(rabbitClient, ic, maps)) + .subscribe().with(ignored -> promise.complete(), promise::fail); + }); + + // Eagerly trigger connection to declare infrastructure (exchange, queue, bindings) + holder.getOrEstablishConnection() + .subscribe().with(v -> { + }, log::unableToConnectToBroker); + + Multi> multi = holder.getOrEstablishConnection() + .flatMap(connection -> createConsumer(ic, connection)) + .onItem().transformToMulti( + consumer -> getStreamOfMessages(consumer, holder, incomingContext, ic, onNack, onAck)); if (ic.getBroadcast()) { multi = multi.broadcast().toAllSubscribers(); @@ -115,31 +135,6 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder return computeHealthReport(builder); } - private Uni> createConsumer(RabbitMQConnector connector, - RabbitMQConnectorIncomingConfiguration ic) { - ClientHolder holder = connector.getClientHolder(ic); - final RabbitMQClient client = holder.client(); - client.getDelegate().addConnectionEstablishedCallback(promise -> { - - Uni uni; - if (ic.getMaxOutstandingMessages().isPresent()) { - uni = client.basicQos(ic.getMaxOutstandingMessages().get(), false); - } else { - uni = Uni.createFrom().nullItem(); - } - - Instance> maps = connector.configMaps(); - // Ensure we create the queues (and exchanges) from which messages will be read - uni - .call(() -> declareQueue(client, ic, maps)) - .call(() -> RabbitMQClientHelper.configureDLQorDLX(client, ic, maps)) - .subscribe().with(ignored -> promise.complete(), promise::fail); - }); - - return holder.getOrEstablishConnection() - .flatMap(connection -> createConsumer(ic, connection).map(consumer -> Tuple2.of(holder, consumer))); - } - private RabbitMQFailureHandler createFailureHandler(Instance failureHandlerFactories, RabbitMQConnectorIncomingConfiguration config) { String strategy = config.getFailureStrategy(); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index 481ee1111..c05f65304 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -200,11 +200,11 @@ void testIncomingDeclarations() throws Exception { } /** - * Verifies that incoming channel infrastructure (exchange, queue, bindings) is fully - * set up immediately after container initialization, without requiring any polling. + * Verifies that incoming channel infrastructure (exchange, queue, bindings) is + * set up after container initialization via eager connection establishment. */ @Test - void testIncomingChannelReadyImmediatelyAfterInitialization() throws Exception { + void testIncomingChannelReadyAfterInitialization() throws Exception { weld.addBeanClass(IncomingBean.class); new MapBasedConfig() @@ -223,8 +223,7 @@ void testIncomingChannelReadyImmediatelyAfterInitialization() throws Exception { container = weld.initialize(); - // No await() - infrastructure must be ready synchronously after initialization - assertThat(isRabbitMQConnectorAvailable(container)).isTrue(); + await().until(() -> isRabbitMQConnectorAvailable(container)); assertThat(usage.getExchange(exchangeName)).isNotNull(); assertThat(usage.getQueue(queueName)).isNotNull(); }