From 617255af88e245fe82fcdfe1480709410d6c5eb0 Mon Sep 17 00:00:00 2001 From: seonwoo_jung <79202163+seonwooj0810@users.noreply.github.com> Date: Fri, 5 Jun 2026 21:14:44 +0900 Subject: [PATCH] Propagate startup failure properties to created container ConcurrentPulsarListenerContainerFactory#createContainerInstance builds a fresh PulsarContainerProperties for each container and only copies a curated subset of the factory's container properties onto it. The startup failure policy and startup failure retry template were not among them, so a StartupFailurePolicy.RETRY (and a custom retry template) configured on the factory - for example via a PulsarContainerFactoryCustomizer, which is the documented way to opt into startup retries - was silently dropped and the container kept the default STOP policy. Copy the startup failure retry template and policy from the factory props to the container props. The retry template is set first because setting a non-null template implicitly switches the policy to RETRY; the policy is then set explicitly to mirror the factory exactly. Signed-off-by: seonwoo_jung <79202163+seonwooj0810@users.noreply.github.com> --- ...currentPulsarListenerContainerFactory.java | 4 ++ ...ntPulsarListenerContainerFactoryTests.java | 56 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java index 88c7830c..5f5dfcae 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java @@ -77,6 +77,10 @@ protected ConcurrentPulsarMessageListenerContainer createContainerInstance(Pu containerProps.setTopicResolver(factoryProps.getTopicResolver()); containerProps.setSubscriptionType(factoryProps.getSubscriptionType()); containerProps.setSubscriptionName(factoryProps.getSubscriptionName()); + // Set the retry template before the policy as setting a non-null template + // implicitly switches the policy to RETRY + containerProps.setStartupFailureRetryTemplate(factoryProps.getStartupFailureRetryTemplate()); + containerProps.setStartupFailurePolicy(factoryProps.getStartupFailurePolicy()); var factoryTxnProps = factoryProps.transactions(); var containerTxnProps = containerProps.transactions(); containerTxnProps.setEnabled(factoryTxnProps.isEnabled()); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java index 436290cd..39f8c0af 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java @@ -24,6 +24,8 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.springframework.core.retry.RetryPolicy; +import org.springframework.core.retry.RetryTemplate; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.listener.PulsarContainerProperties; @@ -151,4 +153,58 @@ void factoryPropsUsedWhenSpecified() { } + @SuppressWarnings("unchecked") + @Nested + class StartupFailurePolicyFrom { + + @Test + void factoryPropsUsedWhenSpecified() { + var factoryProps = new PulsarContainerProperties(); + factoryProps.setStartupFailurePolicy(StartupFailurePolicy.CONTINUE); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getStartupFailurePolicy()) + .isEqualTo(StartupFailurePolicy.CONTINUE); + } + + @Test + void defaultUsedWhenNotSetOnFactoryProps() { + var factoryProps = new PulsarContainerProperties(); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getStartupFailurePolicy()) + .isEqualTo(StartupFailurePolicy.STOP); + } + + } + + @SuppressWarnings("unchecked") + @Nested + class StartupFailureRetryTemplateFrom { + + @Test + void factoryPropsUsedWhenSpecified() { + var factoryProps = new PulsarContainerProperties(); + var retryTemplate = new RetryTemplate(RetryPolicy.builder().maxRetries(3).build()); + factoryProps.setStartupFailureRetryTemplate(retryTemplate); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createRegisteredContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getStartupFailureRetryTemplate()) + .isSameAs(retryTemplate); + // Setting the retry template implicitly switches the policy to RETRY + assertThat(createdContainer.getContainerProperties().getStartupFailurePolicy()) + .isEqualTo(StartupFailurePolicy.RETRY); + } + + } + }