Skip to content

Commit 7f8f337

Browse files
committed
Delay Rqueue startup for Boot web apps
1 parent f4a3715 commit 7f8f337

4 files changed

Lines changed: 125 additions & 2 deletions

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,10 +517,14 @@ private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappin
517517

518518
@Override
519519
public void start() {
520-
log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId());
521520
synchronized (lifecycleMgr) {
522-
running = true;
521+
if (running) {
522+
log.debug("Rqueue Message container {} is already running", RqueueConfig.getBrokerId());
523+
return;
524+
}
525+
log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId());
523526
doStart();
527+
running = true;
524528
rqueueBeanProvider
525529
.getApplicationEventPublisher()
526530
.publishEvent(new RqueueBootstrapEvent(EVENT_SOURCE, true));
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.spring.boot;
18+
19+
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
20+
import java.util.Set;
21+
import java.util.concurrent.ConcurrentHashMap;
22+
import org.springframework.beans.BeansException;
23+
import org.springframework.beans.factory.config.BeanPostProcessor;
24+
import org.springframework.boot.context.event.ApplicationReadyEvent;
25+
import org.springframework.context.ApplicationListener;
26+
27+
/**
28+
* Delays Rqueue poller startup in Boot web applications until the servlet or reactive web server
29+
* is ready and Spring Boot has published {@link ApplicationReadyEvent}.
30+
*/
31+
public class RqueueAutoStartupLifecycle
32+
implements BeanPostProcessor, ApplicationListener<ApplicationReadyEvent> {
33+
34+
private final Set<RqueueMessageListenerContainer> delayedContainers =
35+
ConcurrentHashMap.newKeySet();
36+
37+
@Override
38+
public Object postProcessBeforeInitialization(Object bean, String beanName)
39+
throws BeansException {
40+
if (bean instanceof RqueueMessageListenerContainer) {
41+
RqueueMessageListenerContainer container = (RqueueMessageListenerContainer) bean;
42+
if (container.isAutoStartup()) {
43+
container.setAutoStartup(false);
44+
delayedContainers.add(container);
45+
}
46+
}
47+
return bean;
48+
}
49+
50+
@Override
51+
public void onApplicationEvent(ApplicationReadyEvent event) {
52+
for (RqueueMessageListenerContainer container : delayedContainers) {
53+
if (!container.isRunning()) {
54+
container.start();
55+
}
56+
}
57+
}
58+
}

rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueListenerAutoConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,18 @@
3333
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
3434
import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled;
3535
import com.github.sonus21.rqueue.utils.condition.RqueueEnabled;
36+
import org.springframework.beans.factory.config.BeanDefinition;
3637
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
3738
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
39+
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
3840
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
3941
import org.springframework.context.annotation.Bean;
4042
import org.springframework.context.annotation.ComponentScan;
4143
import org.springframework.context.annotation.Conditional;
4244
import org.springframework.context.annotation.Configuration;
4345
import org.springframework.context.annotation.DependsOn;
4446
import org.springframework.context.annotation.Import;
47+
import org.springframework.context.annotation.Role;
4548

4649
@Configuration
4750
@AutoConfigureAfter(DataRedisAutoConfiguration.class)
@@ -54,6 +57,13 @@
5457
@Import(RqueueRedisConfigImportSelector.class)
5558
public class RqueueListenerAutoConfig extends RqueueListenerBaseConfig {
5659

60+
@Bean
61+
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
62+
@ConditionalOnWebApplication
63+
public static RqueueAutoStartupLifecycle rqueueAutoStartupLifecycle() {
64+
return new RqueueAutoStartupLifecycle();
65+
}
66+
5767
@Bean
5868
@ConditionalOnMissingBean
5969
public RqueueMessageHandler rqueueMessageHandler(MessageBroker messageBroker) {

rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/unit/RqueueListenerAutoConfigTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.github.sonus21.rqueue.spring.boot.tests.unit;
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertFalse;
2021
import static org.junit.jupiter.api.Assertions.assertNotNull;
2122
import static org.junit.jupiter.api.Assertions.assertSame;
2223
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -35,6 +36,8 @@
3536
import com.github.sonus21.rqueue.core.spi.Capabilities;
3637
import com.github.sonus21.rqueue.core.spi.MessageBroker;
3738
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
39+
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
40+
import com.github.sonus21.rqueue.spring.boot.RqueueAutoStartupLifecycle;
3841
import com.github.sonus21.rqueue.spring.boot.RqueueListenerAutoConfig;
3942
import com.github.sonus21.rqueue.spring.boot.tests.SpringBootUnitTest;
4043
import org.apache.commons.lang3.reflect.FieldUtils;
@@ -125,6 +128,33 @@ void rqueueMessageListenerContainer()
125128
assertSame(messageBroker, factory.getMessageBroker());
126129
}
127130

131+
@Test
132+
void autoStartupLifecycleDelaysOnlyAutoStartupContainers() {
133+
RqueueAutoStartupLifecycle lifecycle = new RqueueAutoStartupLifecycle();
134+
TestRqueueMessageListenerContainer autoStartupContainer =
135+
new TestRqueueMessageListenerContainer();
136+
TestRqueueMessageListenerContainer disabledContainer =
137+
new TestRqueueMessageListenerContainer();
138+
TestRqueueMessageListenerContainer alreadyRunningContainer =
139+
new TestRqueueMessageListenerContainer();
140+
disabledContainer.setAutoStartup(false);
141+
142+
lifecycle.postProcessBeforeInitialization(autoStartupContainer, "autoStartupContainer");
143+
lifecycle.postProcessBeforeInitialization(disabledContainer, "disabledContainer");
144+
lifecycle.postProcessBeforeInitialization(alreadyRunningContainer, "alreadyRunningContainer");
145+
alreadyRunningContainer.running = true;
146+
147+
assertFalse(autoStartupContainer.isAutoStartup());
148+
assertFalse(disabledContainer.isAutoStartup());
149+
assertFalse(alreadyRunningContainer.isAutoStartup());
150+
151+
lifecycle.onApplicationEvent(null);
152+
153+
assertEquals(1, autoStartupContainer.startCount);
154+
assertEquals(0, disabledContainer.startCount);
155+
assertEquals(0, alreadyRunningContainer.startCount);
156+
}
157+
128158
@Test
129159
void rqueueMessageEnqueuerWiresBroker() throws IllegalAccessException {
130160
SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory();
@@ -158,4 +188,25 @@ void rqueueMessageSenderUsesConfiguredMessageConverter() throws IllegalAccessExc
158188
MessageConverter converter = messageSender.getMessageConverter();
159189
assertTrue(converter.hashCode() == messageConverter.hashCode());
160190
}
191+
192+
private class TestRqueueMessageListenerContainer extends RqueueMessageListenerContainer {
193+
194+
private int startCount;
195+
private boolean running;
196+
197+
private TestRqueueMessageListenerContainer() {
198+
super(rqueueMessageHandler, messageTemplate);
199+
}
200+
201+
@Override
202+
public void start() {
203+
startCount++;
204+
running = true;
205+
}
206+
207+
@Override
208+
public boolean isRunning() {
209+
return running;
210+
}
211+
}
161212
}

0 commit comments

Comments
 (0)