Skip to content

Commit 03e239d

Browse files
authored
Fix unstable batch test in MessageDrivenAdapterTests
By starting the adapter after sending messages, ensure a single poll retrieves the full batch messages. Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com> **Auto-cherry-pick to `7.0.x` & `6.5.x`**
1 parent d90215b commit 03e239d

1 file changed

Lines changed: 14 additions & 2 deletions

File tree

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
* @author Cameron Mayfield
111111
* @author Urs Keller
112112
* @author Jooyoung Pyoung
113+
* @author Jiandong Ma
113114
*
114115
* @since 5.4
115116
*
@@ -432,8 +433,6 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
432433
}
433434

434435
});
435-
adapter.start();
436-
ContainerTestUtils.waitForAssignment(container, 1);
437436

438437
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
439438
senderProps.put(ProducerConfig.LINGER_MS_CONFIG, 0);
@@ -443,6 +442,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
443442
template.sendDefault(0, 1487694048607L, 1, "foo");
444443
template.sendDefault(0, 1487694048608L, 1, "bar");
445444

445+
adapter.start();
446+
ContainerTestUtils.waitForAssignment(container, 1);
447+
446448
Message<?> received = out.receive(10000);
447449
assertThat(received).isNotNull();
448450
Object payload = received.getPayload();
@@ -461,6 +463,11 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
461463

462464
assertThat(onPartitionsAssignedCalledLatch.await(10, TimeUnit.SECONDS)).isTrue();
463465

466+
adapter.stop();
467+
468+
final CountDownLatch restartAssignmentLatch = new CountDownLatch(1);
469+
adapter.setOnPartitionsAssignedSeekCallback((map, consumer) -> restartAssignmentLatch.countDown());
470+
464471
adapter.setMessageConverter(new BatchMessageConverter() {
465472

466473
@Override
@@ -480,12 +487,17 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
480487
adapter.setErrorChannel(errors);
481488
template.sendDefault(0, 1487694048607L, 1, "foo");
482489
template.sendDefault(0, 1487694048608L, 1, "bar");
490+
491+
adapter.start();
492+
ContainerTestUtils.waitForAssignment(container, 1);
493+
483494
Message<?> error = errors.receive(30000);
484495
assertThat(error).isNotNull();
485496
assertThat(error.getPayload()).isInstanceOf(ConversionException.class);
486497
assertThat(((ConversionException) error.getPayload()).getMessage())
487498
.contains("Failed to convert to message");
488499
assertThat(((ConversionException) error.getPayload()).getRecords()).hasSize(2);
500+
assertThat(restartAssignmentLatch.await(10, TimeUnit.SECONDS)).isTrue();
489501

490502
adapter.stop();
491503
pf.reset();

0 commit comments

Comments
 (0)