From 292a9ac30e66eb1924764f0ff8b7351d22ac69f5 Mon Sep 17 00:00:00 2001 From: Nikhil Bagmar Date: Thu, 14 May 2026 14:56:03 -0700 Subject: [PATCH 1/3] Create log group and stream in CloudWatch Logs sink When enabled, the CloudWatch Logs sink creates the configured log group and log stream on first ResourceNotFoundException instead of failing to DLQ. This eliminates the need for manual pre-provisioning of CloudWatch Logs resources before running Data Prepper pipelines. - Reactive creation (on-failure), not eager (at-init) - Creation attempted at most once per Uploader invocation - Idempotent: ResourceAlreadyExistsException silently ignored - Requires logs:CreateLogGroup and logs:CreateLogStream IAM perms Resolves: #6861 Signed-off-by: Nikhil Bagmar --- .../cloudwatch-logs/README.md | 7 + .../cloudwatch_logs/CloudWatchLogsIT.java | 46 +++- .../cloudwatch_logs/CloudWatchLogsSink.java | 1 + .../client/CloudWatchLogsDispatcher.java | 103 ++++++-- .../config/CloudWatchLogsSinkConfig.java | 7 + .../CloudWatchLogsSinkTest.java | 63 +++++ .../client/CloudWatchLogsDispatcherTest.java | 237 ++++++++++++++++++ .../config/CloudWatchLogsSinkConfigTest.java | 13 + 8 files changed, 449 insertions(+), 28 deletions(-) diff --git a/data-prepper-plugins/cloudwatch-logs/README.md b/data-prepper-plugins/cloudwatch-logs/README.md index 068a390f5f..a9bb654514 100644 --- a/data-prepper-plugins/cloudwatch-logs/README.md +++ b/data-prepper-plugins/cloudwatch-logs/README.md @@ -35,6 +35,7 @@ pipeline: X-Request-ID: "request-123" X-Source: "dataprepper" endpoint: "https://logs.us-west-2.amazonaws.com" + create_log_group_and_stream: false ``` ## AWS Configuration @@ -65,6 +66,12 @@ pipeline: - `endpoint` (Optional) : A string representing a custom CloudWatch Logs endpoint URL to override the default service endpoint. +- `create_log_group_and_stream` (Optional): A boolean that controls whether + the sink will create the configured `log_group` and `log_stream` if they do + not already exist. Defaults to `false`. When set to `true`, the IAM principal + used by the sink must have `logs:CreateLogGroup` and `logs:CreateLogStream` + permissions in addition to `logs:PutLogEvents`. + ## Buffer Type Configuration - `buffer_type` (Optional) : A string representing the type of buffer to use to hold onto events. Currently only supports `in_memory`. diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 7278fca25d..79460f42c0 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.DeleteLogStreamRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import software.amazon.awssdk.services.s3.S3Client; @@ -271,7 +272,12 @@ void tearDown() { .logGroupName(logGroupName) .logStreamName(logStreamName) .build(); - cloudWatchLogsClient.deleteLogStream(deleteRequest); + try { + cloudWatchLogsClient.deleteLogStream(deleteRequest); + } catch (ResourceNotFoundException e) { + // Sink-side stream-creation may have failed in the test under verification; don't + // mask the real test failure with a teardown error for an absent stream. + } deleteObjectsWithPrefix(bucket, DLQ_PREFIX); } @@ -574,6 +580,44 @@ public void testToVerifyLackOfCredentialsResultInFailure() throws Exception { assertThat(eventsSuccessCount.get(), equalTo(0)); } + @Test + void TestSinkOperationWithCreateLogStream() throws Exception { + // Delete the stream setUp() created so we don't leak it. We're about to mutate + // logStreamName so tearDown() won't see this one again. + cloudWatchLogsClient.deleteLogStream(DeleteLogStreamRequest.builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .build()); + + // Use a brand-new stream name to avoid the delete-then-recreate-same-name window + // where CloudWatch Logs may transiently throw OperationAbortedException. + logStreamName = "CloudWatchLogsIT_create_" + RandomStringUtils.randomAlphabetic(8); + when(cloudWatchLogsSinkConfig.getLogStream()).thenReturn(logStreamName); + when(cloudWatchLogsSinkConfig.getCreateLogGroupAndStream()).thenReturn(true); + when(thresholdConfig.getBatchSize()).thenReturn(10); + when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L); + when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); + when(thresholdConfig.getFlushInterval()).thenReturn(10L); + + final long startTime = Instant.now().toEpochMilli(); + sink = createObjectUnderTest(); + sink.doOutput(getRecordList(NUM_RECORDS)); + + await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + sink.doOutput(Collections.emptyList()); + GetLogEventsResponse resp = cloudWatchLogsClient.getLogEvents( + GetLogEventsRequest.builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .startTime(startTime) + .endTime(Instant.now().toEpochMilli()) + .build()); + assertThat(resp.events().size(), equalTo(NUM_RECORDS)); + }); + assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); + assertThat(dlqSuccessCount.get(), equalTo(0)); + } + private Collection> getRecordList(int numberOfRecords) { final Collection> recordList = new ArrayList<>(); List records = generateRecords(numberOfRecords); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index e9111c6231..464f17141b 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -96,6 +96,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, .logStream(cloudWatchLogsSinkConfig.getLogStream()) .retryCount(dlqPushHandler == null ? Integer.MAX_VALUE : cloudWatchLogsSinkConfig.getMaxRetries()) .executor(executor) + .createLogGroupAndStream(cloudWatchLogsSinkConfig.getCreateLogGroupAndStream()) .build(); Buffer buffer; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index 99483dad4d..1be905e59d 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -13,10 +13,14 @@ import com.linecorp.armeria.client.retry.Backoff; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.RejectedLogEventsInfo; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsSinkUtils; import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler; import org.opensearch.dataprepper.model.failures.DlqObject; @@ -40,24 +44,7 @@ public class CloudWatchLogsDispatcher { private String logGroup; private String logStream; private int retryCount; - public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient, - final CloudWatchLogsMetrics cloudWatchLogsMetrics, - final DlqPushHandler dlqPushHandler, - final boolean dropIfDlqNotConfigured, - final Executor executor, - final String logGroup, - final String logStream, - final int retryCount) { - this.cloudWatchLogsClient = cloudWatchLogsClient; - this.cloudWatchLogsMetrics = cloudWatchLogsMetrics; - this.logGroup = logGroup; - this.logStream = logStream; - this.retryCount = retryCount; - this.dlqPushHandler = dlqPushHandler; - this.dropIfDlqNotConfigured = dropIfDlqNotConfigured; - - this.executor = executor; - } + private boolean createLogGroupAndStream; /** * Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents. @@ -101,6 +88,7 @@ public void dispatchLogs(List inputLogEvents, List e .dropIfDlqNotConfigured(dropIfDlqNotConfigured) .totalEventCount(inputLogEvents.size()) .retryCount(retryCount) + .createLogGroupAndStream(createLogGroupAndStream) .build()); } @@ -117,6 +105,7 @@ protected static class Uploader implements Runnable { private final int totalEventCount; private final int retryCount; private boolean dropIfDlqNotConfigured; + private final boolean createLogGroupAndStream; @Override public void run() { @@ -125,6 +114,7 @@ public void run() { public void upload() { boolean failedToTransmit = true; + boolean resourceCreationAttempted = false; int failCount = 0; String failureMessage = ""; PutLogEventsResponse putLogEventsResponse = null; @@ -138,17 +128,21 @@ public void upload() { cloudWatchLogsMetrics.increaseRequestSuccessCounter(1); failedToTransmit = false; + } catch (ResourceNotFoundException e) { + // Must be caught before CloudWatchLogsException since RNF extends CWLException. + if (createLogGroupAndStream && !resourceCreationAttempted) { + resourceCreationAttempted = true; + createLogGroupAndStream(); + // Loop continues; next iteration retries PLE without incrementing failCount. + // If PLE still throws RNF, the guard sends us to the else branch and + // normal retry/DLQ logic takes over. + } else { + failureMessage = e.getMessage(); + failCount = handlePutLogEventsFailure(e, failCount, backoff); + } } catch (CloudWatchLogsException | SdkClientException e) { failureMessage = e.getMessage(); - LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage()); - cloudWatchLogsMetrics.increaseRequestFailCounter(1); - if (++failCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) { - cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1); - } - final long delayMillis = backoff.nextDelayMillis(failCount); - if (delayMillis > 0) { - Thread.sleep(delayMillis); - } + failCount = handlePutLogEventsFailure(e, failCount, backoff); } } } catch (Exception e) { @@ -177,6 +171,61 @@ public void upload() { CloudWatchLogsSinkUtils.handleDlqObjects(dlqObjects, dlqPushHandler); } + /** + * Logs the failure, increments fail metrics, and sleeps using the backoff schedule. + * Returns the new fail count so the caller can update its local. Extracted so the + * RNF-fallback branch and the generic CWL/SDK catch don't drift apart. + */ + private int handlePutLogEventsFailure(final Exception e, final int currentFailCount, final Backoff backoff) + throws InterruptedException { + LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage()); + cloudWatchLogsMetrics.increaseRequestFailCounter(1); + final int newFailCount = currentFailCount + 1; + if (newFailCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) { + cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1); + } + final long delayMillis = backoff.nextDelayMillis(newFailCount); + if (delayMillis > 0) { + Thread.sleep(delayMillis); + } + return newFailCount; + } + + /** + * Attempts to create the configured log group and log stream. The helper never throws — + * all SDK exceptions are caught inside so that a recovery failure does not interrupt the + * Uploader. ResourceAlreadyExistsException is intentionally swallowed to make creation + * idempotent. If creation fails, the next PutLogEvents call will hit ResourceNotFoundException + * again and the guard in upload() will route it to the normal retry/DLQ path. + */ + private void createLogGroupAndStream() { + final String logGroupName = putLogEventsRequest.logGroupName(); + final String logStreamName = putLogEventsRequest.logStreamName(); + + try { + cloudWatchLogsClient.createLogGroup( + CreateLogGroupRequest.builder().logGroupName(logGroupName).build()); + LOG.info("Created log group: {}", logGroupName); + } catch (ResourceAlreadyExistsException e) { + LOG.debug("Log group already exists: {}", logGroupName); + } catch (CloudWatchLogsException | SdkClientException e) { + LOG.warn("Unable to create log group '{}': {}", logGroupName, e.getMessage()); + } + + try { + cloudWatchLogsClient.createLogStream( + CreateLogStreamRequest.builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .build()); + LOG.info("Created log stream: {}/{}", logGroupName, logStreamName); + } catch (ResourceAlreadyExistsException e) { + LOG.debug("Log stream already exists: {}/{}", logGroupName, logStreamName); + } catch (CloudWatchLogsException | SdkClientException e) { + LOG.warn("Unable to create log stream '{}/{}': {}", logGroupName, logStreamName, e.getMessage()); + } + } + List getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsResponse) { List dlqObjects = new ArrayList<>(); RejectedLogEventsInfo rejectedLogEventsInfo = putLogEventsResponse.rejectedLogEventsInfo(); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index 8cddc032ab..437bb2281a 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -61,6 +61,9 @@ public class CloudWatchLogsSinkConfig { @JsonProperty("endpoint") private String endpoint; + @JsonProperty(value = "create_log_group_and_stream", defaultValue = "false") + private boolean createLogGroupAndStream = false; + public AwsConfig getAwsConfig() { return awsConfig; } @@ -97,4 +100,8 @@ public String getEndpoint() { return endpoint; } + public boolean getCreateLogGroupAndStream() { + return createLogGroupAndStream; + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java index 3f19fe5ba8..3258f0a705 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -67,6 +67,9 @@ class CloudWatchLogsSinkTest { private static final String TEST_PLUGIN_NAME = "testPluginName"; private static final String TEST_PIPELINE_NAME = "testPipelineName"; private static final String TEST_BUFFER_TYPE = "in_memory"; + // Number of args Lombok @Builder passes to the all-args constructor of CloudWatchLogsDispatcher. + // Bumping this is the signal that positional context.arguments().get(N) calls below need to be re-audited. + private static final int EXPECTED_DISPATCHER_ARITY = 9; private int numRetries; @BeforeEach void setUp() { @@ -234,9 +237,11 @@ void WHEN_sink_initialization_with_header_overrides_THEN_sink_is_ready() { void WHEN_sink_has_no_dlq_config_THEN_retries_set_to_maxint() { when(mockCloudWatchLogsSinkConfig.getHeaderOverrides()).thenReturn(mockHeaderOverrides); + final int[] capturedArity = new int[1]; try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { final MockedConstruction dispatcherMock = mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedArity[0] = context.arguments().size(); numRetries = (int)context.arguments().get(7); }); @@ -249,6 +254,8 @@ void WHEN_sink_has_no_dlq_config_THEN_retries_set_to_maxint() { dispatcherMock.close(); } + // Arity guard: positional reads above silently rot when fields are added/reordered. + assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); assertThat(numRetries, equalTo(Integer.MAX_VALUE)); } @@ -260,9 +267,11 @@ void WHEN_sink_has_dlq_config_THEN_retries_set_to_user_configured_value() { when(mockAwsConfig.getAwsRegion()).thenReturn(Region.of("us-west-2")); when(mockAwsConfig.getAwsStsRoleArn()).thenReturn("role"); + final int[] capturedArity = new int[1]; try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { final MockedConstruction dispatcherMock = mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedArity[0] = context.arguments().size(); numRetries = (int)context.arguments().get(7); }); final MockedConstruction dlqMock = @@ -277,7 +286,61 @@ void WHEN_sink_has_dlq_config_THEN_retries_set_to_user_configured_value() { testCloudWatchSink.doInitialize(); dispatcherMock.close(); } + // Arity guard: positional reads above silently rot when fields are added/reordered. + assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); assertThat(numRetries, equalTo(TEST_MAX_RETRIES)); } + @Test + void WHEN_create_log_group_and_stream_is_true_THEN_flag_passed_to_dispatcher() { + when(mockCloudWatchLogsSinkConfig.getCreateLogGroupAndStream()).thenReturn(true); + + final boolean[] capturedFlag = new boolean[1]; + final int[] capturedArity = new int[1]; + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + final MockedConstruction dispatcherMock = + mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedArity[0] = context.arguments().size(); + capturedFlag[0] = (boolean) context.arguments().get(8); + }); + + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class), any(), any())) + .thenReturn(mockClient); + + CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); + testCloudWatchSink.doInitialize(); + dispatcherMock.close(); + } + // Arity guard: positional reads above silently rot when fields are added/reordered. + assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); + assertThat(capturedFlag[0], equalTo(true)); + } + + @Test + void WHEN_create_log_group_and_stream_is_false_THEN_flag_passed_as_false_to_dispatcher() { + when(mockCloudWatchLogsSinkConfig.getCreateLogGroupAndStream()).thenReturn(false); + + final boolean[] capturedFlag = new boolean[]{true}; + final int[] capturedArity = new int[1]; + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + final MockedConstruction dispatcherMock = + mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedArity[0] = context.arguments().size(); + capturedFlag[0] = (boolean) context.arguments().get(8); + }); + + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class), any(), any())) + .thenReturn(mockClient); + + CloudWatchLogsSink testCloudWatchSink = getTestCloudWatchSink(); + testCloudWatchSink.doInitialize(); + dispatcherMock.close(); + } + // Arity guard: positional reads above silently rot when fields are added/reordered. + assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); + assertThat(capturedFlag[0], equalTo(false)); + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 04c73fc259..0f87924bf7 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -11,10 +11,16 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupResponse; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.RejectedLogEventsInfo; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException; +import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; @@ -80,6 +86,19 @@ CloudWatchLogsDispatcher getCloudWatchLogsDispatcher(int retryCount) { .build(); } + CloudWatchLogsDispatcher getCloudWatchLogsDispatcherWithCreateFlag(final int retryCount, final boolean createLogGroupAndStream) { + return CloudWatchLogsDispatcher.builder() + .cloudWatchLogsClient(mockCloudWatchLogsClient) + .cloudWatchLogsMetrics(mockCloudWatchLogsMetrics) + .executor(mockExecutor) + .logGroup(LOG_GROUP) + .logStream(LOG_STREAM) + .retryCount(retryCount) + .dropIfDlqNotConfigured(true) + .createLogGroupAndStream(createLogGroupAndStream) + .build(); + } + private void executeDispatcherRunnable() { ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(Runnable.class); verify(mockExecutor).execute(runnableCaptor.capture()); @@ -268,4 +287,222 @@ void GIVEN_max_retries_exceeded_SHOULD_not_release_events() { // No events should be released after max retries eventHandles.forEach(eventHandle -> verify(eventHandle).release(true)); } + + @Test + void GIVEN_resource_not_found_and_create_flag_true_WHEN_upload_SHOULD_create_group_and_stream_then_retry() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Group and stream were created exactly once. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + // Recovery action does not count as a failure — fail counter is not incremented. + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + // PLE retry succeeded, so success counter is incremented. + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_flag_false_WHEN_upload_SHOULD_follow_normal_retry_logic() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, false); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // No creation attempted when the flag is false. + verify(mockCloudWatchLogsClient, never()).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, never()).createLogStream(any(CreateLogStreamRequest.class)); + // RNF flows to the normal retry/DLQ path: fail counter incremented for every retry. + verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_succeeds_WHEN_retry_ple_SHOULD_succeed() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // PLE retried after successful creation and succeeded — events released. + verify(mockCloudWatchLogsClient, times(2)).putLogEvents(any(PutLogEventsRequest.class)); + eventHandles.forEach(eventHandle -> verify(eventHandle).release(true)); + } + + @Test + void GIVEN_resource_not_found_and_group_already_exists_WHEN_create_SHOULD_ignore_and_create_stream() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenThrow(ResourceAlreadyExistsException.builder().message("group exists").build()); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // RAEE is swallowed silently; createLogStream is still attempted; PLE retry succeeds. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_stream_already_exists_WHEN_create_SHOULD_ignore_and_retry_ple() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenThrow(ResourceAlreadyExistsException.builder().message("stream exists").build()); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Stream RAEE is swallowed; PLE retry runs and succeeds. + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_group_throws_access_denied_SHOULD_still_attempt_create_stream() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + // Simulate AccessDeniedException on createLogGroup (a CloudWatchLogsException subtype, surfacing as base in tests). + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenThrow(CloudWatchLogsException.builder().message("access denied").build()); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // createLogStream is still attempted even when createLogGroup fails — the helper does not short-circuit. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_stream_throws_cwl_exception_SHOULD_not_kill_uploader() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + + final List eventHandles = getSampleEventHandles(); + // First PLE call throws RNF; second PLE call (after failed creation) succeeds. + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenThrow(CloudWatchLogsException.builder().message("creation failed").build()); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Helper swallowed the exception; PLE retry ran and succeeded; uploader not interrupted. + verify(mockCloudWatchLogsClient, times(2)).putLogEvents(any(PutLogEventsRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_flag_true_SHOULD_only_attempt_creation_once_per_upload() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + + final List eventHandles = getSampleEventHandles(); + // Every PLE call throws RNF — creation should still only be attempted once. + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Creation invoked exactly once per Uploader invocation, no matter how many RNFs follow. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + } + + @Test + void GIVEN_create_flag_true_AND_creation_succeeds_BUT_PLE_still_throws_RNF_SHOULD_only_create_once_and_FAIL_RETRIES() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Creation attempted exactly once. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + // Subsequent RNFs flow to the normal retry/DLQ path: fail counter incremented for every retry. + verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java index abc5686b2e..ad4202021b 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java @@ -322,4 +322,17 @@ void GIVEN_endpoint_configured_SHOULD_return_the_configured_value() throws NoSuc ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "endpoint", testEndpoint); assertThat(cloudWatchLogsSinkConfig.getEndpoint(), equalTo(testEndpoint)); } + + @Test + void GIVEN_new_sink_config_WHEN_get_create_log_group_and_stream_called_SHOULD_return_false() { + assertThat(new CloudWatchLogsSinkConfig().getCreateLogGroupAndStream(), equalTo(false)); + } + + @Test + void GIVEN_create_log_group_and_stream_set_true_WHEN_get_called_SHOULD_return_true() + throws NoSuchFieldException, IllegalAccessException { + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, + "createLogGroupAndStream", true); + assertThat(cloudWatchLogsSinkConfig.getCreateLogGroupAndStream(), equalTo(true)); + } } From 5e0ff1c4b68ef11c6ebe54150357ec20f5c0f226 Mon Sep 17 00:00:00 2001 From: Nikhil Bagmar Date: Tue, 19 May 2026 13:31:52 -0700 Subject: [PATCH 2/3] Split resource creation into separate create_log_group and create_log_stream config options - create_log_group (default false): creates log group on ResourceNotFoundException - create_log_stream (default true): creates log stream on ResourceNotFoundException - Dispatcher only calls createLogGroup/createLogStream based on respective flags - Expand acronyms in comments and test method names for readability - Add tests for independent flag combinations (only group, only stream) Signed-off-by: Nikhil Bagmar --- .../cloudwatch-logs/README.md | 17 ++-- .../cloudwatch_logs/CloudWatchLogsIT.java | 3 +- .../cloudwatch_logs/CloudWatchLogsSink.java | 3 +- .../client/CloudWatchLogsDispatcher.java | 79 ++++++++------- .../config/CloudWatchLogsSinkConfig.java | 15 ++- .../CloudWatchLogsSinkTest.java | 32 ++++--- .../client/CloudWatchLogsDispatcherTest.java | 95 ++++++++++++++----- .../config/CloudWatchLogsSinkConfigTest.java | 23 ++++- 8 files changed, 177 insertions(+), 90 deletions(-) diff --git a/data-prepper-plugins/cloudwatch-logs/README.md b/data-prepper-plugins/cloudwatch-logs/README.md index a9bb654514..be63608bfe 100644 --- a/data-prepper-plugins/cloudwatch-logs/README.md +++ b/data-prepper-plugins/cloudwatch-logs/README.md @@ -35,7 +35,8 @@ pipeline: X-Request-ID: "request-123" X-Source: "dataprepper" endpoint: "https://logs.us-west-2.amazonaws.com" - create_log_group_and_stream: false + create_log_group: false + create_log_stream: true ``` ## AWS Configuration @@ -66,11 +67,15 @@ pipeline: - `endpoint` (Optional) : A string representing a custom CloudWatch Logs endpoint URL to override the default service endpoint. -- `create_log_group_and_stream` (Optional): A boolean that controls whether - the sink will create the configured `log_group` and `log_stream` if they do - not already exist. Defaults to `false`. When set to `true`, the IAM principal - used by the sink must have `logs:CreateLogGroup` and `logs:CreateLogStream` - permissions in addition to `logs:PutLogEvents`. +- `create_log_group` (Optional): A boolean that controls whether the sink will + create the configured `log_group` if it does not already exist. Defaults to + `false`. When set to `true`, the IAM principal must have `logs:CreateLogGroup` + permission. + +- `create_log_stream` (Optional): A boolean that controls whether the sink will + create the configured `log_stream` if it does not already exist. Defaults to + `true`. When set to `true`, the IAM principal must have `logs:CreateLogStream` + permission. ## Buffer Type Configuration diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 79460f42c0..6234b13623 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -593,7 +593,8 @@ void TestSinkOperationWithCreateLogStream() throws Exception { // where CloudWatch Logs may transiently throw OperationAbortedException. logStreamName = "CloudWatchLogsIT_create_" + RandomStringUtils.randomAlphabetic(8); when(cloudWatchLogsSinkConfig.getLogStream()).thenReturn(logStreamName); - when(cloudWatchLogsSinkConfig.getCreateLogGroupAndStream()).thenReturn(true); + when(cloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(true); + when(cloudWatchLogsSinkConfig.getCreateLogStream()).thenReturn(true); when(thresholdConfig.getBatchSize()).thenReturn(10); when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L); when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index 464f17141b..62e576f194 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -96,7 +96,8 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, .logStream(cloudWatchLogsSinkConfig.getLogStream()) .retryCount(dlqPushHandler == null ? Integer.MAX_VALUE : cloudWatchLogsSinkConfig.getMaxRetries()) .executor(executor) - .createLogGroupAndStream(cloudWatchLogsSinkConfig.getCreateLogGroupAndStream()) + .createLogGroup(cloudWatchLogsSinkConfig.getCreateLogGroup()) + .createLogStream(cloudWatchLogsSinkConfig.getCreateLogStream()) .build(); Buffer buffer; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index 1be905e59d..3011076af9 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -44,7 +44,8 @@ public class CloudWatchLogsDispatcher { private String logGroup; private String logStream; private int retryCount; - private boolean createLogGroupAndStream; + private boolean createLogGroup; + private boolean createLogStream; /** * Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents. @@ -88,7 +89,8 @@ public void dispatchLogs(List inputLogEvents, List e .dropIfDlqNotConfigured(dropIfDlqNotConfigured) .totalEventCount(inputLogEvents.size()) .retryCount(retryCount) - .createLogGroupAndStream(createLogGroupAndStream) + .createLogGroup(createLogGroup) + .createLogStream(createLogStream) .build()); } @@ -105,7 +107,8 @@ protected static class Uploader implements Runnable { private final int totalEventCount; private final int retryCount; private boolean dropIfDlqNotConfigured; - private final boolean createLogGroupAndStream; + private final boolean createLogGroup; + private final boolean createLogStream; @Override public void run() { @@ -129,13 +132,13 @@ public void upload() { failedToTransmit = false; } catch (ResourceNotFoundException e) { - // Must be caught before CloudWatchLogsException since RNF extends CWLException. - if (createLogGroupAndStream && !resourceCreationAttempted) { + // Must be caught before CloudWatchLogsException since ResourceNotFoundException extends it. + if ((createLogGroup || createLogStream) && !resourceCreationAttempted) { resourceCreationAttempted = true; - createLogGroupAndStream(); - // Loop continues; next iteration retries PLE without incrementing failCount. - // If PLE still throws RNF, the guard sends us to the else branch and - // normal retry/DLQ logic takes over. + createResources(); + // Loop continues; next iteration retries PutLogEvents without incrementing failCount. + // If PutLogEvents still throws ResourceNotFoundException, the guard sends us to the + // else branch and normal retry/DLQ logic takes over. } else { failureMessage = e.getMessage(); failCount = handlePutLogEventsFailure(e, failCount, backoff); @@ -174,7 +177,8 @@ public void upload() { /** * Logs the failure, increments fail metrics, and sleeps using the backoff schedule. * Returns the new fail count so the caller can update its local. Extracted so the - * RNF-fallback branch and the generic CWL/SDK catch don't drift apart. + * ResourceNotFoundException-fallback branch and the generic CloudWatchLogsException/SDK + * catch don't drift apart. */ private int handlePutLogEventsFailure(final Exception e, final int currentFailCount, final Backoff backoff) throws InterruptedException { @@ -192,37 +196,42 @@ private int handlePutLogEventsFailure(final Exception e, final int currentFailCo } /** - * Attempts to create the configured log group and log stream. The helper never throws — - * all SDK exceptions are caught inside so that a recovery failure does not interrupt the - * Uploader. ResourceAlreadyExistsException is intentionally swallowed to make creation - * idempotent. If creation fails, the next PutLogEvents call will hit ResourceNotFoundException - * again and the guard in upload() will route it to the normal retry/DLQ path. + * Attempts to create the configured log group and/or log stream based on the flags. + * The helper never throws — all SDK exceptions are caught inside so that a recovery + * failure does not interrupt the Uploader. ResourceAlreadyExistsException is intentionally + * swallowed to make creation idempotent. If creation fails, the next PutLogEvents call + * will hit ResourceNotFoundException again and the guard in upload() will route it to + * the normal retry/DLQ path. */ - private void createLogGroupAndStream() { + private void createResources() { final String logGroupName = putLogEventsRequest.logGroupName(); final String logStreamName = putLogEventsRequest.logStreamName(); - try { - cloudWatchLogsClient.createLogGroup( - CreateLogGroupRequest.builder().logGroupName(logGroupName).build()); - LOG.info("Created log group: {}", logGroupName); - } catch (ResourceAlreadyExistsException e) { - LOG.debug("Log group already exists: {}", logGroupName); - } catch (CloudWatchLogsException | SdkClientException e) { - LOG.warn("Unable to create log group '{}': {}", logGroupName, e.getMessage()); + if (createLogGroup) { + try { + cloudWatchLogsClient.createLogGroup( + CreateLogGroupRequest.builder().logGroupName(logGroupName).build()); + LOG.info("Created log group: {}", logGroupName); + } catch (ResourceAlreadyExistsException e) { + LOG.debug("Log group already exists: {}", logGroupName); + } catch (CloudWatchLogsException | SdkClientException e) { + LOG.warn("Unable to create log group '{}': {}", logGroupName, e.getMessage()); + } } - try { - cloudWatchLogsClient.createLogStream( - CreateLogStreamRequest.builder() - .logGroupName(logGroupName) - .logStreamName(logStreamName) - .build()); - LOG.info("Created log stream: {}/{}", logGroupName, logStreamName); - } catch (ResourceAlreadyExistsException e) { - LOG.debug("Log stream already exists: {}/{}", logGroupName, logStreamName); - } catch (CloudWatchLogsException | SdkClientException e) { - LOG.warn("Unable to create log stream '{}/{}': {}", logGroupName, logStreamName, e.getMessage()); + if (createLogStream) { + try { + cloudWatchLogsClient.createLogStream( + CreateLogStreamRequest.builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .build()); + LOG.info("Created log stream: {}/{}", logGroupName, logStreamName); + } catch (ResourceAlreadyExistsException e) { + LOG.debug("Log stream already exists: {}/{}", logGroupName, logStreamName); + } catch (CloudWatchLogsException | SdkClientException e) { + LOG.warn("Unable to create log stream '{}/{}': {}", logGroupName, logStreamName, e.getMessage()); + } } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index 437bb2281a..5adea4a211 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -61,8 +61,11 @@ public class CloudWatchLogsSinkConfig { @JsonProperty("endpoint") private String endpoint; - @JsonProperty(value = "create_log_group_and_stream", defaultValue = "false") - private boolean createLogGroupAndStream = false; + @JsonProperty(value = "create_log_group", defaultValue = "false") + private boolean createLogGroup = false; + + @JsonProperty(value = "create_log_stream", defaultValue = "true") + private boolean createLogStream = true; public AwsConfig getAwsConfig() { return awsConfig; @@ -100,8 +103,12 @@ public String getEndpoint() { return endpoint; } - public boolean getCreateLogGroupAndStream() { - return createLogGroupAndStream; + public boolean getCreateLogGroup() { + return createLogGroup; + } + + public boolean getCreateLogStream() { + return createLogStream; } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java index 3258f0a705..a4b2dfa5dc 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -69,7 +69,7 @@ class CloudWatchLogsSinkTest { private static final String TEST_BUFFER_TYPE = "in_memory"; // Number of args Lombok @Builder passes to the all-args constructor of CloudWatchLogsDispatcher. // Bumping this is the signal that positional context.arguments().get(N) calls below need to be re-audited. - private static final int EXPECTED_DISPATCHER_ARITY = 9; + private static final int EXPECTED_DISPATCHER_ARITY = 10; private int numRetries; @BeforeEach void setUp() { @@ -292,16 +292,19 @@ void WHEN_sink_has_dlq_config_THEN_retries_set_to_user_configured_value() { } @Test - void WHEN_create_log_group_and_stream_is_true_THEN_flag_passed_to_dispatcher() { - when(mockCloudWatchLogsSinkConfig.getCreateLogGroupAndStream()).thenReturn(true); + void WHEN_create_log_group_and_stream_flags_are_set_THEN_flags_passed_to_dispatcher() { + when(mockCloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(true); + when(mockCloudWatchLogsSinkConfig.getCreateLogStream()).thenReturn(true); - final boolean[] capturedFlag = new boolean[1]; + final boolean[] capturedCreateLogGroup = new boolean[1]; + final boolean[] capturedCreateLogStream = new boolean[1]; final int[] capturedArity = new int[1]; try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { final MockedConstruction dispatcherMock = mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { capturedArity[0] = context.arguments().size(); - capturedFlag[0] = (boolean) context.arguments().get(8); + capturedCreateLogGroup[0] = (boolean) context.arguments().get(8); + capturedCreateLogStream[0] = (boolean) context.arguments().get(9); }); mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), @@ -312,22 +315,25 @@ void WHEN_create_log_group_and_stream_is_true_THEN_flag_passed_to_dispatcher() { testCloudWatchSink.doInitialize(); dispatcherMock.close(); } - // Arity guard: positional reads above silently rot when fields are added/reordered. assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); - assertThat(capturedFlag[0], equalTo(true)); + assertThat(capturedCreateLogGroup[0], equalTo(true)); + assertThat(capturedCreateLogStream[0], equalTo(true)); } @Test - void WHEN_create_log_group_and_stream_is_false_THEN_flag_passed_as_false_to_dispatcher() { - when(mockCloudWatchLogsSinkConfig.getCreateLogGroupAndStream()).thenReturn(false); + void WHEN_create_log_group_is_false_and_create_log_stream_is_false_THEN_flags_passed_as_false_to_dispatcher() { + when(mockCloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(false); + when(mockCloudWatchLogsSinkConfig.getCreateLogStream()).thenReturn(false); - final boolean[] capturedFlag = new boolean[]{true}; + final boolean[] capturedCreateLogGroup = new boolean[]{true}; + final boolean[] capturedCreateLogStream = new boolean[]{true}; final int[] capturedArity = new int[1]; try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { final MockedConstruction dispatcherMock = mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { capturedArity[0] = context.arguments().size(); - capturedFlag[0] = (boolean) context.arguments().get(8); + capturedCreateLogGroup[0] = (boolean) context.arguments().get(8); + capturedCreateLogStream[0] = (boolean) context.arguments().get(9); }); mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), @@ -338,9 +344,9 @@ void WHEN_create_log_group_and_stream_is_false_THEN_flag_passed_as_false_to_disp testCloudWatchSink.doInitialize(); dispatcherMock.close(); } - // Arity guard: positional reads above silently rot when fields are added/reordered. assertThat(capturedArity[0], equalTo(EXPECTED_DISPATCHER_ARITY)); - assertThat(capturedFlag[0], equalTo(false)); + assertThat(capturedCreateLogGroup[0], equalTo(false)); + assertThat(capturedCreateLogStream[0], equalTo(false)); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 0f87924bf7..01f67dc064 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -86,7 +86,7 @@ CloudWatchLogsDispatcher getCloudWatchLogsDispatcher(int retryCount) { .build(); } - CloudWatchLogsDispatcher getCloudWatchLogsDispatcherWithCreateFlag(final int retryCount, final boolean createLogGroupAndStream) { + CloudWatchLogsDispatcher getCloudWatchLogsDispatcherWithCreateFlag(final int retryCount, final boolean createLogGroup, final boolean createLogStream) { return CloudWatchLogsDispatcher.builder() .cloudWatchLogsClient(mockCloudWatchLogsClient) .cloudWatchLogsMetrics(mockCloudWatchLogsMetrics) @@ -95,7 +95,8 @@ CloudWatchLogsDispatcher getCloudWatchLogsDispatcherWithCreateFlag(final int ret .logStream(LOG_STREAM) .retryCount(retryCount) .dropIfDlqNotConfigured(true) - .createLogGroupAndStream(createLogGroupAndStream) + .createLogGroup(createLogGroup) + .createLogStream(createLogStream) .build(); } @@ -290,7 +291,7 @@ void GIVEN_max_retries_exceeded_SHOULD_not_release_events() { @Test void GIVEN_resource_not_found_and_create_flag_true_WHEN_upload_SHOULD_create_group_and_stream_then_retry() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); final List eventHandles = getSampleEventHandles(); when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) @@ -311,13 +312,13 @@ void GIVEN_resource_not_found_and_create_flag_true_WHEN_upload_SHOULD_create_gro verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); // Recovery action does not count as a failure — fail counter is not incremented. verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); - // PLE retry succeeded, so success counter is incremented. + // PutLogEvents retry succeeded, so success counter is incremented. verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); } @Test void GIVEN_resource_not_found_and_create_flag_false_WHEN_upload_SHOULD_follow_normal_retry_logic() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, false); + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, false, false); final List eventHandles = getSampleEventHandles(); when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) @@ -331,14 +332,58 @@ void GIVEN_resource_not_found_and_create_flag_false_WHEN_upload_SHOULD_follow_no // No creation attempted when the flag is false. verify(mockCloudWatchLogsClient, never()).createLogGroup(any(CreateLogGroupRequest.class)); verify(mockCloudWatchLogsClient, never()).createLogStream(any(CreateLogStreamRequest.class)); - // RNF flows to the normal retry/DLQ path: fail counter incremented for every retry. + // ResourceNotFoundException flows to the normal retry/DLQ path: fail counter incremented for every retry. verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1); verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); } @Test - void GIVEN_resource_not_found_and_create_succeeds_WHEN_retry_ple_SHOULD_succeed() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + void GIVEN_resource_not_found_and_only_create_log_stream_true_WHEN_upload_SHOULD_only_create_stream() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, false, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenReturn(mock(CreateLogStreamResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + verify(mockCloudWatchLogsClient, never()).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_only_create_log_group_true_WHEN_upload_SHOULD_only_create_group() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, false); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenReturn(mock(CreateLogGroupResponse.class)); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, never()).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_resource_not_found_and_create_succeeds_WHEN_retry_put_log_events_SHOULD_succeed() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); final List eventHandles = getSampleEventHandles(); when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) @@ -354,14 +399,14 @@ void GIVEN_resource_not_found_and_create_succeeds_WHEN_retry_ple_SHOULD_succeed( executeDispatcherRunnable(); - // PLE retried after successful creation and succeeded — events released. + // PutLogEvents retried after successful creation and succeeded — events released. verify(mockCloudWatchLogsClient, times(2)).putLogEvents(any(PutLogEventsRequest.class)); eventHandles.forEach(eventHandle -> verify(eventHandle).release(true)); } @Test void GIVEN_resource_not_found_and_group_already_exists_WHEN_create_SHOULD_ignore_and_create_stream() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); final List eventHandles = getSampleEventHandles(); when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) @@ -377,7 +422,7 @@ void GIVEN_resource_not_found_and_group_already_exists_WHEN_create_SHOULD_ignore executeDispatcherRunnable(); - // RAEE is swallowed silently; createLogStream is still attempted; PLE retry succeeds. + // ResourceAlreadyExistsException is swallowed silently; createLogStream is still attempted; PutLogEvents retry succeeds. verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); @@ -385,8 +430,8 @@ void GIVEN_resource_not_found_and_group_already_exists_WHEN_create_SHOULD_ignore } @Test - void GIVEN_resource_not_found_and_stream_already_exists_WHEN_create_SHOULD_ignore_and_retry_ple() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + void GIVEN_resource_not_found_and_stream_already_exists_WHEN_create_SHOULD_ignore_and_retry_put_log_events() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); final List eventHandles = getSampleEventHandles(); when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) @@ -402,7 +447,7 @@ void GIVEN_resource_not_found_and_stream_already_exists_WHEN_create_SHOULD_ignor executeDispatcherRunnable(); - // Stream RAEE is swallowed; PLE retry runs and succeeds. + // Stream ResourceAlreadyExistsException is swallowed; PutLogEvents retry runs and succeeds. verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); @@ -410,7 +455,7 @@ void GIVEN_resource_not_found_and_stream_already_exists_WHEN_create_SHOULD_ignor @Test void GIVEN_resource_not_found_and_create_group_throws_access_denied_SHOULD_still_attempt_create_stream() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); final List eventHandles = getSampleEventHandles(); when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) @@ -434,11 +479,11 @@ void GIVEN_resource_not_found_and_create_group_throws_access_denied_SHOULD_still } @Test - void GIVEN_resource_not_found_and_create_stream_throws_cwl_exception_SHOULD_not_kill_uploader() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + void GIVEN_resource_not_found_and_create_stream_throws_cloudwatch_logs_exception_SHOULD_not_kill_uploader() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); final List eventHandles = getSampleEventHandles(); - // First PLE call throws RNF; second PLE call (after failed creation) succeeds. + // First PutLogEvents call throws ResourceNotFoundException; second PutLogEvents call (after failed creation) succeeds. when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) .thenThrow(ResourceNotFoundException.builder().message("missing").build()) .thenReturn(mock(PutLogEventsResponse.class)); @@ -452,7 +497,7 @@ void GIVEN_resource_not_found_and_create_stream_throws_cwl_exception_SHOULD_not_ executeDispatcherRunnable(); - // Helper swallowed the exception; PLE retry ran and succeeded; uploader not interrupted. + // Helper swallowed the exception; PutLogEvents retry ran and succeeded; uploader not interrupted. verify(mockCloudWatchLogsClient, times(2)).putLogEvents(any(PutLogEventsRequest.class)); verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); @@ -460,10 +505,10 @@ void GIVEN_resource_not_found_and_create_stream_throws_cwl_exception_SHOULD_not_ @Test void GIVEN_resource_not_found_and_create_flag_true_SHOULD_only_attempt_creation_once_per_upload() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); final List eventHandles = getSampleEventHandles(); - // Every PLE call throws RNF — creation should still only be attempted once. + // Every PutLogEvents call throws ResourceNotFoundException — creation should still only be attempted once. when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) .thenThrow(ResourceNotFoundException.builder().message("missing").build()); when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) @@ -476,14 +521,14 @@ void GIVEN_resource_not_found_and_create_flag_true_SHOULD_only_attempt_creation_ executeDispatcherRunnable(); - // Creation invoked exactly once per Uploader invocation, no matter how many RNFs follow. + // Creation invoked exactly once per Uploader invocation, no matter how many ResourceNotFoundExceptions follow. verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); } @Test - void GIVEN_create_flag_true_AND_creation_succeeds_BUT_PLE_still_throws_RNF_SHOULD_only_create_once_and_FAIL_RETRIES() { - cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true); + void GIVEN_create_flag_true_AND_creation_succeeds_BUT_put_log_events_still_throws_resource_not_found_SHOULD_only_create_once_and_FAIL_RETRIES() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); final List eventHandles = getSampleEventHandles(); when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) @@ -501,7 +546,7 @@ void GIVEN_create_flag_true_AND_creation_succeeds_BUT_PLE_still_throws_RNF_SHOUL // Creation attempted exactly once. verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); - // Subsequent RNFs flow to the normal retry/DLQ path: fail counter incremented for every retry. + // Subsequent ResourceNotFoundExceptions flow to the normal retry/DLQ path: fail counter incremented for every retry. verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1); verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java index ad4202021b..d75df13f1e 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java @@ -324,15 +324,28 @@ void GIVEN_endpoint_configured_SHOULD_return_the_configured_value() throws NoSuc } @Test - void GIVEN_new_sink_config_WHEN_get_create_log_group_and_stream_called_SHOULD_return_false() { - assertThat(new CloudWatchLogsSinkConfig().getCreateLogGroupAndStream(), equalTo(false)); + void GIVEN_new_sink_config_WHEN_get_create_log_group_called_SHOULD_return_false() { + assertThat(new CloudWatchLogsSinkConfig().getCreateLogGroup(), equalTo(false)); } @Test - void GIVEN_create_log_group_and_stream_set_true_WHEN_get_called_SHOULD_return_true() + void GIVEN_create_log_group_set_true_WHEN_get_called_SHOULD_return_true() throws NoSuchFieldException, IllegalAccessException { ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, - "createLogGroupAndStream", true); - assertThat(cloudWatchLogsSinkConfig.getCreateLogGroupAndStream(), equalTo(true)); + "createLogGroup", true); + assertThat(cloudWatchLogsSinkConfig.getCreateLogGroup(), equalTo(true)); + } + + @Test + void GIVEN_new_sink_config_WHEN_get_create_log_stream_called_SHOULD_return_true() { + assertThat(new CloudWatchLogsSinkConfig().getCreateLogStream(), equalTo(true)); + } + + @Test + void GIVEN_create_log_stream_set_false_WHEN_get_called_SHOULD_return_false() + throws NoSuchFieldException, IllegalAccessException { + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, + "createLogStream", false); + assertThat(cloudWatchLogsSinkConfig.getCreateLogStream(), equalTo(false)); } } From f3f4262756de181f2aa268a82012588a131627a1 Mon Sep 17 00:00:00 2001 From: Nikhil Bagmar Date: Wed, 20 May 2026 12:43:38 -0700 Subject: [PATCH 3/3] Change create_log_stream default to false for backward compatibility The create_log_stream config option previously defaulted to true, which changes existing sink behavior by attempting stream creation on ResourceNotFoundException. Default both create_log_group and create_log_stream to false so existing pipelines are unaffected and users must explicitly opt in to auto-creation. Signed-off-by: Nikhil Bagmar --- data-prepper-plugins/cloudwatch-logs/README.md | 4 ++-- .../config/CloudWatchLogsSinkConfig.java | 4 ++-- .../config/CloudWatchLogsSinkConfigTest.java | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/data-prepper-plugins/cloudwatch-logs/README.md b/data-prepper-plugins/cloudwatch-logs/README.md index be63608bfe..cf1a4b8fb9 100644 --- a/data-prepper-plugins/cloudwatch-logs/README.md +++ b/data-prepper-plugins/cloudwatch-logs/README.md @@ -36,7 +36,7 @@ pipeline: X-Source: "dataprepper" endpoint: "https://logs.us-west-2.amazonaws.com" create_log_group: false - create_log_stream: true + create_log_stream: false ``` ## AWS Configuration @@ -74,7 +74,7 @@ pipeline: - `create_log_stream` (Optional): A boolean that controls whether the sink will create the configured `log_stream` if it does not already exist. Defaults to - `true`. When set to `true`, the IAM principal must have `logs:CreateLogStream` + `false`. When set to `true`, the IAM principal must have `logs:CreateLogStream` permission. ## Buffer Type Configuration diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index 5adea4a211..563cd1070f 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -64,8 +64,8 @@ public class CloudWatchLogsSinkConfig { @JsonProperty(value = "create_log_group", defaultValue = "false") private boolean createLogGroup = false; - @JsonProperty(value = "create_log_stream", defaultValue = "true") - private boolean createLogStream = true; + @JsonProperty(value = "create_log_stream", defaultValue = "false") + private boolean createLogStream = false; public AwsConfig getAwsConfig() { return awsConfig; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java index d75df13f1e..6c9f24d2df 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java @@ -337,15 +337,15 @@ void GIVEN_create_log_group_set_true_WHEN_get_called_SHOULD_return_true() } @Test - void GIVEN_new_sink_config_WHEN_get_create_log_stream_called_SHOULD_return_true() { - assertThat(new CloudWatchLogsSinkConfig().getCreateLogStream(), equalTo(true)); + void GIVEN_new_sink_config_WHEN_get_create_log_stream_called_SHOULD_return_false() { + assertThat(new CloudWatchLogsSinkConfig().getCreateLogStream(), equalTo(false)); } @Test - void GIVEN_create_log_stream_set_false_WHEN_get_called_SHOULD_return_false() + void GIVEN_create_log_stream_set_true_WHEN_get_called_SHOULD_return_true() throws NoSuchFieldException, IllegalAccessException { ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, - "createLogStream", false); - assertThat(cloudWatchLogsSinkConfig.getCreateLogStream(), equalTo(false)); + "createLogStream", true); + assertThat(cloudWatchLogsSinkConfig.getCreateLogStream(), equalTo(true)); } }