Skip to content

Commit 17790ff

Browse files
kkondakaKrishna Kondaka
andauthored
Add support for Kafka headers and timestamp in the Kafka Source (#4566)
* Add support for Kafka headers and timestamp in the Kafka Source Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Fix the typo Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * fixed checkstyle error Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
1 parent 81e4058 commit 17790ff

4 files changed

Lines changed: 106 additions & 3 deletions

File tree

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/DefaultEventMetadata.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,19 @@ public void setAttribute(final String key, final Object value) {
100100
public Object getAttribute(final String attributeKey) {
101101
String key = (attributeKey.charAt(0) == '/') ? attributeKey.substring(1) : attributeKey;
102102

103-
// Does not support recursive or inner-object lookups for now.
104-
return attributes.get(key);
103+
Map<String, Object> mapObject = attributes;
104+
if (key.contains("/")) {
105+
String[] keys = key.split("/");
106+
for (int i = 0; i < keys.length-1; i++) {
107+
Object value = mapObject.get(keys[i]);
108+
if (value == null || !(value instanceof Map)) {
109+
return null;
110+
}
111+
mapObject = (Map<String, Object>)value;
112+
key = keys[i+1];
113+
}
114+
}
115+
return mapObject.get(key);
105116
}
106117

107118
@Override

data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/DefaultEventMetadataTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,25 @@ public void testSetAttribute(String key, final Object value) {
134134
assertThat(eventMetadata.getAttribute(key), equalTo(value));
135135
}
136136

137+
private static Stream<Arguments> getNestedAttributeTestInputs() {
138+
return Stream.of(Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", "v3")), "k1", "v1"),
139+
Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", "v3")), "k2/k3", "v3"),
140+
Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", Map.of("k4", 4))), "k2/k3/k4", 4),
141+
Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", 4)), "k2/k3/k4", null),
142+
Arguments.of(Map.of("k1","v1"),"k1", "v1"));
143+
}
144+
145+
@ParameterizedTest
146+
@MethodSource("getNestedAttributeTestInputs")
147+
public void testNestedGetAttribute(Map<String, Object> attributes, final String key, final Object expectedValue) {
148+
eventMetadata = DefaultEventMetadata.builder()
149+
.withEventType(testEventType)
150+
.withTimeReceived(testTimeReceived)
151+
.withAttributes(attributes)
152+
.build();
153+
assertThat(eventMetadata.getAttribute(key), equalTo(expectedValue));
154+
}
155+
137156
@Test
138157
public void test_with_ExternalOriginationTime() {
139158
Instant now = Instant.now();

data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceJsonTypeIT.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import org.apache.kafka.clients.producer.KafkaProducer;
1414
import org.apache.kafka.clients.producer.ProducerConfig;
1515
import org.apache.kafka.clients.producer.ProducerRecord;
16+
import org.apache.kafka.common.header.Header;
17+
import org.apache.kafka.common.header.internals.RecordHeader;
18+
import org.apache.kafka.common.header.internals.RecordHeaders;
1619
import org.junit.jupiter.api.AfterEach;
1720
import org.junit.jupiter.api.BeforeEach;
1821
import org.junit.jupiter.api.Test;
@@ -36,8 +39,10 @@
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
3841

42+
import java.nio.charset.StandardCharsets;
3943
import java.time.Duration;
4044
import java.util.ArrayList;
45+
import java.util.Arrays;
4146
import java.util.Collection;
4247
import java.util.Collections;
4348
import java.util.List;
@@ -49,13 +54,15 @@
4954

5055
import static org.awaitility.Awaitility.await;
5156
import static org.hamcrest.CoreMatchers.equalTo;
57+
import static org.hamcrest.CoreMatchers.not;
5258
import static org.hamcrest.MatcherAssert.assertThat;
5359
import static org.mockito.ArgumentMatchers.any;
5460
import static org.mockito.ArgumentMatchers.anyString;
5561
import static org.mockito.Mockito.doAnswer;
5662
import static org.mockito.Mockito.mock;
5763
import static org.mockito.Mockito.when;
5864

65+
5966
public class KafkaSourceJsonTypeIT {
6067
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceJsonTypeIT.class);
6168
private static final int TEST_ID = 123456;
@@ -98,13 +105,21 @@ public class KafkaSourceJsonTypeIT {
98105
private String testKey;
99106
private String testTopic;
100107
private String testGroup;
108+
private String headerKey1;
109+
private byte[] headerValue1;
110+
private String headerKey2;
111+
private byte[] headerValue2;
101112

102113
public KafkaSource createObjectUnderTest() {
103114
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
104115
}
105116

106117
@BeforeEach
107118
public void setup() throws Throwable {
119+
headerKey1 = RandomStringUtils.randomAlphabetic(6);
120+
headerValue1 = RandomStringUtils.randomAlphabetic(10).getBytes(StandardCharsets.UTF_8);
121+
headerKey2 = RandomStringUtils.randomAlphabetic(5);
122+
headerValue2 = RandomStringUtils.randomAlphabetic(15).getBytes(StandardCharsets.UTF_8);
108123
sourceConfig = mock(KafkaSourceConfig.class);
109124
pluginMetrics = mock(PluginMetrics.class);
110125
counter = mock(Counter.class);
@@ -209,6 +224,13 @@ public void TestJsonRecordsWithNullKey() throws Exception {
209224
assertThat(map.get("kafka_key"), equalTo(null));
210225
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
211226
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
227+
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
228+
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
229+
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
230+
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
231+
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
232+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
233+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
212234
}
213235
}
214236

@@ -240,6 +262,13 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception {
240262
assertThat(map.get("status"), equalTo(true));
241263
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
242264
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
265+
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
266+
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
267+
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
268+
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
269+
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
270+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
271+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
243272
event.getEventHandle().release(false);
244273
}
245274
receivedRecords.clear();
@@ -258,6 +287,13 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception {
258287
assertThat(map.get("status"), equalTo(true));
259288
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
260289
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
290+
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
291+
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
292+
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
293+
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
294+
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
295+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
296+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
261297
event.getEventHandle().release(true);
262298
}
263299
}
@@ -289,6 +325,13 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception {
289325
assertThat(map.get("status"), equalTo(true));
290326
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
291327
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
328+
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
329+
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
330+
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
331+
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
332+
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
333+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
334+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
292335
}
293336
}
294337

@@ -320,6 +363,13 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception {
320363
assertThat(map.get("kafka_key"), equalTo(testKey));
321364
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
322365
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
366+
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
367+
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
368+
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
369+
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
370+
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
371+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
372+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
323373
}
324374
}
325375

@@ -351,6 +401,13 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception {
351401
assertThat(metadata.getAttributes().get("kafka_key"), equalTo(testKey));
352402
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
353403
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
404+
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
405+
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
406+
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
407+
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
408+
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
409+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
410+
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
354411
}
355412
}
356413

@@ -364,8 +421,12 @@ public void produceJsonRecords(final String servers, final String topicName, fin
364421
KafkaProducer producer = new KafkaProducer(props);
365422
for (int i = 0; i < numRecords; i++) {
366423
String value = "{\"name\":\"testName" + i + "\", \"id\":" + (TEST_ID + i) + ", \"status\":true}";
424+
List<Header> headers = Arrays.asList(
425+
new RecordHeader(headerKey1, headerValue1),
426+
new RecordHeader(headerKey2, headerValue2)
427+
);
367428
ProducerRecord<String, String> record =
368-
new ProducerRecord<>(topicName, testKey, value);
429+
new ProducerRecord<>(topicName, null, testKey, value, new RecordHeaders(headers));
369430
producer.send(record);
370431
try {
371432
Thread.sleep(100);

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.apache.kafka.clients.consumer.ConsumerRecords;
1818
import org.apache.kafka.clients.consumer.KafkaConsumer;
1919
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
20+
import org.apache.kafka.common.header.Header;
21+
import org.apache.kafka.common.header.Headers;
2022
import org.apache.kafka.common.TopicPartition;
2123
import org.apache.kafka.common.errors.AuthenticationException;
2224
import org.apache.kafka.common.errors.RecordDeserializationException;
@@ -426,6 +428,16 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
426428
if (kafkaKeyMode == KafkaKeyMode.INCLUDE_AS_METADATA) {
427429
eventMetadata.setAttribute("kafka_key", key);
428430
}
431+
Headers headers = consumerRecord.headers();
432+
if (headers != null) {
433+
Map<String, byte[]> headerData = new HashMap<>();
434+
for (Header header: headers) {
435+
headerData.put(header.key(), header.value());
436+
}
437+
eventMetadata.setAttribute("kafka_headers", headerData);
438+
}
439+
eventMetadata.setAttribute("kafka_timestamp", consumerRecord.timestamp());
440+
eventMetadata.setAttribute("kafka_timestamp_type", consumerRecord.timestampType().toString());
429441
eventMetadata.setAttribute("kafka_topic", topicName);
430442
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));
431443
eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp()));

0 commit comments

Comments
 (0)