Skip to content

Commit 76c0da0

Browse files
committed
Add codec to http source
Signed-off-by: Hai Yan <oeyh@amazon.com>
1 parent 49ef09d commit 76c0da0

5 files changed

Lines changed: 98 additions & 36 deletions

File tree

data-prepper-plugins/http-source/src/jmh/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceMeasure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void setUp() throws IOException {
5151
when(buffer.getOptimalRequestSize()).thenReturn(Optional.of(256 * 1024));
5252

5353
serviceRequestContext = mock(ServiceRequestContext.class);
54-
logHTTPService = new LogHTTPService((int) Duration.ofSeconds(10).toMillis(), buffer, PluginMetrics.fromPrefix("testing"));
54+
logHTTPService = new LogHTTPService((int) Duration.ofSeconds(10).toMillis(), buffer, PluginMetrics.fromPrefix("testing"), null);
5555

5656
requestHeaders = RequestHeaders.builder()
5757
.method(HttpMethod.POST)

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
1616
import org.opensearch.dataprepper.model.buffer.Buffer;
1717
import org.opensearch.dataprepper.model.codec.ByteDecoder;
18+
import org.opensearch.dataprepper.model.codec.InputCodec;
1819
import org.opensearch.dataprepper.model.codec.JsonDecoder;
1920
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
2021
import org.opensearch.dataprepper.model.configuration.PluginModel;
@@ -48,6 +49,7 @@ public class HTTPSource implements Source<Record<Log>> {
4849
private final PluginMetrics pluginMetrics;
4950
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
5051
private ByteDecoder byteDecoder;
52+
private final InputCodec codec;
5153

5254
@DataPrepperPluginConstructor
5355
public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final PluginFactory pluginFactory,
@@ -75,6 +77,13 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
7577
authenticationPluginSetting.setPipelineName(pipelineName);
7678
authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting);
7779
httpRequestExceptionHandler = new HttpRequestExceptionHandler(pluginMetrics);
80+
final PluginModel codecConfiguration = sourceConfig.getCodec();
81+
if (codecConfiguration == null) {
82+
codec = null;
83+
} else {
84+
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
85+
codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings);
86+
}
7887
}
7988

8089
@Override
@@ -85,7 +94,7 @@ public void start(final Buffer<Record<Log>> buffer) {
8594
if (server == null) {
8695
ServerConfiguration serverConfiguration = ConvertConfiguration.convertConfiguration(sourceConfig);
8796
CreateServer createServer = new CreateServer(serverConfiguration, LOG, pluginMetrics, PLUGIN_NAME, pipelineName);
88-
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics);
97+
final LogHTTPService logHTTPService = new LogHTTPService(serverConfiguration.getBufferTimeoutInMillis(), buffer, pluginMetrics, codec);
8998
server = createServer.createHTTPServer(buffer, certificateProviderFactory, authenticationProvider, httpRequestExceptionHandler, logHTTPService);
9099
pluginMetrics.gauge(SERVER_CONNECTIONS, server, Server::numConnections);
91100
}

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55

66
package org.opensearch.dataprepper.plugins.source.loghttp;
77

8+
import com.fasterxml.jackson.annotation.JsonProperty;
89
import org.opensearch.dataprepper.http.BaseHttpServerConfig;
10+
import org.opensearch.dataprepper.model.configuration.PluginModel;
911

1012
public class HTTPSourceConfig extends BaseHttpServerConfig {
1113

@@ -21,4 +23,11 @@ public int getDefaultPort() {
2123
public String getDefaultPath() {
2224
return DEFAULT_LOG_INGEST_URI;
2325
}
26+
27+
@JsonProperty("codec")
28+
private PluginModel codec;
29+
30+
public PluginModel getCodec() {
31+
return codec;
32+
}
2433
}

data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPService.java

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,28 @@
55

66
package org.opensearch.dataprepper.plugins.source.loghttp;
77

8-
import com.linecorp.armeria.server.ServiceRequestContext;
9-
import org.opensearch.dataprepper.metrics.PluginMetrics;
10-
import org.opensearch.dataprepper.model.buffer.Buffer;
11-
import org.opensearch.dataprepper.model.log.JacksonLog;
12-
import org.opensearch.dataprepper.model.log.Log;
13-
import org.opensearch.dataprepper.model.record.Record;
148
import com.linecorp.armeria.common.AggregatedHttpRequest;
159
import com.linecorp.armeria.common.HttpData;
1610
import com.linecorp.armeria.common.HttpResponse;
1711
import com.linecorp.armeria.common.HttpStatus;
12+
import com.linecorp.armeria.server.ServiceRequestContext;
1813
import com.linecorp.armeria.server.annotation.Blocking;
1914
import com.linecorp.armeria.server.annotation.Post;
2015
import io.micrometer.core.instrument.Counter;
2116
import io.micrometer.core.instrument.DistributionSummary;
2217
import io.micrometer.core.instrument.Timer;
2318
import org.opensearch.dataprepper.http.codec.JsonCodec;
19+
import org.opensearch.dataprepper.metrics.PluginMetrics;
20+
import org.opensearch.dataprepper.model.buffer.Buffer;
21+
import org.opensearch.dataprepper.model.codec.InputCodec;
22+
import org.opensearch.dataprepper.model.log.JacksonLog;
23+
import org.opensearch.dataprepper.model.log.Log;
24+
import org.opensearch.dataprepper.model.record.Record;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

2728
import java.io.IOException;
29+
import java.util.ArrayList;
2830
import java.util.List;
2931
import java.util.UUID;
3032
import java.util.stream.Collectors;
@@ -48,6 +50,7 @@ public class LogHTTPService {
4850
// TODO: support other data-types as request body, e.g. json_lines, msgpack
4951
private final JsonCodec jsonCodec = new JsonCodec();
5052
private final Buffer<Record<Log>> buffer;
53+
private final InputCodec codec;
5154
private final int bufferWriteTimeoutInMillis;
5255
private final Counter requestsReceivedCounter;
5356
private final Counter successRequestsCounter;
@@ -60,11 +63,13 @@ public class LogHTTPService {
6063

6164
public LogHTTPService(final int bufferWriteTimeoutInMillis,
6265
final Buffer<Record<Log>> buffer,
63-
final PluginMetrics pluginMetrics) {
66+
final PluginMetrics pluginMetrics,
67+
final InputCodec codec) {
6468
this.buffer = buffer;
6569
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
6670
this.bufferMaxRequestLength = buffer.getMaxRequestSize().isPresent() ? buffer.getMaxRequestSize().get(): null;
6771
this.bufferOptimalRequestLength = buffer.getOptimalRequestSize().isPresent() ? buffer.getOptimalRequestSize().get(): null;
72+
this.codec = codec;
6873
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
6974
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
7075
requestsOverOptimalSizeCounter = pluginMetrics.counter(REQUESTS_OVER_OPTIMAL_SIZE);
@@ -108,17 +113,32 @@ HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) t
108113
}
109114
} else {
110115
final List<String> jsonList;
116+
final List<Record<Log>> records = new ArrayList<>();
111117

112-
try {
113-
jsonList = jsonCodec.parse(content);
114-
} catch (IOException e) {
115-
LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage());
116-
throw new IOException("Bad request data format. Needs to be json array.", e.getCause());
117-
}
118+
if (codec != null) {
119+
try {
120+
codec.parse(content.toInputStream(), record -> {
121+
records.add(new Record<>((Log) record.getData()));
122+
});
123+
} catch (IOException e) {
124+
LOG.error("Failed to parse the request of size {} using specified input codec {} due to: {}", content.length(), codec.getClass(), e.getMessage());
125+
throw new IOException("Bad request data format. ", e.getCause());
126+
}
127+
} else {
128+
129+
try {
130+
jsonList = jsonCodec.parse(content);
131+
} catch (IOException e) {
132+
LOG.error("Failed to parse the request of size {} due to: {}", content.length(), e.getMessage());
133+
throw new IOException("Bad request data format. Needs to be json array.", e.getCause());
134+
}
118135

119-
final List<Record<Log>> records = jsonList.stream()
120-
.map(this::buildRecordLog)
121-
.collect(Collectors.toList());
136+
records.addAll(
137+
jsonList.stream()
138+
.map(this::buildRecordLog)
139+
.collect(Collectors.toList())
140+
);
141+
}
122142

123143
try {
124144
buffer.writeAll(records, bufferWriteTimeoutInMillis);

data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/LogHTTPServiceTest.java

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,6 @@
55

66
package org.opensearch.dataprepper.plugins.source.loghttp;
77

8-
import com.linecorp.armeria.server.ServiceRequestContext;
9-
import org.junit.jupiter.api.Nested;
10-
import org.opensearch.dataprepper.metrics.PluginMetrics;
11-
import org.opensearch.dataprepper.model.buffer.Buffer;
12-
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
13-
import org.opensearch.dataprepper.model.log.Log;
14-
import org.opensearch.dataprepper.model.record.Record;
158
import com.fasterxml.jackson.core.JsonProcessingException;
169
import com.fasterxml.jackson.databind.ObjectMapper;
1710
import com.linecorp.armeria.common.AggregatedHttpRequest;
@@ -23,20 +16,31 @@
2316
import com.linecorp.armeria.common.HttpStatus;
2417
import com.linecorp.armeria.common.MediaType;
2518
import com.linecorp.armeria.common.RequestHeaders;
19+
import com.linecorp.armeria.server.ServiceRequestContext;
2620
import io.micrometer.core.instrument.Counter;
2721
import io.micrometer.core.instrument.DistributionSummary;
2822
import io.micrometer.core.instrument.Timer;
2923
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Nested;
3025
import org.junit.jupiter.api.Test;
3126
import org.junit.jupiter.api.extension.ExtendWith;
3227
import org.mockito.ArgumentCaptor;
3328
import org.mockito.ArgumentMatchers;
3429
import org.mockito.Mock;
3530
import org.mockito.junit.jupiter.MockitoExtension;
3631
import org.mockito.stubbing.Answer;
32+
import org.opensearch.dataprepper.metrics.PluginMetrics;
33+
import org.opensearch.dataprepper.model.buffer.Buffer;
34+
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
35+
import org.opensearch.dataprepper.model.codec.InputCodec;
36+
import org.opensearch.dataprepper.model.log.Log;
37+
import org.opensearch.dataprepper.model.record.Record;
3738
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
3839

40+
import java.io.ByteArrayInputStream;
3941
import java.io.IOException;
42+
import java.io.InputStream;
43+
import java.nio.charset.StandardCharsets;
4044
import java.util.ArrayList;
4145
import java.util.Collections;
4246
import java.util.List;
@@ -46,25 +50,22 @@
4650
import java.util.concurrent.Callable;
4751
import java.util.concurrent.ExecutionException;
4852
import java.util.concurrent.TimeoutException;
49-
import java.nio.charset.StandardCharsets;
50-
import java.io.ByteArrayInputStream;
51-
import java.io.InputStream;
5253

5354
import static org.hamcrest.CoreMatchers.containsString;
5455
import static org.hamcrest.CoreMatchers.equalTo;
5556
import static org.hamcrest.MatcherAssert.assertThat;
5657
import static org.junit.jupiter.api.Assertions.assertEquals;
5758
import static org.junit.jupiter.api.Assertions.assertThrows;
59+
import static org.mockito.ArgumentMatchers.any;
60+
import static org.mockito.ArgumentMatchers.anyString;
5861
import static org.mockito.ArgumentMatchers.eq;
62+
import static org.mockito.ArgumentMatchers.isNull;
63+
import static org.mockito.Mockito.lenient;
64+
import static org.mockito.Mockito.mock;
5965
import static org.mockito.Mockito.never;
6066
import static org.mockito.Mockito.times;
6167
import static org.mockito.Mockito.verify;
6268
import static org.mockito.Mockito.when;
63-
import static org.mockito.Mockito.lenient;
64-
import static org.mockito.Mockito.mock;
65-
import static org.mockito.ArgumentMatchers.any;
66-
import static org.mockito.ArgumentMatchers.anyString;
67-
import static org.mockito.ArgumentMatchers.isNull;
6869

6970
@ExtendWith(MockitoExtension.class)
7071
class LogHTTPServiceTest {
@@ -95,6 +96,9 @@ class LogHTTPServiceTest {
9596
@Mock
9697
private Buffer<Record<Log>> byteBuffer;
9798

99+
@Mock
100+
private InputCodec codec;
101+
98102
@BeforeEach
99103
public void setUp() throws Exception {
100104
when(pluginMetrics.counter(LogHTTPService.REQUESTS_RECEIVED)).thenReturn(requestsReceivedCounter);
@@ -112,7 +116,7 @@ public void setUp() throws Exception {
112116
);
113117

114118
Buffer<Record<Log>> blockingBuffer = new BlockingBuffer<>(TEST_BUFFER_CAPACITY, 8, "test-pipeline");
115-
logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics);
119+
logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics, null);
116120
}
117121

118122
@Test
@@ -150,6 +154,26 @@ public void testHTTPRequestBadRequest() throws Exception {
150154
verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.<Callable<HttpResponse>>any());
151155
}
152156

157+
@Test
158+
public void testHTTPRequestSuccessWithCodec() throws Exception {
159+
// Prepare
160+
Buffer<Record<Log>> blockingBuffer = new BlockingBuffer<>(TEST_BUFFER_CAPACITY, 8, "test-pipeline");
161+
logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, blockingBuffer, pluginMetrics, codec);
162+
AggregatedHttpRequest testRequest = generateRandomValidHTTPRequest(2);
163+
164+
// When
165+
AggregatedHttpResponse postResponse = logHTTPService.doPost(serviceRequestContext, testRequest).aggregate().get();
166+
167+
// Then
168+
assertEquals(HttpStatus.OK, postResponse.status());
169+
verify(requestsReceivedCounter, times(1)).increment();
170+
verify(successRequestsCounter, times(1)).increment();
171+
final ArgumentCaptor<Double> payloadLengthCaptor = ArgumentCaptor.forClass(Double.class);
172+
verify(payloadSizeSummary, times(1)).record(payloadLengthCaptor.capture());
173+
assertEquals(testRequest.content().length(), Math.round(payloadLengthCaptor.getValue()));
174+
verify(requestProcessDuration, times(1)).recordCallable(ArgumentMatchers.<Callable<HttpResponse>>any());
175+
}
176+
153177
@Test
154178
public void testHTTPRequestEntityTooLarge() throws Exception {
155179
// Prepare
@@ -204,7 +228,7 @@ void setUp() {
204228
aggregatedHttpRequest = mock(AggregatedHttpRequest.class);
205229
httpData = mock(HttpData.class);
206230

207-
logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, byteBuffer, pluginMetrics);
231+
logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, byteBuffer, pluginMetrics, codec);
208232

209233
StringBuilder sb = new StringBuilder(1024 * 1024 + 10240);
210234
for (int i = 0; i < 12500; i++) {
@@ -303,7 +327,7 @@ void setUp() {
303327
aggregatedHttpRequest = mock(AggregatedHttpRequest.class);
304328
httpData = mock(HttpData.class);
305329

306-
logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, byteBuffer, pluginMetrics);
330+
logHTTPService = new LogHTTPService(TEST_TIMEOUT_IN_MILLIS, byteBuffer, pluginMetrics, codec);
307331

308332
StringBuilder sb = new StringBuilder(1024 * 1024 + 10240);
309333
for (int i = 0; i < 12500; i++) {

0 commit comments

Comments
 (0)