Skip to content

Commit 03a3bdc

Browse files
committed
remove usage of buffer accumulator from Kafka custom consumer
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 860e593 commit 03a3bdc

2 files changed

Lines changed: 26 additions & 19 deletions

File tree

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
7070

7171
private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumer.class);
7272
private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L;
73-
private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1;
7473
private static final int RETRY_ON_EXCEPTION_SLEEP_MS = 1000;
74+
private static final int BUFFER_WRITE_TIMEOUT = 2000;
7575
static final String DEFAULT_KEY = "message";
7676

7777
private volatile long lastCommitTime;
@@ -81,7 +81,6 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
8181
private final TopicConsumerConfig topicConfig;
8282
private MessageFormat schema;
8383
private boolean paused;
84-
private final BufferAccumulator<Record<Event>> bufferAccumulator;
8584
private final Buffer<Record<Event>> buffer;
8685
private static final ObjectMapper objectMapper = new ObjectMapper();
8786
private final JsonFactory jsonFactory = new JsonFactory();
@@ -137,8 +136,6 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
137136
this.partitionCommitTrackerMap = new HashMap<>();
138137
this.partitionsToReset = Collections.synchronizedSet(new HashSet<>());
139138
this.schema = MessageFormat.getByMessageFormatByName(schemaType);
140-
Duration bufferTimeout = Duration.ofSeconds(1);
141-
this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout);
142139
this.lastCommitTime = System.currentTimeMillis();
143140
this.numberOfAcksPending = new AtomicInteger(0);
144141
this.errLogRateLimiter = new LogRateLimiter(2, System.currentTimeMillis());
@@ -492,23 +489,19 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
492489
return new Record<Event>(event);
493490
}
494491

495-
private void processRecord(final AcknowledgementSet acknowledgementSet, final Record<Event> record) {
492+
private void processRecords(final AcknowledgementSet acknowledgementSet, final List<Record<Event>> eventRecords) {
496493
// Always add record to acknowledgementSet before adding to
497494
// buffer because another thread may take and process
498495
// buffer contents before the event record is added
499496
// to acknowledgement set
500497
if (acknowledgementSet != null) {
501-
acknowledgementSet.add(record.getData());
498+
eventRecords.forEach(record -> acknowledgementSet.add(record.getData()));
502499
}
503500
long numRetries = 0;
504501
while (true) {
505502
LOG.debug("In while loop for processing records, paused = {}", paused);
506503
try {
507-
if (numRetries == 0) {
508-
bufferAccumulator.add(record);
509-
} else {
510-
bufferAccumulator.flush();
511-
}
504+
buffer.writeAll(eventRecords, BUFFER_WRITE_TIMEOUT);
512505
break;
513506
} catch (Exception e) {
514507
if (!paused && numRetries++ > maxRetriesOnException) {
@@ -559,6 +552,7 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
559552
}
560553

561554
List<ConsumerRecord<String, T>> partitionRecords = records.records(topicPartition);
555+
final List<Record<Event>> eventRecords = new ArrayList<>();
562556
for (ConsumerRecord<String, T> consumerRecord : partitionRecords) {
563557
if (schema == MessageFormat.BYTES) {
564558
InputStream byteInputStream = new ByteArrayInputStream((byte[])consumerRecord.value());
@@ -567,24 +561,24 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
567561
if(byteDecoder != null) {
568562
final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, Instant.now().toEpochMilli());
569563

570-
byteDecoder.parse(decompressedInputStream, Instant.ofEpochMilli(receivedTimeStamp), (record) -> {
571-
processRecord(acknowledgementSet, record);
572-
});
564+
byteDecoder.parse(decompressedInputStream, Instant.ofEpochMilli(receivedTimeStamp), eventRecords::add);
573565
} else {
574566
JsonNode jsonNode = objectMapper.readValue(decompressedInputStream, JsonNode.class);
575567

576568
Event event = JacksonLog.builder().withData(jsonNode).build();
577569
Record<Event> record = new Record<>(event);
578-
processRecord(acknowledgementSet, record);
570+
eventRecords.add(record);
579571
}
580572
} else {
581573
Record<Event> record = getRecord(consumerRecord, topicPartition.partition());
582574
if (record != null) {
583-
processRecord(acknowledgementSet, record);
575+
eventRecords.add(record);
584576
}
585577
}
586578
}
587579

580+
processRecords(acknowledgementSet, eventRecords);
581+
588582
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
589583
long firstOffset = partitionRecords.get(0).offset();
590584
Range<Long> offsetRange = Range.between(firstOffset, lastOffset);

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.fasterxml.jackson.databind.JsonNode;
1212
import com.fasterxml.jackson.databind.ObjectMapper;
1313
import io.micrometer.core.instrument.Counter;
14+
import org.apache.commons.validator.Arg;
1415
import org.apache.kafka.clients.consumer.ConsumerRecord;
1516
import org.apache.kafka.clients.consumer.ConsumerRecords;
1617
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -22,6 +23,9 @@
2223
import org.junit.jupiter.api.BeforeEach;
2324
import org.junit.jupiter.api.Test;
2425
import org.junit.jupiter.api.extension.ExtendWith;
26+
import org.junit.jupiter.params.ParameterizedTest;
27+
import org.junit.jupiter.params.provider.Arguments;
28+
import org.junit.jupiter.params.provider.MethodSource;
2529
import org.mockito.Mock;
2630
import org.mockito.junit.jupiter.MockitoExtension;
2731
import org.mockito.junit.jupiter.MockitoSettings;
@@ -52,7 +56,9 @@
5256
import java.util.Map;
5357
import java.util.concurrent.Executors;
5458
import java.util.concurrent.ScheduledExecutorService;
59+
import java.util.concurrent.TimeoutException;
5560
import java.util.concurrent.atomic.AtomicBoolean;
61+
import java.util.stream.Stream;
5662

5763
import static org.awaitility.Awaitility.await;
5864
import static org.hamcrest.CoreMatchers.equalTo;
@@ -230,14 +236,15 @@ public void testGetRecordTimeStamp() {
230236
assertThat(consumer.getRecordTimeStamp(consumerRecord3, nowMs), equalTo(nowMs));
231237
}
232238

233-
@Test
234-
public void testBufferOverflowPauseResume() throws InterruptedException, Exception {
239+
@ParameterizedTest
240+
@MethodSource("provideExceptionsFromBufferWrite")
241+
public void testBufferOverflowPauseResume(final Exception bufferException) throws InterruptedException, Exception {
235242
when(topicConfig.getMaxPollInterval()).thenReturn(Duration.ofMillis(4000));
236243
String topic = topicConfig.getName();
237244
consumerRecords = createPlainTextRecords(topic, 0L);
238245
doAnswer((i)-> {
239246
if (!paused && !resumed)
240-
throw new SizeOverflowException("size overflow");
247+
throw bufferException;
241248
buffer.writeAll(i.getArgument(0), i.getArgument(1));
242249
return null;
243250
}).when(mockBuffer).writeAll(any(), anyInt());
@@ -690,6 +697,12 @@ private ConsumerRecords createJsonRecords(String topic) throws Exception {
690697
records.put(new TopicPartition(topic, testJsonPartition), Arrays.asList(record1, record2));
691698
return new ConsumerRecords(records);
692699
}
700+
701+
private static Stream<Arguments> provideExceptionsFromBufferWrite() {
702+
return Stream.of(
703+
Arguments.of(new SizeOverflowException("size overflow")),
704+
Arguments.of(new TimeoutException()));
705+
}
693706
}
694707

695708

0 commit comments

Comments
 (0)