Skip to content

Commit ae56f0e

Browse files
committed
Passing http request headers as metadata in the event for http source
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent 2f8c836 commit ae56f0e

6 files changed

Lines changed: 333 additions & 14 deletions

File tree

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import java.util.Collections;
33+
import java.util.List;
3334
import java.util.concurrent.ExecutionException;
3435

3536
@DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class)
@@ -50,6 +51,7 @@ public class HTTPSource implements Source<Record<Log>> {
5051
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
5152
private ByteDecoder byteDecoder;
5253
private final InputCodec codec;
54+
private final List<String> metadataHeaders;
5355

5456
@DataPrepperPluginConstructor
5557
public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
@@ -59,6 +61,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
5961
this.pipelineName = pipelineDescription.getPipelineName();
6062
this.byteDecoder = new JsonDecoder();
6163
this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
64+
this.metadataHeaders = sourceConfig.getMetadataHeaders();
6265
final PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
6366
final PluginSetting authenticationPluginSetting;
6467

@@ -94,7 +97,7 @@ public void start(final Buffer<Record<Log>> buffer) {
9497
if (server == null) {
9598
ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig);
9699
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName);
97-
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec);
100+
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec, metadataHeaders);
98101
server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler, logHTTPService);
99102
pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
100103
}

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import org.opensearch.dataprepper.http.BaseHttpServerConfig;
1010
import org.opensearch.dataprepper.model.configuration.PluginModel;
1111

12+
import java.util.List;
13+
1214
public class HTTPSourceConfig extends BaseHttpServerConfig {
1315

1416
static final String DEFAULT_LOG_INGEST_URI = "/log/ingest";
@@ -27,7 +29,14 @@ public String getDefaultPath() {
2729
@JsonProperty("codec")
2830
private PluginModel codec;
2931

32+
@JsonProperty("metadata_headers")
33+
private List<String> metadataHeaders;
34+
3035
public PluginModel getCodec() {
3136
return codec;
3237
}
38+
39+
public List<String> getMetadataHeaders() {
40+
return metadataHeaders;
41+
}
3342
}

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@
2727

2828
import java.io.IOException;
2929
import java.util.ArrayList;
30+
import java.util.Collections;
31+
import java.util.HashMap;
32+
import java.util.LinkedHashSet;
3033
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Set;
3136
import java.util.UUID;
3237
import java.util.stream.Collectors;
3338

34-
35-
/*
36-
* A HTTP service for log ingestion to be executed by BlockingTaskExecutor.
37-
*/
3839
@Blocking
3940
public class LogHTTPService {
4041
private static final int SERIALIZATION_OVERHEAD = 1024;
@@ -45,6 +46,21 @@ public class LogHTTPService {
4546
public static final String PAYLOAD_SIZE = "payloadSize";
4647
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";
4748

49+
static final Set<String> SENSITIVE_HEADERS = Set.of(
50+
"authorization",
51+
"proxy-authorization",
52+
"cookie",
53+
"set-cookie",
54+
"www-authenticate",
55+
"proxy-authenticate",
56+
"x-api-key",
57+
"x-csrf-token",
58+
"x-xsrf-token",
59+
"x-auth-token",
60+
"x-amz-security-token",
61+
"x-amz-credential"
62+
);
63+
4864
private static final Logger LOG = LoggerFactory.getLogger(LogHTTPService.class);
4965

5066
// TODO: support other data-types as request body, e.g. json_lines, msgpack
@@ -60,16 +76,19 @@ public class LogHTTPService {
6076
private final Timer requestProcessDuration;
6177
private Integer bufferMaxRequestLength;
6278
private Integer bufferOptimalRequestLength;
79+
private final List<String> metadataHeaders;
6380

6481
public LogHTTPService(final int bufferWriteTimeoutInMillis,
6582
final Buffer<Record<Log>> buffer,
6683
final PluginMetrics pluginMetrics,
67-
final InputCodec codec) {
84+
final InputCodec codec,
85+
final List<String> metadataHeaders) {
6886
this.buffer = buffer;
6987
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
7088
this.bufferMaxRequestLength = buffer.getMaxRequestSize().isPresent() ? buffer.getMaxRequestSize().get(): null;
7189
this.bufferOptimalRequestLength = buffer.getOptimalRequestSize().isPresent() ? buffer.getOptimalRequestSize().get(): null;
7290
this.codec = codec;
91+
this.metadataHeaders = metadataHeaders;
7392
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
7493
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
7594
requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE);
@@ -78,6 +97,13 @@ public LogHTTPService(final int bufferWriteTimeoutInMillis,
7897
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
7998
}
8099

100+
public LogHTTPService(final int bufferWriteTimeoutInMillis,
101+
final Buffer<Record<Log>> buffer,
102+
final PluginMetrics pluginMetrics,
103+
final InputCodec codec) {
104+
this(bufferWriteTimeoutInMillis, buffer, pluginMetrics, codec, null);
105+
}
106+
81107
@Post
82108
public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
83109
requestsReceivedCounter.increment();
@@ -92,6 +118,7 @@ public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, fi
92118

93119
HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
94120
final HttpData content = aggregatedHttpRequest.content();
121+
final Map<String, Object> extractedHeaders = extractHeaders(aggregatedHttpRequest);
95122

96123
if (buffer.isByteBuffer()) {
97124
if (bufferMaxRequestLength != null && bufferOptimalRequestLength != null && content.array().length > bufferOptimalRequestLength) {
@@ -124,6 +151,13 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t
124151
LOG.error("Failed to parse the request of size {} using specified input codec {} due to: {}", content.length(), codec.getClass(), e.getMessage());
125152
throw new IOException("Bad request data format. ", e.getCause());
126153
}
154+
if (!extractedHeaders.isEmpty()) {
155+
for (Record<Log> record : records) {
156+
for (Map.Entry<String, Object> entry : extractedHeaders.entrySet()) {
157+
record.getData().getMetadata().setAttribute(entry.getKey(), entry.getValue());
158+
}
159+
}
160+
}
127161
} else {
128162

129163
try {
@@ -135,7 +169,7 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t
135169

136170
records.addAll(
137171
jsonList.stream()
138-
.map(this::buildRecordLog)
172+
.map(json -> buildRecordLog(json, extractedHeaders))
139173
.collect(Collectors.toList())
140174
);
141175
}
@@ -152,6 +186,35 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t
152186
return HttpResponse.of(HttpStatus.OK);
153187
}
154188

189+
private Map<String, Object> extractHeaders(final AggregatedHttpRequest aggregatedHttpRequest) {
190+
if (metadataHeaders == null || metadataHeaders.isEmpty()) {
191+
return Collections.emptyMap();
192+
}
193+
194+
final boolean includeAll = metadataHeaders.size() == 1 && "*".equals(metadataHeaders.get(0));
195+
final Set<String> headerNames = includeAll
196+
? aggregatedHttpRequest.headers().names().stream().map(Object::toString).collect(Collectors.toSet())
197+
: metadataHeaders.stream().map(String::toLowerCase).collect(Collectors.toCollection(LinkedHashSet::new));
198+
199+
final Map<String, Object> headers = new HashMap<>();
200+
for (String headerName : headerNames) {
201+
if (isSensitiveHeader(headerName)) {
202+
LOG.warn("Skipping sensitive header '{}' from metadata_headers configuration", headerName);
203+
continue;
204+
}
205+
List<String> values = aggregatedHttpRequest.headers().getAll(headerName);
206+
if (!values.isEmpty()) {
207+
headers.put(headerName, values.size() == 1 ? values.get(0) : new ArrayList<>(values));
208+
}
209+
}
210+
211+
return headers;
212+
}
213+
214+
static boolean isSensitiveHeader(final String headerName) {
215+
return SENSITIVE_HEADERS.contains(headerName.toLowerCase());
216+
}
217+
155218
private void writeChunkedBody(final String chunk) {
156219
final byte[] chunkBytes = chunk.getBytes();
157220

@@ -171,13 +234,13 @@ private void writeChunkedBody(final String chunk) {
171234
}
172235
}
173236

174-
private Record<Log> buildRecordLog(String json) {
175-
176-
final JacksonLog log = JacksonLog.builder()
237+
private Record<Log> buildRecordLog(final String json, final Map<String, Object> headerAttributes) {
238+
final JacksonLog.Builder builder = JacksonLog.builder()
177239
.withData(json)
178-
.getThis()
179-
.build();
180-
181-
return new Record<>(log);
240+
.getThis();
241+
if (!headerAttributes.isEmpty()) {
242+
builder.withEventMetadataAttributes(headerAttributes);
243+
}
244+
return new Record<>(builder.build());
182245
}
183246
}

data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@
55

66
package org.opensearch.dataprepper.plugins.source.loghttp;
77

8+
import com.fasterxml.jackson.databind.ObjectMapper;
89
import org.junit.jupiter.api.Test;
910

11+
import java.util.List;
12+
1013
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertNull;
1115

1216
public class HTTPSourceConfigTest {
1317

18+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
19+
1420
@Test
1521
void testDefault() {
1622
// Prepare
@@ -21,5 +27,14 @@ void testDefault() {
2127
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath());
2228
assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getDefaultPort());
2329
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getDefaultPath());
30+
assertNull(sourceConfig.getMetadataHeaders());
31+
}
32+
33+
@Test
34+
void testSetMetadataHeaders() throws Exception {
35+
final String json = "{\"metadata_headers\": [\"X-Tenant-Id\", \"X-Region\"]}";
36+
final HTTPSourceConfig sourceConfig = OBJECT_MAPPER.readValue(json, HTTPSourceConfig.class);
37+
38+
assertEquals(List.of("X-Tenant-Id", "X-Region"), sourceConfig.getMetadataHeaders());
2439
}
2540
}

data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.Map;
7676
import java.util.Random;
7777
import java.util.StringJoiner;
78+
import java.util.UUID;
7879
import java.util.concurrent.CompletableFuture;
7980
import java.util.concurrent.CompletionException;
8081
import java.util.concurrent.ExecutionException;
@@ -92,6 +93,7 @@
9293
import static org.junit.jupiter.api.Assertions.assertEquals;
9394
import static org.junit.jupiter.api.Assertions.assertFalse;
9495
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
96+
import static org.junit.jupiter.api.Assertions.assertNull;
9597
import static org.junit.jupiter.api.Assertions.assertThrows;
9698
import static org.junit.jupiter.api.Assertions.assertTrue;
9799
import static org.mockito.ArgumentMatchers.any;
@@ -1020,6 +1022,58 @@ public void testHTTPJsonCodec() throws IOException {
10201022
assertEquals(testPayloadSize, payloadSizeMax.getValue());
10211023
}
10221024

1025+
@Test
1026+
public void testHTTPJsonResponse200WithMetadataHeaders() throws JsonProcessingException {
1027+
final String tenantId = UUID.randomUUID().toString();
1028+
final String testData = "[{\"log\": \"somelog\"}]";
1029+
1030+
when(sourceConfig.getMetadataHeaders()).thenReturn(List.of("X-Tenant-Id"));
1031+
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);
1032+
testBuffer = getBuffer(1, 1);
1033+
HTTPSourceUnderTest.start(testBuffer);
1034+
1035+
WebClient.of().execute(RequestHeaders.builder()
1036+
.scheme(SessionProtocol.HTTP)
1037+
.authority("127.0.0.1:2021")
1038+
.method(HttpMethod.POST)
1039+
.path("/log/ingest")
1040+
.contentType(MediaType.JSON_UTF_8)
1041+
.add("X-Tenant-Id", tenantId)
1042+
.build(),
1043+
HttpData.ofUtf8(testData))
1044+
.aggregate()
1045+
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
1046+
1047+
final Map.Entry<Collection<Record<Log>>, CheckpointState> result = testBuffer.read(100);
1048+
List<Record<Log>> records = new ArrayList<>(result.getKey());
1049+
assertEquals(1, records.size());
1050+
assertEquals(tenantId, records.get(0).getData().getMetadata().getAttribute("x-tenant-id"));
1051+
}
1052+
1053+
@Test
1054+
public void testHTTPJsonResponse200WithNoMetadataHeaders() {
1055+
final String testData = "[{\"log\": \"somelog\"}]";
1056+
1057+
HTTPSourceUnderTest.start(testBuffer);
1058+
1059+
WebClient.of().execute(RequestHeaders.builder()
1060+
.scheme(SessionProtocol.HTTP)
1061+
.authority("127.0.0.1:2021")
1062+
.method(HttpMethod.POST)
1063+
.path("/log/ingest")
1064+
.contentType(MediaType.JSON_UTF_8)
1065+
.add("X-Tenant-Id", UUID.randomUUID().toString())
1066+
.build(),
1067+
HttpData.ofUtf8(testData))
1068+
.aggregate()
1069+
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
1070+
1071+
final Map.Entry<Collection<Record<Log>>, CheckpointState> result = testBuffer.read(100);
1072+
List<Record<Log>> records = new ArrayList<>(result.getKey());
1073+
assertEquals(1, records.size());
1074+
assertNull(records.get(0).getData().getMetadata().getAttribute("x-tenant-id"));
1075+
}
1076+
10231077
private void assertCommonFields(Record<Log> record) {
10241078
assertEquals("111111111111", record.getData().get("owner", String.class));
10251079
assertEquals("CloudTrail/logs", record.getData().get("logGroup", String.class));

0 commit comments

Comments
 (0)