Skip to content

Commit b129198

Browse files
committed
Passing http request headers as metadata in the event for http source
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent 2f8c836 commit b129198

8 files changed

Lines changed: 449 additions & 14 deletions

File tree

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import java.util.Collections;
33+
import java.util.List;
3334
import java.util.concurrent.ExecutionException;
3435

3536
@DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class)
@@ -50,6 +51,8 @@ public class HTTPSource implements Source<Record<Log>> {
5051
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
5152
private ByteDecoder byteDecoder;
5253
private final InputCodec codec;
54+
private final List<String> metadataHeaders;
55+
private final HttpHeaderExtractor httpHeaderExtractor;
5356

5457
@DataPrepperPluginConstructor
5558
public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
@@ -59,6 +62,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
5962
this.pipelineName = pipelineDescription.getPipelineName();
6063
this.byteDecoder = new JsonDecoder();
6164
this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
65+
this.metadataHeaders = sourceConfig.getMetadataHeaders();
6266
final PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
6367
final PluginSetting authenticationPluginSetting;
6468

@@ -84,6 +88,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
8488
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
8589
codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
8690
}
91+
httpHeaderExtractor = new HttpHeaderExtractor(metadataHeaders);
8792
}
8893

8994
@Override
@@ -94,7 +99,7 @@ public void start(final Buffer<Record<Log>> buffer) {
9499
if (server == null) {
95100
ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig);
96101
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName);
97-
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec);
102+
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec, httpHeaderExtractor);
98103
server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler, logHTTPService);
99104
pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
100105
}

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import org.opensearch.dataprepper.http.BaseHttpServerConfig;
1010
import org.opensearch.dataprepper.model.configuration.PluginModel;
1111

12+
import java.util.Collections;
13+
import java.util.List;
14+
1215
public class HTTPSourceConfig extends BaseHttpServerConfig {
1316

1417
static final String DEFAULT_LOG_INGEST_URI = "/log/ingest";
@@ -27,7 +30,15 @@ public String getDefaultPath() {
2730
@JsonProperty("codec")
2831
private PluginModel codec;
2932

33+
@JsonProperty("metadata_headers")
34+
private List<String> metadataHeaders = Collections.emptyList();
35+
3036
public PluginModel getCodec() {
3137
return codec;
3238
}
39+
40+
public List<String> getMetadataHeaders() {
41+
return metadataHeaders;
42+
}
43+
3344
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.loghttp;
12+
13+
import com.linecorp.armeria.common.AggregatedHttpRequest;
14+
15+
import java.util.ArrayList;
16+
import java.util.Collections;
17+
import java.util.HashMap;
18+
import java.util.LinkedHashSet;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.Set;
22+
import java.util.stream.Collectors;
23+
24+
public class HttpHeaderExtractor {
25+
26+
static final Set<String> SENSITIVE_HEADERS = Set.of(
27+
"authorization",
28+
"proxy-authorization",
29+
"cookie",
30+
"set-cookie",
31+
"www-authenticate",
32+
"proxy-authenticate",
33+
"x-api-key",
34+
"x-csrf-token",
35+
"x-xsrf-token",
36+
"x-auth-token",
37+
"x-amz-security-token",
38+
"x-amz-credential"
39+
);
40+
41+
private final List<String> metadataHeaders;
42+
43+
public HttpHeaderExtractor(final List<String> metadataHeaders) {
44+
this.metadataHeaders = metadataHeaders;
45+
}
46+
47+
public Map<String, Object> extractHeaders(final AggregatedHttpRequest aggregatedHttpRequest) {
48+
if (metadataHeaders == null || metadataHeaders.isEmpty()) {
49+
return Collections.emptyMap();
50+
}
51+
52+
final Set<String> headerNames = metadataHeaders.stream()
53+
.map(String::toLowerCase)
54+
.collect(Collectors.toCollection(LinkedHashSet::new));
55+
56+
final Map<String, Object> headers = new HashMap<>();
57+
for (String headerName : headerNames) {
58+
if (isSensitiveHeader(headerName)) {
59+
continue;
60+
}
61+
List<String> values = aggregatedHttpRequest.headers().getAll(headerName);
62+
if (!values.isEmpty()) {
63+
headers.put(headerName, values.size() == 1 ? values.get(0) : new ArrayList<>(values));
64+
}
65+
}
66+
67+
return headers;
68+
}
69+
70+
static boolean isSensitiveHeader(final String headerName) {
71+
return SENSITIVE_HEADERS.contains(headerName.toLowerCase());
72+
}
73+
}

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@
2828
import java.io.IOException;
2929
import java.util.ArrayList;
3030
import java.util.List;
31+
import java.util.Collections;
32+
import java.util.Map;
3133
import java.util.UUID;
3234
import java.util.stream.Collectors;
3335

34-
35-
/*
36-
* A HTTP service for log ingestion to be executed by BlockingTaskExecutor.
37-
*/
3836
@Blocking
3937
public class LogHTTPService {
4038
private static final int SERIALIZATION_OVERHEAD = 1024;
@@ -60,16 +58,19 @@ public class LogHTTPService {
6058
private final Timer requestProcessDuration;
6159
private Integer bufferMaxRequestLength;
6260
private Integer bufferOptimalRequestLength;
61+
private final HttpHeaderExtractor httpHeaderExtractor;
6362

6463
public LogHTTPService(final int bufferWriteTimeoutInMillis,
6564
final Buffer<Record<Log>> buffer,
6665
final PluginMetrics pluginMetrics,
67-
final InputCodec codec) {
66+
final InputCodec codec,
67+
final HttpHeaderExtractor httpHeaderExtractor) {
6868
this.buffer = buffer;
6969
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
7070
this.bufferMaxRequestLength = buffer.getMaxRequestSize().isPresent() ? buffer.getMaxRequestSize().get(): null;
7171
this.bufferOptimalRequestLength = buffer.getOptimalRequestSize().isPresent() ? buffer.getOptimalRequestSize().get(): null;
7272
this.codec = codec;
73+
this.httpHeaderExtractor = httpHeaderExtractor;
7374
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
7475
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
7576
requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE);
@@ -78,6 +79,13 @@ public LogHTTPService(final int bufferWriteTimeoutInMillis,
7879
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
7980
}
8081

82+
public LogHTTPService(final int bufferWriteTimeoutInMillis,
83+
final Buffer<Record<Log>> buffer,
84+
final PluginMetrics pluginMetrics,
85+
final InputCodec codec) {
86+
this(bufferWriteTimeoutInMillis, buffer, pluginMetrics, codec, new HttpHeaderExtractor(null));
87+
}
88+
8189
@Post
8290
public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
8391
requestsReceivedCounter.increment();
@@ -92,6 +100,7 @@ public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, fi
92100

93101
HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
94102
final HttpData content = aggregatedHttpRequest.content();
103+
final Map<String, Object> extractedHeaders = Collections.unmodifiableMap(httpHeaderExtractor.extractHeaders(aggregatedHttpRequest));
95104

96105
if (buffer.isByteBuffer()) {
97106
if (bufferMaxRequestLength != null && bufferOptimalRequestLength != null && content.array().length > bufferOptimalRequestLength) {
@@ -124,6 +133,11 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t
124133
LOG.error("Failed to parse the request of size {} using specified input codec {} due to: {}", content.length(), codec.getClass(), e.getMessage());
125134
throw new IOException("Bad request data format. ", e.getCause());
126135
}
136+
if (!extractedHeaders.isEmpty()) {
137+
for (Record<Log> record : records) {
138+
record.getData().getMetadata().setAttribute("headers", extractedHeaders);
139+
}
140+
}
127141
} else {
128142

129143
try {
@@ -135,7 +149,7 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t
135149

136150
records.addAll(
137151
jsonList.stream()
138-
.map(this::buildRecordLog)
152+
.map(json -> buildRecordLog(json, extractedHeaders))
139153
.collect(Collectors.toList())
140154
);
141155
}
@@ -171,13 +185,13 @@ private void writeChunkedBody(final String chunk) {
171185
}
172186
}
173187

174-
private Record<Log> buildRecordLog(String json) {
175-
176-
final JacksonLog log = JacksonLog.builder()
188+
private Record<Log> buildRecordLog(final String json, final Map<String, Object> headerAttributes) {
189+
final JacksonLog.Builder builder = JacksonLog.builder()
177190
.withData(json)
178-
.getThis()
179-
.build();
180-
181-
return new Record<>(log);
191+
.getThis();
192+
if (!headerAttributes.isEmpty()) {
193+
builder.withEventMetadataAttributes(Map.of("headers", headerAttributes));
194+
}
195+
return new Record<>(builder.build());
182196
}
183197
}

data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@
55

66
package org.opensearch.dataprepper.plugins.source.loghttp;
77

8+
import com.fasterxml.jackson.databind.ObjectMapper;
89
import org.junit.jupiter.api.Test;
910

11+
import java.util.List;
12+
1013
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertNull;
1115

1216
public class HTTPSourceConfigTest {
1317

18+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
19+
1420
@Test
1521
void testDefault() {
1622
// Prepare
@@ -21,5 +27,14 @@ void testDefault() {
2127
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath());
2228
assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getDefaultPort());
2329
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getDefaultPath());
30+
assertNull(sourceConfig.getMetadataHeaders());
31+
}
32+
33+
@Test
34+
void testSetMetadataHeaders() throws Exception {
35+
final String json = "{\"metadata_headers\": [\"X-Tenant-Id\", \"X-Region\"]}";
36+
final HTTPSourceConfig sourceConfig = OBJECT_MAPPER.readValue(json, HTTPSourceConfig.class);
37+
38+
assertEquals(List.of("X-Tenant-Id", "X-Region"), sourceConfig.getMetadataHeaders());
2439
}
2540
}

data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.Map;
7676
import java.util.Random;
7777
import java.util.StringJoiner;
78+
import java.util.UUID;
7879
import java.util.concurrent.CompletableFuture;
7980
import java.util.concurrent.CompletionException;
8081
import java.util.concurrent.ExecutionException;
@@ -92,6 +93,7 @@
9293
import static org.junit.jupiter.api.Assertions.assertEquals;
9394
import static org.junit.jupiter.api.Assertions.assertFalse;
9495
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
96+
import static org.junit.jupiter.api.Assertions.assertNull;
9597
import static org.junit.jupiter.api.Assertions.assertThrows;
9698
import static org.junit.jupiter.api.Assertions.assertTrue;
9799
import static org.mockito.ArgumentMatchers.any;
@@ -1020,6 +1022,58 @@ public void testHTTPJsonCodec() throws IOException {
10201022
assertEquals(testPayloadSize, payloadSizeMax.getValue());
10211023
}
10221024

1025+
@Test
1026+
public void testHTTPJsonResponse200WithMetadataHeaders() throws JsonProcessingException {
1027+
final String tenantId = UUID.randomUUID().toString();
1028+
final String testData = "[{\"log\": \"somelog\"}]";
1029+
1030+
when(sourceConfig.getMetadataHeaders()).thenReturn(List.of("X-Tenant-Id"));
1031+
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);
1032+
testBuffer = getBuffer(1, 1);
1033+
HTTPSourceUnderTest.start(testBuffer);
1034+
1035+
WebClient.of().execute(RequestHeaders.builder()
1036+
.scheme(SessionProtocol.HTTP)
1037+
.authority("127.0.0.1:2021")
1038+
.method(HttpMethod.POST)
1039+
.path("/log/ingest")
1040+
.contentType(MediaType.JSON_UTF_8)
1041+
.add("X-Tenant-Id", tenantId)
1042+
.build(),
1043+
HttpData.ofUtf8(testData))
1044+
.aggregate()
1045+
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
1046+
1047+
final Map.Entry<Collection<Record<Log>>, CheckpointState> result = testBuffer.read(100);
1048+
List<Record<Log>> records = new ArrayList<>(result.getKey());
1049+
assertEquals(1, records.size());
1050+
assertEquals(tenantId, records.get(0).getData().getMetadata().getAttribute("headers/x-tenant-id"));
1051+
}
1052+
1053+
@Test
1054+
public void testHTTPJsonResponse200WithNoMetadataHeaders() {
1055+
final String testData = "[{\"log\": \"somelog\"}]";
1056+
1057+
HTTPSourceUnderTest.start(testBuffer);
1058+
1059+
WebClient.of().execute(RequestHeaders.builder()
1060+
.scheme(SessionProtocol.HTTP)
1061+
.authority("127.0.0.1:2021")
1062+
.method(HttpMethod.POST)
1063+
.path("/log/ingest")
1064+
.contentType(MediaType.JSON_UTF_8)
1065+
.add("X-Tenant-Id", UUID.randomUUID().toString())
1066+
.build(),
1067+
HttpData.ofUtf8(testData))
1068+
.aggregate()
1069+
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
1070+
1071+
final Map.Entry<Collection<Record<Log>>, CheckpointState> result = testBuffer.read(100);
1072+
List<Record<Log>> records = new ArrayList<>(result.getKey());
1073+
assertEquals(1, records.size());
1074+
assertNull(records.get(0).getData().getMetadata().getAttribute("headers/x-tenant-id"));
1075+
}
1076+
10231077
private void assertCommonFields(Record<Log> record) {
10241078
assertEquals("111111111111", record.getData().get("owner", String.class));
10251079
assertEquals("CloudTrail/logs", record.getData().get("logGroup", String.class));

0 commit comments

Comments
 (0)