diff --git a/data-prepper-plugins/cloudwatch-logs/README.md b/data-prepper-plugins/cloudwatch-logs/README.md index 068a390f5f..cf1a4b8fb9 100644 --- a/data-prepper-plugins/cloudwatch-logs/README.md +++ b/data-prepper-plugins/cloudwatch-logs/README.md @@ -35,6 +35,8 @@ pipeline: X-Request-ID: "request-123" X-Source: "dataprepper" endpoint: "https://logs.us-west-2.amazonaws.com" + create_log_group: false + create_log_stream: false ``` ## AWS Configuration @@ -65,6 +67,16 @@ pipeline: - `endpoint` (Optional) : A string representing a custom CloudWatch Logs endpoint URL to override the default service endpoint. +- `create_log_group` (Optional): A boolean that controls whether the sink will + create the configured `log_group` if it does not already exist. Defaults to + `false`. When set to `true`, the IAM principal must have `logs:CreateLogGroup` + permission. + +- `create_log_stream` (Optional): A boolean that controls whether the sink will + create the configured `log_stream` if it does not already exist. Defaults to + `false`. When set to `true`, the IAM principal must have `logs:CreateLogStream` + permission. + ## Buffer Type Configuration - `buffer_type` (Optional) : A string representing the type of buffer to use to hold onto events. Currently only supports `in_memory`. diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 7278fca25d..6234b13623 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.DeleteLogStreamRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import software.amazon.awssdk.services.s3.S3Client; @@ -271,7 +272,12 @@ void tearDown() { .logGroupName(logGroupName) .logStreamName(logStreamName) .build(); - cloudWatchLogsClient.deleteLogStream(deleteRequest); + try { + cloudWatchLogsClient.deleteLogStream(deleteRequest); + } catch (ResourceNotFoundException e) { + // Sink-side stream-creation may have failed in the test under verification; don't + // mask the real test failure with a teardown error for an absent stream. + } deleteObjectsWithPrefix(bucket, DLQ_PREFIX); } @@ -574,6 +580,45 @@ public void testToVerifyLackOfCredentialsResultInFailure() throws Exception { assertThat(eventsSuccessCount.get(), equalTo(0)); } + @Test + void TestSinkOperationWithCreateLogStream() throws Exception { + // Delete the stream setUp() created so we don't leak it. We're about to mutate + // logStreamName so tearDown() won't see this one again. + cloudWatchLogsClient.deleteLogStream(DeleteLogStreamRequest.builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .build()); + + // Use a brand-new stream name to avoid the delete-then-recreate-same-name window + // where CloudWatch Logs may transiently throw OperationAbortedException. + logStreamName = "CloudWatchLogsIT_create_" + RandomStringUtils.randomAlphabetic(8); + when(cloudWatchLogsSinkConfig.getLogStream()).thenReturn(logStreamName); + when(cloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(true); + when(cloudWatchLogsSinkConfig.getCreateLogStream()).thenReturn(true); + when(thresholdConfig.getBatchSize()).thenReturn(10); + when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L); + when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); + when(thresholdConfig.getFlushInterval()).thenReturn(10L); + + final long startTime = Instant.now().toEpochMilli(); + sink = createObjectUnderTest(); + sink.doOutput(getRecordList(NUM_RECORDS)); + + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + sink.doOutput(Collections.emptyList()); + GetLogEventsResponse resp = cloudWatchLogsClient.getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .startTime(startTime) + .endTime(Instant.now().toEpochMilli()) + .build()); + assertThat(resp.events().size(), equalTo(NUM_RECORDS)); + }); + assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); + assertThat(dlqSuccessCount.get(), equalTo(0)); + } + private Collection> getRecordList(int numberOfRecords) { final Collection> recordList = new ArrayList<>(); List records = generateRecords(numberOfRecords); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index e9111c6231..62e576f194 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -96,6 +96,8 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, .logStream(cloudWatchLogsSinkConfig.getLogStream()) .retryCount(dlqPushHandler == null ? Integer.MAX_VALUE : cloudWatchLogsSinkConfig.getMaxRetries()) .executor(executor) + .createLogGroup(cloudWatchLogsSinkConfig.getCreateLogGroup()) + .createLogStream(cloudWatchLogsSinkConfig.getCreateLogStream()) .build(); Buffer buffer; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index 99483dad4d..3011076af9 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -13,10 +13,14 @@ import com.linecorp.armeria.client.retry.Backoff; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.RejectedLogEventsInfo; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsSinkUtils; import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler; import org.opensearch.dataprepper.model.failures.DlqObject; @@ -40,24 +44,8 @@ public class CloudWatchLogsDispatcher { private String logGroup; private String logStream; private int retryCount; - public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient, - final CloudWatchLogsMetrics cloudWatchLogsMetrics, - final DlqPushHandler dlqPushHandler, - final boolean dropIfDlqNotConfigured, - final Executor executor, - final String logGroup, - final String logStream, - final int retryCount) { - this.cloudWatchLogsClient = cloudWatchLogsClient; - this.cloudWatchLogsMetrics = cloudWatchLogsMetrics; - this.logGroup = logGroup; - this.logStream = logStream; - this.retryCount = retryCount; - this.dlqPushHandler = dlqPushHandler; - this.dropIfDlqNotConfigured = dropIfDlqNotConfigured; - - this.executor = executor; - } + private boolean createLogGroup; + private boolean createLogStream; /** * Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents. @@ -101,6 +89,8 @@ public void dispatchLogs(List inputLogEvents, List e .dropIfDlqNotConfigured(dropIfDlqNotConfigured) .totalEventCount(inputLogEvents.size()) .retryCount(retryCount) + .createLogGroup(createLogGroup) + .createLogStream(createLogStream) .build()); } @@ -117,6 +107,8 @@ protected static class Uploader implements Runnable { private final int totalEventCount; private final int retryCount; private boolean dropIfDlqNotConfigured; + private final boolean createLogGroup; + private final boolean createLogStream; @Override public void run() { @@ -125,6 +117,7 @@ public void run() { public void upload() { boolean failedToTransmit = true; + boolean resourceCreationAttempted = false; int failCount = 0; String failureMessage = ""; PutLogEventsResponse putLogEventsResponse = null; @@ -138,17 +131,21 @@ public void upload() { cloudWatchLogsMetrics.increaseRequestSuccessCounter(1); failedToTransmit = false; + } catch (ResourceNotFoundException e) { + // Must be caught before CloudWatchLogsException since ResourceNotFoundException extends it. + if ((createLogGroup || createLogStream) && !resourceCreationAttempted) { + resourceCreationAttempted = true; + createResources(); + // Loop continues; next iteration retries PutLogEvents without incrementing failCount. + // If PutLogEvents still throws ResourceNotFoundException, the guard sends us to the + // else branch and normal retry/DLQ logic takes over. + } else { + failureMessage = e.getMessage(); + failCount = handlePutLogEventsFailure(e, failCount, backoff); + } } catch (CloudWatchLogsException | SdkClientException e) { failureMessage = e.getMessage(); - LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage()); - cloudWatchLogsMetrics.increaseRequestFailCounter(1); - if (++failCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) { - cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1); - } - final long delayMillis = backoff.nextDelayMillis(failCount); - if (delayMillis > 0) { - Thread.sleep(delayMillis); - } + failCount = handlePutLogEventsFailure(e, failCount, backoff); } } } catch (Exception e) { @@ -177,6 +174,67 @@ public void upload() { CloudWatchLogsSinkUtils.handleDlqObjects(dlqObjects, dlqPushHandler); } + /** + * Logs the failure, increments fail metrics, and sleeps using the backoff schedule. + * Returns the new fail count so the caller can update its local. Extracted so the + * ResourceNotFoundException-fallback branch and the generic CloudWatchLogsException/SDK + * catch don't drift apart. + */ + private int handlePutLogEventsFailure(final Exception e, final int currentFailCount, final Backoff backoff) + throws InterruptedException { + LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage()); + cloudWatchLogsMetrics.increaseRequestFailCounter(1); + final int newFailCount = currentFailCount + 1; + if (newFailCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) { + cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1); + } + final long delayMillis = backoff.nextDelayMillis(newFailCount); + if (delayMillis > 0) { + Thread.sleep(delayMillis); + } + return newFailCount; + } + + /** + * Attempts to create the configured log group and/or log stream based on the flags. + * The helper never throws — all SDK exceptions are caught inside so that a recovery + * failure does not interrupt the Uploader. ResourceAlreadyExistsException is intentionally + * swallowed to make creation idempotent. If creation fails, the next PutLogEvents call + * will hit ResourceNotFoundException again and the guard in upload() will route it to + * the normal retry/DLQ path. + */ + private void createResources() { + final String logGroupName = putLogEventsRequest.logGroupName(); + final String logStreamName = putLogEventsRequest.logStreamName(); + + if (createLogGroup) { + try { + cloudWatchLogsClient.createLogGroup( + CreateLogGroupRequest.builder().logGroupName(logGroupName).build()); + LOG.info("Created log group: {}", logGroupName); + } catch (ResourceAlreadyExistsException e) { + LOG.debug("Log group already exists: {}", logGroupName); + } catch (CloudWatchLogsException | SdkClientException e) { + LOG.warn("Unable to create log group '{}': {}", logGroupName, e.getMessage()); + } + } + + if (createLogStream) { + try { + cloudWatchLogsClient.createLogStream( + CreateLogStreamRequest.builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .build()); + LOG.info("Created log stream: {}/{}", logGroupName, logStreamName); + } catch (ResourceAlreadyExistsException e) { + LOG.debug("Log stream already exists: {}/{}", logGroupName, logStreamName); + } catch (CloudWatchLogsException | SdkClientException e) { + LOG.warn("Unable to create log stream '{}/{}': {}", logGroupName, logStreamName, e.getMessage()); + } + } + } + List getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsResponse) { List dlqObjects = new ArrayList<>(); RejectedLogEventsInfo rejectedLogEventsInfo = putLogEventsResponse.rejectedLogEventsInfo(); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index 8cddc032ab..563cd1070f 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -61,6 +61,12 @@ public class CloudWatchLogsSinkConfig { @JsonProperty("endpoint") private String endpoint; + @JsonProperty(value = "create_log_group", defaultValue = "false") + private boolean createLogGroup = false; + + @JsonProperty(value = "create_log_stream", defaultValue = "false") + private boolean createLogStream = false; + public AwsConfig getAwsConfig() { return awsConfig; } @@ -97,4 +103,12 @@ public String getEndpoint() { return endpoint; } + public boolean getCreateLogGroup() { + return createLogGroup; + } + + public boolean getCreateLogStream() { + return createLogStream; + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java index 3f19fe5ba8..a4b2dfa5dc 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -67,6 +67,9 @@ class CloudWatchLogsSinkTest { private static final String TEST_PLUGIN_NAME = "testPluginName"; private static final String TEST_PIPELINE_NAME = "testPipelineName"; private static final String TEST_BUFFER_TYPE = "in_memory"; + // Number of args Lombok @Builder passes to the all-args constructor of CloudWatchLogsDispatcher. + // Bumping this is the signal that positional context.arguments().get(N) calls below need to be re-audited. + private static final int EXPECTED_DISPATCHER_ARITY = 10; private int numRetries; @BeforeEach void setUp() { @@ -234,9 +237,11 @@ void WHEN_sink_initialization_with_header_overrides_THEN_sink_is_ready() { void WHEN_sink_has_no_dlq_config_THEN_retries_set_to_maxint() { when(mockCloudWatchLogsSinkConfig.getHeaderOverrides()).thenReturn(mockHeaderOverrides); + final int[] capturedArity = new int[1]; try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { final MockedConstruction dispatcherMock = mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedArity[0] = context.arguments().size(); numRetries = (int)context.arguments().get(7); }); @@ -249,6 +254,8 @@ void WHEN_sink_has_no_dlq_config_THEN_retries_set_to_maxint() { dispatcherMock.close(); } + // Arity guard: positional reads above silently rot when fields are added/reordered. + assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); assertThat(numRetries, equalTo(Integer.MAX_VALUE)); } @@ -260,9 +267,11 @@ void WHEN_sink_has_dlq_config_THEN_retries_set_to_user_configured_value() { when(mockAwsConfig.getAwsRegion()).thenReturn(Region.of("us-west-2")); when(mockAwsConfig.getAwsStsRoleArn()).thenReturn("role"); + final int[] capturedArity = new int[1]; try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { final MockedConstruction dispatcherMock = mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedArity[0] = context.arguments().size(); numRetries = (int)context.arguments().get(7); }); final MockedConstruction dlqMock = @@ -277,7 +286,67 @@ void WHEN_sink_has_dlq_config_THEN_retries_set_to_user_configured_value() { testCloudWatchSink.doInitialize(); dispatcherMock.close(); } + // Arity guard: positional reads above silently rot when fields are added/reordered. + assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); assertThat(numRetries, equalTo(TEST_MAX_RETRIES)); } + @Test + void WHEN_create_log_group_and_stream_flags_are_set_THEN_flags_passed_to_dispatcher() { + when(mockCloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(true); + when(mockCloudWatchLogsSinkConfig.getCreateLogStream()).thenReturn(true); + + final boolean[] capturedCreateLogGroup = new boolean[1]; + final boolean[] capturedCreateLogStream = new boolean[1]; + final int[] capturedArity = new int[1]; + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + final MockedConstruction dispatcherMock = + mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedArity[0] = context.arguments().size(); + capturedCreateLogGroup[0] = (boolean) context.arguments().get(8); + capturedCreateLogStream[0] = (boolean) context.arguments().get(9); + }); + + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class), any(), any())) + .thenReturn(mockClient); + + CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); + testCloudWatchSink.doInitialize(); + dispatcherMock.close(); + } + assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); + assertThat(capturedCreateLogGroup[0], equalTo(true)); + assertThat(capturedCreateLogStream[0], equalTo(true)); + } + + @Test + void WHEN_create_log_group_is_false_and_create_log_stream_is_false_THEN_flags_passed_as_false_to_dispatcher() { + when(mockCloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(false); + when(mockCloudWatchLogsSinkConfig.getCreateLogStream()).thenReturn(false); + + final boolean[] capturedCreateLogGroup = new boolean[]{true}; + final boolean[] capturedCreateLogStream = new boolean[]{true}; + final int[] capturedArity = new int[1]; + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + final MockedConstruction dispatcherMock = + mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedArity[0] = context.arguments().size(); + capturedCreateLogGroup[0] = (boolean) context.arguments().get(8); + capturedCreateLogStream[0] = (boolean) context.arguments().get(9); + }); + + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class), any(), any())) + .thenReturn(mockClient); + + CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); + testCloudWatchSink.doInitialize(); + dispatcherMock.close(); + } + assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); + assertThat(capturedCreateLogGroup[0], equalTo(false)); + assertThat(capturedCreateLogStream[0], equalTo(false)); + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 04c73fc259..01f67dc064 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -11,10 +11,16 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupResponse; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.RejectedLogEventsInfo; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; @@ -80,6 +86,20 @@ CloudWatchLogsDispatcher getCloudWatchLogsDispatcher(int retryCount) { .build(); } + CloudWatchLogsDispatcher getCloudWatchLogsDispatcherWithCreateFlag(final int retryCount, final boolean createLogGroup, final boolean createLogStream) { + return CloudWatchLogsDispatcher.builder() + .cloudWatchLogsClient(mockCloudWatchLogsClient) + .cloudWatchLogsMetrics(mockCloudWatchLogsMetrics) + .executor(mockExecutor) + .logGroup(LOG_GROUP) + .logStream(LOG_STREAM) + .retryCount(retryCount) + .dropIfDlqNotConfigured(true) + .createLogGroup(createLogGroup) + .createLogStream(createLogStream) + .build(); + } + private void executeDispatcherRunnable() { ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); verify(mockExecutor).execute(runnableCaptor.capture()); @@ -268,4 +288,266 @@ void GIVEN_max_retries_exceeded_SHOULD_not_release_events() { // No events should be released after max retries eventHandles.forEach(eventHandle -> verify(eventHandle).release(true)); } + + @Test + void GIVEN_resource_not_found_and_create_flag_true_WHEN_upload_SHOULD_create_group_and_stream_then_retry() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Group and stream were created exactly once. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + // Recovery action does not count as a failure — fail counter is not incremented. + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + // PutLogEvents retry succeeded, so success counter is incremented. + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_flag_false_WHEN_upload_SHOULD_follow_normal_retry_logic() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, false, false); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // No creation attempted when the flag is false. + verify(mockCloudWatchLogsClient, never()).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, never()).createLogStream(any(CreateLogStreamRequest.class)); + // ResourceNotFoundException flows to the normal retry/DLQ path: fail counter incremented for every retry. + verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_only_create_log_stream_true_WHEN_upload_SHOULD_only_create_stream() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, false, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + verify(mockCloudWatchLogsClient, never()).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_only_create_log_group_true_WHEN_upload_SHOULD_only_create_group() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, false); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, never()).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_succeeds_WHEN_retry_put_log_events_SHOULD_succeed() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // PutLogEvents retried after successful creation and succeeded — events released. + verify(mockCloudWatchLogsClient, times(2)).putLogEvents(any(PutLogEventsRequest.class)); + eventHandles.forEach(eventHandle -> verify(eventHandle).release(true)); + } + + @Test + void GIVEN_resource_not_found_and_group_already_exists_WHEN_create_SHOULD_ignore_and_create_stream() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenThrow(ResourceAlreadyExistsException.builder().message("group exists").build()); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // ResourceAlreadyExistsException is swallowed silently; createLogStream is still attempted; PutLogEvents retry succeeds. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_stream_already_exists_WHEN_create_SHOULD_ignore_and_retry_put_log_events() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenThrow(ResourceAlreadyExistsException.builder().message("stream exists").build()); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Stream ResourceAlreadyExistsException is swallowed; PutLogEvents retry runs and succeeds. + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_group_throws_access_denied_SHOULD_still_attempt_create_stream() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + // Simulate AccessDeniedException on createLogGroup (a CloudWatchLogsException subtype, surfacing as base in tests). + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenThrow(CloudWatchLogsException.builder().message("access denied").build()); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // createLogStream is still attempted even when createLogGroup fails — the helper does not short-circuit. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_stream_throws_cloudwatch_logs_exception_SHOULD_not_kill_uploader() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + // First PutLogEvents call throws ResourceNotFoundException; second PutLogEvents call (after failed creation) succeeds. + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenThrow(CloudWatchLogsException.builder().message("creation failed").build()); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Helper swallowed the exception; PutLogEvents retry ran and succeeded; uploader not interrupted. + verify(mockCloudWatchLogsClient, times(2)).putLogEvents(any(PutLogEventsRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_flag_true_SHOULD_only_attempt_creation_once_per_upload() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + // Every PutLogEvents call throws ResourceNotFoundException — creation should still only be attempted once. + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Creation invoked exactly once per Uploader invocation, no matter how many ResourceNotFoundExceptions follow. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + } + + @Test + void GIVEN_create_flag_true_AND_creation_succeeds_BUT_put_log_events_still_throws_resource_not_found_SHOULD_only_create_once_and_FAIL_RETRIES() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Creation attempted exactly once. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + // Subsequent ResourceNotFoundExceptions flow to the normal retry/DLQ path: fail counter incremented for every retry. + verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java index abc5686b2e..6c9f24d2df 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java @@ -322,4 +322,30 @@ void GIVEN_endpoint_configured_SHOULD_return_the_configured_value() throws NoSuc ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "endpoint", testEndpoint); assertThat(cloudWatchLogsSinkConfig.getEndpoint(), equalTo(testEndpoint)); } + + @Test + void GIVEN_new_sink_config_WHEN_get_create_log_group_called_SHOULD_return_false() { + assertThat(new CloudWatchLogsSinkConfig().getCreateLogGroup(), equalTo(false)); + } + + @Test + void GIVEN_create_log_group_set_true_WHEN_get_called_SHOULD_return_true() + throws NoSuchFieldException, IllegalAccessException { + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, + "createLogGroup", true); + assertThat(cloudWatchLogsSinkConfig.getCreateLogGroup(), equalTo(true)); + } + + @Test + void GIVEN_new_sink_config_WHEN_get_create_log_stream_called_SHOULD_return_false() { + assertThat(new CloudWatchLogsSinkConfig().getCreateLogStream(), equalTo(false)); + } + + @Test + void GIVEN_create_log_stream_set_true_WHEN_get_called_SHOULD_return_true() + throws NoSuchFieldException, IllegalAccessException { + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, + "createLogStream", true); + assertThat(cloudWatchLogsSinkConfig.getCreateLogStream(), equalTo(true)); + } }