1717import org .apache .kafka .clients .consumer .ConsumerRecords ;
1818import org .apache .kafka .clients .consumer .KafkaConsumer ;
1919import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
20- import org .apache .kafka .common .errors .RebalanceInProgressException ;
21- import org .apache .kafka .common .header .Header ;
22- import org .apache .kafka .common .header .Headers ;
2320import org .apache .kafka .common .TopicPartition ;
2421import org .apache .kafka .common .errors .AuthenticationException ;
22+ import org .apache .kafka .common .errors .RebalanceInProgressException ;
2523import org .apache .kafka .common .errors .RecordDeserializationException ;
26- import org .opensearch .dataprepper .buffer .common .BufferAccumulator ;
24+ import org .apache .kafka .common .header .Header ;
25+ import org .apache .kafka .common .header .Headers ;
2726import org .opensearch .dataprepper .model .acknowledgements .AcknowledgementSet ;
2827import org .opensearch .dataprepper .model .acknowledgements .AcknowledgementSetManager ;
2928import org .opensearch .dataprepper .model .buffer .Buffer ;
@@ -70,8 +69,8 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
7069
7170 private static final Logger LOG = LoggerFactory .getLogger (KafkaCustomConsumer .class );
7271 private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L ;
73- private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1 ;
7472 private static final int RETRY_ON_EXCEPTION_SLEEP_MS = 1000 ;
73+ private static final int BUFFER_WRITE_TIMEOUT = 2000 ;
7574 static final String DEFAULT_KEY = "message" ;
7675
7776 private volatile long lastCommitTime ;
@@ -81,7 +80,6 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
8180 private final TopicConsumerConfig topicConfig ;
8281 private MessageFormat schema ;
8382 private boolean paused ;
84- private final BufferAccumulator <Record <Event >> bufferAccumulator ;
8583 private final Buffer <Record <Event >> buffer ;
8684 private static final ObjectMapper objectMapper = new ObjectMapper ();
8785 private final JsonFactory jsonFactory = new JsonFactory ();
@@ -123,7 +121,7 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
123121 this .paused = false ;
124122 this .byteDecoder = byteDecoder ;
125123 this .topicMetrics = topicMetrics ;
126- this .maxRetriesOnException = topicConfig .getMaxPollInterval ().toMillis () / (2 * RETRY_ON_EXCEPTION_SLEEP_MS );
124+ this .maxRetriesOnException = topicConfig .getMaxPollInterval ().toMillis () / (2 * ( RETRY_ON_EXCEPTION_SLEEP_MS + BUFFER_WRITE_TIMEOUT ) );
127125 this .pauseConsumePredicate = pauseConsumePredicate ;
128126 this .topicMetrics .register (consumer );
129127 this .offsetsToCommit = new HashMap <>();
@@ -137,8 +135,6 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
137135 this .partitionCommitTrackerMap = new HashMap <>();
138136 this .partitionsToReset = Collections .synchronizedSet (new HashSet <>());
139137 this .schema = MessageFormat .getByMessageFormatByName (schemaType );
140- Duration bufferTimeout = Duration .ofSeconds (1 );
141- this .bufferAccumulator = BufferAccumulator .create (buffer , DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE , bufferTimeout );
142138 this .lastCommitTime = System .currentTimeMillis ();
143139 this .numberOfAcksPending = new AtomicInteger (0 );
144140 this .errLogRateLimiter = new LogRateLimiter (2 , System .currentTimeMillis ());
@@ -492,23 +488,19 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
492488 return new Record <Event >(event );
493489 }
494490
495- private void processRecord (final AcknowledgementSet acknowledgementSet , final Record <Event > record ) {
491+ private void processRecords (final AcknowledgementSet acknowledgementSet , final List < Record <Event >> eventRecords ) {
496492 // Always add record to acknowledgementSet before adding to
497493 // buffer because another thread may take and process
498494 // buffer contents before the event record is added
499495 // to acknowledgement set
500496 if (acknowledgementSet != null ) {
501- acknowledgementSet .add (record .getData ());
497+ eventRecords . forEach ( record -> acknowledgementSet .add (record .getData () ));
502498 }
503499 long numRetries = 0 ;
504500 while (true ) {
505501 LOG .debug ("In while loop for processing records, paused = {}" , paused );
506502 try {
507- if (numRetries == 0 ) {
508- bufferAccumulator .add (record );
509- } else {
510- bufferAccumulator .flush ();
511- }
503+ buffer .writeAll (eventRecords , BUFFER_WRITE_TIMEOUT );
512504 break ;
513505 } catch (Exception e ) {
514506 if (!paused && numRetries ++ > maxRetriesOnException ) {
@@ -559,6 +551,7 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
559551 }
560552
561553 List <ConsumerRecord <String , T >> partitionRecords = records .records (topicPartition );
554+ final List <Record <Event >> eventRecords = new ArrayList <>();
562555 for (ConsumerRecord <String , T > consumerRecord : partitionRecords ) {
563556 if (schema == MessageFormat .BYTES ) {
564557 InputStream byteInputStream = new ByteArrayInputStream ((byte [])consumerRecord .value ());
@@ -567,24 +560,24 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
567560 if (byteDecoder != null ) {
568561 final long receivedTimeStamp = getRecordTimeStamp (consumerRecord , Instant .now ().toEpochMilli ());
569562
570- byteDecoder .parse (decompressedInputStream , Instant .ofEpochMilli (receivedTimeStamp ), (record ) -> {
571- processRecord (acknowledgementSet , record );
572- });
563+ byteDecoder .parse (decompressedInputStream , Instant .ofEpochMilli (receivedTimeStamp ), eventRecords ::add );
573564 } else {
574565 JsonNode jsonNode = objectMapper .readValue (decompressedInputStream , JsonNode .class );
575566
576567 Event event = JacksonLog .builder ().withData (jsonNode ).build ();
577568 Record <Event > record = new Record <>(event );
578- processRecord ( acknowledgementSet , record );
569+ eventRecords . add ( record );
579570 }
580571 } else {
581572 Record <Event > record = getRecord (consumerRecord , topicPartition .partition ());
582573 if (record != null ) {
583- processRecord ( acknowledgementSet , record );
574+ eventRecords . add ( record );
584575 }
585576 }
586577 }
587578
579+ processRecords (acknowledgementSet , eventRecords );
580+
588581 long lastOffset = partitionRecords .get (partitionRecords .size () - 1 ).offset ();
589582 long firstOffset = partitionRecords .get (0 ).offset ();
590583 Range <Long > offsetRange = Range .between (firstOffset , lastOffset );
0 commit comments