Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
testImplementation 'net.bytebuddy:byte-buddy:1.15.11'
testImplementation 'net.bytebuddy:byte-buddy-agent:1.15.11'
testImplementation testLibs.slf4j.simple
testImplementation 'org.wiremock:wiremock:3.10.0'
}

sourceSets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void testOpenSearchConnection() throws IOException {
builder.withUsername(user);
builder.withPassword(password);
}
builder.withInsecure(true);
final AwsCredentialsSupplier awsCredentialsSupplier = mock(AwsCredentialsSupplier.class);
final RestHighLevelClient client = builder.build().createClient(awsCredentialsSupplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,7 @@ private Map<String, Object> initializeConfigurationMetadata(final String indexTy
metadata.put(IndexConfiguration.INDEX_ALIAS, indexAlias);
metadata.put(IndexConfiguration.TEMPLATE_FILE, templateFilePath);
metadata.put(IndexConfiguration.FLUSH_TIMEOUT, -1);
metadata.put("insecure", true);
final String user = System.getProperty("tests.opensearch.user");
final String password = System.getProperty("tests.opensearch.password");
if (user != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,18 @@ private void checkProxyPort(final int port) {
}

private void attachSSLContext(final HttpAsyncClientBuilder httpClientBuilder) {
final SSLContext sslContext = certPath != null ? getCAStrategy(certPath) : getTrustAllStrategy();
httpClientBuilder.setSSLContext(sslContext);
final SSLContext sslContext;
if(certPath != null) {
sslContext = getCAStrategy(certPath);
} else if(this.insecure) {
sslContext = getTrustAllStrategy();
} else {
sslContext = null;
}
if(sslContext != null) {
httpClientBuilder.setSSLContext(sslContext);
}

if (this.insecure) {
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
}
Expand Down Expand Up @@ -439,7 +449,7 @@ private OpenSearchTransport createOpenSearchTransport(final RestHighLevelClient
transportOptions.setRequestCompressionSize(Integer.MAX_VALUE);
}

return new AwsSdk2Transport(createSdkHttpClient(), HttpHost.create(hosts.get(0)).getHostName(),
return new AwsSdk2Transport(createSdkHttpClient(), HttpHost.create(hosts.get(0)).toHostString(),
serviceName, Region.of(awsRegion), transportOptions.build());
} else {
return new RestClientTransport(
Expand All @@ -461,11 +471,13 @@ private SdkHttpClient createSdkHttpClient() {
}

private void attachSSLContext(final ApacheHttpClient.Builder apacheHttpClientBuilder) {
TrustManager[] trustManagers = createTrustManagers(certPath);
apacheHttpClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
TrustManager[] trustManagers = createTrustManagers(certPath, insecure);
if(trustManagers.length > 0) {
apacheHttpClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
}
}

private static TrustManager[] createTrustManagers(final Path certPath) {
private static TrustManager[] createTrustManagers(final Path certPath, final boolean insecure) {
if (certPath != null) {
LOG.info("Using the cert provided in the config.");
try (InputStream certificateInputStream = Files.newInputStream(certPath)) {
Expand All @@ -481,8 +493,11 @@ private static TrustManager[] createTrustManagers(final Path certPath) {
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
} else {
} else if(insecure) {
LOG.info("Using the trust all strategy");
return new TrustManager[] { new X509TrustAllManager() };
} else {
return new TrustManager[0];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ private void setConnectAndSocketTimeout(final org.elasticsearch.client.RestClien

private void attachSSLContext(final NettyNioAsyncHttpClient.Builder asyncClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
TrustManager[] trustManagers = createTrustManagers(openSearchSourceConfiguration.getConnectionConfiguration());
asyncClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
if (trustManagers.length > 0) {
asyncClientBuilder.tlsTrustManagersProvider(() -> trustManagers);
}
}

private void attachSSLContext(final HttpAsyncClientBuilder httpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
Expand All @@ -287,31 +289,37 @@ private void attachSSLContext(final HttpAsyncClientBuilder httpClientBuilder, fi

private TrustManager[] createTrustManagers(final ConnectionConfiguration connectionConfiguration) {
final Path certPath = connectionConfiguration.getCertPath();
if (Objects.nonNull(certPath)) {
final String certificate = connectionConfiguration.getCertificate();
if (certPath != null) {
return TrustStoreProvider.createTrustManager(certPath);
} else if (Objects.nonNull(connectionConfiguration.getCertificate())) {
if (PemObjectValidator.isPemObject(connectionConfiguration.getCertificate())) {
return TrustStoreProvider.createTrustManager(connectionConfiguration.getCertificate());
} else if (certificate != null) {
if (PemObjectValidator.isPemObject(certificate)) {
return TrustStoreProvider.createTrustManager(certificate);
} else {
return TrustStoreProvider.createTrustManager(Path.of(connectionConfiguration.getCertificate()));
}
} else {
return TrustStoreProvider.createTrustManager(Path.of(certificate));}
} else if (connectionConfiguration.isInsecure()) {
return TrustStoreProvider.createTrustAllManager();

} else {
return new TrustManager[0];
}
}

private SSLContext getCAStrategy(final ConnectionConfiguration connectionConfiguration) {
final Path certPath = connectionConfiguration.getCertPath();
if (Objects.nonNull(certPath)) {
final String certificate = connectionConfiguration.getCertificate();
if (certPath != null) {
return TrustStoreProvider.createSSLContext(certPath);
} else if (Objects.nonNull(connectionConfiguration.getCertificate())) {
if (PemObjectValidator.isPemObject(connectionConfiguration.getCertificate())) {
return TrustStoreProvider.createSSLContext(connectionConfiguration.getCertificate());
} else if (certificate != null) {
if (PemObjectValidator.isPemObject(certificate)) {
return TrustStoreProvider.createSSLContext(certificate);
} else {
return TrustStoreProvider.createSSLContext(Path.of(connectionConfiguration.getCertificate()));
}
} else if (connectionConfiguration.isInsecure()) {
return TrustStoreProvider.createSSLContextWithTrustAllStrategy();
} else {
return TrustStoreProvider.createSSLContextWithTrustAllStrategy();
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ void testCreateOpenSearchClientAwsServerlessDefault() throws IOException {
when(awsCredentialsSupplier.getProvider(any())).thenReturn(awsCredentialsProvider);

final RestHighLevelClient client = connectionConfiguration.createClient(awsCredentialsSupplier);
when(apacheHttpClientBuilder.tlsTrustManagersProvider(any())).thenReturn(apacheHttpClientBuilder);
when(apacheHttpClientBuilder.build()).thenReturn(apacheHttpClient);
final OpenSearchClient openSearchClient;
try (final MockedStatic<ApacheHttpClient> apacheHttpClientMockedStatic = mockStatic(ApacheHttpClient.class)) {
Expand All @@ -160,7 +159,6 @@ void testCreateOpenSearchClientAwsServerlessDefault() throws IOException {
assertNotNull(openSearchClient);
assertThat(openSearchClient._transport(), instanceOf(AwsSdk2Transport.class));
assertThat(openSearchClient._transport().jsonpMapper(), instanceOf(PreSerializedJsonpMapper.class));
verify(apacheHttpClientBuilder).tlsTrustManagersProvider(any());
verify(apacheHttpClientBuilder).build();
openSearchClient.shutdown();
client.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.opensearch;

import com.github.tomakehurst.wiremock.WireMockServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.core.MainResponse;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.core.InfoResponse;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;

import javax.net.ssl.SSLHandshakeException;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;

import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.jsonResponse;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ConnectionConfiguration_ServerTest {
private static WireMockServer wireMockServer;

@Mock
private AwsCredentialsSupplier awsCredentialsSupplier;

private String host;

private String clusterUuid;

@BeforeAll
static void setUpAll() {
wireMockServer = new WireMockServer(options()
.httpDisabled(true)
.dynamicHttpsPort()
.keystorePath("src/test/resources/test_keystore.jks")
.keystorePassword("password")
.keyManagerPassword("password")
);

wireMockServer.start();
}

@AfterAll
static void tearDownAll() {
wireMockServer.stop();
}

@BeforeEach
void setUp() {
host = "https://localhost:" + wireMockServer.httpsPort();

clusterUuid = UUID.randomUUID().toString();
final Map<String, Object> responseBody = Map.of(
"name", "opensearch",
"cluster_name", "opensearch",
"cluster_uuid", clusterUuid,
"version", Map.of(
"number", "2.10.0",
"build_hash", "abcdefg",
"build_date", "20241212",
"build_type", "testing",
"distribution", "datapreppertesting",
"build_snapshot", "false",
"lucene_version", "8",
"minimum_wire_compatibility_version", "2.10.0",
"minimum_index_compatibility_version", "2.10.0"
),
"tagline", "You Know, for Search"
);
wireMockServer.stubFor(get("/").willReturn(jsonResponse(responseBody, 200)));
}

@Nested
class DefaultConfiguration {
private ConnectionConfiguration createObjectUnderTest() {
return new ConnectionConfiguration.Builder(Collections.singletonList(host))
.build();
}

@Test
void createClient_will_not_trust_self_signed_certificates_by_default() {
final RestHighLevelClient client = createObjectUnderTest().createClient(awsCredentialsSupplier);
assertThat(client, notNullValue());

assertThrows(SSLHandshakeException.class, () -> client.info(RequestOptions.DEFAULT));
}

@Test
void createOpenSearchClient_will_not_trust_self_signed_certificates_by_default() {
final ConnectionConfiguration objectUnderTest = createObjectUnderTest();
final OpenSearchClient openSearchClient = objectUnderTest.createOpenSearchClient(objectUnderTest.createClient(awsCredentialsSupplier), awsCredentialsSupplier);
assertThat(openSearchClient, notNullValue());

assertThrows(SSLHandshakeException.class, openSearchClient::info);
}
}

@Nested
class DefaultSigV4Configuration {
@BeforeEach
void setUp() {
when(awsCredentialsSupplier.getProvider(any())).thenReturn(AnonymousCredentialsProvider.create());
}

private ConnectionConfiguration createObjectUnderTest() {
return new ConnectionConfiguration.Builder(Collections.singletonList(host))
.withAwsSigv4(true)
.withAwsRegion("us-east-1")
.build();
}

@Test
void createClient_will_not_trust_self_signed_certificates_by_default() {
final RestHighLevelClient client = createObjectUnderTest().createClient(awsCredentialsSupplier);
assertThat(client, notNullValue());

assertThrows(SSLHandshakeException.class, () -> client.info(RequestOptions.DEFAULT));
}

@Test
void createOpenSearchClient_will_not_trust_self_signed_certificates_by_default() {
final ConnectionConfiguration objectUnderTest = createObjectUnderTest();
final OpenSearchClient openSearchClient = objectUnderTest.createOpenSearchClient(objectUnderTest.createClient(awsCredentialsSupplier), awsCredentialsSupplier);
assertThat(openSearchClient, notNullValue());

assertThrows(SSLHandshakeException.class, openSearchClient::info);
}
}

@Nested
class InsecureConfiguration {
private ConnectionConfiguration createObjectUnderTest() {
return new ConnectionConfiguration.Builder(Collections.singletonList(host))
.withInsecure(true)
.build();
}

@Test
void createClient_will_trust_self_signed_certificates_if_insecure() throws IOException {
final RestHighLevelClient client = createObjectUnderTest().createClient(awsCredentialsSupplier);
assertThat(client, notNullValue());

final MainResponse infoResponse = client.info(RequestOptions.DEFAULT);

assertThat(infoResponse, notNullValue());
assertThat(infoResponse.getClusterName(), equalTo("opensearch"));
assertThat(infoResponse.getClusterUuid(), equalTo(clusterUuid));
}


@Test
void createOpenSearchClient_will_trust_self_signed_certificates_if_insecure() throws IOException {
final ConnectionConfiguration objectUnderTest = createObjectUnderTest();
final OpenSearchClient openSearchClient = objectUnderTest.createOpenSearchClient(objectUnderTest.createClient(awsCredentialsSupplier), awsCredentialsSupplier);
assertThat(openSearchClient, notNullValue());

final InfoResponse infoResponse = openSearchClient.info();

assertThat(infoResponse, notNullValue());
assertThat(infoResponse.clusterName(), equalTo("opensearch"));
assertThat(infoResponse.clusterUuid(), equalTo(clusterUuid));
}
}
}
Loading
Loading