Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/issue-14460.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "a"
message = "Add HTTP Async proxy support for AWS Kinesis/Cloudwatch input."

issues = ["14460"]
pulls = ["26404"]
6 changes: 6 additions & 0 deletions graylog2-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,12 @@
</exclusion>
</exclusions>
</dependency>
<!-- Netty async HTTP client, used by the AWS async clients (DynamoDB, CloudWatch, Kinesis) for the Kinesis
input. Required for HTTP/2 (Kinesis Client Library) and proxy support on the async clients. -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</dependency>
<!-- additional integrations dependencies -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.aws;

import com.google.common.base.Splitter;
import jakarta.annotation.Nullable;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Provider;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;

import java.net.URI;
import java.util.List;

/**
* Provides a Netty-based async HTTP client builder configured with the optional Graylog HTTP proxy
* (the {@code http_proxy_uri} server configuration).
* <p>
* The synchronous {@link AWSProxyConfigurationProvider} uses the Apache HTTP client, which cannot be used by the
* AWS async clients (DynamoDB, CloudWatch, Kinesis) that the Kinesis input relies on. The Kinesis Client Library
* additionally requires HTTP/2, which is only supported by the Netty async client. This provider therefore mirrors
* the proxy logic of {@link AWSProxyConfigurationProvider}, but produces a {@link NettyNioAsyncHttpClient.Builder}.
*/
@Singleton
public class AWSAsyncProxyConfigurationProvider implements Provider<NettyNioAsyncHttpClient.Builder> {
private static final Logger LOG = LoggerFactory.getLogger(AWSAsyncProxyConfigurationProvider.class);
private static final String HTTPS_SCHEME = "https";
private static final int DEFAULT_HTTP_PORT = 80;
private static final int DEFAULT_HTTPS_PORT = 443;

private final URI httpProxyUri;

@Inject
public AWSAsyncProxyConfigurationProvider(@Named("http_proxy_uri") @Nullable URI httpProxyUri) {
this.httpProxyUri = httpProxyUri;
}

@Override
public NettyNioAsyncHttpClient.Builder get() {
final NettyNioAsyncHttpClient.Builder httpClientBuilder = NettyNioAsyncHttpClient.builder();
if (httpProxyUri == null) {
LOG.debug("AWS async proxy disabled: http_proxy_uri not set");
return httpClientBuilder;
}

httpClientBuilder.proxyConfiguration(buildProxyConfiguration(httpProxyUri));
LOG.debug("AWS async proxy enabled: {}:{}", httpProxyUri.getHost(), httpProxyUri.getPort());
return httpClientBuilder;
}

static ProxyConfiguration buildProxyConfiguration(URI proxyUri) {
final String scheme = proxyUri.getScheme();
final int port = proxyUri.getPort();

final ProxyConfiguration.Builder proxyConfigBuilder = ProxyConfiguration.builder()
.scheme(scheme)
.host(proxyUri.getHost())
// The Netty proxy configuration requires an explicit port. Fall back to the scheme default when the
// proxy URI does not specify one.
.port(port >= 0 ? port : (HTTPS_SCHEME.equalsIgnoreCase(scheme) ? DEFAULT_HTTPS_PORT : DEFAULT_HTTP_PORT));

if (proxyUri.getUserInfo() != null && !proxyUri.getUserInfo().isEmpty()) {
final List<String> credentials = Splitter.on(":")
.limit(2)
.splitToList(proxyUri.getUserInfo());

if (credentials.size() == 2) {
proxyConfigBuilder.username(credentials.get(0));
proxyConfigBuilder.password(credentials.get(1));
}
}

return proxyConfigBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@
package org.graylog.integrations.aws;

import com.google.common.base.Preconditions;
import jakarta.inject.Inject;
import jakarta.inject.Provider;
import org.apache.commons.lang3.StringUtils;
import org.graylog.aws.AWSAsyncProxyConfigurationProvider;
import org.graylog.aws.AWSProxyConfigurationProvider;
import org.graylog.integrations.aws.resources.requests.AWSRequest;
import org.graylog2.Configuration;
import org.graylog2.security.encryption.EncryptedValue;
import org.graylog2.security.encryption.EncryptedValueService;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.nio.netty.Http2Configuration;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClientBuilder;
Expand All @@ -35,9 +41,8 @@
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;

import jakarta.inject.Inject;

import java.net.URI;
import java.time.Duration;
import java.util.Optional;

/**
Expand All @@ -46,18 +51,27 @@
*/
public class AWSClientBuilderUtil {

// HTTP/2 tuning required by the Kinesis Client Library. These mirror the values applied by
// software.amazon.kinesis.common.KinesisClientUtil#adjustKinesisClientBuilder, which we cannot use directly
// because it would override our proxy-aware HTTP client builder.
private static final int KINESIS_HTTP2_INITIAL_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KB
private static final long KINESIS_HTTP2_HEALTH_CHECK_PING_PERIOD_MILLIS = 60 * 1000;

private final Provider<AWSAuthFactory> authFactoryProvider;
private final EncryptedValueService encryptedValueService;
private final Configuration configuration;
private final AWSProxyConfigurationProvider proxyConfigurationProvider;
private final AWSAsyncProxyConfigurationProvider asyncProxyConfigurationProvider;

@Inject
public AWSClientBuilderUtil(Provider<AWSAuthFactory> authFactoryProvider, EncryptedValueService encryptedValueService,
Configuration configuration, AWSProxyConfigurationProvider proxyConfigurationProvider) {
Configuration configuration, AWSProxyConfigurationProvider proxyConfigurationProvider,
AWSAsyncProxyConfigurationProvider asyncProxyConfigurationProvider) {
this.authFactoryProvider = authFactoryProvider;
this.encryptedValueService = encryptedValueService;
this.configuration = configuration;
this.proxyConfigurationProvider = proxyConfigurationProvider;
this.asyncProxyConfigurationProvider = asyncProxyConfigurationProvider;
}

public AwsCredentialsProvider createCredentialsProvider(AWSRequest request) {
Expand All @@ -71,7 +85,6 @@ public AwsCredentialsProvider createCredentialsProvider(AWSRequest request) {

/**
* Creates an AWS credentials provider with proxy support on the STS client used for assume-role.
* Use this for inputs that support HTTP proxy (S3, Security Lake) but NOT for Kinesis.
*/
public AwsCredentialsProvider createCredentialsProviderWithStsProxy(AWSRequest request) {
return authFactoryProvider.get().create(
Expand Down Expand Up @@ -112,7 +125,8 @@ public CloudWatchLogsClient buildClient(CloudWatchLogsClientBuilder clientBuilde
initializeBuilder(clientBuilder,
request.cloudwatchEndpoint(),
Region.of(request.region()),
createCredentialsProvider(request));
createCredentialsProviderWithStsProxy(request));
applySyncProxy(clientBuilder);

return clientBuilder.build();
}
Expand All @@ -128,7 +142,8 @@ public KinesisClient buildClient(KinesisClientBuilder clientBuilder, AWSRequest
initializeBuilder(clientBuilder,
request.kinesisEndpoint(),
Region.of(request.region()),
createCredentialsProvider(request));
createCredentialsProviderWithStsProxy(request));
applySyncProxy(clientBuilder);

return clientBuilder.build();
}
Expand All @@ -149,10 +164,45 @@ public IamClient buildClient(IamClientBuilder clientBuilder, AWSRequest request)
initializeBuilder(clientBuilder,
request.iamEndpoint(),
iamRegion, // Always specify the appropriate global region for the IAM client.
createCredentialsProvider(request));
createCredentialsProviderWithStsProxy(request));
applySyncProxy(clientBuilder);
return clientBuilder.build();
}

/**
* Applies the proxy-aware Apache HTTP client builder to a synchronous AWS client builder. This is a no-op when
* no {@code http_proxy_uri} is configured.
*/
private void applySyncProxy(AwsSyncClientBuilder<?, ?> clientBuilder) {
clientBuilder.httpClientBuilder(proxyConfigurationProvider.get());
}

/**
* Returns a Netty async HTTP client builder configured with the Graylog HTTP proxy (when set). Use this for the
* DynamoDB and CloudWatch async clients required by the Kinesis input. These do not require HTTP/2.
*/
public NettyNioAsyncHttpClient.Builder asyncHttpClientBuilder() {
return asyncProxyConfigurationProvider.get();
}

/**
* Returns a Netty async HTTP client builder configured for HTTP/2 (required by the Kinesis Client Library) plus the
* Graylog HTTP proxy (when set).
* <p>
* This replaces {@link software.amazon.kinesis.common.KinesisClientUtil#createKinesisAsyncClient}, which always
* installs its own HTTP client builder and would therefore discard our proxy configuration. The HTTP/2 settings
* mirror the values applied by that utility.
*/
public NettyNioAsyncHttpClient.Builder kinesisAsyncHttpClientBuilder() {
return asyncProxyConfigurationProvider.get()
.maxConcurrency(Integer.MAX_VALUE)
.http2Configuration(Http2Configuration.builder()
.initialWindowSize(KINESIS_HTTP2_INITIAL_WINDOW_SIZE_BYTES)
.healthCheckPingPeriod(Duration.ofMillis(KINESIS_HTTP2_HEALTH_CHECK_PING_PERIOD_MILLIS))
.build())
.protocol(Protocol.HTTP2);
}

private String decryptSecretAccessKey(EncryptedValue secretAccessKey) {
return encryptedValueService.decrypt(Optional.ofNullable(secretAccessKey).orElse(EncryptedValue.createUnset()));
}
Expand Down
Loading
Loading