From fe544ad7933cf2c8ac39bb7bb22e2a2b364ce68a Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Wed, 14 May 2025 13:34:18 -0500 Subject: [PATCH 1/2] Add codec to http source Signed-off-by: Hai Yan --- .../source/loghttp/LogHTTPServiceMeasure.java | 2 +- .../plugins/source/loghttp/HTTPSource.java | 11 +++- .../source/loghttp/HTTPSourceConfig.java | 9 +++ .../source/loghttp/LogHTTPService.java | 52 +++++++++++----- .../source/loghttp/LogHTTPServiceTest.java | 60 +++++++++++++------ 5 files changed, 98 insertions(+), 36 deletions(-) diff --git a/data-prepper-plugins/http-source/src/jmh/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceMeasure.java b/data-prepper-plugins/http-source/src/jmh/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceMeasure.java index 40d5b7c3e2..6fcea29ae5 100644 --- a/data-prepper-plugins/http-source/src/jmh/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceMeasure.java +++ b/data-prepper-plugins/http-source/src/jmh/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceMeasure.java @@ -51,7 +51,7 @@ public void setUp() throws IOException { when(buffer.getOptimalRequestSize()).thenReturn(Optional.of(256 * 1024)); serviceRequestContext = mock(ServiceRequestContext.class); - logHTTPService = new LogHTTPService((int) Duration.ofSeconds(10).toMillis(), buffer, PluginMetrics.fromPrefix("testing")); + logHTTPService = new LogHTTPService((int) Duration.ofSeconds(10).toMillis(), buffer, PluginMetrics.fromPrefix("testing"), null); requestHeaders = RequestHeaders.builder() .method(HttpMethod.POST) diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java index 737a8046d7..31769782d2 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.codec.ByteDecoder; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.codec.JsonDecoder; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginModel; @@ -48,6 +49,7 @@ public class HTTPSource implements Source> { private final PluginMetrics pluginMetrics; private static final String HTTP_HEALTH_CHECK_PATH = "/health"; private ByteDecoder byteDecoder; + private final InputCodec codec; @DataPrepperPluginConstructor public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory, @@ -75,6 +77,13 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi authenticationPluginSetting.setPipelineName(pipelineName); authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting); httpRequestExceptionHandler = new HttpRequestExceptionHandler(pluginMetrics); + final PluginModel codecConfiguration = sourceConfig.getCodec(); + if (codecConfiguration == null) { + codec = null; + } else { + final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); + } } @Override @@ -85,7 +94,7 @@ public void start(final Buffer> buffer) { if (server == null) { ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig); CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName); - final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics); + final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec); server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler, logHTTPService); pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections); } diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java index 9ab52afce0..fc83d59a2b 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java @@ -5,7 +5,9 @@ package org.opensearch.dataprepper.plugins.source.loghttp; +import com.fasterxml.jackson.annotation.JsonProperty; import org.opensearch.dataprepper.http.BaseHttpServerConfig; +import org.opensearch.dataprepper.model.configuration.PluginModel; public class HTTPSourceConfig extends BaseHttpServerConfig { @@ -21,4 +23,11 @@ public int getDefaultPort() { public String getDefaultPath() { return DEFAULT_LOG_INGEST_URI; } + + @JsonProperty("codec") + private PluginModel codec; + + public PluginModel getCodec() { + return codec; + } } diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java index c2bd344fc5..4125bdb1da 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java @@ -5,26 +5,28 @@ package org.opensearch.dataprepper.plugins.source.loghttp; -import com.linecorp.armeria.server.ServiceRequestContext; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.log.JacksonLog; -import org.opensearch.dataprepper.model.log.Log; -import org.opensearch.dataprepper.model.record.Record; import com.linecorp.armeria.common.AggregatedHttpRequest; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.annotation.Blocking; import com.linecorp.armeria.server.annotation.Post; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.http.codec.JsonCodec; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.log.JacksonLog; +import org.opensearch.dataprepper.model.log.Log; +import org.opensearch.dataprepper.model.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; @@ -48,6 +50,7 @@ public class LogHTTPService { // TODO: support other data-types as request body, e.g. json_lines, msgpack private final JsonCodec jsonCodec = new JsonCodec(); private final Buffer> buffer; + private final InputCodec codec; private final int bufferWriteTimeoutInMillis; private final Counter requestsReceivedCounter; private final Counter successRequestsCounter; @@ -60,11 +63,13 @@ public class LogHTTPService { public LogHTTPService(final int bufferWriteTimeoutInMillis, final Buffer> buffer, - final PluginMetrics pluginMetrics) { + final PluginMetrics pluginMetrics, + final InputCodec codec) { this.buffer = buffer; this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis; this.bufferMaxRequestLength = buffer.getMaxRequestSize().isPresent() ? buffer.getMaxRequestSize().get(): null; this.bufferOptimalRequestLength = buffer.getOptimalRequestSize().isPresent() ? buffer.getOptimalRequestSize().get(): null; + this.codec = codec; requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED); successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS); requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE); @@ -108,17 +113,32 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t } } else { final List jsonList; + final List> records = new ArrayList<>(); - try { - jsonList = jsonCodec.parse(content); - } catch (IOException e) { - LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage()); - throw new IOException("Bad request data format. Needs to be json array.", e.getCause()); - } + if (codec != null) { + try { + codec.parse(content.toInputStream(), record -> { + records.add(new Record<>((Log) record.getData())); + }); + } catch (IOException e) { + LOG.error("Failed to parse the request of size {} using specified input codec {} due to: {}", content.length(), codec.getClass(), e.getMessage()); + throw new IOException("Bad request data format. ", e.getCause()); + } + } else { + + try { + jsonList = jsonCodec.parse(content); + } catch (IOException e) { + LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage()); + throw new IOException("Bad request data format. Needs to be json array.", e.getCause()); + } - final List> records = jsonList.stream() - .map(this::buildRecordLog) - .collect(Collectors.toList()); + records.addAll( + jsonList.stream() + .map(this::buildRecordLog) + .collect(Collectors.toList()) + ); + } try { buffer.writeAll(records, bufferWriteTimeoutInMillis); diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java index 5e2ffe2c7d..4c9e9691c8 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java @@ -5,13 +5,6 @@ package org.opensearch.dataprepper.plugins.source.loghttp; -import com.linecorp.armeria.server.ServiceRequestContext; -import org.junit.jupiter.api.Nested; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.buffer.SizeOverflowException; -import org.opensearch.dataprepper.model.log.Log; -import org.opensearch.dataprepper.model.record.Record; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.linecorp.armeria.common.AggregatedHttpRequest; @@ -23,10 +16,12 @@ import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.server.ServiceRequestContext; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; @@ -34,9 +29,18 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.buffer.SizeOverflowException; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.log.Log; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -46,25 +50,22 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.nio.charset.StandardCharsets; -import java.io.ByteArrayInputStream; -import java.io.InputStream; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.mock; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.isNull; @ExtendWith(MockitoExtension.class) class LogHTTPServiceTest { @@ -95,6 +96,9 @@ class LogHTTPServiceTest { @Mock private Buffer> byteBuffer; + @Mock + private InputCodec codec; + @BeforeEach public void setUp() throws Exception { when(pluginMetrics.counter(LogHTTPService.REQUESTS_RECEIVED)).thenReturn(requestsReceivedCounter); @@ -112,7 +116,7 @@ public void setUp() throws Exception { ); Buffer> blockingBuffer = new BlockingBuffer<>(TEST_BUFFER_CAPACITY, 8, "test-pipeline"); - logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics); + logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics, null); } @Test @@ -150,6 +154,26 @@ public void testHTTPRequestBadRequest() throws Exception { verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.>any()); } + @Test + public void testHTTPRequestSuccessWithCodec() throws Exception { + // Prepare + Buffer> blockingBuffer = new BlockingBuffer<>(TEST_BUFFER_CAPACITY, 8, "test-pipeline"); + logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics, codec); + AggregatedHttpRequest testRequest = generateRandomValidHTTPRequest(2); + + // When + AggregatedHttpResponse postResponse = logHTTPService.doPost(serviceRequestContext, testRequest).aggregate().get(); + + // Then + assertEquals(HttpStatus.OK, postResponse.status()); + verify(requestsReceivedCounter, times(1)).increment(); + verify(successRequestsCounter, times(1)).increment(); + final ArgumentCaptor payloadLengthCaptor = ArgumentCaptor.forClass(Double.class); + verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture()); + assertEquals(testRequest.content().length(), Math.round(payloadLengthCaptor.getValue())); + verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.>any()); + } + @Test public void testHTTPRequestEntityTooLarge() throws Exception { // Prepare @@ -204,7 +228,7 @@ void setUp() { aggregatedHttpRequest = mock(AggregatedHttpRequest.class); httpData = mock(HttpData.class); - logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, byteBuffer, pluginMetrics); + logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, byteBuffer, pluginMetrics, codec); StringBuilder sb = new StringBuilder(1024 * 1024 + 10240); for (int i = 0; i < 12500; i++) { @@ -303,7 +327,7 @@ void setUp() { aggregatedHttpRequest = mock(AggregatedHttpRequest.class); httpData = mock(HttpData.class); - logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, byteBuffer, pluginMetrics); + logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, byteBuffer, pluginMetrics, codec); StringBuilder sb = new StringBuilder(1024 * 1024 + 10240); for (int i = 0; i < 12500; i++) { From 719c8bd1ef9cff82b0da96bf4175cd751b30e062 Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Tue, 20 May 2025 12:24:58 -0500 Subject: [PATCH 2/2] Add a test with sample data Signed-off-by: Hai Yan --- data-prepper-plugins/http-source/build.gradle | 1 + .../source/loghttp/HTTPSourceTest.java | 104 ++++++++++++++++-- .../resources/cloudwatch-logs-sample.json | 26 +++++ 3 files changed, 124 insertions(+), 7 deletions(-) create mode 100644 data-prepper-plugins/http-source/src/test/resources/cloudwatch-logs-sample.json diff --git a/data-prepper-plugins/http-source/build.gradle b/data-prepper-plugins/http-source/build.gradle index 5c6173b0ab..cc6e7970cb 100644 --- a/data-prepper-plugins/http-source/build.gradle +++ b/data-prepper-plugins/http-source/build.gradle @@ -23,6 +23,7 @@ dependencies { testImplementation 'org.assertj:assertj-core:3.27.0' testImplementation project(':data-prepper-api').sourceSets.test.output testImplementation project(':data-prepper-test-common') + testImplementation project(':data-prepper-plugins:parse-json-processor') } jacocoTestCoverageVerification { diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java index ef0c22c771..7f073d2311 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java @@ -43,6 +43,7 @@ import org.opensearch.dataprepper.metrics.MetricsTestUtil; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.model.codec.InputCodec; import org.opensearch.dataprepper.model.configuration.PipelineDescription; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -56,6 +57,8 @@ import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; import org.opensearch.dataprepper.plugins.certificate.model.Certificate; import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodec; +import org.opensearch.dataprepper.plugins.codec.json.JsonInputCodecConfig; import java.io.ByteArrayOutputStream; import java.io.File; @@ -115,6 +118,7 @@ class HTTPSourceTest { private final String TEST_PIPELINE_NAME = "test_pipeline"; private final String TEST_SSL_CERTIFICATE_FILE = getClass().getClassLoader().getResource("test_cert.crt").getFile(); private final String TEST_SSL_KEY_FILE = getClass().getClassLoader().getResource("test_decrypted_key.key").getFile(); + private static final String CLOUDWATCH_LOGS_SAMPLE = "/cloudwatch-logs-sample.json"; @Mock private ServerBuilder serverBuilder; @@ -152,10 +156,10 @@ class HTTPSourceTest { private PluginMetrics pluginMetrics; private PluginFactory pluginFactory; - private BlockingBuffer> getBuffer() throws JsonProcessingException { + private BlockingBuffer> getBuffer(final int bufferSize, final int batchSize) throws JsonProcessingException { final HashMap integerHashMap = new HashMap<>(); - integerHashMap.put("buffer_size", 1); - integerHashMap.put("batch_size", 1); + integerHashMap.put("buffer_size", bufferSize); + integerHashMap.put("batch_size", batchSize); ObjectMapper objectMapper = new ObjectMapper(); String json = objectMapper.writeValueAsString(integerHashMap); BlockingBufferConfig blockingBufferConfig = objectMapper.readValue(json, BlockingBufferConfig.class); @@ -233,7 +237,7 @@ public void setUp() throws JsonProcessingException { when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class))) .thenReturn(authenticationProvider); - testBuffer = getBuffer(); + testBuffer = getBuffer(1, 1); when(pipelineDescription.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); } @@ -710,7 +714,7 @@ void testHTTPSJsonResponse() throws JsonProcessingException { when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - testBuffer = getBuffer(); + testBuffer = getBuffer(1, 1); HTTPSourceUnderTest.start(testBuffer); WebClient.builder().factory(ClientFactory.insecure()).build().execute(RequestHeaders.builder() @@ -740,7 +744,7 @@ void testHTTPRequestWhenSSLRequiredNoResponse() throws JsonProcessingException { when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - testBuffer = getBuffer(); + testBuffer = getBuffer(1, 1); HTTPSourceUnderTest.start(testBuffer); CompletableFuture future = WebClient.builder() @@ -777,7 +781,7 @@ void testHTTPSJsonResponse_with_custom_path_along_with_placeholder() throws Json when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); - testBuffer = getBuffer(); + testBuffer = getBuffer(1, 1); HTTPSourceUnderTest.start(testBuffer); final String path = "/" + TEST_PIPELINE_NAME + "/test"; @@ -935,4 +939,90 @@ public void request_that_exceeds_maxRequestLength_returns_413() { assertTrue(testBuffer.isEmpty()); } + @Test + public void testHTTPJsonCodec() throws IOException { + // Prepare + final String testData; + try (InputStream inputStream = this.getClass().getResourceAsStream(CLOUDWATCH_LOGS_SAMPLE)) { + testData = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); + } + final int testPayloadSize = testData.getBytes().length; + + when(sourceConfig.getCodec()).thenReturn(mock(PluginModel.class)); + when(sourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(2048000)); + + ObjectMapper mapper = new ObjectMapper(); + final String configString = "{\"key_name\": \"logEvents\",\"include_keys\":[\"owner\",\"logGroup\",\"logStream\"]}"; + final JsonInputCodecConfig codecConfig = mapper.readValue(configString, JsonInputCodecConfig.class); + + final InputCodec codec = new JsonInputCodec(codecConfig); + + when(pluginFactory.loadPlugin(eq(InputCodec.class), any(PluginSetting.class))).thenReturn(codec); + + HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + testBuffer = getBuffer(5, 5); + HTTPSourceUnderTest.start(testBuffer); + refreshMeasurements(); + + // When + WebClient.of().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority("127.0.0.1:2021") + .method(HttpMethod.POST) + .path("/log/ingest") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8(testData)) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + + // Then + assertFalse(testBuffer.isEmpty()); + + final Map.Entry>, CheckpointState> result = testBuffer.read(100); + List> records = new ArrayList<>(result.getKey()); + assertEquals(3, records.size()); + + // Verify content + final Record record = records.get(0); + assertCommonFields(record); + assertEquals("31953106606966983378809025079804211143289615424298221568", record.getData().get("id", String.class)); + assertEquals(1432826855000L, record.getData().get("timestamp", Long.class)); + assertEquals("{\"eventVersion\":\"1.01\",\"userIdentity\":{\"type\":\"Root\"}", record.getData().get("message", String.class)); + + final Record record2 = records.get(1); + assertCommonFields(record2); + assertEquals("31953106606966983378809025079804211143289615424298221569", record2.getData().get("id", String.class)); + assertEquals(1432826855001L, record2.getData().get("timestamp", Long.class)); + assertEquals("{\"eventVersion\":\"1.02\",\"userIdentity\":{\"type\":\"Root\"}", record2.getData().get("message", String.class)); + + final Record record3 = records.get(2); + assertCommonFields(record3); + assertEquals("31953106606966983378809025079804211143289615424298221570", record3.getData().get("id", String.class)); + assertEquals(1432826855002L, record3.getData().get("timestamp", Long.class)); + assertEquals("{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}", record3.getData().get("message", String.class)); + + // Verify metrics + final Measurement requestReceivedCount = MetricsTestUtil.getMeasurementFromList( + requestsReceivedMeasurements, Statistic.COUNT); + assertEquals(1.0, requestReceivedCount.getValue()); + final Measurement successRequestsCount = MetricsTestUtil.getMeasurementFromList( + successRequestsMeasurements, Statistic.COUNT); + assertEquals(1.0, successRequestsCount.getValue()); + final Measurement requestProcessDurationCount = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.COUNT); + assertEquals(1.0, requestProcessDurationCount.getValue()); + final Measurement requestProcessDurationMax = MetricsTestUtil.getMeasurementFromList( + requestProcessDurationMeasurements, Statistic.MAX); + assertTrue(requestProcessDurationMax.getValue() > 0); + final Measurement payloadSizeMax = MetricsTestUtil.getMeasurementFromList( + payloadSizeSummaryMeasurements, Statistic.MAX); + assertEquals(testPayloadSize, payloadSizeMax.getValue()); + } + + private void assertCommonFields(Record record) { + assertEquals("111111111111", record.getData().get("owner", String.class)); + assertEquals("CloudTrail/logs", record.getData().get("logGroup", String.class)); + assertEquals("111111111111_CloudTrail/logs_us-east-1", record.getData().get("logStream", String.class)); + } } diff --git a/data-prepper-plugins/http-source/src/test/resources/cloudwatch-logs-sample.json b/data-prepper-plugins/http-source/src/test/resources/cloudwatch-logs-sample.json new file mode 100644 index 0000000000..7ab16b2bb7 --- /dev/null +++ b/data-prepper-plugins/http-source/src/test/resources/cloudwatch-logs-sample.json @@ -0,0 +1,26 @@ +{ + "owner": "111111111111", + "logGroup": "CloudTrail/logs", + "logStream": "111111111111_CloudTrail/logs_us-east-1", + "subscriptionFilters": [ + "Destination" + ], + "messageType": "DATA_MESSAGE", + "logEvents": [ + { + "id": "31953106606966983378809025079804211143289615424298221568", + "timestamp": 1432826855000, + "message": "{\"eventVersion\":\"1.01\",\"userIdentity\":{\"type\":\"Root\"}" + }, + { + "id": "31953106606966983378809025079804211143289615424298221569", + "timestamp": 1432826855001, + "message": "{\"eventVersion\":\"1.02\",\"userIdentity\":{\"type\":\"Root\"}" + }, + { + "id": "31953106606966983378809025079804211143289615424298221570", + "timestamp": 1432826855002, + "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}" + } + ] +} \ No newline at end of file