Skip to content

Commit 6ce36ff

Browse files
kkondakasimonelbaz
authored andcommitted
Convert Kafka Header values to Strings (#6507)
* Convert Kafka Header values to Strings Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> Signed-off-by: Kondaka <krishkdk@amazon.com> * Fixed header value parsing logic Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments. Created a new class for header extraction Signed-off-by: Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Kondaka <krishkdk@amazon.com> * Fixed license header failures Signed-off-by: Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 1b21576 commit 6ce36ff

4 files changed

Lines changed: 271 additions & 88 deletions

File tree

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import org.apache.kafka.common.errors.RebalanceInProgressException;
2828
import org.apache.kafka.common.utils.ExponentialBackoff;
2929
import org.apache.kafka.common.errors.RecordDeserializationException;
30-
import org.apache.kafka.common.header.Header;
31-
import org.apache.kafka.common.header.Headers;
3230
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
3331
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
3432
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -504,12 +502,8 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
504502
if (kafkaKeyMode == KafkaKeyMode.INCLUDE_AS_METADATA) {
505503
eventMetadata.setAttribute("kafka_key", key);
506504
}
507-
Headers headers = consumerRecord.headers();
508-
if (headers != null) {
509-
Map<String, byte[]> headerData = new HashMap<>();
510-
for (Header header: headers) {
511-
headerData.put(header.key(), header.value());
512-
}
505+
Map<String, Object> headerData = KafkaHeadersExtractor.extractMessageHeaders(consumerRecord.headers());
506+
if (headerData != null) {
513507
eventMetadata.setAttribute("kafka_headers", headerData);
514508
}
515509
final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, now.toEpochMilli());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.kafka.consumer;
11+
12+
import org.apache.kafka.common.header.Header;
13+
import org.apache.kafka.common.header.Headers;
14+
15+
import java.nio.charset.StandardCharsets;
16+
import java.util.Arrays;
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
20+
public class KafkaHeadersExtractor {
21+
public static Map<String, Object> extractMessageHeaders(Headers headers) {
22+
if (headers == null) {
23+
return null;
24+
}
25+
Map<String, Object> headerData = new HashMap<>();
26+
for (Header header : headers) {
27+
byte[] headerValue = header.value();
28+
if (headerValue == null) {
29+
headerData.put(header.key(), null);
30+
continue;
31+
}
32+
String strValue = new String(headerValue, StandardCharsets.UTF_8);
33+
if (Arrays.equals(headerValue, strValue.getBytes(StandardCharsets.UTF_8))
34+
&& isPrintableString(strValue)) {
35+
headerData.put(header.key(), strValue);
36+
} else {
37+
headerData.put(header.key(), headerValue);
38+
}
39+
}
40+
return headerData;
41+
}
42+
43+
private static boolean isPrintableString(String value) {
44+
for (int i = 0; i < value.length(); i++) {
45+
char c = value.charAt(i);
46+
if (Character.isISOControl(c) && c != '\t' && c != '\n' && c != '\r') {
47+
return false;
48+
}
49+
}
50+
return true;
51+
}
52+
}

0 commit comments

Comments
 (0)