Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pipeline:
X-Request-ID: "request-123"
X-Source: "dataprepper"
endpoint: "https://logs.us-west-2.amazonaws.com"
create_log_group: false
create_log_stream: false
```

## AWS Configuration
Expand Down Expand Up @@ -65,6 +67,16 @@ pipeline:

- `endpoint` (Optional) : A string representing a custom CloudWatch Logs endpoint URL to override the default service endpoint.

- `create_log_group` (Optional): A boolean that controls whether the sink will
create the configured `log_group` if it does not already exist. Defaults to
`false`. When set to `true`, the IAM principal must have `logs:CreateLogGroup`
permission.

- `create_log_stream` (Optional): A boolean that controls whether the sink will
create the configured `log_stream` if it does not already exist. Defaults to
`false`. When set to `true`, the IAM principal must have `logs:CreateLogStream`
permission.

## Buffer Type Configuration

- `buffer_type` (Optional) : A string representing the type of buffer to use to hold onto events. Currently only supports `in_memory`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.DeleteLogStreamRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;

import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -271,7 +272,12 @@ void tearDown() {
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.build();
cloudWatchLogsClient.deleteLogStream(deleteRequest);
try {
cloudWatchLogsClient.deleteLogStream(deleteRequest);
} catch (ResourceNotFoundException e) {
// Sink-side stream-creation may have failed in the test under verification; don't
// mask the real test failure with a teardown error for an absent stream.
}
deleteObjectsWithPrefix(bucket, DLQ_PREFIX);
}

Expand Down Expand Up @@ -574,6 +580,45 @@ public void testToVerifyLackOfCredentialsResultInFailure() throws Exception {
assertThat(eventsSuccessCount.get(), equalTo(0));
}

@Test
void TestSinkOperationWithCreateLogStream() throws Exception {
// Delete the stream setUp() created so we don't leak it. We're about to mutate
// logStreamName so tearDown() won't see this one again.
cloudWatchLogsClient.deleteLogStream(DeleteLogStreamRequest.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.build());

// Use a brand-new stream name to avoid the delete-then-recreate-same-name window
// where CloudWatch Logs may transiently throw OperationAbortedException.
logStreamName = "CloudWatchLogsIT_create_" + RandomStringUtils.randomAlphabetic(8);
when(cloudWatchLogsSinkConfig.getLogStream()).thenReturn(logStreamName);
when(cloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(true);
when(cloudWatchLogsSinkConfig.getCreateLogStream()).thenReturn(true);
when(thresholdConfig.getBatchSize()).thenReturn(10);
when(thresholdConfig.getMaxEventSizeBytes()).thenReturn(1000L);
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);
when(thresholdConfig.getFlushInterval()).thenReturn(10L);

final long startTime = Instant.now().toEpochMilli();
sink = createObjectUnderTest();
sink.doOutput(getRecordList(NUM_RECORDS));

await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
sink.doOutput(Collections.emptyList());
GetLogEventsResponse resp = cloudWatchLogsClient.getLogEvents(
GetLogEventsRequest.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.startTime(startTime)
.endTime(Instant.now().toEpochMilli())
.build());
assertThat(resp.events().size(), equalTo(NUM_RECORDS));
});
assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS));
assertThat(dlqSuccessCount.get(), equalTo(0));
}

private Collection<Record<Event>> getRecordList(int numberOfRecords) {
final Collection<Record<Event>> recordList = new ArrayList<>();
List<HashMap> records = generateRecords(numberOfRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
.logStream(cloudWatchLogsSinkConfig.getLogStream())
.retryCount(dlqPushHandler == null ? Integer.MAX_VALUE : cloudWatchLogsSinkConfig.getMaxRetries())
.executor(executor)
.createLogGroup(cloudWatchLogsSinkConfig.getCreateLogGroup())
.createLogStream(cloudWatchLogsSinkConfig.getCreateLogStream())
.build();

Buffer buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
import com.linecorp.armeria.client.retry.Backoff;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.RejectedLogEventsInfo;
import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceAlreadyExistsException;
import software.amazon.awssdk.services.cloudwatchlogs.model.ResourceNotFoundException;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsSinkUtils;
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
import org.opensearch.dataprepper.model.failures.DlqObject;
Expand All @@ -40,24 +44,8 @@ public class CloudWatchLogsDispatcher {
private String logGroup;
private String logStream;
private int retryCount;
public CloudWatchLogsDispatcher(final CloudWatchLogsClient cloudWatchLogsClient,
final CloudWatchLogsMetrics cloudWatchLogsMetrics,
final DlqPushHandler dlqPushHandler,
final boolean dropIfDlqNotConfigured,
final Executor executor,
final String logGroup,
final String logStream,
final int retryCount) {
this.cloudWatchLogsClient = cloudWatchLogsClient;
this.cloudWatchLogsMetrics = cloudWatchLogsMetrics;
this.logGroup = logGroup;
this.logStream = logStream;
this.retryCount = retryCount;
this.dlqPushHandler = dlqPushHandler;
this.dropIfDlqNotConfigured = dropIfDlqNotConfigured;

this.executor = executor;
}
private boolean createLogGroup;
private boolean createLogStream;

/**
* Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents.
Expand Down Expand Up @@ -101,6 +89,8 @@ public void dispatchLogs(List<InputLogEvent> inputLogEvents, List<EventHandle> e
.dropIfDlqNotConfigured(dropIfDlqNotConfigured)
.totalEventCount(inputLogEvents.size())
.retryCount(retryCount)
.createLogGroup(createLogGroup)
.createLogStream(createLogStream)
.build());
}

Expand All @@ -117,6 +107,8 @@ protected static class Uploader implements Runnable {
private final int totalEventCount;
private final int retryCount;
private boolean dropIfDlqNotConfigured;
private final boolean createLogGroup;
private final boolean createLogStream;

@Override
public void run() {
Expand All @@ -125,6 +117,7 @@ public void run() {

public void upload() {
boolean failedToTransmit = true;
boolean resourceCreationAttempted = false;
int failCount = 0;
String failureMessage = "";
PutLogEventsResponse putLogEventsResponse = null;
Expand All @@ -138,17 +131,21 @@ public void upload() {
cloudWatchLogsMetrics.increaseRequestSuccessCounter(1);
failedToTransmit = false;

} catch (ResourceNotFoundException e) {
Comment thread
bagmarnikhil marked this conversation as resolved.
// Must be caught before CloudWatchLogsException since ResourceNotFoundException extends it.
if ((createLogGroup || createLogStream) && !resourceCreationAttempted) {
resourceCreationAttempted = true;
createResources();
// Loop continues; next iteration retries PutLogEvents without incrementing failCount.
// If PutLogEvents still throws ResourceNotFoundException, the guard sends us to the
// else branch and normal retry/DLQ logic takes over.
} else {
failureMessage = e.getMessage();
failCount = handlePutLogEventsFailure(e, failCount, backoff);
}
} catch (CloudWatchLogsException | SdkClientException e) {
failureMessage = e.getMessage();
LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage());
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
if (++failCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) {
cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1);
}
final long delayMillis = backoff.nextDelayMillis(failCount);
if (delayMillis > 0) {
Thread.sleep(delayMillis);
}
failCount = handlePutLogEventsFailure(e, failCount, backoff);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -177,6 +174,67 @@ public void upload() {
CloudWatchLogsSinkUtils.handleDlqObjects(dlqObjects, dlqPushHandler);
}

/**
* Logs the failure, increments fail metrics, and sleeps using the backoff schedule.
* Returns the new fail count so the caller can update its local. Extracted so the
* ResourceNotFoundException-fallback branch and the generic CloudWatchLogsException/SDK
* catch don't drift apart.
*/
private int handlePutLogEventsFailure(final Exception e, final int currentFailCount, final Backoff backoff)
throws InterruptedException {
LOG.error(NOISY, "Failed to push logs with error: {}", e.getMessage());
cloudWatchLogsMetrics.increaseRequestFailCounter(1);
final int newFailCount = currentFailCount + 1;
if (newFailCount % MULTIPLE_FAILURES_METRIC_COUNT == 0) {
cloudWatchLogsMetrics.increaseRequestMultiFailCounter(1);
}
final long delayMillis = backoff.nextDelayMillis(newFailCount);
if (delayMillis > 0) {
Thread.sleep(delayMillis);
}
return newFailCount;
}

/**
* Attempts to create the configured log group and/or log stream based on the flags.
* The helper never throws — all SDK exceptions are caught inside so that a recovery
* failure does not interrupt the Uploader. ResourceAlreadyExistsException is intentionally
* swallowed to make creation idempotent. If creation fails, the next PutLogEvents call
* will hit ResourceNotFoundException again and the guard in upload() will route it to
* the normal retry/DLQ path.
*/
private void createResources() {
final String logGroupName = putLogEventsRequest.logGroupName();
final String logStreamName = putLogEventsRequest.logStreamName();

if (createLogGroup) {
try {
cloudWatchLogsClient.createLogGroup(
CreateLogGroupRequest.builder().logGroupName(logGroupName).build());
LOG.info("Created log group: {}", logGroupName);
} catch (ResourceAlreadyExistsException e) {
LOG.debug("Log group already exists: {}", logGroupName);
} catch (CloudWatchLogsException | SdkClientException e) {
LOG.warn("Unable to create log group '{}': {}", logGroupName, e.getMessage());
}
}

if (createLogStream) {
try {
cloudWatchLogsClient.createLogStream(
CreateLogStreamRequest.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.build());
LOG.info("Created log stream: {}/{}", logGroupName, logStreamName);
} catch (ResourceAlreadyExistsException e) {
LOG.debug("Log stream already exists: {}/{}", logGroupName, logStreamName);
} catch (CloudWatchLogsException | SdkClientException e) {
LOG.warn("Unable to create log stream '{}/{}': {}", logGroupName, logStreamName, e.getMessage());
}
}
}

List<DlqObject> getDlqObjectsFromResponse(PutLogEventsResponse putLogEventsResponse) {
List<DlqObject> dlqObjects = new ArrayList<>();
RejectedLogEventsInfo rejectedLogEventsInfo = putLogEventsResponse.rejectedLogEventsInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public class CloudWatchLogsSinkConfig {
@JsonProperty("endpoint")
private String endpoint;

@JsonProperty(value = "create_log_group", defaultValue = "false")
private boolean createLogGroup = false;

@JsonProperty(value = "create_log_stream", defaultValue = "false")
private boolean createLogStream = false;

public AwsConfig getAwsConfig() {
return awsConfig;
}
Expand Down Expand Up @@ -97,4 +103,12 @@ public String getEndpoint() {
return endpoint;
}

public boolean getCreateLogGroup() {
return createLogGroup;
}

public boolean getCreateLogStream() {
return createLogStream;
}

}
Loading
Loading