Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
dlqPushHandler = new DlqPushHandler(pluginFactory, pluginSetting, pluginMetrics, cloudWatchLogsSinkConfig.getDlq(), region, role, "cloudWatchLogs");
}

Executor executor = Executors.newCachedThreadPool();
Executor executor = Executors.newFixedThreadPool(cloudWatchLogsSinkConfig.getWorkers());

CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

public class CloudWatchLogsSinkConfig {
public static final int DEFAULT_RETRY_COUNT = 5;
public static final int DEFAULT_NUM_WORKERS = 10;

@JsonProperty("aws")
@Valid
Expand All @@ -43,6 +44,11 @@ public class CloudWatchLogsSinkConfig {
@Max(15)
private int maxRetries = DEFAULT_RETRY_COUNT;

@JsonProperty(value = "workers", defaultValue = "10")
@Min(1)
@Max(50)
private int workers = DEFAULT_NUM_WORKERS;

public AwsConfig getAwsConfig() {
return awsConfig;
}
Expand All @@ -67,4 +73,8 @@ public int getMaxRetries() {
return maxRetries;
}

public int getWorkers() {
return workers;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ void setUp() {
when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
when(mockCloudWatchLogsSinkConfig.getLogGroup()).thenReturn(TEST_LOG_GROUP);
when(mockCloudWatchLogsSinkConfig.getLogStream()).thenReturn(TEST_LOG_STREAM);
when(mockCloudWatchLogsSinkConfig.getMaxRetries()).thenReturn(3);
when(mockCloudWatchLogsSinkConfig.getWorkers()).thenReturn(10);

when(mockPluginSetting.getName()).thenReturn(TEST_PLUGIN_NAME);
when(mockPluginSetting.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

import java.util.Random;

class CloudWatchLogsSinkConfigTest {
private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig;
private AwsConfig awsConfig;
Expand Down Expand Up @@ -42,11 +44,23 @@ void GIVEN_new_sink_config_WHEN_get_log_group_called_SHOULD_return_null() {
assertThat(new CloudWatchLogsSinkConfig().getLogGroup(), equalTo(null));
}

@Test
void GIVEN_new_sink_config_WHEN_get_num_threads_called_SHOULD_return_default_value() {
assertThat(new CloudWatchLogsSinkConfig().getWorkers(), equalTo(CloudWatchLogsSinkConfig.DEFAULT_NUM_WORKERS));
}

@Test
void GIVEN_new_sink_config_WHEN_get_log_stream_called_SHOULD_return_null() {
assertThat(new CloudWatchLogsSinkConfig().getLogStream(), equalTo(null));
}

@Test
void GIVEN_num_threads_configured_SHOULD_return_the_configured_value() throws NoSuchFieldException, IllegalAccessException {
int testValue = (new Random()).nextInt();
ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "workers", testValue);
assertThat(cloudWatchLogsSinkConfig.getWorkers(), equalTo(testValue));
}

@Test
void GIVEN_empty_sink_config_WHEN_deserialized_from_json_SHOULD_return_valid_log_group_and_log_stream() throws NoSuchFieldException, IllegalAccessException {
ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "logGroup", LOG_GROUP);
Expand Down
Loading