Skip to content

Commit fa04f35

Browse files
artembilanspring-builds
authored andcommitted
GH-3419: Make compatible with RabbitMQ 4.3
Fixes: #3419 * The `queue-master-locator` is failing now. * Migrate `Queue` and `QueueBuilder` to use `x-queaue-leader-locator` property supported since RabbitMQ 3.13 * Deprecate (for removal) `LeaderLocator.minLeaders` and `random` in favor of newly added `balanced` for the `queaue-leader-locator` feature * Fix tests to not create non-durable-non-exclusive queues as this feature is failing now. Mostly creating `durable` queues instead (cherry picked from commit 303137f)
1 parent 8ae21a1 commit fa04f35

11 files changed

Lines changed: 59 additions & 44 deletions

spring-amqp/src/main/java/org/springframework/amqp/core/Queue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class Queue extends AbstractDeclarable implements Cloneable {
3838
* Argument key for the queue leader locator.
3939
* @since 2.1
4040
*/
41-
public static final String X_QUEUE_LEADER_LOCATOR = "x-queue-master-locator";
41+
public static final String X_QUEUE_LEADER_LOCATOR = "x-queaue-leader-locator";
4242

4343
private final String name;
4444

spring-amqp/src/main/java/org/springframework/amqp/core/QueueBuilder.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ public QueueBuilder lazy() {
232232
* @since 2.2
233233
*/
234234
public QueueBuilder leaderLocator(LeaderLocator locator) {
235-
return withArgument("x-queue-master-locator", locator.getValue());
235+
return withArgument(Queue.X_QUEUE_LEADER_LOCATOR, locator.getValue());
236236
}
237237

238238
/**
@@ -331,7 +331,11 @@ public enum LeaderLocator {
331331

332332
/**
333333
* Deploy on the node with the fewest queue leaders.
334+
/**
335+
* Deploy on a random node.
336+
* @deprecated in favor of {@link #balanced}
334337
*/
338+
@Deprecated(since = "3.2.11", forRemoval = true)
335339
minLeaders("min-masters"),
336340

337341
/**
@@ -341,8 +345,17 @@ public enum LeaderLocator {
341345

342346
/**
343347
* Deploy on a random node.
348+
* @deprecated in favor of {@link #balanced}
349+
*/
350+
@Deprecated(since = "3.2.11", forRemoval = true)
351+
random("random"),
352+
353+
/**
354+
* The leader is placed on the node that currently hosts
355+
* the minimum number of quorum queue leaders to achieve an even distribution.
356+
* @since 3.2.11
344357
*/
345-
random("random");
358+
balanced("balanced");
346359

347360
private final String value;
348361

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/ConsumerBatchingTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public Listener listener() {
213213

214214
@Bean
215215
public org.springframework.amqp.core.Queue batch3() {
216-
return QueueBuilder.nonDurable("c.batch.3")
216+
return QueueBuilder.durable("c.batch.3")
217217
.autoDelete()
218218
.deadLetterExchange("")
219219
.deadLetterRoutingKey("c.batch.3.dlq")
@@ -222,14 +222,14 @@ public org.springframework.amqp.core.Queue batch3() {
222222

223223
@Bean
224224
public org.springframework.amqp.core.Queue batch3Dlq() {
225-
return QueueBuilder.nonDurable("c.batch.3.dlq")
225+
return QueueBuilder.durable("c.batch.3.dlq")
226226
.autoDelete()
227227
.build();
228228
}
229229

230230
@Bean
231231
public org.springframework.amqp.core.Queue batch4() {
232-
return QueueBuilder.nonDurable("c.batch.4")
232+
return QueueBuilder.durable("c.batch.4")
233233
.autoDelete()
234234
.deadLetterExchange("")
235235
.deadLetterRoutingKey("c.batch.4.dlq")
@@ -238,14 +238,14 @@ public org.springframework.amqp.core.Queue batch4() {
238238

239239
@Bean
240240
public org.springframework.amqp.core.Queue batch4Dlq() {
241-
return QueueBuilder.nonDurable("c.batch.4.dlq")
241+
return QueueBuilder.durable("c.batch.4.dlq")
242242
.autoDelete()
243243
.build();
244244
}
245245

246246
@Bean
247247
public org.springframework.amqp.core.Queue batch5() {
248-
return QueueBuilder.nonDurable("c.batch.5")
248+
return QueueBuilder.durable("c.batch.5")
249249
.autoDelete()
250250
.deadLetterExchange("")
251251
.deadLetterRoutingKey("c.batch.5.dlq")
@@ -254,7 +254,7 @@ public org.springframework.amqp.core.Queue batch5() {
254254

255255
@Bean
256256
public org.springframework.amqp.core.Queue batch5Dlq() {
257-
return QueueBuilder.nonDurable("c.batch.5.dlq")
257+
return QueueBuilder.durable("c.batch.5.dlq")
258258
.autoDelete()
259259
.build();
260260
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2027,12 +2027,12 @@ public TxClassLevel txClassLevel() {
20272027

20282028
@Bean
20292029
public org.springframework.amqp.core.Queue sendToReplies() {
2030-
return new org.springframework.amqp.core.Queue(sendToRepliesBean(), false, false, false);
2030+
return new org.springframework.amqp.core.Queue(sendToRepliesBean(), true, false, false);
20312031
}
20322032

20332033
@Bean
20342034
public org.springframework.amqp.core.Queue sendToRepliesSpEL() {
2035-
return new org.springframework.amqp.core.Queue(sendToRepliesSpELBean(), false, false, true);
2035+
return new org.springframework.amqp.core.Queue(sendToRepliesSpELBean(), true, false, true);
20362036
}
20372037

20382038
@Bean

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ void testQueueArgs1() throws MalformedURLException, URISyntaxException, Interrup
9595
assertThat(arguments.get("x-dead-letter-routing-key")).isEqualTo("reply.dlrk");
9696
assertThat(arguments.get("x-max-priority")).isEqualTo(4);
9797
assertThat(arguments.get("x-queue-mode")).isEqualTo("lazy");
98-
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.minLeaders.getValue());
98+
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.balanced.getValue());
9999
assertThat(arguments.get("x-single-active-consumer")).isEqualTo(Boolean.TRUE);
100100
}
101101

@@ -128,7 +128,7 @@ void testQueueArgs3() throws URISyntaxException {
128128
assertThat(arguments.get("x-dead-letter-routing-key")).isEqualTo("reply.dlrk");
129129
assertThat(arguments.get("x-max-priority")).isEqualTo(4);
130130
assertThat(arguments.get("x-queue-mode")).isEqualTo("lazy");
131-
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.random.getValue());
131+
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.balanced.getValue());
132132

133133
Map<String, Object> exchange = exchangeInfo("dlx.test.requestEx");
134134
assertThat(arguments(exchange).get("alternate-exchange")).isEqualTo("alternate");
@@ -237,7 +237,7 @@ public DirectExchange dlx() {
237237
*/
238238
@Bean
239239
public Queue requestQueue() {
240-
return QueueBuilder.nonDurable("dlx.test.requestQ")
240+
return QueueBuilder.durable("dlx.test.requestQ")
241241
.autoDelete()
242242
.build();
243243
}
@@ -247,7 +247,7 @@ public Queue requestQueue() {
247247
*/
248248
@Bean
249249
public Queue replyQueue() {
250-
return QueueBuilder.nonDurable("dlx.test.replyQ")
250+
return QueueBuilder.durable("dlx.test.replyQ")
251251
.autoDelete()
252252
.withArgument("x-dead-letter-exchange", "reply.dlx")
253253
.build();
@@ -258,14 +258,14 @@ public Queue replyQueue() {
258258
*/
259259
@Bean
260260
public Queue dlq() {
261-
return QueueBuilder.nonDurable("dlx.test.DLQ")
261+
return QueueBuilder.durable("dlx.test.DLQ")
262262
.autoDelete()
263263
.build();
264264
}
265265

266266
@Bean
267267
public Queue allArgs1() {
268-
return QueueBuilder.nonDurable("all.args.1")
268+
return QueueBuilder.durable("all.args.1")
269269
.ttl(1000)
270270
.expires(200_000)
271271
.maxLength(42L)
@@ -275,14 +275,14 @@ public Queue allArgs1() {
275275
.deadLetterRoutingKey("reply.dlrk")
276276
.maxPriority(4)
277277
.lazy()
278-
.leaderLocator(LeaderLocator.minLeaders)
278+
.leaderLocator(LeaderLocator.balanced)
279279
.singleActiveConsumer()
280280
.build();
281281
}
282282

283283
@Bean
284284
public Queue allArgs2() {
285-
return QueueBuilder.nonDurable("all.args.2")
285+
return QueueBuilder.durable("all.args.2")
286286
.ttl(1000)
287287
.expires(200_000)
288288
.maxLength(42L)
@@ -298,7 +298,7 @@ public Queue allArgs2() {
298298

299299
@Bean
300300
public Queue allArgs3() {
301-
return QueueBuilder.nonDurable("all.args.3")
301+
return QueueBuilder.durable("all.args.3")
302302
.ttl(1000)
303303
.expires(200_000)
304304
.maxLength(42L)
@@ -308,7 +308,7 @@ public Queue allArgs3() {
308308
.deadLetterRoutingKey("reply.dlrk")
309309
.maxPriority(4)
310310
.lazy()
311-
.leaderLocator(LeaderLocator.random)
311+
.leaderLocator(LeaderLocator.balanced)
312312
.build();
313313
}
314314

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void testDoubleDeclarationOfAutodeleteQueue() {
141141
CachingConnectionFactory connectionFactory2 = new CachingConnectionFactory();
142142
connectionFactory2.setHost("localhost");
143143
connectionFactory2.setPort(BrokerTestUtils.getPort());
144-
Queue queue = new Queue("test.queue", false, false, true);
144+
Queue queue = new Queue("test.queue", true, false, true);
145145
new RabbitAdmin(connectionFactory1).declareQueue(queue);
146146
new RabbitAdmin(connectionFactory2).declareQueue(queue);
147147
connectionFactory1.destroy();
@@ -176,7 +176,7 @@ public void testQueueWithAutoDelete() throws Exception {
176176
@Test
177177
public void testQueueWithoutAutoDelete() throws Exception {
178178

179-
final Queue queue = new Queue("test.queue", false, false, false);
179+
final Queue queue = new Queue("test.queue", true, false, false);
180180
context.getBeanFactory().registerSingleton("foo", queue);
181181
rabbitAdmin.afterPropertiesSet();
182182

@@ -269,7 +269,7 @@ public void testDeleteExchangeWithInternalOption() throws Exception {
269269
public void testDeclareBindingWithDefaultExchangeImplicitBinding() throws Exception {
270270
Exchange exchange = new DirectExchange(RabbitAdmin.DEFAULT_EXCHANGE_NAME);
271271
String queueName = "test.queue";
272-
final Queue queue = new Queue(queueName, false, false, false);
272+
final Queue queue = new Queue(queueName, true, false, false);
273273
rabbitAdmin.declareQueue(queue);
274274
Binding binding = new Binding(queueName, DestinationType.QUEUE, exchange.getName(), queueName, null);
275275

@@ -284,7 +284,7 @@ public void testSpringWithDefaultExchangeImplicitBinding() throws Exception {
284284
Exchange exchange = new DirectExchange(RabbitAdmin.DEFAULT_EXCHANGE_NAME);
285285
context.getBeanFactory().registerSingleton("foo", exchange);
286286
String queueName = "test.queue";
287-
final Queue queue = new Queue(queueName, false, false, false);
287+
final Queue queue = new Queue(queueName, true, false, false);
288288
context.getBeanFactory().registerSingleton("bar", queue);
289289
Binding binding = new Binding(queueName, DestinationType.QUEUE, exchange.getName(), queueName, null);
290290
context.getBeanFactory().registerSingleton("baz", binding);
@@ -299,7 +299,7 @@ public void testSpringWithDefaultExchangeImplicitBinding() throws Exception {
299299
@Test
300300
public void testRemoveBindingWithDefaultExchangeImplicitBinding() {
301301
String queueName = "test.queue";
302-
final Queue queue = new Queue(queueName, false, false, false);
302+
final Queue queue = new Queue(queueName, true, false, false);
303303
rabbitAdmin.declareQueue(queue);
304304
Binding binding = new Binding(queueName, DestinationType.QUEUE, RabbitAdmin.DEFAULT_EXCHANGE_NAME, queueName, null);
305305

@@ -312,7 +312,7 @@ public void testRemoveBindingWithDefaultExchangeImplicitBinding() {
312312
public void testDeclareBindingWithDefaultExchangeNonImplicitBinding() {
313313
Exchange exchange = new DirectExchange(RabbitAdmin.DEFAULT_EXCHANGE_NAME);
314314
String queueName = "test.queue";
315-
final Queue queue = new Queue(queueName, false, false, false);
315+
final Queue queue = new Queue(queueName, true, false, false);
316316
rabbitAdmin.declareQueue(queue);
317317
Binding binding = new Binding(queueName, DestinationType.QUEUE, exchange.getName(), "test.routingKey", null);
318318

@@ -336,7 +336,7 @@ public void testSpringWithDefaultExchangeNonImplicitBinding() {
336336
Exchange exchange = new DirectExchange(RabbitAdmin.DEFAULT_EXCHANGE_NAME);
337337
context.getBeanFactory().registerSingleton("foo", exchange);
338338
String queueName = "test.queue";
339-
final Queue queue = new Queue(queueName, false, false, false);
339+
final Queue queue = new Queue(queueName, true, false, false);
340340
context.getBeanFactory().registerSingleton("bar", queue);
341341
Binding binding = new Binding(queueName, DestinationType.QUEUE, exchange.getName(), "test.routingKey", null);
342342
context.getBeanFactory().registerSingleton("baz", binding);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,10 @@ public void testTemporaryLogs() {
173173
try {
174174
ApplicationContext ctx = mock();
175175
Map<String, Queue> queues = new HashMap<>();
176-
queues.put("nonDurQ", new Queue("testq.nonDur", false, false, false));
176+
queues.put("nonDurQ", new Queue("testq.nonDur", false, true, false));
177177
queues.put("adQ", new Queue("testq.ad", true, false, true));
178178
queues.put("exclQ", new Queue("testq.excl", true, true, false));
179-
queues.put("allQ", new Queue("testq.all", false, true, true));
179+
queues.put("allQ", new Queue("testq.all", true, true, true));
180180
given(ctx.getBeansOfType(Queue.class, false, false)).willReturn(queues);
181181
Map<String, Exchange> exchanges = new HashMap<>();
182182
exchanges.put("nonDurEx", new DirectExchange("testex.nonDur", false, false));
@@ -198,9 +198,9 @@ public void testTemporaryLogs() {
198198
assertThat(logs.get(1)).contains("(testex.all) durable:false, auto-delete:true");
199199
assertThat(logs.get(2)).contains("(testex.nonDur) durable:false, auto-delete:false");
200200
assertThat(logs.get(3)).contains("(testq.ad) durable:true, auto-delete:true, exclusive:false");
201-
assertThat(logs.get(4)).contains("(testq.all) durable:false, auto-delete:true, exclusive:true");
201+
assertThat(logs.get(4)).contains("(testq.all) durable:true, auto-delete:true, exclusive:true");
202202
assertThat(logs.get(5)).contains("(testq.excl) durable:true, auto-delete:false, exclusive:true");
203-
assertThat(logs.get(6)).contains("(testq.nonDur) durable:false, auto-delete:false, exclusive:false");
203+
assertThat(logs.get(6)).contains("(testq.nonDur) durable:false, auto-delete:false, exclusive:true");
204204
}
205205
finally {
206206
cleanQueuesAndExchanges(rabbitAdmin);
@@ -512,7 +512,7 @@ public DirectExchange e1() {
512512

513513
@Bean
514514
public Queue q1() {
515-
return new Queue("q1", false, false, true);
515+
return new Queue("q1", true, false, true);
516516
}
517517

518518
@Bean
@@ -530,14 +530,14 @@ public Declarables es() {
530530
@Bean
531531
public Declarables qs() {
532532
return new Declarables(
533-
new Queue("q2", false, false, true),
534-
new Queue("q3", false, false, true));
533+
new Queue("q2", true, false, true),
534+
new Queue("q3", true, false, true));
535535
}
536536

537537
@Bean
538538
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
539539
public Declarables prototypes() {
540-
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
540+
return new Declarables(new Queue(this.prototypeQueueName, true, false, true));
541541
}
542542

543543
@Bean
@@ -551,7 +551,7 @@ public Declarables bs() {
551551
public Declarables ds() {
552552
return new Declarables(
553553
new DirectExchange("e4", false, true),
554-
new Queue("q4", false, false, true),
554+
new Queue("q4", true, false, true),
555555
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
556556
}
557557

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegration1Tests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,7 @@ public void testPublisherCallbackChannelImplCloseWithPending() throws Exception
875875
public void testWithFuture() throws Exception {
876876
RabbitAdmin admin = new RabbitAdmin(this.connectionFactory);
877877
Queue queue = QueueBuilder.nonDurable()
878+
.exclusive()
878879
.autoDelete()
879880
.maxLength(1L)
880881
.overflow(Overflow.rejectPublish)

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerInitializationTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void testMismatchedQueueDuringRestart() throws Exception {
8888
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
8989
admin.deleteQueue(TEST_MISMATCH);
9090
assertThat(latches[0].await(20, TimeUnit.SECONDS)).isTrue();
91-
admin.declareQueue(new Queue(TEST_MISMATCH, false, false, true));
91+
admin.declareQueue(new Queue(TEST_MISMATCH, true, false, true));
9292
latches[2].countDown(); // let container thread continue to enable restart
9393
assertThat(latches[1].await(20, TimeUnit.SECONDS)).isTrue();
9494
SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
@@ -103,7 +103,7 @@ public void testMismatchedQueueDuringRestartMultiQueue() throws Exception {
103103
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
104104
admin.deleteQueue(TEST_MISMATCH);
105105
assertThat(latches[0].await(20, TimeUnit.SECONDS)).isTrue();
106-
admin.declareQueue(new Queue(TEST_MISMATCH, false, false, true));
106+
admin.declareQueue(new Queue(TEST_MISMATCH, true, false, true));
107107
latches[2].countDown(); // let container thread continue to enable restart
108108
assertThat(latches[1].await(20, TimeUnit.SECONDS)).isTrue();
109109
SimpleMessageListenerContainer container = context.getBean(SimpleMessageListenerContainer.class);
@@ -159,7 +159,7 @@ public void handleMessage(Message m) {
159159

160160
@Bean
161161
public Queue queue() {
162-
return new Queue(TEST_MISMATCH, false, false, true); // mismatched
162+
return new Queue(TEST_MISMATCH, true, false, true);
163163
}
164164

165165
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DlqExpiryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,15 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
9595

9696
@Bean
9797
public Queue main() {
98-
return QueueBuilder.nonDurable("test.expiry.main").autoDelete()
98+
return QueueBuilder.durable("test.expiry.main").autoDelete()
9999
.withArgument("x-dead-letter-exchange", "")
100100
.withArgument("x-dead-letter-routing-key", "test.expiry.dlq")
101101
.build();
102102
}
103103

104104
@Bean
105105
public Queue dlq() {
106-
return QueueBuilder.nonDurable("test.expiry.dlq").autoDelete()
106+
return QueueBuilder.durable("test.expiry.dlq").autoDelete()
107107
.withArgument("x-dead-letter-exchange", "")
108108
.withArgument("x-dead-letter-routing-key", "test.expiry.main")
109109
.withArgument("x-message-ttl", 100)

0 commit comments

Comments
 (0)