Skip to content

Commit 8a7dfb6

Browse files
graytaylor0simonelbaz
authored andcommitted
Do not clear offsets after failure to commit offsets due to rebalance exception (opensearch-project#6346)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent f215248 commit 8a7dfb6

2 files changed

Lines changed: 91 additions & 1 deletion

File tree

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.kafka.clients.consumer.ConsumerRecords;
1818
import org.apache.kafka.clients.consumer.KafkaConsumer;
1919
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
20+
import org.apache.kafka.common.errors.RebalanceInProgressException;
2021
import org.apache.kafka.common.header.Header;
2122
import org.apache.kafka.common.header.Headers;
2223
import org.apache.kafka.common.TopicPartition;
@@ -354,11 +355,15 @@ private void commitOffsets(boolean forceCommit) {
354355
offsetsToCommit.forEach(((partition, offset) -> updateCommitCountMetric(partition, offset)));
355356
try {
356357
consumer.commitSync(offsetsToCommit);
358+
lastCommitTime = currentTimeMillis;
359+
} catch (final RebalanceInProgressException ex) {
360+
LOG.error("Failed to commit offsets in topic {} due to rebalance in progress", topicName, ex);
361+
return;
357362
} catch (Exception e) {
358363
LOG.error("Failed to commit offsets in topic {}", topicName, e);
359364
}
365+
360366
offsetsToCommit.clear();
361-
lastCommitTime = currentTimeMillis;
362367
}
363368
}
364369

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.kafka.clients.consumer.KafkaConsumer;
1717
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
1818
import org.apache.kafka.common.TopicPartition;
19+
import org.apache.kafka.common.errors.RebalanceInProgressException;
1920
import org.apache.kafka.common.errors.RecordDeserializationException;
2021
import org.junit.jupiter.api.Assertions;
2122
import org.junit.jupiter.api.BeforeEach;
@@ -60,6 +61,8 @@
6061
import static org.junit.jupiter.api.Assertions.assertTrue;
6162
import static org.mockito.ArgumentMatchers.any;
6263
import static org.mockito.ArgumentMatchers.anyInt;
64+
import static org.mockito.ArgumentMatchers.anyMap;
65+
import static org.mockito.Mockito.doThrow;
6366
import static org.mockito.Mockito.doAnswer;
6467
import static org.mockito.Mockito.mock;
6568
import static org.mockito.Mockito.when;
@@ -589,6 +592,88 @@ public void testAwsGlueErrorWithAcknowledgements() throws Exception {
589592
});
590593
}
591594

595+
@Test
596+
public void testCommitOffsets_RebalanceInProgressException_DoesNotClearOffsets() throws Exception {
597+
String topic = topicConfig.getName();
598+
TopicPartition topicPartition = new TopicPartition(topic, testPartition);
599+
600+
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
601+
when(topicConfig.getAutoCommit()).thenReturn(false);
602+
when(topicConfig.getCommitInterval()).thenReturn(Duration.ofMillis(0));
603+
604+
consumer = createObjectUnderTest("plaintext", false);
605+
consumer.onPartitionsAssigned(List.of(topicPartition));
606+
607+
consumerRecords = createPlainTextRecords(topic, 100L);
608+
when(kafkaConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
609+
610+
doThrow(new RebalanceInProgressException("Rebalance in progress"))
611+
.when(kafkaConsumer).commitSync(anyMap());
612+
613+
consumer.consumeRecords();
614+
615+
Map<TopicPartition, OffsetAndMetadata> offsetsBeforeCommit = new HashMap<>(consumer.getOffsetsToCommit());
616+
Assertions.assertFalse(offsetsBeforeCommit.isEmpty(), "Offsets should be populated after consuming records");
617+
Assertions.assertEquals(102L, offsetsBeforeCommit.get(topicPartition).offset());
618+
619+
Thread testThread = new Thread(() -> {
620+
try {
621+
java.lang.reflect.Method method = consumer.getClass().getDeclaredMethod("commitOffsets", boolean.class);
622+
method.setAccessible(true);
623+
method.invoke(consumer, true);
624+
} catch (Exception e) {
625+
throw new RuntimeException(e);
626+
}
627+
});
628+
testThread.start();
629+
testThread.join(5000);
630+
631+
Map<TopicPartition, OffsetAndMetadata> offsetsAfterFailedCommit = consumer.getOffsetsToCommit();
632+
Assertions.assertFalse(offsetsAfterFailedCommit.isEmpty(),
633+
"Offsets should NOT be cleared after RebalanceInProgressException");
634+
Assertions.assertEquals(offsetsBeforeCommit.get(topicPartition).offset(),
635+
offsetsAfterFailedCommit.get(topicPartition).offset(),
636+
"Offset value should remain unchanged for retry after rebalance completes");
637+
}
638+
639+
@Test
640+
public void testCommitOffsets_OtherException_ClearsOffsets() throws Exception {
641+
String topic = topicConfig.getName();
642+
TopicPartition topicPartition = new TopicPartition(topic, testPartition);
643+
644+
when(topicConfig.getAutoCommit()).thenReturn(false);
645+
when(topicConfig.getCommitInterval()).thenReturn(Duration.ofMillis(0));
646+
647+
consumer = createObjectUnderTest("plaintext", false);
648+
consumer.onPartitionsAssigned(List.of(topicPartition));
649+
650+
consumerRecords = createPlainTextRecords(topic, 100L);
651+
when(kafkaConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
652+
653+
doThrow(new RuntimeException("Generic commit failure"))
654+
.when(kafkaConsumer).commitSync(anyMap());
655+
656+
consumer.consumeRecords();
657+
658+
Assertions.assertFalse(consumer.getOffsetsToCommit().isEmpty(),
659+
"Offsets should be populated after consuming records");
660+
661+
Thread testThread = new Thread(() -> {
662+
try {
663+
java.lang.reflect.Method method = consumer.getClass().getDeclaredMethod("commitOffsets", boolean.class);
664+
method.setAccessible(true);
665+
method.invoke(consumer, true);
666+
} catch (Exception e) {
667+
}
668+
});
669+
testThread.start();
670+
testThread.join(5000);
671+
672+
Map<TopicPartition, OffsetAndMetadata> offsetsAfterFailedCommit = consumer.getOffsetsToCommit();
673+
Assertions.assertTrue(offsetsAfterFailedCommit.isEmpty(),
674+
"Offsets should be cleared after non-rebalance exception");
675+
}
676+
592677
private ConsumerRecords createPlainTextRecords(String topic, final long startOffset) {
593678
Map<TopicPartition, List<ConsumerRecord>> records = new HashMap<>();
594679
ConsumerRecord<String, String> record1 = new ConsumerRecord<>(topic, testPartition, startOffset, testKey1, testValue1);

0 commit comments

Comments
 (0)