Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/http-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
testImplementation 'org.assertj:assertj-core:3.27.0'
testImplementation project(':data-prepper-api').sourceSets.test.output
testImplementation project(':data-prepper-test-common')
testImplementation project(':data-prepper-plugins:parse-json-processor')
}

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setUp() throws IOException {
when(buffer.getOptimalRequestSize()).thenReturn(Optional.of(256 * 1024));

serviceRequestContext = mock(ServiceRequestContext.class);
logHTTPService = new LogHTTPService((int) Duration.ofSeconds(10).toMillis(), buffer, PluginMetrics.fromPrefix("testing"));
logHTTPService = new LogHTTPService((int) Duration.ofSeconds(10).toMillis(), buffer, PluginMetrics.fromPrefix("testing"), null);

requestHeaders = RequestHeaders.builder()
.method(HttpMethod.POST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class HTTPSource implements Source<Record<Log>> {
private final PluginMetrics pluginMetrics;
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
private ByteDecoder byteDecoder;
private final InputCodec codec;

@DataPrepperPluginConstructor
public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
Expand Down Expand Up @@ -75,6 +77,13 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
authenticationPluginSetting.setPipelineName(pipelineName);
authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting);
httpRequestExceptionHandler = new HttpRequestExceptionHandler(pluginMetrics);
final PluginModel codecConfiguration = sourceConfig.getCodec();
if (codecConfiguration == null) {
codec = null;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we initialize the json codec here itself.

@oeyh oeyh May 20, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow here. Can you clarify a bit more?

Do you mean you want it to be a JsonInputCodec and disallow other type of input codecs? If so, we can add a validation on the sourceConfig to prevent the use of other codecs that don't fit here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync'ed up and resolved offline.

} else {
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
}
}

@Override
Expand All @@ -85,7 +94,7 @@ public void start(final Buffer<Record<Log>> buffer) {
if (server == null) {
ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig);
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName);
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics);
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec);
server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler, logHTTPService);
pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.dataprepper.plugins.source.loghttp;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.http.BaseHttpServerConfig;
import org.opensearch.dataprepper.model.configuration.PluginModel;

public class HTTPSourceConfig extends BaseHttpServerConfig {

Expand All @@ -21,4 +23,11 @@ public int getDefaultPort() {
public String getDefaultPath() {
return DEFAULT_LOG_INGEST_URI;
}

@JsonProperty("codec")
private PluginModel codec;

public PluginModel getCodec() {
return codec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,28 @@

package org.opensearch.dataprepper.plugins.source.loghttp;

import com.linecorp.armeria.server.ServiceRequestContext;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.log.Log;
import org.opensearch.dataprepper.model.record.Record;
import com.linecorp.armeria.common.AggregatedHttpRequest;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.Blocking;
import com.linecorp.armeria.server.annotation.Post;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.http.codec.JsonCodec;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.log.Log;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
Expand All @@ -48,6 +50,7 @@ public class LogHTTPService {
// TODO: support other data-types as request body, e.g. json_lines, msgpack
private final JsonCodec jsonCodec = new JsonCodec();
private final Buffer<Record<Log>> buffer;
private final InputCodec codec;
private final int bufferWriteTimeoutInMillis;
private final Counter requestsReceivedCounter;
private final Counter successRequestsCounter;
Expand All @@ -60,11 +63,13 @@ public class LogHTTPService {

public LogHTTPService(final int bufferWriteTimeoutInMillis,
final Buffer<Record<Log>> buffer,
final PluginMetrics pluginMetrics) {
final PluginMetrics pluginMetrics,
final InputCodec codec) {
this.buffer = buffer;
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
this.bufferMaxRequestLength = buffer.getMaxRequestSize().isPresent() ? buffer.getMaxRequestSize().get(): null;
this.bufferOptimalRequestLength = buffer.getOptimalRequestSize().isPresent() ? buffer.getOptimalRequestSize().get(): null;
this.codec = codec;
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE);
Expand Down Expand Up @@ -108,17 +113,32 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t
}
} else {
final List<String> jsonList;
final List<Record<Log>> records = new ArrayList<>();

try {
jsonList = jsonCodec.parse(content);
} catch (IOException e) {
LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage());
throw new IOException("Bad request data format. Needs to be json array.", e.getCause());
}
if (codec != null) {
try {
codec.parse(content.toInputStream(), record -> {
records.add(new Record<>((Log) record.getData()));
});
} catch (IOException e) {
LOG.error("Failed to parse the request of size {} using specified input codec {} due to: {}", content.length(), codec.getClass(), e.getMessage());
throw new IOException("Bad request data format. ", e.getCause());
}
} else {

try {
jsonList = jsonCodec.parse(content);
} catch (IOException e) {
LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage());
throw new IOException("Bad request data format. Needs to be json array.", e.getCause());
}

final List<Record<Log>> records = jsonList.stream()
.map(this::buildRecordLog)
.collect(Collectors.toList());
records.addAll(
jsonList.stream()
.map(this::buildRecordLog)
.collect(Collectors.toList())
);
}

try {
buffer.writeAll(records, bufferWriteTimeoutInMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.codec.InputCodec;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand All @@ -56,6 +57,8 @@
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
import org.opensearch.dataprepper.plugins.certificate.model.Certificate;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodec;
import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig;

import java.io.ByteArrayOutputStream;
import java.io.File;
Expand Down Expand Up @@ -115,6 +118,7 @@ class HTTPSourceTest {
private final String TEST_PIPELINE_NAME = "test_pipeline";
private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile();
private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile();
private static final String CLOUDWATCH_LOGS_SAMPLE = "/cloudwatch-logs-sample.json";

@Mock
private ServerBuilder serverBuilder;
Expand Down Expand Up @@ -152,10 +156,10 @@ class HTTPSourceTest {
private PluginMetrics pluginMetrics;
private PluginFactory pluginFactory;

private BlockingBuffer<Record<Log>> getBuffer() throws JsonProcessingException {
private BlockingBuffer<Record<Log>> getBuffer(final int bufferSize, final int batchSize) throws JsonProcessingException {
final HashMap<String, Object> integerHashMap = new HashMap<>();
integerHashMap.put("buffer_size", 1);
integerHashMap.put("batch_size", 1);
integerHashMap.put("buffer_size", bufferSize);
integerHashMap.put("batch_size", batchSize);
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(integerHashMap);
BlockingBufferConfig blockingBufferConfig = objectMapper.readValue(json, BlockingBufferConfig.class);
Expand Down Expand Up @@ -233,7 +237,7 @@ public void setUp() throws JsonProcessingException {
when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class)))
.thenReturn(authenticationProvider);

testBuffer = getBuffer();
testBuffer = getBuffer(1, 1);
when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);
}
Expand Down Expand Up @@ -710,7 +714,7 @@ void testHTTPSJsonResponse() throws JsonProcessingException {
when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE);
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);

testBuffer = getBuffer();
testBuffer = getBuffer(1, 1);
HTTPSourceUnderTest.start(testBuffer);

WebClient.builder().factory(ClientFactory.insecure()).build().execute(RequestHeaders.builder()
Expand Down Expand Up @@ -740,7 +744,7 @@ void testHTTPRequestWhenSSLRequiredNoResponse() throws JsonProcessingException {
when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE);
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);

testBuffer = getBuffer();
testBuffer = getBuffer(1, 1);
HTTPSourceUnderTest.start(testBuffer);

CompletableFuture<AggregatedHttpResponse> future = WebClient.builder()
Expand Down Expand Up @@ -777,7 +781,7 @@ void testHTTPSJsonResponse_with_custom_path_along_with_placeholder() throws Json
when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE);
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);

testBuffer = getBuffer();
testBuffer = getBuffer(1, 1);
HTTPSourceUnderTest.start(testBuffer);

final String path = "/" + TEST_PIPELINE_NAME + "/test";
Expand Down Expand Up @@ -935,4 +939,90 @@ public void request_that_exceeds_maxRequestLength_returns_413() {
assertTrue(testBuffer.isEmpty());
}

@Test
public void testHTTPJsonCodec() throws IOException {
// Prepare
final String testData;
try (InputStream inputStream = this.getClass().getResourceAsStream(CLOUDWATCH_LOGS_SAMPLE)) {
testData = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
}
final int testPayloadSize = testData.getBytes().length;

when(sourceConfig.getCodec()).thenReturn(mock(PluginModel.class));
when(sourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(2048000));

ObjectMapper mapper = new ObjectMapper();
final String configString = "{\"key_name\": \"logEvents\",\"include_keys\":[\"owner\",\"logGroup\",\"logStream\"]}";
final JsonInputCodecConfig codecConfig = mapper.readValue(configString, JsonInputCodecConfig.class);

final InputCodec codec = new JsonInputCodec(codecConfig);

when(pluginFactory.loadPlugin(eq(InputCodec.class), any(PluginSetting.class))).thenReturn(codec);

HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);
testBuffer = getBuffer(5, 5);
HTTPSourceUnderTest.start(testBuffer);
refreshMeasurements();

// When
WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:2021")
.method(HttpMethod.POST)
.path("/log/ingest")
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.ofUtf8(testData))
.aggregate()
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();

// Then
assertFalse(testBuffer.isEmpty());

final Map.Entry<Collection<Record<Log>>, CheckpointState> result = testBuffer.read(100);
List<Record<Log>> records = new ArrayList<>(result.getKey());
assertEquals(3, records.size());

// Verify content
final Record<Log> record = records.get(0);
assertCommonFields(record);
assertEquals("31953106606966983378809025079804211143289615424298221568", record.getData().get("id", String.class));
assertEquals(1432826855000L, record.getData().get("timestamp", Long.class));
assertEquals("{\"eventVersion\":\"1.01\",\"userIdentity\":{\"type\":\"Root\"}", record.getData().get("message", String.class));

final Record<Log> record2 = records.get(1);
assertCommonFields(record2);
assertEquals("31953106606966983378809025079804211143289615424298221569", record2.getData().get("id", String.class));
assertEquals(1432826855001L, record2.getData().get("timestamp", Long.class));
assertEquals("{\"eventVersion\":\"1.02\",\"userIdentity\":{\"type\":\"Root\"}", record2.getData().get("message", String.class));

final Record<Log> record3 = records.get(2);
assertCommonFields(record3);
assertEquals("31953106606966983378809025079804211143289615424298221570", record3.getData().get("id", String.class));
assertEquals(1432826855002L, record3.getData().get("timestamp", Long.class));
assertEquals("{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}", record3.getData().get("message", String.class));

// Verify metrics
final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList(
requestsReceivedMeasurements, Statistic.COUNT);
assertEquals(1.0, requestReceivedCount.getValue());
final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList(
successRequestsMeasurements, Statistic.COUNT);
assertEquals(1.0, successRequestsCount.getValue());
final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList(
requestProcessDurationMeasurements, Statistic.COUNT);
assertEquals(1.0, requestProcessDurationCount.getValue());
final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList(
requestProcessDurationMeasurements, Statistic.MAX);
assertTrue(requestProcessDurationMax.getValue() > 0);
final Measurement payloadSizeMax = MetricsTestUtil.getMeasurementFromList(
payloadSizeSummaryMeasurements, Statistic.MAX);
assertEquals(testPayloadSize, payloadSizeMax.getValue());
}

private void assertCommonFields(Record<Log> record) {
assertEquals("111111111111", record.getData().get("owner", String.class));
assertEquals("CloudTrail/logs", record.getData().get("logGroup", String.class));
assertEquals("111111111111_CloudTrail/logs_us-east-1", record.getData().get("logStream", String.class));
}
}
Loading
Loading