Skip to content

Commit 43ba290

Browse files
kkondakasimonelbaz
authored andcommitted
Make CWL retry indefinitely for retryable errors when no DLQ configured (opensearch-project#6355)
* Make CWL retry indefinitely for retryable errors when no DLQ configured Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Added tests Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent cc6e25e commit 43ba290

5 files changed

Lines changed: 113 additions & 22 deletions

File tree

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
9494
.dropIfDlqNotConfigured(true)
9595
.logGroup(cloudWatchLogsSinkConfig.getLogGroup())
9696
.logStream(cloudWatchLogsSinkConfig.getLogStream())
97-
.retryCount(cloudWatchLogsSinkConfig.getMaxRetries())
97+
.retryCount(dlqPushHandler == null ? Integer.MAX_VALUE : cloudWatchLogsSinkConfig.getMaxRetries())
9898
.executor(executor)
9999
.build();
100100

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void dispatchLogs(List<InputLogEvent> inputLogEvents, List<EventHandle> e
107107
@Builder
108108
protected static class Uploader implements Runnable {
109109
static final long INITIAL_DELAY_MS = 50;
110+
static final int MULTIPLE_FAILURES_METRIC_COUNT = 5;
110111
static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();
111112
private final CloudWatchLogsClient cloudWatchLogsClient;
112113
private final CloudWatchLogsMetrics cloudWatchLogsMetrics;
@@ -141,7 +142,9 @@ public void upload() {
141142
failureMessage = e.getMessage();
142143
LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage());
143144
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
144-
failCount++;
145+
if (++failCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) {
146+
cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1);
147+
}
145148
final long delayMillis = backoff.nextDelayMillis(failCount);
146149
if (delayMillis > 0) {
147150
Thread.sleep(delayMillis);

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,23 @@ public class CloudWatchLogsMetrics {
1919
public static final String CLOUDWATCH_LOGS_EVENTS_SUCCEEDED = "cloudWatchLogsEventsSucceeded";
2020
public static final String CLOUDWATCH_LOGS_EVENTS_FAILED = "cloudWatchLogsEventsFailed";
2121
public static final String CLOUDWATCH_LOGS_REQUESTS_FAILED = "cloudWatchLogsRequestsFailed";
22+
public static final String CLOUDWATCH_LOGS_REQUEST_MULTI_FAILED = "cloudWatchLogsRequestMultipleFailures";
2223
public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped";
2324
public static final String CLOUDWATCH_LOGS_LOG_SIZE = "cloudWatchLogsLogSize";
2425
public static final String CLOUDWATCH_LOGS_REQUEST_SIZE = "cloudWatchLogsRequestSize";
2526
private final Counter logEventSuccessCounter;
2627
private final Counter logEventFailCounter;
2728
private final Counter requestSuccessCount;
2829
private final Counter requestFailCount;
30+
private final Counter requestMultiFailCount;
2931
private final Counter logLargeEventsDroppedCounter;
3032
private final DistributionSummary logSizeMetric;
3133
private final DistributionSummary requestSizeMetric;
3234

3335
public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) {
3436
this.logEventSuccessCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED);
3537
this.requestFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED);
38+
this.requestMultiFailCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUEST_MULTI_FAILED);
3639
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED);
3740
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED);
3841
this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED);
@@ -56,6 +59,10 @@ public void increaseRequestFailCounter(int value) {
5659
requestFailCount.increment(value);
5760
}
5861

62+
public void increaseRequestMultiFailCounter(int value) {
63+
requestMultiFailCount.increment(value);
64+
}
65+
5966
public void increaseLogLargeEventsDroppedCounter(int value) {
6067
logLargeEventsDroppedCounter.increment(value);
6168
}

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,34 @@
88
import io.micrometer.core.instrument.DistributionSummary;
99
import org.junit.jupiter.api.BeforeEach;
1010
import org.junit.jupiter.api.Test;
11+
import org.mockito.MockedConstruction;
1112
import org.mockito.MockedStatic;
1213
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
1314
import org.opensearch.dataprepper.metrics.PluginMetrics;
15+
import org.opensearch.dataprepper.model.configuration.PluginModel;
1416
import org.opensearch.dataprepper.model.plugin.PluginFactory;
1517
import org.opensearch.dataprepper.model.configuration.PluginSetting;
1618
import org.opensearch.dataprepper.model.event.Event;
1719
import org.opensearch.dataprepper.model.event.JacksonEvent;
1820
import org.opensearch.dataprepper.model.record.Record;
21+
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
1922
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory;
23+
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsDispatcher;
2024
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics;
2125
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig;
2226
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig;
2327
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig;
2428
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
29+
import software.amazon.awssdk.regions.Region;
30+
2531

2632
import java.util.ArrayList;
2733
import java.util.Collection;
2834
import java.util.HashMap;
2935
import java.util.Map;
3036

37+
import static org.hamcrest.MatcherAssert.assertThat;
38+
import static org.hamcrest.Matchers.equalTo;
3139
import static org.junit.jupiter.api.Assertions.assertTrue;
3240
import static org.junit.jupiter.api.Assertions.assertThrows;
3341
import static org.mockito.ArgumentMatchers.any;
@@ -36,12 +44,14 @@
3644
import static org.mockito.Mockito.atLeast;
3745
import static org.mockito.Mockito.mock;
3846
import static org.mockito.Mockito.mockStatic;
47+
import static org.mockito.Mockito.mockConstruction;
3948
import static org.mockito.Mockito.spy;
4049
import static org.mockito.Mockito.times;
4150
import static org.mockito.Mockito.verify;
4251
import static org.mockito.Mockito.when;
4352

4453
class CloudWatchLogsSinkTest {
54+
private static int TEST_MAX_RETRIES = 3;
4555
private PluginSetting mockPluginSetting;
4656
private PluginMetrics mockPluginMetrics;
4757
private PluginFactory mockPluginFactory;
@@ -57,6 +67,7 @@ class CloudWatchLogsSinkTest {
5767
private static final String TEST_PLUGIN_NAME = "testPluginName";
5868
private static final String TEST_PIPELINE_NAME = "testPipelineName";
5969
private static final String TEST_BUFFER_TYPE = "in_memory";
70+
private int numRetries;
6071
@BeforeEach
6172
void setUp() {
6273
mockPluginSetting = mock(PluginSetting.class);
@@ -73,12 +84,13 @@ void setUp() {
7384
DistributionSummary summary = mock(DistributionSummary.class);
7485
when(mockPluginMetrics.summary(anyString())).thenReturn(summary);
7586

87+
when(mockCloudWatchLogsSinkConfig.getDlq()).thenReturn(null);
7688
when(mockCloudWatchLogsSinkConfig.getAwsConfig()).thenReturn(mockAwsConfig);
7789
when(mockCloudWatchLogsSinkConfig.getThresholdConfig()).thenReturn(thresholdConfig);
7890
when(mockCloudWatchLogsSinkConfig.getHeaderOverrides()).thenReturn(new HashMap<>());
7991
when(mockCloudWatchLogsSinkConfig.getLogGroup()).thenReturn(TEST_LOG_GROUP);
8092
when(mockCloudWatchLogsSinkConfig.getLogStream()).thenReturn(TEST_LOG_STREAM);
81-
when(mockCloudWatchLogsSinkConfig.getMaxRetries()).thenReturn(3);
93+
when(mockCloudWatchLogsSinkConfig.getMaxRetries()).thenReturn(TEST_MAX_RETRIES);
8294
when(mockCloudWatchLogsSinkConfig.getWorkers()).thenReturn(10);
8395

8496
when(mockPluginSetting.getName()).thenReturn(TEST_PLUGIN_NAME);
@@ -167,17 +179,17 @@ void WHEN_given_sample_empty_records_THEN_records_are_not_processed() {
167179
void WHEN_header_overrides_is_empty_THEN_empty_map_is_passed_to_client_factory() {
168180
Map<String, String> emptyHeaders = new HashMap<>();
169181
when(mockCloudWatchLogsSinkConfig.getHeaderOverrides()).thenReturn(emptyHeaders);
170-
182+
171183
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
172184
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
173185
any(AwsCredentialsSupplier.class), any(), any()))
174186
.thenReturn(mockClient);
175187

176188
CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink();
177-
189+
178190
mockedStatic.verify(() -> CloudWatchLogsClientFactory.createCwlClient(
179-
eq(mockAwsConfig),
180-
eq(mockCredentialSupplier),
191+
eq(mockAwsConfig),
192+
eq(mockCredentialSupplier),
181193
eq(emptyHeaders),
182194
any()));
183195
}
@@ -186,17 +198,17 @@ void WHEN_header_overrides_is_empty_THEN_empty_map_is_passed_to_client_factory()
186198
@Test
187199
void WHEN_header_overrides_is_provided_THEN_headers_are_passed_to_client_factory() {
188200
when(mockCloudWatchLogsSinkConfig.getHeaderOverrides()).thenReturn(mockHeaderOverrides);
189-
201+
190202
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
191203
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
192204
any(AwsCredentialsSupplier.class), any(), any()))
193205
.thenReturn(mockClient);
194206

195207
CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink();
196-
208+
197209
mockedStatic.verify(() -> CloudWatchLogsClientFactory.createCwlClient(
198-
eq(mockAwsConfig),
199-
eq(mockCredentialSupplier),
210+
eq(mockAwsConfig),
211+
eq(mockCredentialSupplier),
200212
eq(mockHeaderOverrides),
201213
any()));
202214
}
@@ -205,16 +217,67 @@ void WHEN_header_overrides_is_provided_THEN_headers_are_passed_to_client_factory
205217
@Test
206218
void WHEN_sink_initialization_with_header_overrides_THEN_sink_is_ready() {
207219
when(mockCloudWatchLogsSinkConfig.getHeaderOverrides()).thenReturn(mockHeaderOverrides);
208-
220+
209221
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
210222
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
211223
any(AwsCredentialsSupplier.class), any(), any()))
212224
.thenReturn(mockClient);
213225

214226
CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink();
215227
testCloudWatchSink.doInitialize();
216-
228+
217229
assertTrue(testCloudWatchSink.isReady());
218230
}
219231
}
232+
233+
@Test
234+
void WHEN_sink_has_no_dlq_config_THEN_retries_set_to_maxint() {
235+
when(mockCloudWatchLogsSinkConfig.getHeaderOverrides()).thenReturn(mockHeaderOverrides);
236+
237+
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
238+
final MockedConstruction<CloudWatchLogsDispatcher> dispatcherMock =
239+
mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> {
240+
numRetries = (int)context.arguments().get(7);
241+
});
242+
243+
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
244+
any(AwsCredentialsSupplier.class), any(), any()))
245+
.thenReturn(mockClient);
246+
247+
CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink();
248+
testCloudWatchSink.doInitialize();
249+
dispatcherMock.close();
250+
251+
}
252+
assertThat(numRetries, equalTo(Integer.MAX_VALUE));
253+
}
254+
255+
@Test
256+
void WHEN_sink_has_dlq_config_THEN_retries_set_to_user_configured_value() {
257+
PluginModel dlqConfig = mock(PluginModel.class);
258+
when(mockCloudWatchLogsSinkConfig.getDlq()).thenReturn(dlqConfig);
259+
when(mockCloudWatchLogsSinkConfig.getHeaderOverrides()).thenReturn(mockHeaderOverrides);
260+
when(mockAwsConfig.getAwsRegion()).thenReturn(Region.of("us-west-2"));
261+
when(mockAwsConfig.getAwsStsRoleArn()).thenReturn("role");
262+
263+
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
264+
final MockedConstruction<CloudWatchLogsDispatcher> dispatcherMock =
265+
mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> {
266+
numRetries = (int)context.arguments().get(7);
267+
});
268+
final MockedConstruction<DlqPushHandler> dlqMock =
269+
mockConstruction(DlqPushHandler.class, (mock, context) -> {
270+
});
271+
272+
mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
273+
any(AwsCredentialsSupplier.class), any(), any()))
274+
.thenReturn(mockClient);
275+
276+
CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink();
277+
testCloudWatchSink.doInitialize();
278+
dispatcherMock.close();
279+
}
280+
assertThat(numRetries, equalTo(TEST_MAX_RETRIES));
281+
}
282+
220283
}

data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,14 @@ List<EventHandle> getSampleEventHandles() {
6868
return eventHandles;
6969
}
7070

71-
CloudWatchLogsDispatcher getCloudWatchLogsDispatcher() {
71+
CloudWatchLogsDispatcher getCloudWatchLogsDispatcher(int retryCount) {
7272
return CloudWatchLogsDispatcher.builder()
7373
.cloudWatchLogsClient(mockCloudWatchLogsClient)
7474
.cloudWatchLogsMetrics(mockCloudWatchLogsMetrics)
7575
.executor(mockExecutor)
7676
.logGroup(LOG_GROUP)
7777
.logStream(LOG_STREAM)
78-
.retryCount(RETRY_COUNT)
78+
.retryCount(retryCount)
7979
.dropIfDlqNotConfigured(true)
8080
.build();
8181
}
@@ -88,7 +88,7 @@ private void executeDispatcherRunnable() {
8888

8989
@Test
9090
void GIVEN_valid_input_log_events_SHOULD_call_executor() {
91-
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher();
91+
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT);
9292

9393
final List<EventHandle> eventHandles = getSampleEventHandles();
9494
final PutLogEventsResponse response = mock(PutLogEventsResponse.class);
@@ -105,7 +105,7 @@ void GIVEN_valid_input_log_events_SHOULD_call_executor() {
105105

106106
@Test
107107
void GIVEN_too_old_events_SHOULD_not_release_old_events() {
108-
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher();
108+
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT);
109109

110110
final List<EventHandle> eventHandles = getSampleEventHandles();
111111
final PutLogEventsResponse response = mock(PutLogEventsResponse.class);
@@ -134,7 +134,7 @@ void GIVEN_too_old_events_SHOULD_not_release_old_events() {
134134

135135
@Test
136136
void GIVEN_too_new_events_SHOULD_not_release_new_events() {
137-
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher();
137+
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT);
138138

139139
final List<EventHandle> eventHandles = getSampleEventHandles();
140140
final PutLogEventsResponse response = mock(PutLogEventsResponse.class);
@@ -164,7 +164,7 @@ void GIVEN_too_new_events_SHOULD_not_release_new_events() {
164164

165165
@Test
166166
void GIVEN_both_old_and_new_rejected_events_SHOULD_only_release_valid_events() {
167-
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher();
167+
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT);
168168

169169
final List<EventHandle> eventHandles = getSampleEventHandles();
170170
final PutLogEventsResponse response = mock(PutLogEventsResponse.class);
@@ -197,7 +197,7 @@ void GIVEN_both_old_and_new_rejected_events_SHOULD_only_release_valid_events() {
197197

198198
@Test
199199
void GIVEN_client_exception_SHOULD_retry() {
200-
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher();
200+
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT);
201201

202202
final List<EventHandle> eventHandles = getSampleEventHandles();
203203
when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class)))
@@ -213,9 +213,27 @@ void GIVEN_client_exception_SHOULD_retry() {
213213
verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1);
214214
}
215215

216+
@Test
217+
void GIVEN_cloudwatch_exception_SHOULD_retry_forever() {
218+
final int TEST_RETRY_COUNT = CloudWatchLogsDispatcher.Uploader.MULTIPLE_FAILURES_METRIC_COUNT+1;
219+
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(TEST_RETRY_COUNT);
220+
221+
final List<EventHandle> eventHandles = getSampleEventHandles();
222+
when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class)))
223+
.thenThrow(CloudWatchLogsException.class);
224+
List<InputLogEvent> inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData());
225+
cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles);
226+
227+
executeDispatcherRunnable();
228+
229+
verify(mockCloudWatchLogsMetrics, times(TEST_RETRY_COUNT)).increaseRequestFailCounter(1);
230+
verify(mockCloudWatchLogsMetrics, times(0)).increaseRequestSuccessCounter(1);
231+
verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestMultiFailCounter(1);
232+
}
233+
216234
@Test
217235
void GIVEN_cloudwatch_exception_SHOULD_retry() {
218-
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher();
236+
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT);
219237

220238
final List<EventHandle> eventHandles = getSampleEventHandles();
221239
when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class)))
@@ -233,7 +251,7 @@ void GIVEN_cloudwatch_exception_SHOULD_retry() {
233251

234252
@Test
235253
void GIVEN_max_retries_exceeded_SHOULD_not_release_events() {
236-
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher();
254+
cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT);
237255

238256
final List<EventHandle> eventHandles = getSampleEventHandles();
239257
when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class)))

0 commit comments

Comments
 (0)