Skip to content

Commit 59db55c

Browse files
Merge remote-tracking branch 'upstream/main' into file-tail-source-6782
2 parents d99e734 + 506a04f commit 59db55c

34 files changed

Lines changed: 1065 additions & 232 deletions

File tree

.github/workflows/opensearch-sink-opensearch-integration-tests.yml

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,22 @@ jobs:
146146
run: |
147147
docker cp ca-cert.pem opensearch-mtls:/usr/share/opensearch/config/
148148
docker exec opensearch-mtls bash -c "
149-
if grep -q 'clientauth_mode' /usr/share/opensearch/config/opensearch.yml; then
150-
sed -i 's/clientauth_mode:.*/clientauth_mode: REQUIRE/' /usr/share/opensearch/config/opensearch.yml
149+
CONFIG=/usr/share/opensearch/config/opensearch.yml
150+
if grep -q 'plugins.security' \$CONFIG; then
151+
PREFIX=plugins.security
151152
else
152-
echo 'plugins.security.ssl.http.clientauth_mode: REQUIRE' >> /usr/share/opensearch/config/opensearch.yml
153+
PREFIX=opendistro_security
154+
fi
155+
if grep -q 'ssl.http.clientauth_mode' \$CONFIG; then
156+
sed -i 's/ssl.http.clientauth_mode:.*/ssl.http.clientauth_mode: REQUIRE/' \$CONFIG
157+
else
158+
echo \"\${PREFIX}.ssl.http.clientauth_mode: REQUIRE\" >> \$CONFIG
159+
fi
160+
if grep -q 'ssl.http.pemtrustedcas_filepath' \$CONFIG; then
161+
sed -i \"s|ssl.http.pemtrustedcas_filepath:.*|ssl.http.pemtrustedcas_filepath: /usr/share/opensearch/config/ca-cert.pem|\" \$CONFIG
162+
else
163+
echo \"\${PREFIX}.ssl.http.pemtrustedcas_filepath: /usr/share/opensearch/config/ca-cert.pem\" >> \$CONFIG
153164
fi
154-
"
155-
docker exec opensearch-mtls bash -c "
156-
sed -i '/http.*pemtrustedcas_filepath/s|:.*|: ca-cert.pem|' /usr/share/opensearch/config/opensearch.yml
157165
"
158166
docker restart opensearch-mtls
159167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.core.peerforwarder.exception;
11+
12+
/**
13+
* This exception is thrown when the peer forwarder receives a request
14+
* with a destination pipeline or plugin that is not registered.
15+
*/
16+
public class NoPeerForwarderTargetException extends RuntimeException {
17+
18+
public NoPeerForwarderTargetException(final String errorMessage) {
19+
super(errorMessage);
20+
}
21+
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/server/PeerForwarderHttpService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
2121
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderReceiveBuffer;
2222
import org.opensearch.dataprepper.core.peerforwarder.codec.PeerForwarderCodec;
23+
import org.opensearch.dataprepper.core.peerforwarder.exception.NoPeerForwarderTargetException;
2324
import org.opensearch.dataprepper.core.peerforwarder.model.PeerForwardingEvents;
2425
import org.opensearch.dataprepper.metrics.PluginMetrics;
2526
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -126,7 +127,21 @@ private PeerForwarderReceiveBuffer<Record<Event>> getPeerForwarderBuffer(final S
126127
final Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap =
127128
peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap();
128129

129-
return pipelinePeerForwarderReceiveBufferMap
130-
.get(destinationPipelineName).get(destinationPluginId);
130+
final Map<String, PeerForwarderReceiveBuffer<Record<Event>>> pluginBufferMap =
131+
pipelinePeerForwarderReceiveBufferMap.get(destinationPipelineName);
132+
if (pluginBufferMap == null) {
133+
throw new NoPeerForwarderTargetException(
134+
"Unable to find a peer-forwarder target with destinationPluginId='" + destinationPluginId +
135+
"' and destinationPipelineName='" + destinationPipelineName + "'");
136+
}
137+
138+
final PeerForwarderReceiveBuffer<Record<Event>> buffer = pluginBufferMap.get(destinationPluginId);
139+
if (buffer == null) {
140+
throw new NoPeerForwarderTargetException(
141+
"Unable to find a peer-forwarder target with destinationPluginId='" + destinationPluginId +
142+
"' and destinationPipelineName='" + destinationPipelineName + "'");
143+
}
144+
145+
return buffer;
131146
}
132147
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/server/ResponseHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.linecorp.armeria.common.HttpStatus;
1414
import com.linecorp.armeria.common.MediaType;
1515
import io.micrometer.core.instrument.Counter;
16+
import org.opensearch.dataprepper.core.peerforwarder.exception.NoPeerForwarderTargetException;
1617
import org.opensearch.dataprepper.metrics.PluginMetrics;
1718
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
1819

@@ -55,6 +56,11 @@ public HttpResponse handleException(final Exception e, final String message) {
5556
return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT, MediaType.ANY_TYPE, message);
5657
}
5758

59+
if (e instanceof NoPeerForwarderTargetException) {
60+
badRequestsCounter.increment();
61+
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.ANY_TYPE, e.getMessage());
62+
}
63+
5864
if (e instanceof NullPointerException) {
5965
requestsUnprocessableCounter.increment();
6066
return HttpResponse.of(HttpStatus.UNPROCESSABLE_ENTITY, MediaType.ANY_TYPE, message);

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarder_ClientServerIT.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,15 @@
1111

1212
import com.fasterxml.jackson.databind.ObjectMapper;
1313
import com.linecorp.armeria.client.UnprocessedRequestException;
14+
import com.linecorp.armeria.client.WebClient;
1415
import com.linecorp.armeria.common.AggregatedHttpResponse;
1516
import com.linecorp.armeria.common.ClosedSessionException;
17+
import com.linecorp.armeria.common.HttpData;
18+
import com.linecorp.armeria.common.HttpMethod;
19+
import com.linecorp.armeria.common.HttpRequest;
1620
import com.linecorp.armeria.common.HttpStatus;
21+
import com.linecorp.armeria.common.MediaType;
22+
import com.linecorp.armeria.common.RequestHeaders;
1723
import com.linecorp.armeria.server.Server;
1824
import io.netty.handler.ssl.NotSslRecordException;
1925
import org.junit.jupiter.api.AfterEach;
@@ -27,6 +33,7 @@
2733
import org.opensearch.dataprepper.core.peerforwarder.codec.JavaPeerForwarderCodec;
2834
import org.opensearch.dataprepper.core.peerforwarder.codec.PeerForwarderCodec;
2935
import org.opensearch.dataprepper.core.peerforwarder.codec.PeerForwarderCodecAppConfig;
36+
import org.opensearch.dataprepper.core.peerforwarder.model.PeerForwardingEvents;
3037
import org.opensearch.dataprepper.core.peerforwarder.discovery.DiscoveryMode;
3138
import org.opensearch.dataprepper.core.peerforwarder.server.PeerForwarderHttpServerProvider;
3239
import org.opensearch.dataprepper.core.peerforwarder.server.PeerForwarderHttpService;
@@ -340,6 +347,65 @@ void send_Events_to_server_when_expecting_SSL_should_throw(final boolean binaryC
340347
assertThat(receivedRecords, notNullValue());
341348
assertThat(receivedRecords, is(empty()));
342349
}
350+
351+
@ParameterizedTest
352+
@ValueSource(booleans = {true, false})
353+
void send_invalid_content_type_with_valid_body_returns_ok(final boolean binaryCodec) throws Exception {
354+
setUpServer(binaryCodec);
355+
final PeerForwarderCodec codec = applicationContext.getBean(PeerForwarderCodec.class);
356+
final List<Event> eventList = outgoingRecords.stream().map(Record::getData).collect(Collectors.toList());
357+
final PeerForwardingEvents peerForwardingEvents = new PeerForwardingEvents(eventList, pluginId, pipelineName);
358+
final byte[] serializedBody = codec.serialize(peerForwardingEvents);
359+
360+
final WebClient client = WebClient.of("http://" + LOCALHOST + ":4994");
361+
362+
final AggregatedHttpResponse response = client.execute(
363+
HttpRequest.of(
364+
RequestHeaders.builder()
365+
.method(HttpMethod.POST)
366+
.path(PeerForwarderConfiguration.DEFAULT_PEER_FORWARDING_URI)
367+
.contentType(MediaType.parse("application/x-www-form-urlencoded"))
368+
.build(),
369+
HttpData.wrap(serializedBody)
370+
)
371+
).aggregate().join();
372+
373+
assertThat(response.status(), equalTo(HttpStatus.OK));
374+
}
375+
376+
@ParameterizedTest
377+
@ValueSource(booleans = {true, false})
378+
void send_malformed_body_returns_bad_request(final boolean binaryCodec) {
379+
setUpServer(binaryCodec);
380+
final String body = "this is not valid json or yaml";
381+
382+
final WebClient client = WebClient.of("http://" + LOCALHOST + ":4994");
383+
384+
final AggregatedHttpResponse response = client.execute(
385+
HttpRequest.of(
386+
RequestHeaders.builder()
387+
.method(HttpMethod.POST)
388+
.path(PeerForwarderConfiguration.DEFAULT_PEER_FORWARDING_URI)
389+
.build(),
390+
HttpData.ofUtf8(body)
391+
)
392+
).aggregate().join();
393+
394+
assertThat(response.status(), equalTo(HttpStatus.BAD_REQUEST));
395+
}
396+
397+
@ParameterizedTest
398+
@ValueSource(booleans = {true, false})
399+
void send_Events_to_unknown_destination_returns_bad_request(final boolean binaryCodec) throws ExecutionException, InterruptedException {
400+
setUpServer(binaryCodec);
401+
final PeerForwarderClient client = createClient(peerForwarderConfiguration);
402+
403+
final CompletableFuture<AggregatedHttpResponse> httpResponseFuture =
404+
client.serializeRecordsAndSendHttpRequest(outgoingRecords, LOCALHOST, UUID.randomUUID().toString(), UUID.randomUUID().toString());
405+
final AggregatedHttpResponse httpResponse = httpResponseFuture.get();
406+
407+
assertThat(httpResponse.status(), equalTo(HttpStatus.BAD_REQUEST));
408+
}
343409
}
344410

345411
@Nested

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/server/PeerForwarderHttpServiceTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
3030
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderReceiveBuffer;
3131
import org.opensearch.dataprepper.core.peerforwarder.codec.PeerForwarderCodec;
32+
import org.opensearch.dataprepper.core.peerforwarder.exception.NoPeerForwarderTargetException;
3233
import org.opensearch.dataprepper.core.peerforwarder.model.PeerForwardingEvents;
3334
import org.opensearch.dataprepper.metrics.PluginMetrics;
3435
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -167,6 +168,35 @@ void test_doPost_with_HTTP_request_size_greater_than_buffer_size_should_return_R
167168
assertThat(aggregatedHttpResponse.status(), equalTo(HttpStatus.REQUEST_ENTITY_TOO_LARGE));
168169
}
169170

171+
@Test
172+
void test_doPost_with_unknown_pipeline_should_return_BAD_REQUEST() throws Exception {
173+
lenient().when(peerForwardingEvents.getDestinationPipelineName()).thenReturn("unknown_pipeline");
174+
when(responseHandler.handleException(any(NoPeerForwarderTargetException.class), anyString())).thenReturn(HttpResponse.of(HttpStatus.BAD_REQUEST));
175+
final HashMap<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap = new HashMap<>();
176+
when(peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap()).thenReturn(pipelinePeerForwarderReceiveBufferMap);
177+
178+
final PeerForwarderHttpService objectUnderTest = createObjectUnderTest();
179+
180+
final AggregatedHttpResponse aggregatedHttpResponse = objectUnderTest.doPost(aggregatedHttpRequest).aggregate().get();
181+
182+
assertThat(aggregatedHttpResponse.status(), equalTo(HttpStatus.BAD_REQUEST));
183+
}
184+
185+
@Test
186+
void test_doPost_with_unknown_plugin_should_return_BAD_REQUEST() throws Exception {
187+
lenient().when(peerForwardingEvents.getDestinationPluginId()).thenReturn("unknown_plugin");
188+
when(responseHandler.handleException(any(NoPeerForwarderTargetException.class), anyString())).thenReturn(HttpResponse.of(HttpStatus.BAD_REQUEST));
189+
final HashMap<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap = new HashMap<>();
190+
pipelinePeerForwarderReceiveBufferMap.put(PIPELINE_NAME, new HashMap<>());
191+
when(peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap()).thenReturn(pipelinePeerForwarderReceiveBufferMap);
192+
193+
final PeerForwarderHttpService objectUnderTest = createObjectUnderTest();
194+
195+
final AggregatedHttpResponse aggregatedHttpResponse = objectUnderTest.doPost(aggregatedHttpRequest).aggregate().get();
196+
197+
assertThat(aggregatedHttpResponse.status(), equalTo(HttpStatus.BAD_REQUEST));
198+
}
199+
170200
private List<Event> generateEvents(final int numEvents) {
171201
final List<Event> events = new ArrayList<>();
172202
for (int i = 0; i < numEvents; i++) {

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/server/ResponseHandlerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.api.extension.ExtendWith;
2121
import org.mockito.Mock;
2222
import org.mockito.junit.jupiter.MockitoExtension;
23+
import org.opensearch.dataprepper.core.peerforwarder.exception.NoPeerForwarderTargetException;
2324
import org.opensearch.dataprepper.metrics.PluginMetrics;
2425
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
2526

@@ -134,6 +135,23 @@ void test_UnknownException() throws ExecutionException, InterruptedException {
134135
verify(badRequestsCounter).increment();
135136
}
136137

138+
@Test
139+
void test_NoPeerForwarderTargetException() throws ExecutionException, InterruptedException {
140+
final ResponseHandler objectUnderTest = createObjectUnderTest();
141+
final String exceptionMessage = "Unable to find a peer-forwarder target with destinationPluginId='myPlugin' and destinationPipelineName='myPipeline'";
142+
final NoPeerForwarderTargetException noPeerForwarderTargetException = new NoPeerForwarderTargetException(exceptionMessage);
143+
144+
final String testMessage = "test exception message";
145+
146+
final HttpResponse httpResponse = objectUnderTest.handleException(noPeerForwarderTargetException, testMessage);
147+
final AggregatedHttpResponse aggregatedHttpResponse = httpResponse.aggregate().get();
148+
149+
assertEquals(HttpStatus.BAD_REQUEST, aggregatedHttpResponse.status());
150+
assertEquals(exceptionMessage, aggregatedHttpResponse.contentUtf8());
151+
152+
verify(badRequestsCounter).increment();
153+
}
154+
137155
@Test
138156
void test_NullPointerException() throws ExecutionException, InterruptedException {
139157
final ResponseHandler objectUnderTest = createObjectUnderTest();

data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientCertIT.java

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212

1313
import com.fasterxml.jackson.core.JsonProcessingException;
1414
import com.fasterxml.jackson.databind.ObjectMapper;
15+
import org.apache.http.HttpHost;
16+
import org.apache.http.auth.AuthScope;
17+
import org.apache.http.auth.UsernamePasswordCredentials;
18+
import org.apache.http.conn.ssl.NoopHostnameVerifier;
19+
import org.apache.http.impl.client.BasicCredentialsProvider;
20+
import org.apache.http.ssl.SSLContextBuilder;
1521
import org.apache.http.util.EntityUtils;
1622
import org.junit.jupiter.api.AfterEach;
1723
import org.junit.jupiter.api.BeforeEach;
@@ -20,6 +26,7 @@
2026
import org.opensearch.client.Request;
2127
import org.opensearch.client.Response;
2228
import org.opensearch.client.RestClient;
29+
import org.opensearch.client.RestClientBuilder;
2330
import org.opensearch.common.xcontent.XContentType;
2431
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
2532
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
@@ -35,8 +42,20 @@
3542
import org.opensearch.dataprepper.plugins.sink.opensearch.configuration.OpenSearchSinkConfig;
3643
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;
3744

45+
import javax.net.ssl.SSLContext;
3846
import javax.ws.rs.HttpMethod;
3947
import java.io.IOException;
48+
import java.io.InputStream;
49+
import java.nio.file.Files;
50+
import java.nio.file.Paths;
51+
import java.security.KeyFactory;
52+
import java.security.KeyStore;
53+
import java.security.PrivateKey;
54+
import java.security.cert.Certificate;
55+
import java.security.cert.CertificateFactory;
56+
import java.security.spec.PKCS8EncodedKeySpec;
57+
import java.util.Base64;
58+
import java.util.Collection;
4059
import java.util.Collections;
4160
import java.util.HashMap;
4261
import java.util.List;
@@ -51,7 +70,6 @@
5170
import static org.mockito.Mockito.mock;
5271
import static org.mockito.Mockito.when;
5372
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser;
54-
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient;
5573
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.getHosts;
5674

5775
@EnabledIfSystemProperty(named = "tests.mtls.client.cert", matches = ".+")
@@ -73,7 +91,7 @@ class OpenSearchClientCertIT {
7391
private OpenSearchSink sink;
7492

7593
@BeforeEach
76-
void setUp() throws IOException {
94+
void setUp() throws Exception {
7795
MetricsTestUtil.initMetrics();
7896

7997
objectMapper = new ObjectMapper();
@@ -88,7 +106,7 @@ void setUp() throws IOException {
88106
when(pluginSetting.getPipelineName()).thenReturn(PIPELINE_NAME);
89107
when(pluginSetting.getName()).thenReturn(PLUGIN_NAME);
90108

91-
client = createOpenSearchClient();
109+
client = createClientWithClientCert();
92110
}
93111

94112
@AfterEach
@@ -178,4 +196,51 @@ private void deleteIndex(final String index) throws IOException {
178196
final Request request = new Request(HttpMethod.DELETE, index);
179197
client.performRequest(request);
180198
}
199+
200+
private RestClient createClientWithClientCert() throws Exception {
201+
final String certPath = System.getProperty("tests.mtls.client.cert");
202+
final String keyPath = System.getProperty("tests.mtls.client.key");
203+
final String userName = System.getProperty("tests.opensearch.user", "admin");
204+
final String password = System.getProperty("tests.opensearch.password", "admin");
205+
206+
final CertificateFactory factory = CertificateFactory.getInstance("X.509");
207+
final Collection<? extends Certificate> certs;
208+
try (InputStream is = Files.newInputStream(Paths.get(certPath))) {
209+
certs = factory.generateCertificates(is);
210+
}
211+
212+
final String keyPem = new String(Files.readAllBytes(Paths.get(keyPath)));
213+
final String keyBase64 = keyPem
214+
.replace("-----BEGIN PRIVATE KEY-----", "")
215+
.replace("-----END PRIVATE KEY-----", "")
216+
.replaceAll("\\s", "");
217+
final byte[] keyBytes = Base64.getDecoder().decode(keyBase64);
218+
final PrivateKey privateKey = KeyFactory.getInstance("RSA")
219+
.generatePrivate(new PKCS8EncodedKeySpec(keyBytes));
220+
221+
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
222+
keyStore.load(null, null);
223+
keyStore.setKeyEntry("client", privateKey, "".toCharArray(), certs.toArray(new Certificate[0]));
224+
225+
final SSLContext sslContext = SSLContextBuilder.create()
226+
.loadTrustMaterial(null, (chains, authType) -> true)
227+
.loadKeyMaterial(keyStore, "".toCharArray())
228+
.build();
229+
230+
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
231+
credentialsProvider.setCredentials(AuthScope.ANY,
232+
new UsernamePasswordCredentials(userName, password));
233+
234+
final List<String> hosts = getHosts();
235+
final HttpHost[] httpHosts = hosts.stream()
236+
.map(HttpHost::create)
237+
.toArray(HttpHost[]::new);
238+
239+
final RestClientBuilder builder = RestClient.builder(httpHosts);
240+
builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
241+
.setDefaultCredentialsProvider(credentialsProvider)
242+
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
243+
.setSSLContext(sslContext));
244+
return builder.build();
245+
}
181246
}

0 commit comments

Comments
 (0)