diff --git a/changelog/unreleased/issue-14460.toml b/changelog/unreleased/issue-14460.toml new file mode 100644 index 000000000000..bcfab9919b4c --- /dev/null +++ b/changelog/unreleased/issue-14460.toml @@ -0,0 +1,5 @@ +type = "a" +message = "Add HTTP Async proxy support for AWS Kinesis/Cloudwatch input." + +issues = ["14460"] +pulls = ["26404"] diff --git a/graylog2-server/pom.xml b/graylog2-server/pom.xml index 9768d01d58b1..07e1a4e16e02 100644 --- a/graylog2-server/pom.xml +++ b/graylog2-server/pom.xml @@ -729,6 +729,12 @@ + + + software.amazon.awssdk + netty-nio-client + software.amazon.awssdk diff --git a/graylog2-server/src/main/java/org/graylog/aws/AWSAsyncProxyConfigurationProvider.java b/graylog2-server/src/main/java/org/graylog/aws/AWSAsyncProxyConfigurationProvider.java new file mode 100644 index 000000000000..902e867604d7 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog/aws/AWSAsyncProxyConfigurationProvider.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.aws; + +import com.google.common.base.Splitter; +import jakarta.annotation.Nullable; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import jakarta.inject.Provider; +import jakarta.inject.Singleton; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; + +import java.net.URI; +import java.util.List; + +/** + * Provides a Netty-based async HTTP client builder configured with the optional Graylog HTTP proxy + * (the {@code http_proxy_uri} server configuration). + *

+ * The synchronous {@link AWSProxyConfigurationProvider} uses the Apache HTTP client, which cannot be used by the + * AWS async clients (DynamoDB, CloudWatch, Kinesis) that the Kinesis input relies on. The Kinesis Client Library + * additionally requires HTTP/2, which is only supported by the Netty async client. This provider therefore mirrors + * the proxy logic of {@link AWSProxyConfigurationProvider}, but produces a {@link NettyNioAsyncHttpClient.Builder}. + */ +@Singleton +public class AWSAsyncProxyConfigurationProvider implements Provider { + private static final Logger LOG = LoggerFactory.getLogger(AWSAsyncProxyConfigurationProvider.class); + private static final String HTTPS_SCHEME = "https"; + private static final int DEFAULT_HTTP_PORT = 80; + private static final int DEFAULT_HTTPS_PORT = 443; + + private final URI httpProxyUri; + + @Inject + public AWSAsyncProxyConfigurationProvider(@Named("http_proxy_uri") @Nullable URI httpProxyUri) { + this.httpProxyUri = httpProxyUri; + } + + @Override + public NettyNioAsyncHttpClient.Builder get() { + final NettyNioAsyncHttpClient.Builder httpClientBuilder = NettyNioAsyncHttpClient.builder(); + if (httpProxyUri == null) { + LOG.debug("AWS async proxy disabled: http_proxy_uri not set"); + return httpClientBuilder; + } + + httpClientBuilder.proxyConfiguration(buildProxyConfiguration(httpProxyUri)); + LOG.debug("AWS async proxy enabled: {}:{}", httpProxyUri.getHost(), httpProxyUri.getPort()); + return httpClientBuilder; + } + + static ProxyConfiguration buildProxyConfiguration(URI proxyUri) { + final String scheme = proxyUri.getScheme(); + final int port = proxyUri.getPort(); + + final ProxyConfiguration.Builder proxyConfigBuilder = ProxyConfiguration.builder() + .scheme(scheme) + .host(proxyUri.getHost()) + // The Netty proxy configuration requires an explicit port. Fall back to the scheme default when the + // proxy URI does not specify one. + .port(port >= 0 ? port : (HTTPS_SCHEME.equalsIgnoreCase(scheme) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT)); + + if (proxyUri.getUserInfo() != null && !proxyUri.getUserInfo().isEmpty()) { + final List credentials = Splitter.on(":") + .limit(2) + .splitToList(proxyUri.getUserInfo()); + + if (credentials.size() == 2) { + proxyConfigBuilder.username(credentials.get(0)); + proxyConfigBuilder.password(credentials.get(1)); + } + } + + return proxyConfigBuilder.build(); + } +} diff --git a/graylog2-server/src/main/java/org/graylog/integrations/aws/AWSClientBuilderUtil.java b/graylog2-server/src/main/java/org/graylog/integrations/aws/AWSClientBuilderUtil.java index 730284273297..910ce26bd7ac 100644 --- a/graylog2-server/src/main/java/org/graylog/integrations/aws/AWSClientBuilderUtil.java +++ b/graylog2-server/src/main/java/org/graylog/integrations/aws/AWSClientBuilderUtil.java @@ -17,8 +17,10 @@ package org.graylog.integrations.aws; import com.google.common.base.Preconditions; +import jakarta.inject.Inject; import jakarta.inject.Provider; import org.apache.commons.lang3.StringUtils; +import org.graylog.aws.AWSAsyncProxyConfigurationProvider; import org.graylog.aws.AWSProxyConfigurationProvider; import org.graylog.integrations.aws.resources.requests.AWSRequest; import org.graylog2.Configuration; @@ -26,7 +28,11 @@ import org.graylog2.security.encryption.EncryptedValueService; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder; +import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.nio.netty.Http2Configuration; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClientBuilder; @@ -35,9 +41,8 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; -import jakarta.inject.Inject; - import java.net.URI; +import java.time.Duration; import java.util.Optional; /** @@ -46,18 +51,27 @@ */ public class AWSClientBuilderUtil { + // HTTP/2 tuning required by the Kinesis Client Library. These mirror the values applied by + // software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder, which we cannot use directly + // because it would override our proxy-aware HTTP client builder. + private static final int KINESIS_HTTP2_INITIAL_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KB + private static final long KINESIS_HTTP2_HEALTH_CHECK_PING_PERIOD_MILLIS = 60 * 1000; + private final Provider authFactoryProvider; private final EncryptedValueService encryptedValueService; private final Configuration configuration; private final AWSProxyConfigurationProvider proxyConfigurationProvider; + private final AWSAsyncProxyConfigurationProvider asyncProxyConfigurationProvider; @Inject public AWSClientBuilderUtil(Provider authFactoryProvider, EncryptedValueService encryptedValueService, - Configuration configuration, AWSProxyConfigurationProvider proxyConfigurationProvider) { + Configuration configuration, AWSProxyConfigurationProvider proxyConfigurationProvider, + AWSAsyncProxyConfigurationProvider asyncProxyConfigurationProvider) { this.authFactoryProvider = authFactoryProvider; this.encryptedValueService = encryptedValueService; this.configuration = configuration; this.proxyConfigurationProvider = proxyConfigurationProvider; + this.asyncProxyConfigurationProvider = asyncProxyConfigurationProvider; } public AwsCredentialsProvider createCredentialsProvider(AWSRequest request) { @@ -71,7 +85,6 @@ public AwsCredentialsProvider createCredentialsProvider(AWSRequest request) { /** * Creates an AWS credentials provider with proxy support on the STS client used for assume-role. - * Use this for inputs that support HTTP proxy (S3, Security Lake) but NOT for Kinesis. */ public AwsCredentialsProvider createCredentialsProviderWithStsProxy(AWSRequest request) { return authFactoryProvider.get().create( @@ -112,7 +125,8 @@ public CloudWatchLogsClient buildClient(CloudWatchLogsClientBuilder clientBuilde initializeBuilder(clientBuilder, request.cloudwatchEndpoint(), Region.of(request.region()), - createCredentialsProvider(request)); + createCredentialsProviderWithStsProxy(request)); + applySyncProxy(clientBuilder); return clientBuilder.build(); } @@ -128,7 +142,8 @@ public KinesisClient buildClient(KinesisClientBuilder clientBuilder, AWSRequest initializeBuilder(clientBuilder, request.kinesisEndpoint(), Region.of(request.region()), - createCredentialsProvider(request)); + createCredentialsProviderWithStsProxy(request)); + applySyncProxy(clientBuilder); return clientBuilder.build(); } @@ -149,10 +164,45 @@ public IamClient buildClient(IamClientBuilder clientBuilder, AWSRequest request) initializeBuilder(clientBuilder, request.iamEndpoint(), iamRegion, // Always specify the appropriate global region for the IAM client. - createCredentialsProvider(request)); + createCredentialsProviderWithStsProxy(request)); + applySyncProxy(clientBuilder); return clientBuilder.build(); } + /** + * Applies the proxy-aware Apache HTTP client builder to a synchronous AWS client builder. This is a no-op when + * no {@code http_proxy_uri} is configured. + */ + private void applySyncProxy(AwsSyncClientBuilder clientBuilder) { + clientBuilder.httpClientBuilder(proxyConfigurationProvider.get()); + } + + /** + * Returns a Netty async HTTP client builder configured with the Graylog HTTP proxy (when set). Use this for the + * DynamoDB and CloudWatch async clients required by the Kinesis input. These do not require HTTP/2. + */ + public NettyNioAsyncHttpClient.Builder asyncHttpClientBuilder() { + return asyncProxyConfigurationProvider.get(); + } + + /** + * Returns a Netty async HTTP client builder configured for HTTP/2 (required by the Kinesis Client Library) plus the + * Graylog HTTP proxy (when set). + *

+ * This replaces {@link software.amazon.kinesis.common.KinesisClientUtil#createKinesisAsyncClient}, which always + * installs its own HTTP client builder and would therefore discard our proxy configuration. The HTTP/2 settings + * mirror the values applied by that utility. + */ + public NettyNioAsyncHttpClient.Builder kinesisAsyncHttpClientBuilder() { + return asyncProxyConfigurationProvider.get() + .maxConcurrency(Integer.MAX_VALUE) + .http2Configuration(Http2Configuration.builder() + .initialWindowSize(KINESIS_HTTP2_INITIAL_WINDOW_SIZE_BYTES) + .healthCheckPingPeriod(Duration.ofMillis(KINESIS_HTTP2_HEALTH_CHECK_PING_PERIOD_MILLIS)) + .build()) + .protocol(Protocol.HTTP2); + } + private String decryptSecretAccessKey(EncryptedValue secretAccessKey) { return encryptedValueService.decrypt(Optional.ofNullable(secretAccessKey).orElse(EncryptedValue.createUnset())); } diff --git a/graylog2-server/src/main/java/org/graylog/integrations/aws/transports/KinesisConsumer.java b/graylog2-server/src/main/java/org/graylog/integrations/aws/transports/KinesisConsumer.java index cfb1a491a491..0fdd90b2f493 100644 --- a/graylog2-server/src/main/java/org/graylog/integrations/aws/transports/KinesisConsumer.java +++ b/graylog2-server/src/main/java/org/graylog/integrations/aws/transports/KinesisConsumer.java @@ -37,7 +37,6 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder; import software.amazon.kinesis.common.ConfigsBuilder; -import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.NoOpWorkerStateChangeListener; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.coordinator.WorkerStateChangeListener; @@ -107,77 +106,87 @@ public class KinesisConsumer implements Runnable { public void run() { LOG.debug("Starting the Kinesis Consumer."); - AwsCredentialsProvider credentialsProvider = awsClientBuilderUtil.createCredentialsProvider(request); + AwsCredentialsProvider credentialsProvider = awsClientBuilderUtil.createCredentialsProviderWithStsProxy(request); final Region region = Region.of(request.region()); final DynamoDbAsyncClientBuilder dynamoDbClientBuilder = DynamoDbAsyncClient.builder(); awsClientBuilderUtil.initializeBuilder(dynamoDbClientBuilder, request.dynamodbEndpoint(), region, credentialsProvider); - final DynamoDbAsyncClient dynamoClient = dynamoDbClientBuilder.build(); - + dynamoDbClientBuilder.httpClientBuilder(awsClientBuilderUtil.asyncHttpClientBuilder()); final CloudWatchAsyncClientBuilder cloudwatchClientBuilder = CloudWatchAsyncClient.builder(); awsClientBuilderUtil.initializeBuilder(cloudwatchClientBuilder, request.cloudwatchEndpoint(), region, credentialsProvider); - final CloudWatchAsyncClient cloudWatchClient = cloudwatchClientBuilder.build(); + cloudwatchClientBuilder.httpClientBuilder(awsClientBuilderUtil.asyncHttpClientBuilder()); + // The Kinesis Client Library normally configures the async client through + // KinesisClientUtil.createKinesisAsyncClient(), but that installs its own HTTP client builder and would discard + // our proxy configuration. We therefore apply the proxy-aware, HTTP/2-enabled builder ourselves. final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder(); awsClientBuilderUtil.initializeBuilder(kinesisAsyncClientBuilder, request.kinesisEndpoint(), region, credentialsProvider); - final KinesisAsyncClient kinesisAsyncClient = KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClientBuilder); + kinesisAsyncClientBuilder.httpClientBuilder(awsClientBuilderUtil.kinesisAsyncHttpClientBuilder()); - final String workerId = String.format(Locale.ENGLISH, "graylog-node-%s", nodeId.anonymize()); - LOG.debug("Using workerId [{}].", workerId); + // All three clients are kept open for the lifetime of the KCL Scheduler (kinesisScheduler.run() blocks). + // Try-with-resources ensures the Netty event loop groups and connection pools are released when the scheduler + // exits, whether normally or due to an exception. + try (DynamoDbAsyncClient dynamoClient = dynamoDbClientBuilder.build(); + CloudWatchAsyncClient cloudWatchClient = cloudwatchClientBuilder.build(); + KinesisAsyncClient kinesisAsyncClient = kinesisAsyncClientBuilder.build()) { - // The application name needs to be unique per input/consumer. - final String applicationName = String.format(Locale.ENGLISH, "graylog-aws-plugin-%s", kinesisStreamName); - LOG.debug("Using Kinesis applicationName [{}].", applicationName); + final String workerId = String.format(Locale.ENGLISH, "graylog-node-%s", nodeId.anonymize()); + LOG.debug("Using workerId [{}].", workerId); - // The KinesisShardProcessorFactory contains the message processing logic. - final KinesisShardProcessorFactory kinesisShardProcessorFactory = new KinesisShardProcessorFactory(objectMapper, transport, handleMessageCallback, kinesisStreamName, awsMessageType); + // The application name needs to be unique per input/consumer. + final String applicationName = String.format(Locale.ENGLISH, "graylog-aws-plugin-%s", kinesisStreamName); + LOG.debug("Using Kinesis applicationName [{}].", applicationName); - ConfigsBuilder configsBuilder = new ConfigsBuilder(kinesisStreamName, applicationName, - kinesisAsyncClient, dynamoClient, cloudWatchClient, - workerId, - kinesisShardProcessorFactory); + // The KinesisShardProcessorFactory contains the message processing logic. + final KinesisShardProcessorFactory kinesisShardProcessorFactory = new KinesisShardProcessorFactory(objectMapper, transport, handleMessageCallback, kinesisStreamName, awsMessageType); - final PollingConfig pollingConfig = new PollingConfig(kinesisStreamName, kinesisAsyncClient); + ConfigsBuilder configsBuilder = new ConfigsBuilder(kinesisStreamName, applicationName, + kinesisAsyncClient, dynamoClient, cloudWatchClient, + workerId, + kinesisShardProcessorFactory); - // Default max records per request is 10k. - if (recordBatchSize != null) { - LOG.debug("Using explicit batch size [{}]", recordBatchSize); - pollingConfig.maxRecords(recordBatchSize); - } - WorkerStateChangeListener workerStateChangeListener = new NoOpWorkerStateChangeListener() { - @Override - public void onAllInitializationAttemptsFailed(Throwable e) { - inputFailureRecorder.setFailing( - KinesisConsumer.class, - String.format(Locale.ROOT, "Initialization for Kinesis stream <%s> failed.", kinesisStreamName), e); + final PollingConfig pollingConfig = new PollingConfig(kinesisStreamName, kinesisAsyncClient); + + // Default max records per request is 10k. + if (recordBatchSize != null) { + LOG.debug("Using explicit batch size [{}]", recordBatchSize); + pollingConfig.maxRecords(recordBatchSize); } - }; - - TaskExecutionListener taskExecutionListener = new NoOpTaskExecutionListener() { - @Override - public void afterTaskExecution(TaskExecutionListenerInput input) { - if (TaskOutcome.FAILURE.equals(input.taskOutcome())) { - inputFailureRecorder.setFailing(KinesisConsumer.class, - String.format(Locale.ROOT, "Errors for Kinesis stream <%s>!", kinesisStreamName)); - } else if (TaskOutcome.SUCCESSFUL.equals(input.taskOutcome()) && TaskType.PROCESS.equals(input.taskType())) { - inputFailureRecorder.setRunning(); + WorkerStateChangeListener workerStateChangeListener = new NoOpWorkerStateChangeListener() { + @Override + public void onAllInitializationAttemptsFailed(Throwable e) { + inputFailureRecorder.setFailing( + KinesisConsumer.class, + String.format(Locale.ROOT, "Initialization for Kinesis stream <%s> failed.", kinesisStreamName), e); } - } - }; - - this.kinesisScheduler = new Scheduler( - configsBuilder.checkpointConfig(), - configsBuilder.coordinatorConfig().workerStateChangeListener(workerStateChangeListener), - configsBuilder.leaseManagementConfig(), - configsBuilder.lifecycleConfig().taskExecutionListener(taskExecutionListener), - configsBuilder.metricsConfig(), - configsBuilder.processorConfig(), - configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig)); - - LOG.debug("Starting Kinesis scheduler."); - kinesisScheduler.run(); - LOG.debug("After Kinesis scheduler stopped."); + }; + + TaskExecutionListener taskExecutionListener = new NoOpTaskExecutionListener() { + @Override + public void afterTaskExecution(TaskExecutionListenerInput input) { + if (TaskOutcome.FAILURE.equals(input.taskOutcome())) { + inputFailureRecorder.setFailing(KinesisConsumer.class, + String.format(Locale.ROOT, "Errors for Kinesis stream <%s>!", kinesisStreamName)); + } else if (TaskOutcome.SUCCESSFUL.equals(input.taskOutcome()) && TaskType.PROCESS.equals(input.taskType())) { + inputFailureRecorder.setRunning(); + } + } + }; + + this.kinesisScheduler = new Scheduler( + configsBuilder.checkpointConfig(), + configsBuilder.coordinatorConfig().workerStateChangeListener(workerStateChangeListener), + configsBuilder.leaseManagementConfig(), + configsBuilder.lifecycleConfig().taskExecutionListener(taskExecutionListener), + configsBuilder.metricsConfig(), + configsBuilder.processorConfig(), + configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig)); + + LOG.debug("Starting Kinesis scheduler."); + kinesisScheduler.run(); + LOG.debug("After Kinesis scheduler stopped."); + } } /** diff --git a/graylog2-server/src/test/java/org/graylog/aws/AWSAsyncProxyConfigurationProviderTest.java b/graylog2-server/src/test/java/org/graylog/aws/AWSAsyncProxyConfigurationProviderTest.java new file mode 100644 index 000000000000..1cca68bfbc10 --- /dev/null +++ b/graylog2-server/src/test/java/org/graylog/aws/AWSAsyncProxyConfigurationProviderTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.aws; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.http.nio.netty.ProxyConfiguration; + +import java.net.URI; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link AWSAsyncProxyConfigurationProvider} proxy configuration creation. + */ +public class AWSAsyncProxyConfigurationProviderTest { + + @Test + public void buildProxyConfigurationWithoutCredentials() { + final URI proxyUri = URI.create("http://proxy.example.com:8080"); + + final ProxyConfiguration proxyConfig = AWSAsyncProxyConfigurationProvider.buildProxyConfiguration(proxyUri); + + assertThat(proxyConfig.host()).isEqualTo("proxy.example.com"); + assertThat(proxyConfig.port()).isEqualTo(8080); + assertThat(proxyConfig.scheme()).isEqualTo("http"); + assertThat(proxyConfig.username()).isNull(); + assertThat(proxyConfig.password()).isNull(); + } + + @Test + public void buildProxyConfigurationWithCredentials() { + final URI proxyUri = URI.create("http://user:pass@proxy.example.com:8080"); + + final ProxyConfiguration proxyConfig = AWSAsyncProxyConfigurationProvider.buildProxyConfiguration(proxyUri); + + assertThat(proxyConfig.username()).isEqualTo("user"); + assertThat(proxyConfig.password()).isEqualTo("pass"); + assertThat(proxyConfig.host()).isEqualTo("proxy.example.com"); + assertThat(proxyConfig.port()).isEqualTo(8080); + } + + @Test + public void buildProxyConfigurationWithUsernameOnly() { + final URI proxyUri = URI.create("http://user@proxy.example.com:8080"); + + final ProxyConfiguration proxyConfig = AWSAsyncProxyConfigurationProvider.buildProxyConfiguration(proxyUri); + + assertThat(proxyConfig.host()).isEqualTo("proxy.example.com"); + assertThat(proxyConfig.port()).isEqualTo(8080); + assertThat(proxyConfig.username()).isNull(); + assertThat(proxyConfig.password()).isNull(); + } + + @Test + public void buildProxyConfigurationWithHttpsScheme() { + final URI proxyUri = URI.create("https://admin:secret@secure-proxy.example.com:443"); + + final ProxyConfiguration proxyConfig = AWSAsyncProxyConfigurationProvider.buildProxyConfiguration(proxyUri); + + assertThat(proxyConfig.username()).isEqualTo("admin"); + assertThat(proxyConfig.password()).isEqualTo("secret"); + assertThat(proxyConfig.scheme()).isEqualTo("https"); + assertThat(proxyConfig.host()).isEqualTo("secure-proxy.example.com"); + assertThat(proxyConfig.port()).isEqualTo(443); + } + + @Test + public void buildProxyConfigurationWithDefaultHttpPort() { + final URI proxyUri = URI.create("http://user:pass@proxy.example.com"); + + final ProxyConfiguration proxyConfig = AWSAsyncProxyConfigurationProvider.buildProxyConfiguration(proxyUri); + + assertThat(proxyConfig.username()).isEqualTo("user"); + assertThat(proxyConfig.password()).isEqualTo("pass"); + assertThat(proxyConfig.host()).isEqualTo("proxy.example.com"); + // The Netty proxy configuration requires an explicit port; the scheme default is used when none is provided. + assertThat(proxyConfig.port()).isEqualTo(80); + } + + @Test + public void buildProxyConfigurationWithDefaultHttpsPort() { + final URI proxyUri = URI.create("https://secure-proxy.example.com"); + + final ProxyConfiguration proxyConfig = AWSAsyncProxyConfigurationProvider.buildProxyConfiguration(proxyUri); + + assertThat(proxyConfig.host()).isEqualTo("secure-proxy.example.com"); + assertThat(proxyConfig.scheme()).isEqualTo("https"); + assertThat(proxyConfig.port()).isEqualTo(443); + } +} diff --git a/graylog2-server/src/test/java/org/graylog/integrations/aws/AwsClientBuilderUtilTest.java b/graylog2-server/src/test/java/org/graylog/integrations/aws/AwsClientBuilderUtilTest.java index 08e6e40fa7c2..bdc99e290925 100644 --- a/graylog2-server/src/test/java/org/graylog/integrations/aws/AwsClientBuilderUtilTest.java +++ b/graylog2-server/src/test/java/org/graylog/integrations/aws/AwsClientBuilderUtilTest.java @@ -16,6 +16,7 @@ */ package org.graylog.integrations.aws; +import org.graylog.aws.AWSAsyncProxyConfigurationProvider; import org.graylog.aws.AWSProxyConfigurationProvider; import org.graylog.integrations.aws.resources.requests.AWSRequest; import org.graylog2.Configuration; @@ -64,7 +65,8 @@ public class AwsClientBuilderUtilTest { @BeforeEach public void setUp() throws Exception { - awsClientBuilderUtil = new AWSClientBuilderUtil(AWSAuthFactory::new, encryptedValueService, mock(Configuration.class), mock(AWSProxyConfigurationProvider.class)); + awsClientBuilderUtil = new AWSClientBuilderUtil(AWSAuthFactory::new, encryptedValueService, mock(Configuration.class), + mock(AWSProxyConfigurationProvider.class), mock(AWSAsyncProxyConfigurationProvider.class)); } // Test Cases