+ * The synchronous {@link AWSProxyConfigurationProvider} uses the Apache HTTP client, which cannot be used by the
+ * AWS async clients (DynamoDB, CloudWatch, Kinesis) that the Kinesis input relies on. The Kinesis Client Library
+ * additionally requires HTTP/2, which is only supported by the Netty async client. This provider therefore mirrors
+ * the proxy logic of {@link AWSProxyConfigurationProvider}, but produces a {@link NettyNioAsyncHttpClient.Builder}.
+ */
+@Singleton
+public class AWSAsyncProxyConfigurationProvider implements Provider
+ * This replaces {@link software.amazon.kinesis.common.KinesisClientUtil#createKinesisAsyncClient}, which always
+ * installs its own HTTP client builder and would therefore discard our proxy configuration. The HTTP/2 settings
+ * mirror the values applied by that utility.
+ */
+ public NettyNioAsyncHttpClient.Builder kinesisAsyncHttpClientBuilder() {
+ return asyncProxyConfigurationProvider.get()
+ .maxConcurrency(Integer.MAX_VALUE)
+ .http2Configuration(Http2Configuration.builder()
+ .initialWindowSize(KINESIS_HTTP2_INITIAL_WINDOW_SIZE_BYTES)
+ .healthCheckPingPeriod(Duration.ofMillis(KINESIS_HTTP2_HEALTH_CHECK_PING_PERIOD_MILLIS))
+ .build())
+ .protocol(Protocol.HTTP2);
+ }
+
private String decryptSecretAccessKey(EncryptedValue secretAccessKey) {
return encryptedValueService.decrypt(Optional.ofNullable(secretAccessKey).orElse(EncryptedValue.createUnset()));
}
diff --git a/graylog2-server/src/main/java/org/graylog/integrations/aws/transports/KinesisConsumer.java b/graylog2-server/src/main/java/org/graylog/integrations/aws/transports/KinesisConsumer.java
index cfb1a491a491..0fdd90b2f493 100644
--- a/graylog2-server/src/main/java/org/graylog/integrations/aws/transports/KinesisConsumer.java
+++ b/graylog2-server/src/main/java/org/graylog/integrations/aws/transports/KinesisConsumer.java
@@ -37,7 +37,6 @@
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.kinesis.common.ConfigsBuilder;
-import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.NoOpWorkerStateChangeListener;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.coordinator.WorkerStateChangeListener;
@@ -107,77 +106,87 @@ public class KinesisConsumer implements Runnable {
public void run() {
LOG.debug("Starting the Kinesis Consumer.");
- AwsCredentialsProvider credentialsProvider = awsClientBuilderUtil.createCredentialsProvider(request);
+ AwsCredentialsProvider credentialsProvider = awsClientBuilderUtil.createCredentialsProviderWithStsProxy(request);
final Region region = Region.of(request.region());
final DynamoDbAsyncClientBuilder dynamoDbClientBuilder = DynamoDbAsyncClient.builder();
awsClientBuilderUtil.initializeBuilder(dynamoDbClientBuilder, request.dynamodbEndpoint(), region, credentialsProvider);
- final DynamoDbAsyncClient dynamoClient = dynamoDbClientBuilder.build();
-
+ dynamoDbClientBuilder.httpClientBuilder(awsClientBuilderUtil.asyncHttpClientBuilder());
final CloudWatchAsyncClientBuilder cloudwatchClientBuilder = CloudWatchAsyncClient.builder();
awsClientBuilderUtil.initializeBuilder(cloudwatchClientBuilder, request.cloudwatchEndpoint(), region, credentialsProvider);
- final CloudWatchAsyncClient cloudWatchClient = cloudwatchClientBuilder.build();
+ cloudwatchClientBuilder.httpClientBuilder(awsClientBuilderUtil.asyncHttpClientBuilder());
+ // The Kinesis Client Library normally configures the async client through
+ // KinesisClientUtil.createKinesisAsyncClient(), but that installs its own HTTP client builder and would discard
+ // our proxy configuration. We therefore apply the proxy-aware, HTTP/2-enabled builder ourselves.
final KinesisAsyncClientBuilder kinesisAsyncClientBuilder = KinesisAsyncClient.builder();
awsClientBuilderUtil.initializeBuilder(kinesisAsyncClientBuilder, request.kinesisEndpoint(), region, credentialsProvider);
- final KinesisAsyncClient kinesisAsyncClient = KinesisClientUtil.createKinesisAsyncClient(kinesisAsyncClientBuilder);
+ kinesisAsyncClientBuilder.httpClientBuilder(awsClientBuilderUtil.kinesisAsyncHttpClientBuilder());
- final String workerId = String.format(Locale.ENGLISH, "graylog-node-%s", nodeId.anonymize());
- LOG.debug("Using workerId [{}].", workerId);
+ // All three clients are kept open for the lifetime of the KCL Scheduler (kinesisScheduler.run() blocks).
+ // Try-with-resources ensures the Netty event loop groups and connection pools are released when the scheduler
+ // exits, whether normally or due to an exception.
+ try (DynamoDbAsyncClient dynamoClient = dynamoDbClientBuilder.build();
+ CloudWatchAsyncClient cloudWatchClient = cloudwatchClientBuilder.build();
+ KinesisAsyncClient kinesisAsyncClient = kinesisAsyncClientBuilder.build()) {
- // The application name needs to be unique per input/consumer.
- final String applicationName = String.format(Locale.ENGLISH, "graylog-aws-plugin-%s", kinesisStreamName);
- LOG.debug("Using Kinesis applicationName [{}].", applicationName);
+ final String workerId = String.format(Locale.ENGLISH, "graylog-node-%s", nodeId.anonymize());
+ LOG.debug("Using workerId [{}].", workerId);
- // The KinesisShardProcessorFactory contains the message processing logic.
- final KinesisShardProcessorFactory kinesisShardProcessorFactory = new KinesisShardProcessorFactory(objectMapper, transport, handleMessageCallback, kinesisStreamName, awsMessageType);
+ // The application name needs to be unique per input/consumer.
+ final String applicationName = String.format(Locale.ENGLISH, "graylog-aws-plugin-%s", kinesisStreamName);
+ LOG.debug("Using Kinesis applicationName [{}].", applicationName);
- ConfigsBuilder configsBuilder = new ConfigsBuilder(kinesisStreamName, applicationName,
- kinesisAsyncClient, dynamoClient, cloudWatchClient,
- workerId,
- kinesisShardProcessorFactory);
+ // The KinesisShardProcessorFactory contains the message processing logic.
+ final KinesisShardProcessorFactory kinesisShardProcessorFactory = new KinesisShardProcessorFactory(objectMapper, transport, handleMessageCallback, kinesisStreamName, awsMessageType);
- final PollingConfig pollingConfig = new PollingConfig(kinesisStreamName, kinesisAsyncClient);
+ ConfigsBuilder configsBuilder = new ConfigsBuilder(kinesisStreamName, applicationName,
+ kinesisAsyncClient, dynamoClient, cloudWatchClient,
+ workerId,
+ kinesisShardProcessorFactory);
- // Default max records per request is 10k.
- if (recordBatchSize != null) {
- LOG.debug("Using explicit batch size [{}]", recordBatchSize);
- pollingConfig.maxRecords(recordBatchSize);
- }
- WorkerStateChangeListener workerStateChangeListener = new NoOpWorkerStateChangeListener() {
- @Override
- public void onAllInitializationAttemptsFailed(Throwable e) {
- inputFailureRecorder.setFailing(
- KinesisConsumer.class,
- String.format(Locale.ROOT, "Initialization for Kinesis stream <%s> failed.", kinesisStreamName), e);
+ final PollingConfig pollingConfig = new PollingConfig(kinesisStreamName, kinesisAsyncClient);
+
+ // Default max records per request is 10k.
+ if (recordBatchSize != null) {
+ LOG.debug("Using explicit batch size [{}]", recordBatchSize);
+ pollingConfig.maxRecords(recordBatchSize);
}
- };
-
- TaskExecutionListener taskExecutionListener = new NoOpTaskExecutionListener() {
- @Override
- public void afterTaskExecution(TaskExecutionListenerInput input) {
- if (TaskOutcome.FAILURE.equals(input.taskOutcome())) {
- inputFailureRecorder.setFailing(KinesisConsumer.class,
- String.format(Locale.ROOT, "Errors for Kinesis stream <%s>!", kinesisStreamName));
- } else if (TaskOutcome.SUCCESSFUL.equals(input.taskOutcome()) && TaskType.PROCESS.equals(input.taskType())) {
- inputFailureRecorder.setRunning();
+ WorkerStateChangeListener workerStateChangeListener = new NoOpWorkerStateChangeListener() {
+ @Override
+ public void onAllInitializationAttemptsFailed(Throwable e) {
+ inputFailureRecorder.setFailing(
+ KinesisConsumer.class,
+ String.format(Locale.ROOT, "Initialization for Kinesis stream <%s> failed.", kinesisStreamName), e);
}
- }
- };
-
- this.kinesisScheduler = new Scheduler(
- configsBuilder.checkpointConfig(),
- configsBuilder.coordinatorConfig().workerStateChangeListener(workerStateChangeListener),
- configsBuilder.leaseManagementConfig(),
- configsBuilder.lifecycleConfig().taskExecutionListener(taskExecutionListener),
- configsBuilder.metricsConfig(),
- configsBuilder.processorConfig(),
- configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig));
-
- LOG.debug("Starting Kinesis scheduler.");
- kinesisScheduler.run();
- LOG.debug("After Kinesis scheduler stopped.");
+ };
+
+ TaskExecutionListener taskExecutionListener = new NoOpTaskExecutionListener() {
+ @Override
+ public void afterTaskExecution(TaskExecutionListenerInput input) {
+ if (TaskOutcome.FAILURE.equals(input.taskOutcome())) {
+ inputFailureRecorder.setFailing(KinesisConsumer.class,
+ String.format(Locale.ROOT, "Errors for Kinesis stream <%s>!", kinesisStreamName));
+ } else if (TaskOutcome.SUCCESSFUL.equals(input.taskOutcome()) && TaskType.PROCESS.equals(input.taskType())) {
+ inputFailureRecorder.setRunning();
+ }
+ }
+ };
+
+ this.kinesisScheduler = new Scheduler(
+ configsBuilder.checkpointConfig(),
+ configsBuilder.coordinatorConfig().workerStateChangeListener(workerStateChangeListener),
+ configsBuilder.leaseManagementConfig(),
+ configsBuilder.lifecycleConfig().taskExecutionListener(taskExecutionListener),
+ configsBuilder.metricsConfig(),
+ configsBuilder.processorConfig(),
+ configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig));
+
+ LOG.debug("Starting Kinesis scheduler.");
+ kinesisScheduler.run();
+ LOG.debug("After Kinesis scheduler stopped.");
+ }
}
/**
diff --git a/graylog2-server/src/test/java/org/graylog/aws/AWSAsyncProxyConfigurationProviderTest.java b/graylog2-server/src/test/java/org/graylog/aws/AWSAsyncProxyConfigurationProviderTest.java
new file mode 100644
index 000000000000..1cca68bfbc10
--- /dev/null
+++ b/graylog2-server/src/test/java/org/graylog/aws/AWSAsyncProxyConfigurationProviderTest.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright (C) 2020 Graylog, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ *