Skip to content

Commit 1ac64aa

Browse files
authored
Passing http request headers as metadata in the event for http source (#6671)
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent d950ece commit 1ac64aa

8 files changed

Lines changed: 440 additions & 10 deletions

File tree

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import java.util.Collections;
33+
import java.util.List;
3334
import java.util.concurrent.ExecutionException;
3435

3536
@DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class)
@@ -50,6 +51,8 @@ public class HTTPSource implements Source<Record<Log>> {
5051
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
5152
private ByteDecoder byteDecoder;
5253
private final InputCodec codec;
54+
private final List<String> metadataHeaders;
55+
private final HttpHeaderExtractor httpHeaderExtractor;
5356

5457
@DataPrepperPluginConstructor
5558
public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
@@ -59,6 +62,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
5962
this.pipelineName = pipelineDescription.getPipelineName();
6063
this.byteDecoder = new JsonDecoder();
6164
this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
65+
this.metadataHeaders = sourceConfig.getMetadataHeaders();
6266
final PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
6367
final PluginSetting authenticationPluginSetting;
6468

@@ -84,6 +88,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
8488
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
8589
codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
8690
}
91+
httpHeaderExtractor = new HttpHeaderExtractor(metadataHeaders);
8792
}
8893

8994
@Override
@@ -94,7 +99,7 @@ public void start(final Buffer<Record<Log>> buffer) {
9499
if (server == null) {
95100
ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig);
96101
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName);
97-
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec);
102+
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec, httpHeaderExtractor);
98103
server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler, logHTTPService);
99104
pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
100105
}

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import org.opensearch.dataprepper.http.BaseHttpServerConfig;
1010
import org.opensearch.dataprepper.model.configuration.PluginModel;
1111

12+
import java.util.Collections;
13+
import java.util.List;
14+
1215
public class HTTPSourceConfig extends BaseHttpServerConfig {
1316

1417
static final String DEFAULT_LOG_INGEST_URI = "/log/ingest";
@@ -27,7 +30,15 @@ public String getDefaultPath() {
2730
@JsonProperty("codec")
2831
private PluginModel codec;
2932

33+
@JsonProperty("metadata_headers")
34+
private List<String> metadataHeaders = Collections.emptyList();
35+
3036
public PluginModel getCodec() {
3137
return codec;
3238
}
39+
40+
public List<String> getMetadataHeaders() {
41+
return metadataHeaders;
42+
}
43+
3344
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.loghttp;
12+
13+
import com.linecorp.armeria.common.AggregatedHttpRequest;
14+
15+
import javax.annotation.Nonnull;
16+
import java.util.Collection;
17+
import java.util.Collections;
18+
import java.util.HashMap;
19+
import java.util.LinkedHashSet;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Set;
23+
import java.util.stream.Collectors;
24+
25+
public class HttpHeaderExtractor {
26+
27+
static final Set<String> SENSITIVE_HEADERS = Set.of(
28+
"authorization",
29+
"proxy-authorization",
30+
"cookie",
31+
"set-cookie",
32+
"www-authenticate",
33+
"proxy-authenticate",
34+
"x-api-key",
35+
"x-csrf-token",
36+
"x-xsrf-token",
37+
"x-auth-token",
38+
"x-amz-security-token",
39+
"x-amz-credential"
40+
);
41+
42+
private final Collection<String> metadataHeaders;
43+
44+
public HttpHeaderExtractor(@Nonnull final Collection<String> metadataHeaders) {
45+
this.metadataHeaders = metadataHeaders;
46+
}
47+
48+
public Map<String, Object> extractHeaders(final AggregatedHttpRequest aggregatedHttpRequest) {
49+
if (metadataHeaders.isEmpty()) {
50+
return Collections.emptyMap();
51+
}
52+
53+
final Set<String> headerNames = metadataHeaders.stream()
54+
.map(String::toLowerCase)
55+
.collect(Collectors.toCollection(LinkedHashSet::new));
56+
57+
final Map<String, Object> headers = new HashMap<>();
58+
for (String headerName : headerNames) {
59+
if (isSensitiveHeader(headerName)) {
60+
continue;
61+
}
62+
List<String> values = aggregatedHttpRequest.headers().getAll(headerName);
63+
if (!values.isEmpty()) {
64+
headers.put(headerName, values.size() == 1 ? values.get(0) : Collections.unmodifiableList(values));
65+
}
66+
}
67+
68+
return headers;
69+
}
70+
71+
static boolean isSensitiveHeader(final String headerName) {
72+
return SENSITIVE_HEADERS.contains(headerName.toLowerCase());
73+
}
74+
}

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@
2828
import java.io.IOException;
2929
import java.util.ArrayList;
3030
import java.util.List;
31+
import java.util.Collections;
32+
import java.util.Map;
3133
import java.util.UUID;
3234
import java.util.stream.Collectors;
3335

34-
3536
/*
3637
* A HTTP service for log ingestion to be executed by BlockingTaskExecutor.
3738
*/
39+
3840
@Blocking
3941
public class LogHTTPService {
4042
private static final int SERIALIZATION_OVERHEAD = 1024;
@@ -60,16 +62,19 @@ public class LogHTTPService {
6062
private final Timer requestProcessDuration;
6163
private Integer bufferMaxRequestLength;
6264
private Integer bufferOptimalRequestLength;
65+
private final HttpHeaderExtractor httpHeaderExtractor;
6366

6467
public LogHTTPService(final int bufferWriteTimeoutInMillis,
6568
final Buffer<Record<Log>> buffer,
6669
final PluginMetrics pluginMetrics,
67-
final InputCodec codec) {
70+
final InputCodec codec,
71+
final HttpHeaderExtractor httpHeaderExtractor) {
6872
this.buffer = buffer;
6973
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
7074
this.bufferMaxRequestLength = buffer.getMaxRequestSize().isPresent() ? buffer.getMaxRequestSize().get(): null;
7175
this.bufferOptimalRequestLength = buffer.getOptimalRequestSize().isPresent() ? buffer.getOptimalRequestSize().get(): null;
7276
this.codec = codec;
77+
this.httpHeaderExtractor = httpHeaderExtractor;
7378
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
7479
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
7580
requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE);
@@ -78,6 +83,13 @@ public LogHTTPService(final int bufferWriteTimeoutInMillis,
7883
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
7984
}
8085

86+
public LogHTTPService(final int bufferWriteTimeoutInMillis,
87+
final Buffer<Record<Log>> buffer,
88+
final PluginMetrics pluginMetrics,
89+
final InputCodec codec) {
90+
this(bufferWriteTimeoutInMillis, buffer, pluginMetrics, codec, new HttpHeaderExtractor(Collections.emptySet()));
91+
}
92+
8193
@Post
8294
public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
8395
requestsReceivedCounter.increment();
@@ -92,6 +104,7 @@ public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, fi
92104

93105
HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
94106
final HttpData content = aggregatedHttpRequest.content();
107+
final Map<String, Object> extractedHeaders = Collections.unmodifiableMap(httpHeaderExtractor.extractHeaders(aggregatedHttpRequest));
95108

96109
if (buffer.isByteBuffer()) {
97110
if (bufferMaxRequestLength != null && bufferOptimalRequestLength != null && content.array().length > bufferOptimalRequestLength) {
@@ -140,6 +153,12 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t
140153
);
141154
}
142155

156+
if (!extractedHeaders.isEmpty()) {
157+
for (Record<Log> record : records) {
158+
record.getData().getMetadata().setAttribute("headers", extractedHeaders);
159+
}
160+
}
161+
143162
try {
144163
buffer.writeAll(records, bufferWriteTimeoutInMillis);
145164
} catch (Exception e) {
@@ -171,13 +190,10 @@ private void writeChunkedBody(final String chunk) {
171190
}
172191
}
173192

174-
private Record<Log> buildRecordLog(String json) {
175-
176-
final JacksonLog log = JacksonLog.builder()
193+
private Record<Log> buildRecordLog(final String json) {
194+
final JacksonLog.Builder builder = JacksonLog.builder()
177195
.withData(json)
178-
.getThis()
179-
.build();
180-
181-
return new Record<>(log);
196+
.getThis();
197+
return new Record<>(builder.build());
182198
}
183199
}

data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@
55

66
package org.opensearch.dataprepper.plugins.source.loghttp;
77

8+
import com.fasterxml.jackson.databind.ObjectMapper;
89
import org.junit.jupiter.api.Test;
910

11+
import java.util.Collections;
12+
import java.util.List;
13+
1014
import static org.junit.jupiter.api.Assertions.assertEquals;
1115

1216
public class HTTPSourceConfigTest {
1317

18+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
19+
1420
@Test
1521
void testDefault() {
1622
// Prepare
@@ -21,5 +27,14 @@ void testDefault() {
2127
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath());
2228
assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getDefaultPort());
2329
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getDefaultPath());
30+
assertEquals(sourceConfig.getMetadataHeaders(), Collections.emptyList());
31+
}
32+
33+
@Test
34+
void testSetMetadataHeaders() throws Exception {
35+
final String json = "{\"metadata_headers\": [\"X-Tenant-Id\", \"X-Region\"]}";
36+
final HTTPSourceConfig sourceConfig = OBJECT_MAPPER.readValue(json, HTTPSourceConfig.class);
37+
38+
assertEquals(List.of("X-Tenant-Id", "X-Region"), sourceConfig.getMetadataHeaders());
2439
}
2540
}

data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.Map;
7676
import java.util.Random;
7777
import java.util.StringJoiner;
78+
import java.util.UUID;
7879
import java.util.concurrent.CompletableFuture;
7980
import java.util.concurrent.CompletionException;
8081
import java.util.concurrent.ExecutionException;
@@ -92,6 +93,7 @@
9293
import static org.junit.jupiter.api.Assertions.assertEquals;
9394
import static org.junit.jupiter.api.Assertions.assertFalse;
9495
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
96+
import static org.junit.jupiter.api.Assertions.assertNull;
9597
import static org.junit.jupiter.api.Assertions.assertThrows;
9698
import static org.junit.jupiter.api.Assertions.assertTrue;
9799
import static org.mockito.ArgumentMatchers.any;
@@ -1020,6 +1022,58 @@ public void testHTTPJsonCodec() throws IOException {
10201022
assertEquals(testPayloadSize, payloadSizeMax.getValue());
10211023
}
10221024

1025+
@Test
1026+
public void testHTTPJsonResponse200WithMetadataHeaders() throws JsonProcessingException {
1027+
final String tenantId = UUID.randomUUID().toString();
1028+
final String testData = "[{\"log\": \"somelog\"}]";
1029+
1030+
when(sourceConfig.getMetadataHeaders()).thenReturn(List.of("X-Tenant-Id"));
1031+
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);
1032+
testBuffer = getBuffer(1, 1);
1033+
HTTPSourceUnderTest.start(testBuffer);
1034+
1035+
WebClient.of().execute(RequestHeaders.builder()
1036+
.scheme(SessionProtocol.HTTP)
1037+
.authority("127.0.0.1:2021")
1038+
.method(HttpMethod.POST)
1039+
.path("/log/ingest")
1040+
.contentType(MediaType.JSON_UTF_8)
1041+
.add("X-Tenant-Id", tenantId)
1042+
.build(),
1043+
HttpData.ofUtf8(testData))
1044+
.aggregate()
1045+
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
1046+
1047+
final Map.Entry<Collection<Record<Log>>, CheckpointState> result = testBuffer.read(100);
1048+
List<Record<Log>> records = new ArrayList<>(result.getKey());
1049+
assertEquals(1, records.size());
1050+
assertEquals(tenantId, records.get(0).getData().getMetadata().getAttribute("headers/x-tenant-id"));
1051+
}
1052+
1053+
@Test
1054+
public void testHTTPJsonResponse200WithNoMetadataHeaders() {
1055+
final String testData = "[{\"log\": \"somelog\"}]";
1056+
1057+
HTTPSourceUnderTest.start(testBuffer);
1058+
1059+
WebClient.of().execute(RequestHeaders.builder()
1060+
.scheme(SessionProtocol.HTTP)
1061+
.authority("127.0.0.1:2021")
1062+
.method(HttpMethod.POST)
1063+
.path("/log/ingest")
1064+
.contentType(MediaType.JSON_UTF_8)
1065+
.add("X-Tenant-Id", UUID.randomUUID().toString())
1066+
.build(),
1067+
HttpData.ofUtf8(testData))
1068+
.aggregate()
1069+
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
1070+
1071+
final Map.Entry<Collection<Record<Log>>, CheckpointState> result = testBuffer.read(100);
1072+
List<Record<Log>> records = new ArrayList<>(result.getKey());
1073+
assertEquals(1, records.size());
1074+
assertNull(records.get(0).getData().getMetadata().getAttribute("headers/x-tenant-id"));
1075+
}
1076+
10231077
private void assertCommonFields(Record<Log> record) {
10241078
assertEquals("111111111111", record.getData().get("owner", String.class));
10251079
assertEquals("CloudTrail/logs", record.getData().get("logGroup", String.class));

0 commit comments

Comments
 (0)