Skip to content

Commit 18ef7ab

Browse files
authored
Merge branch 'opensearch-project:main' into md5checksum-2
2 parents 80e81d6 + d316eda commit 18ef7ab

48 files changed

Lines changed: 3936 additions & 1195 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
2-
* @sb2k16 @engechas @san81 @srikanthjg @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
2+
* @divbok @sb2k16 @san81 @srikanthjg @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh @Zhangxunmt

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml
2727
*.iml
2828
.kiro
2929
.vscode
30+
.project
31+
.classpath
32+
.settings/
3033

3134
# Manual testing files (should not be committed)
3235
MANUAL_TEST.md

MAINTAINERS.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
66

77
| Maintainer | GitHub ID | Affiliation |
88
| ---------------------- | --------------------------------------------------------- | ----------- |
9+
| Divyansh Bokadia | [divbok](https://github.com/divbok) | Amazon |
910
| Souvik Bose | [sb2k16](https://github.com/sb2k16) | Amazon |
10-
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
1111
| Santhosh Gandhe | [san81](https://github.com/san81) | Amazon |
1212
| Srikanth Govindarajan | [srikanthjg](https://github.com/srikanthjg) | Amazon |
1313
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
@@ -16,6 +16,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
1616
| Karsten Schnitter | [KarstenSchnitter](https://github.com/KarstenSchnitter) | SAP |
1717
| David Venable | [dlvenable](https://github.com/dlvenable) | Amazon |
1818
| Hai Yan | [oeyh](https://github.com/oeyh) | Amazon |
19+
| Xun Zhang | [Zhangxunmt](https://github.com/Zhangxunmt) | Amazon |
1920

2021

2122
## Emeritus
@@ -24,6 +25,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
2425
| ---------------------- | ----------------------------------------------------- | ----------- |
2526
| Steven Bayer | [sbayer55](https://github.com/sbayer55) | Amazon |
2627
| Christopher Manning | [cmanning09](https://github.com/cmanning09) | Amazon |
28+
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |
2729
| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon |
2830
| David Powers | [dapowers87](https://github.com/dapowers87) | Amazon |
2931
| Shivani Shukla | [sshivanii](https://github.com/sshivanii) | Amazon |

TRIAGING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ However, should we run out of time before your issue is discussed, you are alway
1616

1717
### How do I join the triage meeting?
1818

19-
Meetings are hosted regularly Tuesdays at 2:30 PM US Central Time (12:30 PM Pacific Time) and can be joined via the links posted on the [OpenSearch Meetup Group](https://www.meetup.com/opensearch/events/) list of events.
19+
Meetings are hosted regularly Tuesdays at 11:00 AM US Central Time (9:00 AM Pacific Time) and can be joined via the links posted on the [OpenSearch Meetup Group](https://www.meetup.com/opensearch/events/) list of events.
2020
The event will be titled `OpenSearch Data Prepper Triage Meeting`.
2121

2222
After joining the video meeting, you can enable your video / voice to join the discussion.

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/meter/EMFLoggingMeterRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
import io.micrometer.core.instrument.step.StepMeterRegistry;
2525
import io.micrometer.core.instrument.util.NamedThreadFactory;
2626
import io.micrometer.core.instrument.util.StringUtils;
27-
import io.micrometer.core.lang.Nullable;
2827
import io.micrometer.core.util.internal.logging.WarnThenDebugLogger;
2928
import software.amazon.cloudwatchlogs.emf.environment.Environment;
3029
import software.amazon.cloudwatchlogs.emf.environment.EnvironmentProvider;
3130
import software.amazon.cloudwatchlogs.emf.logger.MetricsLogger;
3231
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
3332
import software.amazon.cloudwatchlogs.emf.model.Unit;
3433

34+
import javax.annotation.Nullable;
3535
import java.time.Instant;
3636
import java.util.Collections;
3737
import java.util.HashMap;

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import org.apache.kafka.common.errors.RebalanceInProgressException;
2828
import org.apache.kafka.common.utils.ExponentialBackoff;
2929
import org.apache.kafka.common.errors.RecordDeserializationException;
30-
import org.apache.kafka.common.header.Header;
31-
import org.apache.kafka.common.header.Headers;
3230
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
3331
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
3432
import org.opensearch.dataprepper.model.buffer.Buffer;
@@ -504,12 +502,8 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
504502
if (kafkaKeyMode == KafkaKeyMode.INCLUDE_AS_METADATA) {
505503
eventMetadata.setAttribute("kafka_key", key);
506504
}
507-
Headers headers = consumerRecord.headers();
508-
if (headers != null) {
509-
Map<String, byte[]> headerData = new HashMap<>();
510-
for (Header header: headers) {
511-
headerData.put(header.key(), header.value());
512-
}
505+
Map<String, Object> headerData = KafkaHeadersExtractor.extractMessageHeaders(consumerRecord.headers());
506+
if (headerData != null) {
513507
eventMetadata.setAttribute("kafka_headers", headerData);
514508
}
515509
final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, now.toEpochMilli());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.kafka.consumer;
11+
12+
import org.apache.kafka.common.header.Header;
13+
import org.apache.kafka.common.header.Headers;
14+
15+
import java.nio.charset.StandardCharsets;
16+
import java.util.Arrays;
17+
import java.util.HashMap;
18+
import java.util.Map;
19+
20+
public class KafkaHeadersExtractor {
21+
public static Map<String, Object> extractMessageHeaders(Headers headers) {
22+
if (headers == null) {
23+
return null;
24+
}
25+
Map<String, Object> headerData = new HashMap<>();
26+
for (Header header : headers) {
27+
byte[] headerValue = header.value();
28+
if (headerValue == null) {
29+
headerData.put(header.key(), null);
30+
continue;
31+
}
32+
String strValue = new String(headerValue, StandardCharsets.UTF_8);
33+
if (Arrays.equals(headerValue, strValue.getBytes(StandardCharsets.UTF_8))
34+
&& isPrintableString(strValue)) {
35+
headerData.put(header.key(), strValue);
36+
} else {
37+
headerData.put(header.key(), headerValue);
38+
}
39+
}
40+
return headerData;
41+
}
42+
43+
private static boolean isPrintableString(String value) {
44+
for (int i = 0; i < value.length(); i++) {
45+
char c = value.charAt(i);
46+
if (Character.isISOControl(c) && c != '\t' && c != '\n' && c != '\r') {
47+
return false;
48+
}
49+
}
50+
return true;
51+
}
52+
}

0 commit comments

Comments
 (0)