|
41 | 41 | import org.apache.kafka.common.header.internals.RecordHeaders; |
42 | 42 | import org.assertj.core.api.InstanceOfAssertFactories; |
43 | 43 | import org.junit.jupiter.api.Test; |
| 44 | +import org.junitpioneer.jupiter.RetryingTest; |
44 | 45 |
|
45 | 46 | import org.springframework.core.retry.RetryListener; |
46 | 47 | import org.springframework.core.retry.RetryPolicy; |
@@ -399,12 +400,12 @@ protected boolean doSend(Message<?> message, long timeout) { |
399 | 400 | pf.reset(); |
400 | 401 | } |
401 | 402 |
|
402 | | - @Test |
| 403 | + @RetryingTest(10) |
403 | 404 | void testInboundBatch(EmbeddedKafkaBroker embeddedKafka) throws Exception { |
404 | 405 | Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test2", true); |
405 | 406 | props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
406 | | - props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 68); |
407 | | - props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); |
| 407 | + props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50); |
| 408 | + props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000); |
408 | 409 |
|
409 | 410 | DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props); |
410 | 411 | ContainerProperties containerProps = new ContainerProperties(topic2); |
@@ -478,9 +479,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a |
478 | 479 | }); |
479 | 480 | PollableChannel errors = new QueueChannel(); |
480 | 481 | adapter.setErrorChannel(errors); |
481 | | - template.sendDefault(1, "foo"); |
482 | | - template.sendDefault(1, "bar"); |
483 | | - Message<?> error = errors.receive(10000); |
| 482 | + template.sendDefault(0, 1487694048607L, 1, "foo"); |
| 483 | + template.sendDefault(0, 1487694048608L, 1, "bar"); |
| 484 | + Message<?> error = errors.receive(30000); |
484 | 485 | assertThat(error).isNotNull(); |
485 | 486 | assertThat(error.getPayload()).isInstanceOf(ConversionException.class); |
486 | 487 | assertThat(((ConversionException) error.getPayload()).getMessage()) |
|
0 commit comments