Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

@DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class)
Expand All @@ -50,6 +51,8 @@ public class HTTPSource implements Source<Record<Log>> {
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
private ByteDecoder byteDecoder;
private final InputCodec codec;
private final List<String> metadataHeaders;
private final HttpHeaderExtractor httpHeaderExtractor;

@DataPrepperPluginConstructor
public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
Expand All @@ -59,6 +62,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
this.pipelineName = pipelineDescription.getPipelineName();
this.byteDecoder = new JsonDecoder();
this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
this.metadataHeaders = sourceConfig.getMetadataHeaders();
final PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
final PluginSetting authenticationPluginSetting;

Expand All @@ -84,6 +88,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
}
httpHeaderExtractor = new HttpHeaderExtractor(metadataHeaders);
}

@Override
Expand All @@ -94,7 +99,7 @@ public void start(final Buffer<Record<Log>> buffer) {
if (server == null) {
ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig);
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName);
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec);
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec, httpHeaderExtractor);
server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler, logHTTPService);
pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import org.opensearch.dataprepper.http.BaseHttpServerConfig;
import org.opensearch.dataprepper.model.configuration.PluginModel;

import java.util.Collections;
import java.util.List;

public class HTTPSourceConfig extends BaseHttpServerConfig {

static final String DEFAULT_LOG_INGEST_URI = "/log/ingest";
Expand All @@ -27,7 +30,15 @@ public String getDefaultPath() {
@JsonProperty("codec")
private PluginModel codec;

@JsonProperty("metadata_headers")
private List<String> metadataHeaders = Collections.emptyList();

public PluginModel getCodec() {
return codec;
}

public List<String> getMetadataHeaders() {
return metadataHeaders;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.loghttp;

import com.linecorp.armeria.common.AggregatedHttpRequest;

import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class HttpHeaderExtractor {

static final Set<String> SENSITIVE_HEADERS = Set.of(
"authorization",
"proxy-authorization",
"cookie",
"set-cookie",
"www-authenticate",
"proxy-authenticate",
"x-api-key",
"x-csrf-token",
"x-xsrf-token",
"x-auth-token",
"x-amz-security-token",
"x-amz-credential"
);

private final Collection<String> metadataHeaders;

public HttpHeaderExtractor(@Nonnull final Collection<String> metadataHeaders) {
this.metadataHeaders = metadataHeaders;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Require this to be non-null.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

public Map<String, Object> extractHeaders(final AggregatedHttpRequest aggregatedHttpRequest) {
if (metadataHeaders.isEmpty()) {
return Collections.emptyMap();
}

final Set<String> headerNames = metadataHeaders.stream()
.map(String::toLowerCase)
.collect(Collectors.toCollection(LinkedHashSet::new));

final Map<String, Object> headers = new HashMap<>();
for (String headerName : headerNames) {
if (isSensitiveHeader(headerName)) {
continue;
}
List<String> values = aggregatedHttpRequest.headers().getAll(headerName);
if (!values.isEmpty()) {
headers.put(headerName, values.size() == 1 ? values.get(0) : Collections.unmodifiableList(values));
}
}

return headers;
}

static boolean isSensitiveHeader(final String headerName) {
return SENSITIVE_HEADERS.contains(headerName.toLowerCase());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;


/*

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to remove this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it back.

* A HTTP service for log ingestion to be executed by BlockingTaskExecutor.
*/

@Blocking
public class LogHTTPService {
private static final int SERIALIZATION_OVERHEAD = 1024;
Expand All @@ -60,16 +62,19 @@ public class LogHTTPService {
private final Timer requestProcessDuration;
private Integer bufferMaxRequestLength;
private Integer bufferOptimalRequestLength;
private final HttpHeaderExtractor httpHeaderExtractor;

public LogHTTPService(final int bufferWriteTimeoutInMillis,
final Buffer<Record<Log>> buffer,
final PluginMetrics pluginMetrics,
final InputCodec codec) {
final InputCodec codec,
final HttpHeaderExtractor httpHeaderExtractor) {
this.buffer = buffer;
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
this.bufferMaxRequestLength = buffer.getMaxRequestSize().isPresent() ? buffer.getMaxRequestSize().get(): null;
this.bufferOptimalRequestLength = buffer.getOptimalRequestSize().isPresent() ? buffer.getOptimalRequestSize().get(): null;
this.codec = codec;
this.httpHeaderExtractor = httpHeaderExtractor;
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE);
Expand All @@ -78,6 +83,13 @@ public LogHTTPService(final int bufferWriteTimeoutInMillis,
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
}

public LogHTTPService(final int bufferWriteTimeoutInMillis,
final Buffer<Record<Log>> buffer,
final PluginMetrics pluginMetrics,
final InputCodec codec) {
this(bufferWriteTimeoutInMillis, buffer, pluginMetrics, codec, new HttpHeaderExtractor(Collections.emptySet()));
}

@Post
public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
requestsReceivedCounter.increment();
Expand All @@ -92,6 +104,7 @@ public HttpResponse doPost(final ServiceRequestContext serviceRequestContext, fi

HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) throws Exception {
final HttpData content = aggregatedHttpRequest.content();
final Map<String, Object> extractedHeaders = Collections.unmodifiableMap(httpHeaderExtractor.extractHeaders(aggregatedHttpRequest));

if (buffer.isByteBuffer()) {
if (bufferMaxRequestLength != null && bufferOptimalRequestLength != null && content.array().length > bufferOptimalRequestLength) {
Expand Down Expand Up @@ -140,6 +153,12 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t
);
}

if (!extractedHeaders.isEmpty()) {
for (Record<Log> record : records) {
record.getData().getMetadata().setAttribute("headers", extractedHeaders);
}
}

try {
buffer.writeAll(records, bufferWriteTimeoutInMillis);
} catch (Exception e) {
Expand Down Expand Up @@ -171,13 +190,10 @@ private void writeChunkedBody(final String chunk) {
}
}

private Record<Log> buildRecordLog(String json) {

final JacksonLog log = JacksonLog.builder()
private Record<Log> buildRecordLog(final String json) {
final JacksonLog.Builder builder = JacksonLog.builder()
.withData(json)
.getThis()
.build();

return new Record<>(log);
.getThis();
return new Record<>(builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@

package org.opensearch.dataprepper.plugins.source.loghttp;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class HTTPSourceConfigTest {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Test
void testDefault() {
// Prepare
Expand All @@ -21,5 +27,14 @@ void testDefault() {
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath());
assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getDefaultPort());
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getDefaultPath());
assertEquals(sourceConfig.getMetadataHeaders(), Collections.emptyList());
}

@Test
void testSetMetadataHeaders() throws Exception {
final String json = "{\"metadata_headers\": [\"X-Tenant-Id\", \"X-Region\"]}";
final HTTPSourceConfig sourceConfig = OBJECT_MAPPER.readValue(json, HTTPSourceConfig.class);

assertEquals(List.of("X-Tenant-Id", "X-Region"), sourceConfig.getMetadataHeaders());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.Map;
import java.util.Random;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand All @@ -92,6 +93,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -1020,6 +1022,58 @@ public void testHTTPJsonCodec() throws IOException {
assertEquals(testPayloadSize, payloadSizeMax.getValue());
}

@Test
public void testHTTPJsonResponse200WithMetadataHeaders() throws JsonProcessingException {
final String tenantId = UUID.randomUUID().toString();
final String testData = "[{\"log\": \"somelog\"}]";

when(sourceConfig.getMetadataHeaders()).thenReturn(List.of("X-Tenant-Id"));
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);
testBuffer = getBuffer(1, 1);
HTTPSourceUnderTest.start(testBuffer);

WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:2021")
.method(HttpMethod.POST)
.path("/log/ingest")
.contentType(MediaType.JSON_UTF_8)
.add("X-Tenant-Id", tenantId)
.build(),
HttpData.ofUtf8(testData))
.aggregate()
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();

final Map.Entry<Collection<Record<Log>>, CheckpointState> result = testBuffer.read(100);
List<Record<Log>> records = new ArrayList<>(result.getKey());
assertEquals(1, records.size());
assertEquals(tenantId, records.get(0).getData().getMetadata().getAttribute("headers/x-tenant-id"));
}

@Test
public void testHTTPJsonResponse200WithNoMetadataHeaders() {
final String testData = "[{\"log\": \"somelog\"}]";

HTTPSourceUnderTest.start(testBuffer);

WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:2021")
.method(HttpMethod.POST)
.path("/log/ingest")
.contentType(MediaType.JSON_UTF_8)
.add("X-Tenant-Id", UUID.randomUUID().toString())
.build(),
HttpData.ofUtf8(testData))
.aggregate()
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();

final Map.Entry<Collection<Record<Log>>, CheckpointState> result = testBuffer.read(100);
List<Record<Log>> records = new ArrayList<>(result.getKey());
assertEquals(1, records.size());
assertNull(records.get(0).getData().getMetadata().getAttribute("headers/x-tenant-id"));
}

private void assertCommonFields(Record<Log> record) {
assertEquals("111111111111", record.getData().get("owner", String.class));
assertEquals("CloudTrail/logs", record.getData().get("logGroup", String.class));
Expand Down
Loading
Loading