Skip to content

Commit cdfaa34

Browse files
committed
GH-3419: Make compatible with RabbitMQ 4.3
Fixes: #3419 * The `queue-master-locator` is failing now. * Migrate `Queue` and `QueueBuilder` to use `x-queaue-leader-locator` property supported since RabbitMQ 3.13 * Deprecate (for removal) `LeaderLocator.minLeaders` and `random` in favor of newly added `balanced` for the `queaue-leader-locator` feature * Fix tests to not create non-durable-non-exclusive queues as this feature is failing now. Mostly creating `durable` queues instead (cherry picked from commit 303137f)
1 parent 20c5e4d commit cdfaa34

11 files changed

Lines changed: 59 additions & 44 deletions

spring-amqp/src/main/java/org/springframework/amqp/core/Queue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class Queue extends AbstractDeclarable implements Cloneable {
3636
* Argument key for the queue leader locator.
3737
* @since 2.1
3838
*/
39-
public static final String X_QUEUE_LEADER_LOCATOR = "x-queue-master-locator";
39+
public static final String X_QUEUE_LEADER_LOCATOR = "x-queaue-leader-locator";
4040

4141
private final String name;
4242

spring-amqp/src/main/java/org/springframework/amqp/core/QueueBuilder.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ public QueueBuilder lazy() {
232232
* @since 2.2
233233
*/
234234
public QueueBuilder leaderLocator(LeaderLocator locator) {
235-
return withArgument("x-queue-master-locator", locator.getValue());
235+
return withArgument(Queue.X_QUEUE_LEADER_LOCATOR, locator.getValue());
236236
}
237237

238238
/**
@@ -331,7 +331,11 @@ public enum LeaderLocator {
331331

332332
/**
333333
* Deploy on the node with the fewest queue leaders.
334+
/**
335+
* Deploy on a random node.
336+
* @deprecated in favor of {@link #balanced}
334337
*/
338+
@Deprecated(since = "3.2.11", forRemoval = true)
335339
minLeaders("min-masters"),
336340

337341
/**
@@ -341,8 +345,17 @@ public enum LeaderLocator {
341345

342346
/**
343347
* Deploy on a random node.
348+
* @deprecated in favor of {@link #balanced}
349+
*/
350+
@Deprecated(since = "3.2.11", forRemoval = true)
351+
random("random"),
352+
353+
/**
354+
* The leader is placed on the node that currently hosts
355+
* the minimum number of quorum queue leaders to achieve an even distribution.
356+
* @since 3.2.11
344357
*/
345-
random("random");
358+
balanced("balanced");
346359

347360
private final String value;
348361

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/ConsumerBatchingTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public Listener listener() {
213213

214214
@Bean
215215
public org.springframework.amqp.core.Queue batch3() {
216-
return QueueBuilder.nonDurable("c.batch.3")
216+
return QueueBuilder.durable("c.batch.3")
217217
.autoDelete()
218218
.deadLetterExchange("")
219219
.deadLetterRoutingKey("c.batch.3.dlq")
@@ -222,14 +222,14 @@ public org.springframework.amqp.core.Queue batch3() {
222222

223223
@Bean
224224
public org.springframework.amqp.core.Queue batch3Dlq() {
225-
return QueueBuilder.nonDurable("c.batch.3.dlq")
225+
return QueueBuilder.durable("c.batch.3.dlq")
226226
.autoDelete()
227227
.build();
228228
}
229229

230230
@Bean
231231
public org.springframework.amqp.core.Queue batch4() {
232-
return QueueBuilder.nonDurable("c.batch.4")
232+
return QueueBuilder.durable("c.batch.4")
233233
.autoDelete()
234234
.deadLetterExchange("")
235235
.deadLetterRoutingKey("c.batch.4.dlq")
@@ -238,14 +238,14 @@ public org.springframework.amqp.core.Queue batch4() {
238238

239239
@Bean
240240
public org.springframework.amqp.core.Queue batch4Dlq() {
241-
return QueueBuilder.nonDurable("c.batch.4.dlq")
241+
return QueueBuilder.durable("c.batch.4.dlq")
242242
.autoDelete()
243243
.build();
244244
}
245245

246246
@Bean
247247
public org.springframework.amqp.core.Queue batch5() {
248-
return QueueBuilder.nonDurable("c.batch.5")
248+
return QueueBuilder.durable("c.batch.5")
249249
.autoDelete()
250250
.deadLetterExchange("")
251251
.deadLetterRoutingKey("c.batch.5.dlq")
@@ -254,7 +254,7 @@ public org.springframework.amqp.core.Queue batch5() {
254254

255255
@Bean
256256
public org.springframework.amqp.core.Queue batch5Dlq() {
257-
return QueueBuilder.nonDurable("c.batch.5.dlq")
257+
return QueueBuilder.durable("c.batch.5.dlq")
258258
.autoDelete()
259259
.build();
260260
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2030,12 +2030,12 @@ public TxClassLevel txClassLevel() {
20302030

20312031
@Bean
20322032
public org.springframework.amqp.core.Queue sendToReplies() {
2033-
return new org.springframework.amqp.core.Queue(sendToRepliesBean(), false, false, false);
2033+
return new org.springframework.amqp.core.Queue(sendToRepliesBean(), true, false, false);
20342034
}
20352035

20362036
@Bean
20372037
public org.springframework.amqp.core.Queue sendToRepliesSpEL() {
2038-
return new org.springframework.amqp.core.Queue(sendToRepliesSpELBean(), false, false, true);
2038+
return new org.springframework.amqp.core.Queue(sendToRepliesSpELBean(), true, false, true);
20392039
}
20402040

20412041
@Bean

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ void testQueueArgs1() throws MalformedURLException, URISyntaxException, Interrup
9595
assertThat(arguments.get("x-dead-letter-routing-key")).isEqualTo("reply.dlrk");
9696
assertThat(arguments.get("x-max-priority")).isEqualTo(4);
9797
assertThat(arguments.get("x-queue-mode")).isEqualTo("lazy");
98-
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.minLeaders.getValue());
98+
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.balanced.getValue());
9999
assertThat(arguments.get("x-single-active-consumer")).isEqualTo(Boolean.TRUE);
100100
}
101101

@@ -128,7 +128,7 @@ void testQueueArgs3() throws URISyntaxException {
128128
assertThat(arguments.get("x-dead-letter-routing-key")).isEqualTo("reply.dlrk");
129129
assertThat(arguments.get("x-max-priority")).isEqualTo(4);
130130
assertThat(arguments.get("x-queue-mode")).isEqualTo("lazy");
131-
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.random.getValue());
131+
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.balanced.getValue());
132132

133133
Map<String, Object> exchange = exchangeInfo("dlx.test.requestEx");
134134
assertThat(arguments(exchange).get("alternate-exchange")).isEqualTo("alternate");
@@ -237,7 +237,7 @@ public DirectExchange dlx() {
237237
*/
238238
@Bean
239239
public Queue requestQueue() {
240-
return QueueBuilder.nonDurable("dlx.test.requestQ")
240+
return QueueBuilder.durable("dlx.test.requestQ")
241241
.autoDelete()
242242
.build();
243243
}
@@ -247,7 +247,7 @@ public Queue requestQueue() {
247247
*/
248248
@Bean
249249
public Queue replyQueue() {
250-
return QueueBuilder.nonDurable("dlx.test.replyQ")
250+
return QueueBuilder.durable("dlx.test.replyQ")
251251
.autoDelete()
252252
.withArgument("x-dead-letter-exchange", "reply.dlx")
253253
.build();
@@ -258,14 +258,14 @@ public Queue replyQueue() {
258258
*/
259259
@Bean
260260
public Queue dlq() {
261-
return QueueBuilder.nonDurable("dlx.test.DLQ")
261+
return QueueBuilder.durable("dlx.test.DLQ")
262262
.autoDelete()
263263
.build();
264264
}
265265

266266
@Bean
267267
public Queue allArgs1() {
268-
return QueueBuilder.nonDurable("all.args.1")
268+
return QueueBuilder.durable("all.args.1")
269269
.ttl(1000)
270270
.expires(200_000)
271271
.maxLength(42L)
@@ -275,14 +275,14 @@ public Queue allArgs1() {
275275
.deadLetterRoutingKey("reply.dlrk")
276276
.maxPriority(4)
277277
.lazy()
278-
.leaderLocator(LeaderLocator.minLeaders)
278+
.leaderLocator(LeaderLocator.balanced)
279279
.singleActiveConsumer()
280280
.build();
281281
}
282282

283283
@Bean
284284
public Queue allArgs2() {
285-
return QueueBuilder.nonDurable("all.args.2")
285+
return QueueBuilder.durable("all.args.2")
286286
.ttl(1000)
287287
.expires(200_000)
288288
.maxLength(42L)
@@ -298,7 +298,7 @@ public Queue allArgs2() {
298298

299299
@Bean
300300
public Queue allArgs3() {
301-
return QueueBuilder.nonDurable("all.args.3")
301+
return QueueBuilder.durable("all.args.3")
302302
.ttl(1000)
303303
.expires(200_000)
304304
.maxLength(42L)
@@ -308,7 +308,7 @@ public Queue allArgs3() {
308308
.deadLetterRoutingKey("reply.dlrk")
309309
.maxPriority(4)
310310
.lazy()
311-
.leaderLocator(LeaderLocator.random)
311+
.leaderLocator(LeaderLocator.balanced)
312312
.build();
313313
}
314314

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void testDoubleDeclarationOfAutodeleteQueue() {
140140
CachingConnectionFactory connectionFactory2 = new CachingConnectionFactory();
141141
connectionFactory2.setHost("localhost");
142142
connectionFactory2.setPort(BrokerTestUtils.getPort());
143-
Queue queue = new Queue("test.queue", false, false, true);
143+
Queue queue = new Queue("test.queue", true, false, true);
144144
new RabbitAdmin(connectionFactory1).declareQueue(queue);
145145
new RabbitAdmin(connectionFactory2).declareQueue(queue);
146146
connectionFactory1.destroy();
@@ -175,7 +175,7 @@ public void testQueueWithAutoDelete() throws Exception {
175175
@Test
176176
public void testQueueWithoutAutoDelete() throws Exception {
177177

178-
final Queue queue = new Queue("test.queue", false, false, false);
178+
final Queue queue = new Queue("test.queue", true, false, false);
179179
context.getBeanFactory().registerSingleton("foo", queue);
180180
rabbitAdmin.afterPropertiesSet();
181181

@@ -268,7 +268,7 @@ public void testDeleteExchangeWithInternalOption() throws Exception {
268268
public void testDeclareBindingWithDefaultExchangeImplicitBinding() throws Exception {
269269
Exchange exchange = new DirectExchange(RabbitAdmin.DEFAULT_EXCHANGE_NAME);
270270
String queueName = "test.queue";
271-
final Queue queue = new Queue(queueName, false, false, false);
271+
final Queue queue = new Queue(queueName, true, false, false);
272272
rabbitAdmin.declareQueue(queue);
273273
Binding binding = new Binding(queueName, DestinationType.QUEUE, exchange.getName(), queueName, null);
274274

@@ -283,7 +283,7 @@ public void testSpringWithDefaultExchangeImplicitBinding() throws Exception {
283283
Exchange exchange = new DirectExchange(RabbitAdmin.DEFAULT_EXCHANGE_NAME);
284284
context.getBeanFactory().registerSingleton("foo", exchange);
285285
String queueName = "test.queue";
286-
final Queue queue = new Queue(queueName, false, false, false);
286+
final Queue queue = new Queue(queueName, true, false, false);
287287
context.getBeanFactory().registerSingleton("bar", queue);
288288
Binding binding = new Binding(queueName, DestinationType.QUEUE, exchange.getName(), queueName, null);
289289
context.getBeanFactory().registerSingleton("baz", binding);
@@ -298,7 +298,7 @@ public void testSpringWithDefaultExchangeImplicitBinding() throws Exception {
298298
@Test
299299
public void testRemoveBindingWithDefaultExchangeImplicitBinding() {
300300
String queueName = "test.queue";
301-
final Queue queue = new Queue(queueName, false, false, false);
301+
final Queue queue = new Queue(queueName, true, false, false);
302302
rabbitAdmin.declareQueue(queue);
303303
Binding binding = new Binding(queueName, DestinationType.QUEUE, RabbitAdmin.DEFAULT_EXCHANGE_NAME, queueName, null);
304304

@@ -311,7 +311,7 @@ public void testRemoveBindingWithDefaultExchangeImplicitBinding() {
311311
public void testDeclareBindingWithDefaultExchangeNonImplicitBinding() {
312312
Exchange exchange = new DirectExchange(RabbitAdmin.DEFAULT_EXCHANGE_NAME);
313313
String queueName = "test.queue";
314-
final Queue queue = new Queue(queueName, false, false, false);
314+
final Queue queue = new Queue(queueName, true, false, false);
315315
rabbitAdmin.declareQueue(queue);
316316
Binding binding = new Binding(queueName, DestinationType.QUEUE, exchange.getName(), "test.routingKey", null);
317317

@@ -335,7 +335,7 @@ public void testSpringWithDefaultExchangeNonImplicitBinding() {
335335
Exchange exchange = new DirectExchange(RabbitAdmin.DEFAULT_EXCHANGE_NAME);
336336
context.getBeanFactory().registerSingleton("foo", exchange);
337337
String queueName = "test.queue";
338-
final Queue queue = new Queue(queueName, false, false, false);
338+
final Queue queue = new Queue(queueName, true, false, false);
339339
context.getBeanFactory().registerSingleton("bar", queue);
340340
Binding binding = new Binding(queueName, DestinationType.QUEUE, exchange.getName(), "test.routingKey", null);
341341
context.getBeanFactory().registerSingleton("baz", binding);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,10 @@ public void testTemporaryLogs() {
176176
try {
177177
ApplicationContext ctx = mock();
178178
Map<String, Queue> queues = new HashMap<>();
179-
queues.put("nonDurQ", new Queue("testq.nonDur", false, false, false));
179+
queues.put("nonDurQ", new Queue("testq.nonDur", false, true, false));
180180
queues.put("adQ", new Queue("testq.ad", true, false, true));
181181
queues.put("exclQ", new Queue("testq.excl", true, true, false));
182-
queues.put("allQ", new Queue("testq.all", false, true, true));
182+
queues.put("allQ", new Queue("testq.all", true, true, true));
183183
given(ctx.getBeansOfType(Queue.class, false, false)).willReturn(queues);
184184
Map<String, Exchange> exchanges = new HashMap<>();
185185
exchanges.put("nonDurEx", new DirectExchange("testex.nonDur", false, false));
@@ -201,9 +201,9 @@ public void testTemporaryLogs() {
201201
assertThat(logs.get(1)).contains("(testex.all) durable:false, auto-delete:true");
202202
assertThat(logs.get(2)).contains("(testex.nonDur) durable:false, auto-delete:false");
203203
assertThat(logs.get(3)).contains("(testq.ad) durable:true, auto-delete:true, exclusive:false");
204-
assertThat(logs.get(4)).contains("(testq.all) durable:false, auto-delete:true, exclusive:true");
204+
assertThat(logs.get(4)).contains("(testq.all) durable:true, auto-delete:true, exclusive:true");
205205
assertThat(logs.get(5)).contains("(testq.excl) durable:true, auto-delete:false, exclusive:true");
206-
assertThat(logs.get(6)).contains("(testq.nonDur) durable:false, auto-delete:false, exclusive:false");
206+
assertThat(logs.get(6)).contains("(testq.nonDur) durable:false, auto-delete:false, exclusive:true");
207207
}
208208
finally {
209209
cleanQueuesAndExchanges(rabbitAdmin);
@@ -520,7 +520,7 @@ public DirectExchange e1() {
520520

521521
@Bean
522522
public Queue q1() {
523-
return new Queue("q1", false, false, true);
523+
return new Queue("q1", true, false, true);
524524
}
525525

526526
@Bean
@@ -538,14 +538,14 @@ public Declarables es() {
538538
@Bean
539539
public Declarables qs() {
540540
return new Declarables(
541-
new Queue("q2", false, false, true),
542-
new Queue("q3", false, false, true));
541+
new Queue("q2", true, false, true),
542+
new Queue("q3", true, false, true));
543543
}
544544

545545
@Bean
546546
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
547547
public Declarables prototypes() {
548-
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
548+
return new Declarables(new Queue(this.prototypeQueueName, true, false, true));
549549
}
550550

551551
@Bean
@@ -559,7 +559,7 @@ public Declarables bs() {
559559
public Declarables ds() {
560560
return new Declarables(
561561
new DirectExchange("e4", false, true),
562-
new Queue("q4", false, false, true),
562+
new Queue("q4", true, false, true),
563563
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
564564
}
565565

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegration1Tests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,7 @@ public void testPublisherCallbackChannelImplCloseWithPending() throws Exception
871871
public void testWithFuture() throws Exception {
872872
RabbitAdmin admin = new RabbitAdmin(this.connectionFactory);
873873
Queue queue = QueueBuilder.nonDurable()
874+
.exclusive()
874875
.autoDelete()
875876
.maxLength(1L)
876877
.overflow(Overflow.rejectPublish)

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerInitializationTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void testMismatchedQueueDuringRestart() throws Exception {
8787
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
8888
admin.deleteQueue(TEST_MISMATCH);
8989
assertThat(latches[0].await(20, TimeUnit.SECONDS)).isTrue();
90-
admin.declareQueue(new Queue(TEST_MISMATCH, false, false, true));
90+
admin.declareQueue(new Queue(TEST_MISMATCH, true, false, true));
9191
latches[2].countDown(); // let container thread continue to enable restart
9292
assertThat(latches[1].await(20, TimeUnit.SECONDS)).isTrue();
9393
SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
@@ -102,7 +102,7 @@ public void testMismatchedQueueDuringRestartMultiQueue() throws Exception {
102102
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
103103
admin.deleteQueue(TEST_MISMATCH);
104104
assertThat(latches[0].await(20, TimeUnit.SECONDS)).isTrue();
105-
admin.declareQueue(new Queue(TEST_MISMATCH, false, false, true));
105+
admin.declareQueue(new Queue(TEST_MISMATCH, true, false, true));
106106
latches[2].countDown(); // let container thread continue to enable restart
107107
assertThat(latches[1].await(20, TimeUnit.SECONDS)).isTrue();
108108
SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
@@ -158,7 +158,7 @@ public void handleMessage(Message m) {
158158

159159
@Bean
160160
public Queue queue() {
161-
return new Queue(TEST_MISMATCH, false, false, true); // mismatched
161+
return new Queue(TEST_MISMATCH, true, false, true);
162162
}
163163

164164
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DlqExpiryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
9595

9696
@Bean
9797
public Queue main() {
98-
return QueueBuilder.nonDurable("test.expiry.main").autoDelete()
98+
return QueueBuilder.durable("test.expiry.main").autoDelete()
9999
.withArgument("x-dead-letter-exchange", "")
100100
.withArgument("x-dead-letter-routing-key", "test.expiry.dlq")
101101
.build();
102102
}
103103

104104
@Bean
105105
public Queue dlq() {
106-
return QueueBuilder.nonDurable("test.expiry.dlq").autoDelete()
106+
return QueueBuilder.durable("test.expiry.dlq").autoDelete()
107107
.withArgument("x-dead-letter-exchange", "")
108108
.withArgument("x-dead-letter-routing-key", "test.expiry.main")
109109
.withArgument("x-message-ttl", 100)

0 commit comments

Comments
 (0)