Skip to content

Commit 6617862

Browse files
authored
Create log group and stream in CloudWatch Logs sink (#6863)
Create log group and stream in CloudWatch Logs sink When enabled, the CloudWatch Logs sink creates the configured log group and log stream on first ResourceNotFoundException instead of failing to DLQ. This eliminates the need for manual pre-provisioning of CloudWatch Logs resources before running Data Prepper pipelines. - Reactive creation (on-failure), not eager (at-init) - Creation attempted at most once per Uploader invocation - Idempotent: ResourceAlreadyExistsException silently ignored - Requires logs:CreateLogGroup and logs:CreateLogStream IAM perms Split resource creation into separate create_log_group and create_log_stream config options - create_log_group (default false): creates log group on ResourceNotFoundException - create_log_stream (default true): creates log stream on ResourceNotFoundException - Dispatcher only calls createLogGroup/createLogStream based on respective flags - Expand acronyms in comments and test method names for readability - Add tests for independent flag combinations (only group, only stream) Change create_log_stream default to false for backward compatibility The create_log_stream config option previously defaulted to true, which changes existing sink behavior by attempting stream creation on ResourceNotFoundException. Default both create_log_group and create_log_stream to false so existing pipelines are unaffected and users must explicitly opt in to auto-creation. Resolves: #6861 --------- Signed-off-by: Nikhil Bagmar <nikhilbagmar73@gmail.com>
1 parent b85efd3 commit 6617862

8 files changed

Lines changed: 536 additions & 28 deletions

File tree

data-prepper-plugins/cloudwatch-logs/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ pipeline:
3535
X-Request-ID: "request-123"
3636
X-Source: "dataprepper"
3737
endpoint: "https://logs.us-west-2.amazonaws.com"
38+
create_log_group: false
39+
create_log_stream: false
3840
```
3941

4042
## AWS Configuration
@@ -65,6 +67,16 @@ pipeline:
6567

6668
- `endpoint` (Optional) : A string representing a custom CloudWatch Logs endpoint URL to override the default service endpoint.
6769

70+
- `create_log_group` (Optional): A boolean that controls whether the sink will
71+
create the configured `log_group` if it does not already exist. Defaults to
72+
`false`. When set to `true`, the IAM principal must have `logs:CreateLogGroup`
73+
permission.
74+
75+
- `create_log_stream` (Optional): A boolean that controls whether the sink will
76+
create the configured `log_stream` if it does not already exist. Defaults to
77+
`false`. When set to `true`, the IAM principal must have `logs:CreateLogStream`
78+
permission.
79+
6880
## Buffer Type Configuration
6981

7082
- `buffer_type` (Optional) : A string representing the type of buffer to use to hold onto events. Currently only supports `in_memory`.

data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
3232
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse;
3333
import software.amazon.awssdk.services.cloudwatchlogs.model.DeleteLogStreamRequest;
34+
import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException;
3435
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
3536

3637
import software.amazon.awssdk.services.s3.S3Client;
@@ -271,7 +272,12 @@ void tearDown() {
271272
.logGroupName(logGroupName)
272273
.logStreamName(logStreamName)
273274
.build();
274-
cloudWatchLogsClient.deleteLogStream(deleteRequest);
275+
try {
276+
cloudWatchLogsClient.deleteLogStream(deleteRequest);
277+
} catch (ResourceNotFoundException e) {
278+
// Sink-side stream-creation may have failed in the test under verification; don't
279+
// mask the real test failure with a teardown error for an absent stream.
280+
}
275281
deleteObjectsWithPrefix(bucket, DLQ_PREFIX);
276282
}
277283

@@ -574,6 +580,45 @@ public void testToVerifyLackOfCredentialsResultInFailure() throws Exception {
574580
assertThat(eventsSuccessCount.get(), equalTo(0));
575581
}
576582

583+
@Test
584+
void TestSinkOperationWithCreateLogStream() throws Exception {
585+
// Delete the stream setUp() created so we don't leak it. We're about to mutate
586+
// logStreamName so tearDown() won't see this one again.
587+
cloudWatchLogsClient.deleteLogStream(DeleteLogStreamRequest.builder()
588+
.logGroupName(logGroupName)
589+
.logStreamName(logStreamName)
590+
.build());
591+
592+
// Use a brand-new stream name to avoid the delete-then-recreate-same-name window
593+
// where CloudWatch Logs may transiently throw OperationAbortedException.
594+
logStreamName = "CloudWatchLogsIT_create_" + RandomStringUtils.randomAlphabetic(8);
595+
when(cloudWatchLogsSinkConfig.getLogStream()).thenReturn(logStreamName);
596+
when(cloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(true);
597+
when(cloudWatchLogsSinkConfig.getCreateLogStream()).thenReturn(true);
598+
when(thresholdConfig.getBatchSize()).thenReturn(10);
599+
when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L);
600+
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);
601+
when(thresholdConfig.getFlushInterval()).thenReturn(10L);
602+
603+
final long startTime = Instant.now().toEpochMilli();
604+
sink = createObjectUnderTest();
605+
sink.doOutput(getRecordList(NUM_RECORDS));
606+
607+
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
608+
sink.doOutput(Collections.emptyList());
609+
GetLogEventsResponse resp = cloudWatchLogsClient.getLogEvents(
610+
GetLogEventsRequest.builder()
611+
.logGroupName(logGroupName)
612+
.logStreamName(logStreamName)
613+
.startTime(startTime)
614+
.endTime(Instant.now().toEpochMilli())
615+
.build());
616+
assertThat(resp.events().size(), equalTo(NUM_RECORDS));
617+
});
618+
assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS));
619+
assertThat(dlqSuccessCount.get(), equalTo(0));
620+
}
621+
577622
private Collection<Record<Event>> getRecordList(int numberOfRecords) {
578623
final Collection<Record<Event>> recordList = new ArrayList<>();
579624
List<HashMap> records = generateRecords(numberOfRecords);

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
9696
.logStream(cloudWatchLogsSinkConfig.getLogStream())
9797
.retryCount(dlqPushHandler == null ? Integer.MAX_VALUE : cloudWatchLogsSinkConfig.getMaxRetries())
9898
.executor(executor)
99+
.createLogGroup(cloudWatchLogsSinkConfig.getCreateLogGroup())
100+
.createLogStream(cloudWatchLogsSinkConfig.getCreateLogStream())
99101
.build();
100102

101103
Buffer buffer;

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java

Lines changed: 85 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,14 @@
1313
import com.linecorp.armeria.client.retry.Backoff;
1414
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
1515
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
16+
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
17+
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
1618
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
1719
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
1820
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
1921
import software.amazon.awssdk.services.cloudwatchlogs.model.RejectedLogEventsInfo;
22+
import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
23+
import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException;
2024
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsSinkUtils;
2125
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
2226
import org.opensearch.dataprepper.model.failures.DlqObject;
@@ -40,24 +44,8 @@ public class CloudWatchLogsDispatcher {
4044
private String logGroup;
4145
private String logStream;
4246
private int retryCount;
43-
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
44-
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
45-
final DlqPushHandler dlqPushHandler,
46-
final boolean dropIfDlqNotConfigured,
47-
final Executor executor,
48-
final String logGroup,
49-
final String logStream,
50-
final int retryCount) {
51-
this.cloudWatchLogsClient = cloudWatchLogsClient;
52-
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
53-
this.logGroup = logGroup;
54-
this.logStream = logStream;
55-
this.retryCount = retryCount;
56-
this.dlqPushHandler = dlqPushHandler;
57-
this.dropIfDlqNotConfigured = dropIfDlqNotConfigured;
58-
59-
this.executor = executor;
60-
}
47+
private boolean createLogGroup;
48+
private boolean createLogStream;
6149

6250
/**
6351
* Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents.
@@ -101,6 +89,8 @@ public void dispatchLogs(List<InputLogEvent> inputLogEvents, List<EventHandle> e
10189
.dropIfDlqNotConfigured(dropIfDlqNotConfigured)
10290
.totalEventCount(inputLogEvents.size())
10391
.retryCount(retryCount)
92+
.createLogGroup(createLogGroup)
93+
.createLogStream(createLogStream)
10494
.build());
10595
}
10696

@@ -117,6 +107,8 @@ protected static class Uploader implements Runnable {
117107
private final int totalEventCount;
118108
private final int retryCount;
119109
private boolean dropIfDlqNotConfigured;
110+
private final boolean createLogGroup;
111+
private final boolean createLogStream;
120112

121113
@Override
122114
public void run() {
@@ -125,6 +117,7 @@ public void run() {
125117

126118
public void upload() {
127119
boolean failedToTransmit = true;
120+
boolean resourceCreationAttempted = false;
128121
int failCount = 0;
129122
String failureMessage = "";
130123
PutLogEventsResponse putLogEventsResponse = null;
@@ -138,17 +131,21 @@ public void upload() {
138131
cloudWatchLogsMetrics.increaseRequestSuccessCounter(1);
139132
failedToTransmit = false;
140133

134+
} catch (ResourceNotFoundException e) {
135+
// Must be caught before CloudWatchLogsException since ResourceNotFoundException extends it.
136+
if ((createLogGroup || createLogStream) && !resourceCreationAttempted) {
137+
resourceCreationAttempted = true;
138+
createResources();
139+
// Loop continues; next iteration retries PutLogEvents without incrementing failCount.
140+
// If PutLogEvents still throws ResourceNotFoundException, the guard sends us to the
141+
// else branch and normal retry/DLQ logic takes over.
142+
} else {
143+
failureMessage = e.getMessage();
144+
failCount = handlePutLogEventsFailure(e, failCount, backoff);
145+
}
141146
} catch (CloudWatchLogsException | SdkClientException e) {
142147
failureMessage = e.getMessage();
143-
LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage());
144-
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
145-
if (++failCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) {
146-
cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1);
147-
}
148-
final long delayMillis = backoff.nextDelayMillis(failCount);
149-
if (delayMillis > 0) {
150-
Thread.sleep(delayMillis);
151-
}
148+
failCount = handlePutLogEventsFailure(e, failCount, backoff);
152149
}
153150
}
154151
} catch (Exception e) {
@@ -177,6 +174,67 @@ public void upload() {
177174
CloudWatchLogsSinkUtils.handleDlqObjects(dlqObjects, dlqPushHandler);
178175
}
179176

177+
/**
178+
* Logs the failure, increments fail metrics, and sleeps using the backoff schedule.
179+
* Returns the new fail count so the caller can update its local. Extracted so the
180+
* ResourceNotFoundException-fallback branch and the generic CloudWatchLogsException/SDK
181+
* catch don't drift apart.
182+
*/
183+
private int handlePutLogEventsFailure(final Exception e, final int currentFailCount, final Backoff backoff)
184+
throws InterruptedException {
185+
LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage());
186+
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
187+
final int newFailCount = currentFailCount + 1;
188+
if (newFailCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) {
189+
cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1);
190+
}
191+
final long delayMillis = backoff.nextDelayMillis(newFailCount);
192+
if (delayMillis > 0) {
193+
Thread.sleep(delayMillis);
194+
}
195+
return newFailCount;
196+
}
197+
198+
/**
199+
* Attempts to create the configured log group and/or log stream based on the flags.
200+
* The helper never throws — all SDK exceptions are caught inside so that a recovery
201+
* failure does not interrupt the Uploader. ResourceAlreadyExistsException is intentionally
202+
* swallowed to make creation idempotent. If creation fails, the next PutLogEvents call
203+
* will hit ResourceNotFoundException again and the guard in upload() will route it to
204+
* the normal retry/DLQ path.
205+
*/
206+
private void createResources() {
207+
final String logGroupName = putLogEventsRequest.logGroupName();
208+
final String logStreamName = putLogEventsRequest.logStreamName();
209+
210+
if (createLogGroup) {
211+
try {
212+
cloudWatchLogsClient.createLogGroup(
213+
CreateLogGroupRequest.builder().logGroupName(logGroupName).build());
214+
LOG.info("Created log group: {}", logGroupName);
215+
} catch (ResourceAlreadyExistsException e) {
216+
LOG.debug("Log group already exists: {}", logGroupName);
217+
} catch (CloudWatchLogsException | SdkClientException e) {
218+
LOG.warn("Unable to create log group '{}': {}", logGroupName, e.getMessage());
219+
}
220+
}
221+
222+
if (createLogStream) {
223+
try {
224+
cloudWatchLogsClient.createLogStream(
225+
CreateLogStreamRequest.builder()
226+
.logGroupName(logGroupName)
227+
.logStreamName(logStreamName)
228+
.build());
229+
LOG.info("Created log stream: {}/{}", logGroupName, logStreamName);
230+
} catch (ResourceAlreadyExistsException e) {
231+
LOG.debug("Log stream already exists: {}/{}", logGroupName, logStreamName);
232+
} catch (CloudWatchLogsException | SdkClientException e) {
233+
LOG.warn("Unable to create log stream '{}/{}': {}", logGroupName, logStreamName, e.getMessage());
234+
}
235+
}
236+
}
237+
180238
List<DlqObject> getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsResponse) {
181239
List<DlqObject> dlqObjects = new ArrayList<>();
182240
RejectedLogEventsInfo rejectedLogEventsInfo = putLogEventsResponse.rejectedLogEventsInfo();

data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ public class CloudWatchLogsSinkConfig {
6161
@JsonProperty("endpoint")
6262
private String endpoint;
6363

64+
@JsonProperty(value = "create_log_group", defaultValue = "false")
65+
private boolean createLogGroup = false;
66+
67+
@JsonProperty(value = "create_log_stream", defaultValue = "false")
68+
private boolean createLogStream = false;
69+
6470
public AwsConfig getAwsConfig() {
6571
return awsConfig;
6672
}
@@ -97,4 +103,12 @@ public String getEndpoint() {
97103
return endpoint;
98104
}
99105

106+
public boolean getCreateLogGroup() {
107+
return createLogGroup;
108+
}
109+
110+
public boolean getCreateLogStream() {
111+
return createLogStream;
112+
}
113+
100114
}

0 commit comments

Comments
 (0)