Skip to content

Commit 087ced7

Browse files
authored
Fix cloudwatch logs config (#5623)
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 9d1ba04 commit 087ced7

11 files changed

Lines changed: 38 additions & 86 deletions

File tree

data-prepper-plugins/cloudwatch-logs/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ dependencies {
2727
implementation project(':data-prepper-plugins:aws-plugin-api')
2828
implementation project(path: ':data-prepper-plugins:common')
2929
implementation project(path: ':data-prepper-plugins:failures-common')
30+
implementation libs.armeria.core
3031
implementation 'io.micrometer:micrometer-core'
3132
implementation 'com.fasterxml.jackson.core:jackson-core'
3233
implementation 'com.fasterxml.jackson.core:jackson-databind'

data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,10 @@ void setUp() {
202202
when(cloudWatchLogsSinkConfig.getDlq()).thenReturn(null);
203203
when(cloudWatchLogsSinkConfig.getLogStream()).thenReturn(logStreamName);
204204
when(cloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(awsConfig);
205-
when(cloudWatchLogsSinkConfig.getBufferType()).thenReturn(CloudWatchLogsSinkConfig.DEFAULT_BUFFER_TYPE);
205+
when(cloudWatchLogsSinkConfig.getMaxRetries()).thenReturn(3);
206206

207207
thresholdConfig = mock(ThresholdConfig.class);
208-
when(thresholdConfig.getBackOffTime()).thenReturn(500L);
209208
when(thresholdConfig.getLogSendInterval()).thenReturn(60L);
210-
when(thresholdConfig.getRetryCount()).thenReturn(10);
211209
when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L);
212210
when(cloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
213211
}

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits;
3030
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
3131
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
32+
import org.opensearch.dataprepper.model.annotations.Experimental;
3233

3334
import java.util.Collection;
3435
import java.util.concurrent.Executor;
3536
import java.util.concurrent.Executors;
3637

38+
@Experimental
3739
@DataPrepperPlugin(name = "cloudwatch_logs", pluginType = Sink.class, pluginConfigurationType = CloudWatchLogsSinkConfig.class)
3840
public class CloudWatchLogsSink extends AbstractSink<Record<Event>> {
3941
private final CloudWatchLogsService cloudWatchLogsService;
@@ -64,9 +66,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
6466
}
6567

6668
BufferFactory bufferFactory = null;
67-
if (cloudWatchLogsSinkConfig.getBufferType().equals("in_memory")) {
68-
bufferFactory = new InMemoryBufferFactory();
69-
}
69+
bufferFactory = new InMemoryBufferFactory();
7070

7171
if (cloudWatchLogsSinkConfig.getDlq() != null) {
7272
String region = awsConfig.getAwsRegion().toString();
@@ -82,8 +82,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
8282
.dlqPushHandler(dlqPushHandler)
8383
.logGroup(cloudWatchLogsSinkConfig.getLogGroup())
8484
.logStream(cloudWatchLogsSinkConfig.getLogStream())
85-
.backOffTimeBase(thresholdConfig.getBackOffTime())
86-
.retryCount(thresholdConfig.getRetryCount())
85+
.retryCount(cloudWatchLogsSinkConfig.getMaxRetries())
8786
.executor(executor)
8887
.build();
8988

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.slf4j.Logger;
1111
import org.slf4j.LoggerFactory;
1212
import software.amazon.awssdk.core.exception.SdkClientException;
13+
import com.linecorp.armeria.client.retry.Backoff;
1314
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
1415
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
1516
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
@@ -22,15 +23,14 @@
2223
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
2324

2425
import java.nio.charset.StandardCharsets;
26+
import java.time.Duration;
2527
import java.util.ArrayList;
2628
import java.util.Collection;
2729
import java.util.List;
2830
import java.util.concurrent.Executor;
2931

3032
@Builder
3133
public class CloudWatchLogsDispatcher {
32-
private static final long UPPER_RETRY_TIME_BOUND_MILLISECONDS = 2000;
33-
private static final float EXP_TIME_SCALE = 1.25F;
3434
private static final Logger LOG = LoggerFactory.getLogger(CloudWatchLogsDispatcher.class);
3535
private CloudWatchLogsClient cloudWatchLogsClient;
3636
private CloudWatchLogsMetrics cloudWatchLogsMetrics;
@@ -39,19 +39,18 @@ public class CloudWatchLogsDispatcher {
3939
private String logGroup;
4040
private String logStream;
4141
private int retryCount;
42-
private long backOffTimeBase;
4342
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
4443
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
4544
final DlqPushHandler dlqPushHandler,
4645
final Executor executor,
47-
final String logGroup, final String logStream,
48-
final int retryCount, final long backOffTimeBase) {
46+
final String logGroup,
47+
final String logStream,
48+
final int retryCount) {
4949
this.cloudWatchLogsClient = cloudWatchLogsClient;
5050
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
5151
this.logGroup = logGroup;
5252
this.logStream = logStream;
5353
this.retryCount = retryCount;
54-
this.backOffTimeBase = backOffTimeBase;
5554
this.dlqPushHandler = dlqPushHandler;
5655

5756
this.executor = executor;
@@ -97,21 +96,21 @@ public void dispatchLogs(List<InputLogEvent> inputLogEvents, List<EventHandle> e
9796
.putLogEventsRequest(putLogEventsRequest)
9897
.eventHandles(eventHandles)
9998
.totalEventCount(inputLogEvents.size())
100-
.backOffTimeBase(backOffTimeBase)
10199
.retryCount(retryCount)
102100
.build());
103101
}
104102

105103
@Builder
106104
protected static class Uploader implements Runnable {
105+
static final long INITIAL_DELAY_MS = 50;
106+
static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();
107107
private final CloudWatchLogsClient cloudWatchLogsClient;
108108
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
109109
private DlqPushHandler dlqPushHandler;
110110
private final PutLogEventsRequest putLogEventsRequest;
111111
private final List<EventHandle> eventHandles;
112112
private final int totalEventCount;
113113
private final int retryCount;
114-
private final long backOffTimeBase;
115114

116115
@Override
117116
public void run() {
@@ -124,6 +123,7 @@ public void upload() {
124123
String failureMessage = "";
125124
PutLogEventsResponse putLogEventsResponse = null;
126125
List<DlqObject> dlqObjects = new ArrayList<>();
126+
final Backoff backoff = Backoff.exponential(INITIAL_DELAY_MS, MAXIMUM_DELAY_MS).withMaxAttempts(retryCount);
127127

128128
try {
129129
while (failedToTransmit && (failCount < retryCount)) {
@@ -136,8 +136,11 @@ public void upload() {
136136
failureMessage = e.getMessage();
137137
LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage());
138138
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
139-
Thread.sleep(calculateBackOffTime(backOffTimeBase, failCount));
140139
failCount++;
140+
final long delayMillis = backoff.nextDelayMillis(failCount);
141+
if (delayMillis > 0) {
142+
Thread.sleep(delayMillis);
143+
}
141144
}
142145
}
143146
} catch (Exception e) {
@@ -197,15 +200,5 @@ List<DlqObject> getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsRespo
197200
return dlqObjects;
198201
}
199202

200-
private long calculateBackOffTime(final long backOffTimeBase, final int failCounter) {
201-
long scale = (long)Math.pow(EXP_TIME_SCALE, failCounter);
202-
203-
if (scale >= UPPER_RETRY_TIME_BOUND_MILLISECONDS) {
204-
return UPPER_RETRY_TIME_BOUND_MILLISECONDS;
205-
}
206-
207-
return scale * backOffTimeBase;
208-
}
209-
210203
}
211204
}

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config;
77

88
import com.fasterxml.jackson.annotation.JsonProperty;
9+
import jakarta.validation.constraints.Size;
910
import jakarta.validation.Valid;
1011
import jakarta.validation.constraints.NotEmpty;
1112
import jakarta.validation.constraints.NotNull;
@@ -14,7 +15,7 @@
1415

1516

1617
public class CloudWatchLogsSinkConfig {
17-
public static final String DEFAULT_BUFFER_TYPE = "in_memory";
18+
public static final int DEFAULT_RETRY_COUNT = 5;
1819

1920
@JsonProperty("aws")
2021
@Valid
@@ -26,9 +27,6 @@ public class CloudWatchLogsSinkConfig {
2627
@JsonProperty("threshold")
2728
private ThresholdConfig thresholdConfig = new ThresholdConfig();
2829

29-
@JsonProperty("buffer_type")
30-
private String bufferType = DEFAULT_BUFFER_TYPE;
31-
3230
@JsonProperty("log_group")
3331
@NotEmpty
3432
@NotNull
@@ -39,6 +37,10 @@ public class CloudWatchLogsSinkConfig {
3937
@NotNull
4038
private String logStream;
4139

40+
@JsonProperty("max_retries")
41+
@Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15")
42+
private int maxRetries = DEFAULT_RETRY_COUNT;
43+
4244
public AwsConfig getAwsConfig() {
4345
return awsConfig;
4446
}
@@ -51,15 +53,16 @@ public PluginModel getDlq() {
5153
return dlq;
5254
}
5355

54-
public String getBufferType() {
55-
return bufferType;
56-
}
57-
5856
public String getLogGroup() {
5957
return logGroup;
6058
}
6159

6260
public String getLogStream() {
6361
return logStream;
6462
}
63+
64+
public int getMaxRetries() {
65+
return maxRetries;
66+
}
67+
6568
}

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/ThresholdConfig.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.opensearch.dataprepper.model.types.ByteCount;
1313

1414
import java.time.Duration;
15-
import java.time.temporal.ChronoUnit;
1615

1716
/**
1817
* The threshold config holds the different configurations for
@@ -23,9 +22,7 @@ public class ThresholdConfig {
2322
public static final int DEFAULT_BATCH_SIZE = 25;
2423
public static final String DEFAULT_EVENT_SIZE = "256kb";
2524
public static final String DEFAULT_SIZE_OF_REQUEST = "1mb";
26-
public static final int DEFAULT_RETRY_COUNT = 5;
2725
public static final long DEFAULT_LOG_SEND_INTERVAL_TIME = 60;
28-
public static final long DEFAULT_BACKOFF_TIME = 500;
2926

3027
@JsonProperty("batch_size")
3128
@Size(min = 1, max = 10000, message = "batch_size amount should be between 1 to 10000")
@@ -38,20 +35,11 @@ public class ThresholdConfig {
3835
@JsonProperty("max_request_size")
3936
private String maxRequestSize = DEFAULT_SIZE_OF_REQUEST;
4037

41-
@JsonProperty("retry_count")
42-
@Size(min = 1, max = 15, message = "retry_count amount should be between 1 and 15")
43-
private int retryCount = DEFAULT_RETRY_COUNT;
44-
4538
@JsonProperty("log_send_interval")
4639
@DurationMin(seconds = 60)
4740
@DurationMax(seconds = 3600)
4841
private Duration logSendInterval = Duration.ofSeconds(DEFAULT_LOG_SEND_INTERVAL_TIME);
4942

50-
@JsonProperty("back_off_time")
51-
@DurationMin(millis = 500)
52-
@DurationMax(millis = 1000)
53-
private Duration backOffTime = Duration.ofMillis(DEFAULT_BACKOFF_TIME);
54-
5543
public int getBatchSize() {
5644
return batchSize;
5745
}
@@ -64,15 +52,8 @@ public long getMaxRequestSizeBytes() {
6452
return ByteCount.parse(maxRequestSize).getBytes();
6553
}
6654

67-
public int getRetryCount() {
68-
return retryCount;
69-
}
70-
7155
public long getLogSendInterval() {
7256
return logSendInterval.getSeconds();
7357
}
7458

75-
public long getBackOffTime() {
76-
return (backOffTime.get(ChronoUnit.NANOS) / 1000000) + (backOffTime.getSeconds() * 1000);
77-
}
78-
}
59+
}

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ void setUp() {
6767
when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
6868
when(mockCloudWatchLogsSinkConfig.getLogGroup()).thenReturn(TEST_LOG_GROUP);
6969
when(mockCloudWatchLogsSinkConfig.getLogStream()).thenReturn(TEST_LOG_STREAM);
70-
when(mockCloudWatchLogsSinkConfig.getBufferType()).thenReturn(TEST_BUFFER_TYPE);
7170

7271
when(mockPluginSetting.getName()).thenReturn(TEST_PLUGIN_NAME);
7372
when(mockPluginSetting.getPipelineName()).thenReturn(TEST_PIPELINE_NAME);

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class CloudWatchLogsDispatcherTest {
2727
private CloudWatchLogsClient mockCloudWatchLogsClient;
2828
private CloudWatchLogsMetrics mockCloudWatchLogsMetrics;
2929
private Executor mockExecutor;
30+
private static final int RETRY_COUNT = 5;
3031
private static final String LOG_GROUP = "testGroup";
3132
private static final String LOG_STREAM = "testStream";
3233
private static final String TEST_STRING = "testMessage";
@@ -66,8 +67,7 @@ CloudWatchLogsDispatcher getCloudWatchLogsDispatcher() {
6667
.executor(mockExecutor)
6768
.logGroup(LOG_GROUP)
6869
.logStream(LOG_STREAM)
69-
.retryCount(ThresholdConfig.DEFAULT_RETRY_COUNT)
70-
.backOffTimeBase(ThresholdConfig.DEFAULT_BACKOFF_TIME)
70+
.retryCount(RETRY_COUNT)
7171
.build();
7272
}
7373

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/UploaderTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.Mockito.when;
2121

2222
class UploaderTest {
23+
private static final int RETRY_COUNT = 5;
2324
private CloudWatchLogsClient mockCloudWatchLogsClient;
2425
private CloudWatchLogsMetrics mockCloudWatchLogsMetrics;
2526

@@ -50,8 +51,7 @@ CloudWatchLogsDispatcher.Uploader getUploader() {
5051
.putLogEventsRequest(getMockPutLogEventsRequest())
5152
.eventHandles(getTestEventHandles())
5253
.totalEventCount(ThresholdConfig.DEFAULT_BATCH_SIZE)
53-
.retryCount(ThresholdConfig.DEFAULT_RETRY_COUNT)
54-
.backOffTimeBase(ThresholdConfig.DEFAULT_BACKOFF_TIME)
54+
.retryCount(RETRY_COUNT)
5555
.build();
5656
}
5757

@@ -78,7 +78,7 @@ void GIVEN_valid_uploader_WHEN_run_throws_cloud_watch_logs_exception_SHOULD_upda
7878
CloudWatchLogsDispatcher.Uploader testUploader = getUploader();
7979
testUploader.run();
8080

81-
verify(mockCloudWatchLogsMetrics, times(ThresholdConfig.DEFAULT_RETRY_COUNT)).increaseRequestFailCounter(1);
81+
verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1);
8282
verify(mockCloudWatchLogsMetrics, atLeastOnce()).increaseLogEventFailCounter(ThresholdConfig.DEFAULT_BATCH_SIZE);
8383
}
8484

@@ -88,7 +88,7 @@ void GIVEN_valid_uploader_WHEN_run_throws_sdk_client_except_SHOULD_update_fail_c
8888
CloudWatchLogsDispatcher.Uploader testUploader = getUploader();
8989
testUploader.run();
9090

91-
verify(mockCloudWatchLogsMetrics, times(ThresholdConfig.DEFAULT_RETRY_COUNT)).increaseRequestFailCounter(1);
91+
verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1);
9292
verify(mockCloudWatchLogsMetrics, atLeastOnce()).increaseLogEventFailCounter(ThresholdConfig.DEFAULT_BATCH_SIZE);
9393
}
9494
}

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,6 @@ void GIVEN_new_sink_config_WHEN_get_threshold_config_called_SHOULD_return_null()
3737
assertThat(new CloudWatchLogsSinkConfig().getThresholdConfig(), notNullValue());
3838
}
3939

40-
@Test
41-
void GIVEN_new_sink_config_WHEN_get_buffer_type_called_SHOULD_return_default_buffer_type() {
42-
assertThat(new CloudWatchLogsSinkConfig().getBufferType(), equalTo(CloudWatchLogsSinkConfig.DEFAULT_BUFFER_TYPE));
43-
}
44-
4540
@Test
4641
void GIVEN_new_sink_config_WHEN_get_log_group_called_SHOULD_return_null() {
4742
assertThat(new CloudWatchLogsSinkConfig().getLogGroup(), equalTo(null));
@@ -69,4 +64,4 @@ void GIVEN_empty_sink_config_WHEN_deserialized_from_json_SHOULD_return_valid_thr
6964
assertThat(cloudWatchLogsSinkConfig.getAwsConfig(), equalTo(awsConfig));
7065
assertThat(cloudWatchLogsSinkConfig.getThresholdConfig(), equalTo(thresholdConfig));
7166
}
72-
}
67+
}

0 commit comments

Comments
 (0)