diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index cfd37b71a..bdbb8fe9c 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -18,7 +18,6 @@ import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.tuples.Tuple2; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; @@ -65,11 +64,34 @@ public IncomingRabbitMQChannel(RabbitMQConnector connector, final RabbitMQFailureHandler onNack = createFailureHandler(connector.failureHandlerFactories(), ic); final RabbitMQAckHandler onAck = createAckHandler(ic); - Multi> multi = createConsumer(connector, ic) - .invoke(tuple -> client = tuple.getItem1().client()) - // Translate all consumers into a merged stream of messages + ClientHolder holder = connector.getClientHolder(ic); + final RabbitMQClient rabbitClient = holder.client(); + this.client = rabbitClient; + + rabbitClient.getDelegate().addConnectionEstablishedCallback(promise -> { + Uni uni; + if (ic.getMaxOutstandingMessages().isPresent()) { + uni = rabbitClient.basicQos(ic.getMaxOutstandingMessages().get(), false); + } else { + uni = Uni.createFrom().nullItem(); + } + + Instance> maps = connector.configMaps(); + uni + .call(() -> declareQueue(rabbitClient, ic, maps)) + .call(() -> RabbitMQClientHelper.configureDLQorDLX(rabbitClient, ic, maps)) + .subscribe().with(ignored -> promise.complete(), promise::fail); + }); + + // Eagerly trigger connection to declare infrastructure (exchange, queue, bindings) + holder.getOrEstablishConnection() + .subscribe().with(v -> { + }, log::unableToConnectToBroker); + + Multi> multi = holder.getOrEstablishConnection() + .flatMap(connection -> createConsumer(ic, connection)) .onItem().transformToMulti( - tuple -> getStreamOfMessages(tuple.getItem2(), tuple.getItem1(), incomingContext, ic, onNack, onAck)); + consumer -> getStreamOfMessages(consumer, holder, incomingContext, ic, onNack, onAck)); if (ic.getBroadcast()) { multi = multi.broadcast().toAllSubscribers(); @@ -113,31 +135,6 @@ public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder return computeHealthReport(builder); } - private Uni> createConsumer(RabbitMQConnector connector, - RabbitMQConnectorIncomingConfiguration ic) { - ClientHolder holder = connector.getClientHolder(ic); - final RabbitMQClient client = holder.client(); - client.getDelegate().addConnectionEstablishedCallback(promise -> { - - Uni uni; - if (ic.getMaxOutstandingMessages().isPresent()) { - uni = client.basicQos(ic.getMaxOutstandingMessages().get(), false); - } else { - uni = Uni.createFrom().nullItem(); - } - - Instance> maps = connector.configMaps(); - // Ensure we create the queues (and exchanges) from which messages will be read - uni - .call(() -> declareQueue(client, ic, maps)) - .call(() -> RabbitMQClientHelper.configureDLQorDLX(client, ic, maps)) - .subscribe().with(ignored -> promise.complete(), promise::fail); - }); - - return holder.getOrEstablishConnection() - .flatMap(connection -> createConsumer(ic, connection).map(consumer -> Tuple2.of(holder, consumer))); - } - private RabbitMQFailureHandler createFailureHandler(Instance failureHandlerFactories, RabbitMQConnectorIncomingConfiguration config) { String strategy = config.getFailureStrategy(); diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java index 8f081d66a..c05f65304 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQTest.java @@ -199,6 +199,35 @@ void testIncomingDeclarations() throws Exception { assertThat(binding2.getString("routing_key")).isEqualTo("urgent"); } + /** + * Verifies that incoming channel infrastructure (exchange, queue, bindings) is + * set up after container initialization via eager connection establishment. + */ + @Test + void testIncomingChannelReadyAfterInitialization() throws Exception { + weld.addBeanClass(IncomingBean.class); + + new MapBasedConfig() + .put("mp.messaging.incoming.data.exchange.name", exchangeName) + .put("mp.messaging.incoming.data.exchange.declare", true) + .put("mp.messaging.incoming.data.queue.name", queueName) + .put("mp.messaging.incoming.data.queue.declare", true) + .put("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME) + .put("mp.messaging.incoming.data.host", host) + .put("mp.messaging.incoming.data.port", port) + .put("mp.messaging.incoming.data.tracing.enabled", false) + .put("rabbitmq-username", username) + .put("rabbitmq-password", password) + .put("rabbitmq-reconnect-attempts", 0) + .write(); + + container = weld.initialize(); + + await().until(() -> isRabbitMQConnectorAvailable(container)); + assertThat(usage.getExchange(exchangeName)).isNotNull(); + assertThat(usage.getQueue(queueName)).isNotNull(); + } + @Test void testSharedConnectionIncomingAndOutgoingStartup() { final String routingKey = "shared";