diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java index 88c7830c..5f5dfcae 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java @@ -77,6 +77,10 @@ protected ConcurrentPulsarMessageListenerContainer createContainerInstance(Pu containerProps.setTopicResolver(factoryProps.getTopicResolver()); containerProps.setSubscriptionType(factoryProps.getSubscriptionType()); containerProps.setSubscriptionName(factoryProps.getSubscriptionName()); + // Set the retry template before the policy as setting a non-null template + // implicitly switches the policy to RETRY + containerProps.setStartupFailureRetryTemplate(factoryProps.getStartupFailureRetryTemplate()); + containerProps.setStartupFailurePolicy(factoryProps.getStartupFailurePolicy()); var factoryTxnProps = factoryProps.transactions(); var containerTxnProps = containerProps.transactions(); containerTxnProps.setEnabled(factoryTxnProps.isEnabled()); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java index 436290cd..39f8c0af 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java @@ -24,6 +24,8 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.listener.PulsarContainerProperties; @@ -151,4 +153,58 @@ void factoryPropsUsedWhenSpecified() { } + @SuppressWarnings("unchecked") + @Nested + class StartupFailurePolicyFrom { + + @Test + void factoryPropsUsedWhenSpecified() { + var factoryProps = new PulsarContainerProperties(); + factoryProps.setStartupFailurePolicy(StartupFailurePolicy.CONTINUE); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getStartupFailurePolicy()) + .isEqualTo(StartupFailurePolicy.CONTINUE); + } + + @Test + void defaultUsedWhenNotSetOnFactoryProps() { + var factoryProps = new PulsarContainerProperties(); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getStartupFailurePolicy()) + .isEqualTo(StartupFailurePolicy.STOP); + } + + } + + @SuppressWarnings("unchecked") + @Nested + class StartupFailureRetryTemplateFrom { + + @Test + void factoryPropsUsedWhenSpecified() { + var factoryProps = new PulsarContainerProperties(); + var retryTemplate = new RetryTemplate(RetryPolicy.builder().maxRetries(3).build()); + factoryProps.setStartupFailureRetryTemplate(retryTemplate); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getStartupFailureRetryTemplate()) + .isSameAs(retryTemplate); + // Setting the retry template implicitly switches the policy to RETRY + assertThat(createdContainer.getContainerProperties().getStartupFailurePolicy()) + .isEqualTo(StartupFailurePolicy.RETRY); + } + + } + }