diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index 83ddd7c786..c20f7a9e09 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -74,7 +74,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, dlqPushHandler = new DlqPushHandler(pluginFactory, pluginSetting, pluginMetrics, cloudWatchLogsSinkConfig.getDlq(), region, role, "cloudWatchLogs"); } - Executor executor = Executors.newCachedThreadPool(); + Executor executor = Executors.newFixedThreadPool(cloudWatchLogsSinkConfig.getWorkers()); CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder() .cloudWatchLogsClient(cloudWatchLogsClient) diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index bbb8825435..311a14b95c 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -17,6 +17,7 @@ public class CloudWatchLogsSinkConfig { public static final int DEFAULT_RETRY_COUNT = 5; + public static final int DEFAULT_NUM_WORKERS = 10; @JsonProperty("aws") @Valid @@ -43,6 +44,11 @@ public class CloudWatchLogsSinkConfig { @Max(15) private int maxRetries = DEFAULT_RETRY_COUNT; + @JsonProperty(value = "workers", defaultValue = "10") + @Min(1) + @Max(50) + private int workers = DEFAULT_NUM_WORKERS; + public AwsConfig getAwsConfig() { return awsConfig; } @@ -67,4 +73,8 @@ public int getMaxRetries() { return maxRetries; } + public int getWorkers() { + return workers; + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java index 190f978332..f15517f575 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -67,6 +67,8 @@ void setUp() { when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig); when(mockCloudWatchLogsSinkConfig.getLogGroup()).thenReturn(TEST_LOG_GROUP); when(mockCloudWatchLogsSinkConfig.getLogStream()).thenReturn(TEST_LOG_STREAM); + when(mockCloudWatchLogsSinkConfig.getMaxRetries()).thenReturn(3); + when(mockCloudWatchLogsSinkConfig.getWorkers()).thenReturn(10); when(mockPluginSetting.getName()).thenReturn(TEST_PLUGIN_NAME); when(mockPluginSetting.getPipelineName()).thenReturn(TEST_PIPELINE_NAME); diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java index 99ec5a9476..110fc90d93 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java @@ -13,6 +13,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import java.util.Random; + class CloudWatchLogsSinkConfigTest { private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; private AwsConfig awsConfig; @@ -42,11 +44,23 @@ void GIVEN_new_sink_config_WHEN_get_log_group_called_SHOULD_return_null() { assertThat(new CloudWatchLogsSinkConfig().getLogGroup(), equalTo(null)); } + @Test + void GIVEN_new_sink_config_WHEN_get_num_threads_called_SHOULD_return_default_value() { + assertThat(new CloudWatchLogsSinkConfig().getWorkers(), equalTo(CloudWatchLogsSinkConfig.DEFAULT_NUM_WORKERS)); + } + @Test void GIVEN_new_sink_config_WHEN_get_log_stream_called_SHOULD_return_null() { assertThat(new CloudWatchLogsSinkConfig().getLogStream(), equalTo(null)); } + @Test + void GIVEN_num_threads_configured_SHOULD_return_the_configured_value() throws NoSuchFieldException, IllegalAccessException { + int testValue = (new Random()).nextInt(); + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "workers", testValue); + assertThat(cloudWatchLogsSinkConfig.getWorkers(), equalTo(testValue)); + } + @Test void GIVEN_empty_sink_config_WHEN_deserialized_from_json_SHOULD_return_valid_log_group_and_log_stream() throws NoSuchFieldException, IllegalAccessException { ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "logGroup", LOG_GROUP);