Skip to content

Commit 2491e15

Browse files
authored
Add Spring Cloud Stream retry support to ServiceBus Binder (#47149)
1 parent 9f04e26 commit 2491e15

9 files changed

Lines changed: 578 additions & 3 deletions

File tree

sdk/spring/CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,17 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module.
99

1010
- Fixed JDBC/Azure Database and Redis passwordless connection scope defaulting using the wrong `azure.scopes` value for Azure China and Azure US Government when `spring.cloud.azure.profile.cloud-type` is set to `azure_china` or `azure_us_government`. The scopes are now correctly derived from the merged cloud type. ([#47096](https://github.com/Azure/azure-sdk-for-java/issues/47096))
1111

12+
### Spring Cloud Azure Stream Binder Service Bus
13+
This section includes changes in `spring-cloud-azure-stream-binder-servicebus` module.
14+
15+
#### Features Added
16+
17+
- Add support for Spring Cloud Stream consumer retry properties (`maxAttempts`, `backOffInitialInterval`,
18+
`backOffMaxInterval`, `backOffMultiplier`) to enable retry with exponential backoff for message processing
19+
failures. [#47135](https://github.com/Azure/azure-sdk-for-java/issues/47135).
20+
- Add support for injecting a custom `RetryTemplate` from Spring context for advanced retry scenarios.
21+
[#47135](https://github.com/Azure/azure-sdk-for-java/issues/47135).
22+
1223
## 7.2.0 (2026-04-17)
1324
- This release is compatible with Spring Boot 4.0.0-4.0.5. (Note: 4.0.x (x>5) should be supported, but they aren't tested with this release.)
1425
- This release is compatible with Spring Cloud 2025.1.0-2025.1.1. (Note: 2025.1.x (x>1) should be supported, but they aren't tested with this release.)

sdk/spring/spring-cloud-azure-stream-binder-servicebus/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@
5959
<version>4.0.5</version> <!-- {x-version-update;springboot4_org.springframework.boot:spring-boot-starter-actuator;external_dependency} -->
6060
<optional>true</optional>
6161
</dependency>
62+
<dependency>
63+
<groupId>org.springframework.retry</groupId>
64+
<artifactId>spring-retry</artifactId>
65+
<version>2.0.12</version> <!-- {x-version-update;springboot4_org.springframework.retry:spring-retry;external_dependency} -->
66+
</dependency>
6267
<!-- Added this dependency to include necessary annotations used by reactor core.
6368
Without this dependency, javadoc throws a warning as it cannot find enum When.MAYBE
6469
which is used in @Nullable annotation in reactor core classes -->
@@ -156,6 +161,7 @@
156161
<bannedDependencies>
157162
<includes>
158163
<include>org.springframework.boot:spring-boot-starter-actuator:[4.0.5]</include> <!-- {x-include-update;springboot4_org.springframework.boot:spring-boot-starter-actuator;external_dependency} -->
164+
<include>org.springframework.retry:spring-retry:[2.0.12]</include> <!-- {x-include-update;springboot4_org.springframework.retry:spring-retry;external_dependency} -->
159165
</includes>
160166
</bannedDependencies>
161167
</rules>

sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/ServiceBusMessageChannelBinder.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353
import org.springframework.messaging.MessageChannel;
5454
import org.springframework.messaging.MessageHandler;
5555
import org.springframework.messaging.support.ErrorMessage;
56+
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
57+
import org.springframework.retry.policy.SimpleRetryPolicy;
58+
import org.springframework.retry.support.RetryTemplate;
5659
import org.springframework.util.Assert;
5760
import org.springframework.util.StringUtils;
5861

@@ -78,7 +81,6 @@ public class ServiceBusMessageChannelBinder extends
7881
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusMessageChannelBinder.class);
7982
private static final DefaultErrorMessageStrategy DEFAULT_ERROR_MESSAGE_STRATEGY = new DefaultErrorMessageStrategy();
8083
private static final String EXCEPTION_MESSAGE = "exception-message";
81-
8284
private ServiceBusExtendedBindingProperties bindingProperties = new ServiceBusExtendedBindingProperties();
8385
private NamespaceProperties namespaceProperties;
8486
private ServiceBusTemplate serviceBusTemplate;
@@ -91,6 +93,7 @@ public class ServiceBusMessageChannelBinder extends
9193

9294
private final List<ServiceBusProducerFactoryCustomizer> producerFactoryCustomizers = new ArrayList<>();
9395
private final List<ServiceBusProcessorFactoryCustomizer> processorFactoryCustomizers = new ArrayList<>();
96+
private RetryTemplate retryTemplate;
9497

9598
/**
9699
* Construct a {@link ServiceBusMessageChannelBinder} with the specified headersToEmbed and {@link ServiceBusChannelProvisioner}.
@@ -148,6 +151,16 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
148151
inboundAdapter.setInstrumentationId(instrumentationId);
149152
inboundAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
150153
inboundAdapter.setMessageConverter(messageConverter);
154+
155+
// Configure retry only when retry is enabled by the consumer properties.
156+
if (shouldConfigureRetry(properties)) {
157+
// Once retry is enabled, use the injected RetryTemplate if available; otherwise create one from properties.
158+
RetryTemplate retryTemplateToUse = this.retryTemplate != null
159+
? this.retryTemplate
160+
: createRetryTemplate(properties);
161+
inboundAdapter.setRetryTemplate(retryTemplateToUse);
162+
}
163+
151164
return inboundAdapter;
152165
}
153166

@@ -377,4 +390,63 @@ public void addProcessorFactoryCustomizer(ServiceBusProcessorFactoryCustomizer p
377390
}
378391
}
379392

393+
/**
394+
* Set a custom retry template for message processing retries.
395+
* If not set, a retry template will be created automatically based on consumer properties when maxAttempts > 1.
396+
*
397+
* @param retryTemplate the retry template to use
398+
*/
399+
public void setRetryTemplate(RetryTemplate retryTemplate) {
400+
this.retryTemplate = retryTemplate;
401+
}
402+
403+
/**
404+
* Get the retry template configured for this binder.
405+
*
406+
* @return the retry template, or {@code null} if none has been set
407+
*/
408+
public RetryTemplate getRetryTemplate() {
409+
return this.retryTemplate;
410+
}
411+
412+
private boolean shouldConfigureRetry(ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
413+
return properties.getMaxAttempts() > 1;
414+
}
415+
416+
/**
417+
* Create a RetryTemplate based on the consumer properties.
418+
*
419+
* @param properties the extended consumer properties
420+
* @return the configured RetryTemplate
421+
*/
422+
private RetryTemplate createRetryTemplate(ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
423+
RetryTemplate retryTemplate = new RetryTemplate();
424+
425+
// Configure retry policy
426+
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
427+
retryPolicy.setMaxAttempts(properties.getMaxAttempts());
428+
retryTemplate.setRetryPolicy(retryPolicy);
429+
430+
// Configure backoff policy
431+
retryTemplate.setBackOffPolicy(createExponentialBackOffPolicy(properties));
432+
433+
return retryTemplate;
434+
}
435+
436+
/**
437+
* Create an {@link ExponentialBackOffPolicy} from the consumer properties.
438+
* Package-private to allow direct verification in tests without reflective access to RetryTemplate internals.
439+
*
440+
* @param properties the extended consumer properties
441+
* @return the configured ExponentialBackOffPolicy
442+
*/
443+
ExponentialBackOffPolicy createExponentialBackOffPolicy(
444+
ExtendedConsumerProperties<ServiceBusConsumerProperties> properties) {
445+
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
446+
backOffPolicy.setInitialInterval(properties.getBackOffInitialInterval());
447+
backOffPolicy.setMultiplier(properties.getBackOffMultiplier());
448+
backOffPolicy.setMaxInterval(properties.getBackOffMaxInterval());
449+
return backOffPolicy;
450+
}
451+
380452
}

sdk/spring/spring-cloud-azure-stream-binder-servicebus/src/main/java/com/azure/spring/cloud/stream/binder/servicebus/implementation/config/ServiceBusBinderConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.context.annotation.Bean;
3838
import org.springframework.context.annotation.Configuration;
3939
import org.springframework.context.annotation.Import;
40+
import org.springframework.retry.support.RetryTemplate;
4041

4142
import static com.azure.spring.cloud.autoconfigure.implementation.context.AzureContextUtils.DEFAULT_TOKEN_CREDENTIAL_BEAN_NAME;
4243

@@ -96,6 +97,7 @@ ServiceBusChannelProvisioner serviceBusChannelProvisioner() {
9697
* @param messageConverter the message converter.
9798
* @param producerFactoryCustomizers customizers to customize producer factories.
9899
* @param processorFactoryCustomizers customizers to customize processor factories.
100+
* @param retryTemplate optional custom retry template for message processing retries; must be a bean named {@code serviceBusRetryTemplate}.
99101
*
100102
* @return the {@link ServiceBusMessageChannelBinder} bean.
101103
*/
@@ -106,14 +108,16 @@ ServiceBusMessageChannelBinder serviceBusBinder(ServiceBusChannelProvisioner cha
106108
ObjectProvider<NamespaceProperties> namespaceProperties,
107109
ObjectProvider<ServiceBusMessageConverter> messageConverter,
108110
ObjectProvider<ServiceBusProducerFactoryCustomizer> producerFactoryCustomizers,
109-
ObjectProvider<ServiceBusProcessorFactoryCustomizer> processorFactoryCustomizers) {
111+
ObjectProvider<ServiceBusProcessorFactoryCustomizer> processorFactoryCustomizers,
112+
@Qualifier("serviceBusRetryTemplate") ObjectProvider<RetryTemplate> retryTemplate) {
110113

111114
ServiceBusMessageChannelBinder binder = new ServiceBusMessageChannelBinder(null, channelProvisioner);
112115
binder.setBindingProperties(bindingProperties);
113116
binder.setNamespaceProperties(namespaceProperties.getIfAvailable());
114117
binder.setMessageConverter(messageConverter.getIfAvailable());
115118
producerFactoryCustomizers.orderedStream().forEach(binder::addProducerFactoryCustomizer);
116119
processorFactoryCustomizers.orderedStream().forEach(binder::addProcessorFactoryCustomizer);
120+
retryTemplate.ifAvailable(binder::setRetryTemplate);
117121
return binder;
118122
}
119123

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.spring.cloud.stream.binder.servicebus.implementation;
4+
5+
import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType;
6+
import com.azure.spring.cloud.stream.binder.servicebus.core.implementation.provisioning.ServiceBusChannelProvisioner;
7+
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusBindingProperties;
8+
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusConsumerProperties;
9+
import com.azure.spring.cloud.stream.binder.servicebus.core.properties.ServiceBusExtendedBindingProperties;
10+
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
11+
import org.junit.jupiter.api.AfterEach;
12+
import org.junit.jupiter.api.BeforeEach;
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.extension.ExtendWith;
15+
import org.mockito.Mock;
16+
import org.mockito.junit.jupiter.MockitoExtension;
17+
import org.springframework.cloud.stream.binder.BinderHeaders;
18+
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
19+
import org.springframework.cloud.stream.binder.HeaderMode;
20+
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
21+
import org.springframework.context.support.GenericApplicationContext;
22+
import org.springframework.integration.core.MessageProducer;
23+
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
24+
import org.springframework.retry.support.RetryTemplate;
25+
26+
import java.time.Duration;
27+
import java.util.HashMap;
28+
import java.util.Map;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
31+
import static org.assertj.core.api.Assertions.assertThat;
32+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
33+
import static org.mockito.Mockito.when;
34+
35+
/**
36+
* Tests for retry functionality in ServiceBusMessageChannelBinder.
37+
*/
38+
@ExtendWith(MockitoExtension.class)
39+
class ServiceBusRetryTest {
40+
41+
@Mock
42+
private ConsumerDestination consumerDestination;
43+
44+
private final ServiceBusExtendedBindingProperties extendedBindingProperties =
45+
new ServiceBusExtendedBindingProperties();
46+
47+
private ExtendedConsumerProperties<ServiceBusConsumerProperties> consumerProperties;
48+
49+
private final ServiceBusConsumerProperties serviceBusConsumerProperties = new ServiceBusConsumerProperties();
50+
51+
private ServiceBusMessageChannelTestBinder binder;
52+
53+
private GenericApplicationContext applicationContext;
54+
55+
private static final String ENTITY_NAME = "test-entity";
56+
private static final String GROUP = "test";
57+
private static final String NAMESPACE_NAME = "test-namespace";
58+
59+
@BeforeEach
60+
void init() {
61+
binder = new ServiceBusMessageChannelTestBinder(
62+
BinderHeaders.STANDARD_HEADERS, new ServiceBusChannelProvisioner());
63+
applicationContext = new GenericApplicationContext();
64+
applicationContext.refresh();
65+
binder.setApplicationContext(applicationContext);
66+
}
67+
68+
@AfterEach
69+
void tearDown() {
70+
if (applicationContext != null) {
71+
applicationContext.close();
72+
}
73+
}
74+
75+
@Test
76+
void testRetryTemplateConfiguredWhenMaxAttemptsGreaterThanOne() {
77+
// Arrange
78+
prepareConsumerProperties();
79+
consumerProperties.setMaxAttempts(3);
80+
consumerProperties.setBackOffInitialInterval(1000);
81+
consumerProperties.setBackOffMultiplier(2.0);
82+
consumerProperties.setBackOffMaxInterval(5000);
83+
when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
84+
85+
// Act
86+
MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
87+
88+
// Assert
89+
assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
90+
ServiceBusInboundChannelAdapter adapter = (ServiceBusInboundChannelAdapter) producer;
91+
RetryTemplate retryTemplate = adapter.getRetryTemplate();
92+
assertThat(retryTemplate).isNotNull();
93+
94+
// Verify maxAttempts=3 by executing the template and counting actual attempts
95+
AtomicInteger callCount = new AtomicInteger(0);
96+
assertThatThrownBy(() -> retryTemplate.execute(ctx -> {
97+
callCount.incrementAndGet();
98+
throw new RuntimeException("test");
99+
})).isInstanceOf(RuntimeException.class);
100+
assertThat(callCount.get()).isEqualTo(3);
101+
102+
// Verify backoff policy configuration via the binder's factory method (no reflection needed)
103+
ExponentialBackOffPolicy backOffPolicy = binder.createExponentialBackOffPolicy(consumerProperties);
104+
assertThat(backOffPolicy.getInitialInterval()).isEqualTo(1000L);
105+
assertThat(backOffPolicy.getMultiplier()).isEqualTo(2.0);
106+
assertThat(backOffPolicy.getMaxInterval()).isEqualTo(5000L);
107+
}
108+
109+
@Test
110+
void testRetryTemplateNotConfiguredWhenMaxAttemptsIsOne() {
111+
// Arrange
112+
prepareConsumerProperties();
113+
consumerProperties.setMaxAttempts(1);
114+
when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
115+
116+
// Act
117+
MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
118+
119+
// Assert
120+
assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
121+
assertThat(((ServiceBusInboundChannelAdapter) producer).getRetryTemplate()).isNull();
122+
}
123+
124+
@Test
125+
void testRetryTemplateConfiguredWithDefaultSettings() {
126+
// Arrange
127+
prepareConsumerProperties();
128+
// Spring Cloud Stream default maxAttempts is 3 (> 1), so a RetryTemplate should be created.
129+
when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
130+
131+
// Act
132+
MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
133+
134+
// Assert
135+
assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
136+
ServiceBusInboundChannelAdapter adapter = (ServiceBusInboundChannelAdapter) producer;
137+
RetryTemplate retryTemplate = adapter.getRetryTemplate();
138+
assertThat(retryTemplate).isNotNull();
139+
140+
// Verify maxAttempts matches Spring Cloud Stream's default via observable behavior
141+
int expectedMaxAttempts = new ExtendedConsumerProperties<>(new ServiceBusConsumerProperties()).getMaxAttempts();
142+
AtomicInteger callCount = new AtomicInteger(0);
143+
assertThatThrownBy(() -> retryTemplate.execute(ctx -> {
144+
callCount.incrementAndGet();
145+
throw new RuntimeException("test");
146+
})).isInstanceOf(RuntimeException.class);
147+
assertThat(callCount.get()).isEqualTo(expectedMaxAttempts);
148+
}
149+
150+
@Test
151+
void testCustomRetryTemplateIsUsed() {
152+
// Arrange
153+
prepareConsumerProperties();
154+
consumerProperties.setMaxAttempts(3);
155+
when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
156+
157+
// Create a custom RetryTemplate
158+
RetryTemplate customRetryTemplate = new RetryTemplate();
159+
binder.setRetryTemplate(customRetryTemplate);
160+
161+
// Act
162+
MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
163+
164+
// Assert
165+
assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
166+
ServiceBusInboundChannelAdapter adapter = (ServiceBusInboundChannelAdapter) producer;
167+
assertThat(adapter.getRetryTemplate()).isNotNull();
168+
assertThat(adapter.getRetryTemplate()).isSameAs(customRetryTemplate);
169+
}
170+
171+
@Test
172+
void testCustomRetryTemplateNotAppliedWhenMaxAttemptsIsOne() {
173+
// Arrange: maxAttempts=1 disables retry even when a custom RetryTemplate bean is injected
174+
prepareConsumerProperties();
175+
consumerProperties.setMaxAttempts(1);
176+
when(consumerDestination.getName()).thenReturn(ENTITY_NAME);
177+
178+
RetryTemplate customRetryTemplate = new RetryTemplate();
179+
binder.setRetryTemplate(customRetryTemplate);
180+
181+
// Act
182+
MessageProducer producer = binder.createConsumerEndpoint(consumerDestination, GROUP, consumerProperties);
183+
184+
// Assert
185+
assertThat(producer).isInstanceOf(ServiceBusInboundChannelAdapter.class);
186+
assertThat(((ServiceBusInboundChannelAdapter) producer).getRetryTemplate()).isNull();
187+
}
188+
189+
private void prepareConsumerProperties() {
190+
serviceBusConsumerProperties.setEntityName(ENTITY_NAME);
191+
serviceBusConsumerProperties.setSubscriptionName(GROUP);
192+
serviceBusConsumerProperties.setEntityType(ServiceBusEntityType.TOPIC);
193+
serviceBusConsumerProperties.setNamespace(NAMESPACE_NAME);
194+
serviceBusConsumerProperties.getRetry().setTryTimeout(Duration.ofMinutes(5));
195+
serviceBusConsumerProperties.setAutoComplete(false);
196+
ServiceBusBindingProperties bindingProperties = new ServiceBusBindingProperties();
197+
bindingProperties.setConsumer(serviceBusConsumerProperties);
198+
199+
Map<String, ServiceBusBindingProperties> bindings = new HashMap<>();
200+
bindings.put(ENTITY_NAME, bindingProperties);
201+
extendedBindingProperties.setBindings(bindings);
202+
binder.setBindingProperties(extendedBindingProperties);
203+
204+
consumerProperties = new ExtendedConsumerProperties<>(serviceBusConsumerProperties);
205+
consumerProperties.setHeaderMode(HeaderMode.embeddedHeaders);
206+
}
207+
}

0 commit comments

Comments
 (0)