Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions data-prepper-plugins/cloudwatch-logs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ pipeline:
endpoint: "https://logs.us-west-2.amazonaws.com"
create_log_group: false
create_log_stream: false
entity:
key_attributes:
Type: "RemoteService"
Name: "okta_auth0"
attributes:
AWS.ServiceNameSource: "UserConfiguration"
```

## AWS Configuration
Expand Down Expand Up @@ -77,6 +83,15 @@ pipeline:
`false`. When set to `true`, the IAM principal must have `logs:CreateLogStream`
permission.

## Entity Configuration

- `entity` (Optional) : An object that attaches CloudWatch [Entity](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_Entity.html) metadata to every `PutLogEvents` request. When omitted, no entity is attached and behavior is unchanged.

- `key_attributes` (Required when `entity` is configured) : A `Map<String, String>` of key attributes that uniquely identify the entity (for example `Type`, `Name`).
- `attributes` (Optional) : A `Map<String, String>` of additional attributes describing the entity. Defaults to an empty map.

AWS-owned limits — maximum number of entries, allowed key names (such as `Type`, `ResourceType`, `Identifier`, `Name`, `Environment`), and key/value length caps — are enforced by CloudWatch at request time and are intentionally not mirrored here. Violations are surfaced via the `cloudWatchLogsEntityRejected` metric and a log warning. See the [AWS Entity API docs](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_Entity.html) for the current contract.

## Buffer Type Configuration

- `buffer_type` (Optional) : A string representing the type of buffer to use to hold onto events. Currently only supports `in_memory`.
Expand All @@ -95,6 +110,7 @@ threshold parameters.
* `cloudWatchLogsEventsFailed` - The number of log events failed while publishing to CloudWatch Logs.
* `cloudWatchLogsRequestsSucceeded` - The number of log requests successfully made to CloudWatch Logs.
* `cloudWatchLogsRequestsFailed` - The number of log requests failed to reach CloudWatch Logs.
* `cloudWatchLogsEntityRejected` - The number of `PutLogEvents` responses where CloudWatch rejected the configured entity. The request itself still succeeds and events are released; this counter is the primary signal for misconfigured `entity` attributes.

## Developer Guide

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.EntityConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;
import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.OutputLogEvent;
Expand Down Expand Up @@ -335,6 +337,50 @@ void TestSinkOperationWithLogSendInterval() throws Exception {
verify(eventHandle, times(NUM_RECORDS)).release(true);
}

@Test
void TestSinkOperationWithEntity() throws Exception {
long startTime = Instant.now().toEpochMilli();
when(thresholdConfig.getBatchSize()).thenReturn(10);
when(thresholdConfig.getFlushInterval()).thenReturn(10L);
when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L);

final EntityConfig entityConfig = new EntityConfig();
ReflectivelySetField.setField(EntityConfig.class, entityConfig, "keyAttributes",
Map.of("Type", "RemoteService", "Name", "data-prepper-cwl-it"));
ReflectivelySetField.setField(EntityConfig.class, entityConfig, "attributes",
Map.of("AWS.ServiceNameSource", "UserConfiguration"));
when(cloudWatchLogsSinkConfig.getEntityConfig()).thenReturn(entityConfig);

sink = createObjectUnderTest();
Collection<Record<Event>> records = getRecordList(NUM_RECORDS);
sink.doOutput(records);
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
sink.doOutput(Collections.emptyList());
long endTime = Instant.now().toEpochMilli();
GetLogEventsRequest getRequest = GetLogEventsRequest
.builder()
.logGroupName(logGroupName)
.logStreamName(logStreamName)
.startTime(startTime)
.endTime(endTime)
.build();
GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest);
List<OutputLogEvent> events = response.events();
assertThat(events.size(), equalTo(NUM_RECORDS));
for (int i = 0; i < events.size(); i++) {
String message = events.get(i).message();
Map<String, Object> event = objectMapper.readValue(message, Map.class);
assertThat(event.get("name"), equalTo("Person" + i));
assertThat(event.get("age"), equalTo(Integer.toString(i)));
}
});
assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS));
assertThat(requestsSuccessCount.get(), equalTo(1));
assertThat(dlqSuccessCount.get(), equalTo(0));
verify(eventHandle, times(NUM_RECORDS)).release(true);
}

@Test
void TestSinkOperationWithBatchSize() throws Exception {
long startTime = Instant.now().toEpochMilli();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.EntityConfig;

import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.exception.InvalidBufferTypeException;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.Entity;
import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler;
import org.opensearch.dataprepper.model.annotations.Experimental;
import org.slf4j.Logger;
Expand Down Expand Up @@ -87,6 +89,12 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,

Executor executor = Executors.newFixedThreadPool(cloudWatchLogsSinkConfig.getWorkers());

final EntityConfig entityConfig = cloudWatchLogsSinkConfig.getEntityConfig();
final Entity entity = entityConfig == null ? null : Entity.builder()
Comment thread
bagmarnikhil marked this conversation as resolved.
.keyAttributes(entityConfig.getKeyAttributes())
.attributes(entityConfig.getAttributes())
.build();

CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
.cloudWatchLogsMetrics(cloudWatchLogsMetrics)
Expand All @@ -98,6 +106,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting,
.executor(executor)
.createLogGroup(cloudWatchLogsSinkConfig.getCreateLogGroup())
.createLogStream(cloudWatchLogsSinkConfig.getCreateLogStream())
.entity(entity)
.build();

Buffer buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException;
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.Entity;
import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest;
import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse;
Expand Down Expand Up @@ -46,6 +47,7 @@ public class CloudWatchLogsDispatcher {
private int retryCount;
private boolean createLogGroup;
private boolean createLogStream;
private Entity entity;
Comment thread
bagmarnikhil marked this conversation as resolved.

/**
* Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents.
Expand Down Expand Up @@ -74,11 +76,16 @@ public List<InputLogEvent> prepareInputLogEvents(final Collection<byte[]> eventM
}

public void dispatchLogs(List<InputLogEvent> inputLogEvents, List<EventHandle> eventHandles) {
PutLogEventsRequest putLogEventsRequest = PutLogEventsRequest.builder()
final PutLogEventsRequest.Builder requestBuilder = PutLogEventsRequest.builder()
.logEvents(inputLogEvents)
.logGroupName(logGroup)
.logStreamName(logStream)
.build();
.logStreamName(logStream);

if (entity != null) {
requestBuilder.entity(entity);
}

final PutLogEventsRequest putLogEventsRequest = requestBuilder.build();

executor.execute(Uploader.builder()
.cloudWatchLogsClient(cloudWatchLogsClient)
Expand Down Expand Up @@ -168,6 +175,11 @@ public void upload() {
if (putLogEventsResponse != null) {
dlqObjects = getDlqObjectsFromResponse(putLogEventsResponse);
}
if (putLogEventsResponse != null && putLogEventsResponse.rejectedEntityInfo() != null) {
Comment thread
bagmarnikhil marked this conversation as resolved.
cloudWatchLogsMetrics.increaseEntityRejectedCounter(1);
Comment thread
bagmarnikhil marked this conversation as resolved.
LOG.warn(NOISY, "Entity was rejected by CloudWatch: {}",
putLogEventsResponse.rejectedEntityInfo().errorTypeAsString());
}
cloudWatchLogsMetrics.increaseLogEventSuccessCounter(totalEventCount - dlqObjects.size());
releaseEventHandles(putLogEventsResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ public class CloudWatchLogsMetrics {
public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped";
public static final String CLOUDWATCH_LOGS_LOG_SIZE = "cloudWatchLogsLogSize";
public static final String CLOUDWATCH_LOGS_REQUEST_SIZE = "cloudWatchLogsRequestSize";
public static final String CLOUDWATCH_LOGS_ENTITY_REJECTED = "cloudWatchLogsEntityRejected";
private final Counter logEventSuccessCounter;
private final Counter logEventFailCounter;
private final Counter requestSuccessCount;
private final Counter requestFailCount;
private final Counter requestMultiFailCount;
private final Counter logLargeEventsDroppedCounter;
private final Counter entityRejectedCounter;
private final DistributionSummary logSizeMetric;
private final DistributionSummary requestSizeMetric;

Expand All @@ -39,6 +41,7 @@ public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) {
this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED);
this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED);
this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED);
this.entityRejectedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_ENTITY_REJECTED);
this.logSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_LOG_SIZE);
this.requestSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_REQUEST_SIZE);
}
Expand Down Expand Up @@ -67,6 +70,10 @@ public void increaseLogLargeEventsDroppedCounter(int value) {
logLargeEventsDroppedCounter.increment(value);
}

public void increaseEntityRejectedCounter(int value) {
entityRejectedCounter.increment(value);
}

public void recordLogSize(int value) {
logSizeMetric.record(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public class CloudWatchLogsSinkConfig {
@JsonProperty(value = "create_log_stream", defaultValue = "false")
private boolean createLogStream = false;
Comment thread
bagmarnikhil marked this conversation as resolved.

@JsonProperty("entity")
@Valid
private EntityConfig entityConfig;

public AwsConfig getAwsConfig() {
return awsConfig;
}
Expand Down Expand Up @@ -111,4 +115,8 @@ public boolean getCreateLogStream() {
return createLogStream;
}

public EntityConfig getEntityConfig() {
return entityConfig;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.Collections;
import java.util.Map;

public class EntityConfig {

@JsonProperty("key_attributes")
@NotEmpty
Comment thread
bagmarnikhil marked this conversation as resolved.
Comment thread
bagmarnikhil marked this conversation as resolved.
private Map<String, String> keyAttributes = Collections.emptyMap();

@JsonProperty("attributes")

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attributes has no @NotNull. Explicit attributes: null in YAML deserializes to null and unmodifiableMap(null) throws NPE; either add @NotNull or null-guard the getter.

@NotNull
private Map<String, String> attributes = Collections.emptyMap();

public Map<String, String> getKeyAttributes() {
return Collections.unmodifiableMap(keyAttributes);
}

public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(attributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.EntityConfig;
import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.Entity;
import software.amazon.awssdk.regions.Region;


Expand All @@ -36,6 +39,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -69,7 +73,7 @@ class CloudWatchLogsSinkTest {
private static final String TEST_BUFFER_TYPE = "in_memory";
// Number of args Lombok @Builder passes to the all-args constructor of CloudWatchLogsDispatcher.
// Bumping this is the signal that positional context.arguments().get(N) calls below need to be re-audited.
private static final int EXPECTED_DISPATCHER_ARITY = 10;
private static final int EXPECTED_DISPATCHER_ARITY = 11;
private int numRetries;
@BeforeEach
void setUp() {
Expand Down Expand Up @@ -349,4 +353,56 @@ void WHEN_create_log_group_is_false_and_create_log_stream_is_false_THEN_flags_pa
assertThat(capturedCreateLogStream[0], equalTo(false));
}

@Test
void WHEN_entity_config_is_null_THEN_dispatcher_built_without_entity() {
final Entity[] capturedEntity = new Entity[1];

try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
final MockedConstruction<CloudWatchLogsDispatcher> dispatcherMock =
mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> {
capturedEntity[0] = (Entity) context.arguments().get(10);
});

mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
any(AwsCredentialsSupplier.class), any(), any()))
.thenReturn(mockClient);

getTestCloudWatchSink();
dispatcherMock.close();
}
assertThat(capturedEntity[0], equalTo(null));
}

@Test
void WHEN_entity_config_is_provided_THEN_dispatcher_built_with_entity() throws Exception {
final Map<String, String> keyAttributes = new HashMap<>();
keyAttributes.put("Type", "RemoteService");
keyAttributes.put("Name", "okta_auth0");
final Map<String, String> attributes = new HashMap<>();
attributes.put("AWS.ServiceNameSource", "UserConfiguration");

final EntityConfig entityConfig = new EntityConfig();
ReflectivelySetField.setField(EntityConfig.class, entityConfig, "keyAttributes", keyAttributes);
ReflectivelySetField.setField(EntityConfig.class, entityConfig, "attributes", attributes);
when(mockCloudWatchLogsSinkConfig.getEntityConfig()).thenReturn(entityConfig);

final Entity[] capturedEntity = new Entity[1];
try(MockedStatic<CloudWatchLogsClientFactory> mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) {
final MockedConstruction<CloudWatchLogsDispatcher> dispatcherMock =
mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> {
capturedEntity[0] = (Entity) context.arguments().get(10);
});

mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class),
any(AwsCredentialsSupplier.class), any(), any()))
.thenReturn(mockClient);

getTestCloudWatchSink();
dispatcherMock.close();
}
assertThat(capturedEntity[0], notNullValue());
assertThat(capturedEntity[0].keyAttributes(), equalTo(keyAttributes));
assertThat(capturedEntity[0].attributes(), equalTo(attributes));
}

}
Loading
Loading