Skip to content

Commit 9efb841

Browse files
mjd507artembilan
authored andcommitted
Fix unstable batch test in MessageDrivenAdapterTests
By starting the adapter after sending messages, ensure a single poll retrieves the full batch messages. Signed-off-by: Jiandong Ma <jiandong.ma.cn@gmail.com> (cherry picked from commit 03e239d) # Conflicts: # spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java
1 parent fc69f7a commit 9efb841

1 file changed

Lines changed: 17 additions & 5 deletions

File tree

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
* @author Biju Kunjummen
107107
* @author Cameron Mayfield
108108
* @author Urs Keller
109+
* @author Jiandong Ma
109110
*
110111
* @since 5.4
111112
*
@@ -426,8 +427,6 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
426427
}
427428

428429
});
429-
adapter.start();
430-
ContainerTestUtils.waitForAssignment(container, 1);
431430

432431
Map<String, Object> senderProps = KafkaTestUtils.producerProps(EMBEDDED_BROKERS);
433432
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
@@ -436,6 +435,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
436435
template.sendDefault(0, 1487694048607L, 1, "foo");
437436
template.sendDefault(0, 1487694048608L, 1, "bar");
438437

438+
adapter.start();
439+
ContainerTestUtils.waitForAssignment(container, 1);
440+
439441
Message<?> received = out.receive(10000);
440442
assertThat(received).isNotNull();
441443
Object payload = received.getPayload();
@@ -456,6 +458,11 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
456458

457459
assertThat(onPartitionsAssignedCalledLatch.await(10, TimeUnit.SECONDS)).isTrue();
458460

461+
adapter.stop();
462+
463+
final CountDownLatch restartAssignmentLatch = new CountDownLatch(1);
464+
adapter.setOnPartitionsAssignedSeekCallback((map, consumer) -> restartAssignmentLatch.countDown());
465+
459466
adapter.setMessageConverter(new BatchMessageConverter() {
460467

461468
@Override
@@ -473,14 +480,19 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
473480
});
474481
PollableChannel errors = new QueueChannel();
475482
adapter.setErrorChannel(errors);
476-
template.sendDefault(1, "foo");
477-
template.sendDefault(1, "bar");
478-
Message<?> error = errors.receive(10000);
483+
template.sendDefault(0, 1487694048607L, 1, "foo");
484+
template.sendDefault(0, 1487694048608L, 1, "bar");
485+
486+
adapter.start();
487+
ContainerTestUtils.waitForAssignment(container, 1);
488+
489+
Message<?> error = errors.receive(30000);
479490
assertThat(error).isNotNull();
480491
assertThat(error.getPayload()).isInstanceOf(ConversionException.class);
481492
assertThat(((ConversionException) error.getPayload()).getMessage())
482493
.contains("Failed to convert to message");
483494
assertThat(((ConversionException) error.getPayload()).getRecords()).hasSize(2);
495+
assertThat(restartAssignmentLatch.await(10, TimeUnit.SECONDS)).isTrue();
484496

485497
adapter.stop();
486498
pf.reset();

0 commit comments

Comments
 (0)