diff --git a/data-prepper-plugins/cloudwatch-logs/README.md b/data-prepper-plugins/cloudwatch-logs/README.md index cf1a4b8fb9..d8405b9277 100644 --- a/data-prepper-plugins/cloudwatch-logs/README.md +++ b/data-prepper-plugins/cloudwatch-logs/README.md @@ -37,6 +37,12 @@ pipeline: endpoint: "https://logs.us-west-2.amazonaws.com" create_log_group: false create_log_stream: false + entity: + key_attributes: + Type: "RemoteService" + Name: "okta_auth0" + attributes: + AWS.ServiceNameSource: "UserConfiguration" ``` ## AWS Configuration @@ -77,6 +83,15 @@ pipeline: `false`. When set to `true`, the IAM principal must have `logs:CreateLogStream` permission. +## Entity Configuration + +- `entity` (Optional) : An object that attaches CloudWatch [Entity](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_Entity.html) metadata to every `PutLogEvents` request. When omitted, no entity is attached and behavior is unchanged. + + - `key_attributes` (Required when `entity` is configured) : A `Map` of key attributes that uniquely identify the entity (for example `Type`, `Name`). + - `attributes` (Optional) : A `Map` of additional attributes describing the entity. Defaults to an empty map. + + AWS-owned limits — maximum number of entries, allowed key names (such as `Type`, `ResourceType`, `Identifier`, `Name`, `Environment`), and key/value length caps — are enforced by CloudWatch at request time and are intentionally not mirrored here. Violations are surfaced via the `cloudWatchLogsEntityRejected` metric and a log warning. See the [AWS Entity API docs](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_Entity.html) for the current contract. + ## Buffer Type Configuration - `buffer_type` (Optional) : A string representing the type of buffer to use to hold onto events. Currently only supports `in_memory`. @@ -95,6 +110,7 @@ threshold parameters. * `cloudWatchLogsEventsFailed` - The number of log events failed while publishing to CloudWatch Logs. * `cloudWatchLogsRequestsSucceeded` - The number of log requests successfully made to CloudWatch Logs. * `cloudWatchLogsRequestsFailed` - The number of log requests failed to reach CloudWatch Logs. +* `cloudWatchLogsEntityRejected` - The number of `PutLogEvents` responses where CloudWatch rejected the configured entity. The request itself still succeeds and events are released; this counter is the primary signal for misconfigured `entity` attributes. ## Developer Guide diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 6234b13623..1d3fae0644 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -22,9 +22,11 @@ import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.EntityConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.OutputLogEvent; @@ -335,6 +337,50 @@ void TestSinkOperationWithLogSendInterval() throws Exception { verify(eventHandle, times(NUM_RECORDS)).release(true); } + @Test + void TestSinkOperationWithEntity() throws Exception { + long startTime = Instant.now().toEpochMilli(); + when(thresholdConfig.getBatchSize()).thenReturn(10); + when(thresholdConfig.getFlushInterval()).thenReturn(10L); + when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); + + final EntityConfig entityConfig = new EntityConfig(); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "keyAttributes", + Map.of("Type", "RemoteService", "Name", "data-prepper-cwl-it")); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "attributes", + Map.of("AWS.ServiceNameSource", "UserConfiguration")); + when(cloudWatchLogsSinkConfig.getEntityConfig()).thenReturn(entityConfig); + + sink = createObjectUnderTest(); + Collection> records = getRecordList(NUM_RECORDS); + sink.doOutput(records); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + sink.doOutput(Collections.emptyList()); + long endTime = Instant.now().toEpochMilli(); + GetLogEventsRequest getRequest = GetLogEventsRequest + .builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .startTime(startTime) + .endTime(endTime) + .build(); + GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); + List events = response.events(); + assertThat(events.size(), equalTo(NUM_RECORDS)); + for (int i = 0; i < events.size(); i++) { + String message = events.get(i).message(); + Map event = objectMapper.readValue(message, Map.class); + assertThat(event.get("name"), equalTo("Person" + i)); + assertThat(event.get("age"), equalTo(Integer.toString(i))); + } + }); + assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); + assertThat(requestsSuccessCount.get(), equalTo(1)); + assertThat(dlqSuccessCount.get(), equalTo(0)); + verify(eventHandle, times(NUM_RECORDS)).release(true); + } + @Test void TestSinkOperationWithBatchSize() throws Exception { long startTime = Instant.now().toEpochMilli(); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index 62e576f194..b6d21e8797 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -24,11 +24,13 @@ import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.EntityConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.exception.InvalidBufferTypeException; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.Entity; import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler; import org.opensearch.dataprepper.model.annotations.Experimental; import org.slf4j.Logger; @@ -87,6 +89,12 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, Executor executor = Executors.newFixedThreadPool(cloudWatchLogsSinkConfig.getWorkers()); + final EntityConfig entityConfig = cloudWatchLogsSinkConfig.getEntityConfig(); + final Entity entity = entityConfig == null ? null : Entity.builder() + .keyAttributes(entityConfig.getKeyAttributes()) + .attributes(entityConfig.getAttributes()) + .build(); + CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder() .cloudWatchLogsClient(cloudWatchLogsClient) .cloudWatchLogsMetrics(cloudWatchLogsMetrics) @@ -98,6 +106,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, .executor(executor) .createLogGroup(cloudWatchLogsSinkConfig.getCreateLogGroup()) .createLogStream(cloudWatchLogsSinkConfig.getCreateLogStream()) + .entity(entity) .build(); Buffer buffer; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index 3011076af9..cabf8c3771 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -15,6 +15,7 @@ import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.Entity; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse; @@ -46,6 +47,7 @@ public class CloudWatchLogsDispatcher { private int retryCount; private boolean createLogGroup; private boolean createLogStream; + private Entity entity; /** * Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents. @@ -74,11 +76,16 @@ public List prepareInputLogEvents(final Collection eventM } public void dispatchLogs(List inputLogEvents, List eventHandles) { - PutLogEventsRequest putLogEventsRequest = PutLogEventsRequest.builder() + final PutLogEventsRequest.Builder requestBuilder = PutLogEventsRequest.builder() .logEvents(inputLogEvents) .logGroupName(logGroup) - .logStreamName(logStream) - .build(); + .logStreamName(logStream); + + if (entity != null) { + requestBuilder.entity(entity); + } + + final PutLogEventsRequest putLogEventsRequest = requestBuilder.build(); executor.execute(Uploader.builder() .cloudWatchLogsClient(cloudWatchLogsClient) @@ -168,6 +175,11 @@ public void upload() { if (putLogEventsResponse != null) { dlqObjects = getDlqObjectsFromResponse(putLogEventsResponse); } + if (putLogEventsResponse != null && putLogEventsResponse.rejectedEntityInfo() != null) { + cloudWatchLogsMetrics.increaseEntityRejectedCounter(1); + LOG.warn(NOISY, "Entity was rejected by CloudWatch: {}", + putLogEventsResponse.rejectedEntityInfo().errorTypeAsString()); + } cloudWatchLogsMetrics.increaseLogEventSuccessCounter(totalEventCount - dlqObjects.size()); releaseEventHandles(putLogEventsResponse); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java index 63fc55cf4e..e8f5199604 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java @@ -23,12 +23,14 @@ public class CloudWatchLogsMetrics { public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped"; public static final String CLOUDWATCH_LOGS_LOG_SIZE = "cloudWatchLogsLogSize"; public static final String CLOUDWATCH_LOGS_REQUEST_SIZE = "cloudWatchLogsRequestSize"; + public static final String CLOUDWATCH_LOGS_ENTITY_REJECTED = "cloudWatchLogsEntityRejected"; private final Counter logEventSuccessCounter; private final Counter logEventFailCounter; private final Counter requestSuccessCount; private final Counter requestFailCount; private final Counter requestMultiFailCount; private final Counter logLargeEventsDroppedCounter; + private final Counter entityRejectedCounter; private final DistributionSummary logSizeMetric; private final DistributionSummary requestSizeMetric; @@ -39,6 +41,7 @@ public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) { this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED); this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED); this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED); + this.entityRejectedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_ENTITY_REJECTED); this.logSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_LOG_SIZE); this.requestSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_REQUEST_SIZE); } @@ -67,6 +70,10 @@ public void increaseLogLargeEventsDroppedCounter(int value) { logLargeEventsDroppedCounter.increment(value); } + public void increaseEntityRejectedCounter(int value) { + entityRejectedCounter.increment(value); + } + public void recordLogSize(int value) { logSizeMetric.record(value); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index 563cd1070f..afa9842a62 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -67,6 +67,10 @@ public class CloudWatchLogsSinkConfig { @JsonProperty(value = "create_log_stream", defaultValue = "false") private boolean createLogStream = false; + @JsonProperty("entity") + @Valid + private EntityConfig entityConfig; + public AwsConfig getAwsConfig() { return awsConfig; } @@ -111,4 +115,8 @@ public boolean getCreateLogStream() { return createLogStream; } + public EntityConfig getEntityConfig() { + return entityConfig; + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java new file mode 100644 index 0000000000..84d92c9e21 --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; + +import java.util.Collections; +import java.util.Map; + +public class EntityConfig { + + @JsonProperty("key_attributes") + @NotEmpty + private Map keyAttributes = Collections.emptyMap(); + + @JsonProperty("attributes") + @NotNull + private Map attributes = Collections.emptyMap(); + + public Map getKeyAttributes() { + return Collections.unmodifiableMap(keyAttributes); + } + + public Map getAttributes() { + return Collections.unmodifiableMap(attributes); + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java index a4b2dfa5dc..36bf042e29 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -24,8 +24,11 @@ import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.EntityConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.Entity; import software.amazon.awssdk.regions.Region; @@ -36,6 +39,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -69,7 +73,7 @@ class CloudWatchLogsSinkTest { private static final String TEST_BUFFER_TYPE = "in_memory"; // Number of args Lombok @Builder passes to the all-args constructor of CloudWatchLogsDispatcher. // Bumping this is the signal that positional context.arguments().get(N) calls below need to be re-audited. - private static final int EXPECTED_DISPATCHER_ARITY = 10; + private static final int EXPECTED_DISPATCHER_ARITY = 11; private int numRetries; @BeforeEach void setUp() { @@ -349,4 +353,56 @@ void WHEN_create_log_group_is_false_and_create_log_stream_is_false_THEN_flags_pa assertThat(capturedCreateLogStream[0], equalTo(false)); } + @Test + void WHEN_entity_config_is_null_THEN_dispatcher_built_without_entity() { + final Entity[] capturedEntity = new Entity[1]; + + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + final MockedConstruction dispatcherMock = + mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedEntity[0] = (Entity) context.arguments().get(10); + }); + + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class), any(), any())) + .thenReturn(mockClient); + + getTestCloudWatchSink(); + dispatcherMock.close(); + } + assertThat(capturedEntity[0], equalTo(null)); + } + + @Test + void WHEN_entity_config_is_provided_THEN_dispatcher_built_with_entity() throws Exception { + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + final Map attributes = new HashMap<>(); + attributes.put("AWS.ServiceNameSource", "UserConfiguration"); + + final EntityConfig entityConfig = new EntityConfig(); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "keyAttributes", keyAttributes); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "attributes", attributes); + when(mockCloudWatchLogsSinkConfig.getEntityConfig()).thenReturn(entityConfig); + + final Entity[] capturedEntity = new Entity[1]; + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + final MockedConstruction dispatcherMock = + mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedEntity[0] = (Entity) context.arguments().get(10); + }); + + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class), any(), any())) + .thenReturn(mockClient); + + getTestCloudWatchSink(); + dispatcherMock.close(); + } + assertThat(capturedEntity[0], notNullValue()); + assertThat(capturedEntity[0].keyAttributes(), equalTo(keyAttributes)); + assertThat(capturedEntity[0].attributes(), equalTo(attributes)); + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 01f67dc064..0fef5b031f 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -15,6 +15,8 @@ import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse; +import software.amazon.awssdk.services.cloudwatchlogs.model.Entity; +import software.amazon.awssdk.services.cloudwatchlogs.model.RejectedEntityInfo; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse; @@ -25,10 +27,16 @@ import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Collection; import java.util.List; import java.util.concurrent.Executor; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -550,4 +558,112 @@ void GIVEN_create_flag_true_AND_creation_succeeds_BUT_put_log_events_still_throw verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1); verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); } + + @Test + void GIVEN_resource_not_found_and_both_create_group_and_stream_throw_non_already_exists_SHOULD_not_kill_uploader() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenThrow(CloudWatchLogsException.builder().message("access denied").build()); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenThrow(CloudWatchLogsException.builder().message("access denied").build()); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Both creation calls failed but the helper swallowed both exceptions; PutLogEvents retry succeeded. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + + @Test + void GIVEN_entity_configured_WHEN_dispatch_logs_called_SHOULD_set_entity_on_put_log_events_request() { + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + final Map attributes = new HashMap<>(); + attributes.put("AWS.ServiceNameSource", "UserConfiguration"); + final Entity entity = Entity.builder().keyAttributes(keyAttributes).attributes(attributes).build(); + + cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder() + .cloudWatchLogsClient(mockCloudWatchLogsClient) + .cloudWatchLogsMetrics(mockCloudWatchLogsMetrics) + .executor(mockExecutor) + .logGroup(LOG_GROUP) + .logStream(LOG_STREAM) + .retryCount(RETRY_COUNT) + .dropIfDlqNotConfigured(true) + .entity(entity) + .build(); + + final List eventHandles = getSampleEventHandles(); + final PutLogEventsResponse response = mock(PutLogEventsResponse.class); + when(response.rejectedLogEventsInfo()).thenReturn(null); + when(response.rejectedEntityInfo()).thenReturn(null); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenReturn(response); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PutLogEventsRequest.class); + verify(mockCloudWatchLogsClient).putLogEvents(requestCaptor.capture()); + final PutLogEventsRequest captured = requestCaptor.getValue(); + + assertThat(captured.entity(), notNullValue()); + assertThat(captured.entity().keyAttributes(), equalTo(keyAttributes)); + assertThat(captured.entity().attributes(), equalTo(attributes)); + } + + @Test + void GIVEN_entity_not_configured_WHEN_dispatch_logs_called_SHOULD_not_set_entity_on_put_log_events_request() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT); + + final List eventHandles = getSampleEventHandles(); + final PutLogEventsResponse response = mock(PutLogEventsResponse.class); + when(response.rejectedLogEventsInfo()).thenReturn(null); + when(response.rejectedEntityInfo()).thenReturn(null); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenReturn(response); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PutLogEventsRequest.class); + verify(mockCloudWatchLogsClient).putLogEvents(requestCaptor.capture()); + + assertThat(requestCaptor.getValue().entity(), nullValue()); + } + + @Test + void GIVEN_entity_rejected_WHEN_put_log_events_succeeds_SHOULD_increment_rejection_metric_and_release_events() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT); + + final List eventHandles = getSampleEventHandles(); + final PutLogEventsResponse response = mock(PutLogEventsResponse.class); + final RejectedEntityInfo rejectedEntityInfo = mock(RejectedEntityInfo.class); + when(rejectedEntityInfo.errorTypeAsString()).thenReturn("InvalidEntity"); + when(response.rejectedLogEventsInfo()).thenReturn(null); + when(response.rejectedEntityInfo()).thenReturn(rejectedEntityInfo); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenReturn(response); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + verify(mockCloudWatchLogsMetrics, times(1)).increaseEntityRejectedCounter(1); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + eventHandles.forEach(eventHandle -> verify(eventHandle).release(true)); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java index 528034d373..7da149a391 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java @@ -18,6 +18,7 @@ class CloudWatchLogsMetricsTest { private Counter mockSuccessRequestCounter; private Counter mockFailedEventCounter; private Counter mockFailedRequestCounter; + private Counter mockEntityRejectedCounter; @BeforeEach void setUp() { @@ -26,11 +27,13 @@ void setUp() { mockSuccessRequestCounter = mock(Counter.class); mockFailedEventCounter = mock(Counter.class); mockFailedRequestCounter = mock(Counter.class); + mockEntityRejectedCounter = mock(Counter.class); when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED)).thenReturn(mockSuccessEventCounter); when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED)).thenReturn(mockSuccessRequestCounter); when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED)).thenReturn(mockFailedEventCounter); when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED)).thenReturn(mockFailedRequestCounter); + when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_ENTITY_REJECTED)).thenReturn(mockEntityRejectedCounter); testCloudWatchLogsMetrics = new CloudWatchLogsMetrics(mockPluginMetrics); } @@ -63,4 +66,10 @@ void WHEN_increase_request_failed_counter_called_THEN_request_failed_counter_inc testCloudWatchLogsMetrics.increaseRequestFailCounter(1); verify(mockFailedRequestCounter, times(1)).increment(1); } + + @Test + void WHEN_increase_entity_rejected_counter_called_THEN_entity_rejected_counter_increase_method_should_be_called() { + testCloudWatchLogsMetrics.increaseEntityRejectedCounter(1); + verify(mockEntityRejectedCounter, times(1)).increment(1); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java index 6c9f24d2df..92b5f06850 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java @@ -12,6 +12,10 @@ import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import jakarta.validation.ConstraintValidatorContext; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validation; +import jakarta.validation.Validator; +import org.hibernate.validator.messageinterpolation.ParameterMessageInterpolator; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.aMapWithSize; @@ -26,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.Set; class CloudWatchLogsSinkConfigTest { private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; @@ -348,4 +353,44 @@ void GIVEN_create_log_stream_set_true_WHEN_get_called_SHOULD_return_true() "createLogStream", true); assertThat(cloudWatchLogsSinkConfig.getCreateLogStream(), equalTo(true)); } + + @Test + void GIVEN_new_sink_config_WHEN_get_entity_config_called_SHOULD_return_null() { + assertThat(new CloudWatchLogsSinkConfig().getEntityConfig(), equalTo(null)); + } + + @Test + void GIVEN_entity_config_set_WHEN_get_entity_config_called_SHOULD_return_configured_value() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "keyAttributes", keyAttributes); + + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "entityConfig", entityConfig); + + assertThat(cloudWatchLogsSinkConfig.getEntityConfig(), equalTo(entityConfig)); + assertThat(cloudWatchLogsSinkConfig.getEntityConfig().getKeyAttributes(), equalTo(keyAttributes)); + } + + @Test + void GIVEN_invalid_entity_config_WHEN_sink_config_validated_SHOULD_propagate_validation_error() + throws NoSuchFieldException, IllegalAccessException { + final Validator beanValidator = Validation.byDefaultProvider() + .configure() + .messageInterpolator(new ParameterMessageInterpolator()) + .buildValidatorFactory() + .getValidator(); + + final EntityConfig invalidEntityConfig = new EntityConfig(); + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "logGroup", LOG_GROUP); + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "logStream", LOG_STREAM); + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "entityConfig", invalidEntityConfig); + + final Set> violations = beanValidator.validate(cloudWatchLogsSinkConfig); + + assertThat(violations.stream() + .anyMatch(v -> v.getPropertyPath().toString().equals("entityConfig.keyAttributes")), equalTo(true)); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java new file mode 100644 index 0000000000..1e8b41d48b --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java @@ -0,0 +1,173 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validation; +import jakarta.validation.Validator; +import org.hibernate.validator.messageinterpolation.ParameterMessageInterpolator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class EntityConfigTest { + private static final String KEY_ATTRIBUTES_FIELD = "keyAttributes"; + private static final String ATTRIBUTES_FIELD = "attributes"; + + private ObjectMapper objectMapper; + private Validator validator; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + validator = Validation.byDefaultProvider() + .configure() + .messageInterpolator(new ParameterMessageInterpolator()) + .buildValidatorFactory() + .getValidator(); + } + + @Test + void GIVEN_new_entity_config_WHEN_get_key_attributes_called_SHOULD_return_empty_map() { + final EntityConfig entityConfig = new EntityConfig(); + + assertThat(entityConfig.getKeyAttributes(), notNullValue()); + assertThat(entityConfig.getKeyAttributes(), aMapWithSize(0)); + } + + @Test + void GIVEN_new_entity_config_WHEN_get_attributes_called_SHOULD_return_empty_map() { + final EntityConfig entityConfig = new EntityConfig(); + + assertThat(entityConfig.getAttributes(), notNullValue()); + assertThat(entityConfig.getAttributes(), aMapWithSize(0)); + } + + @Test + void GIVEN_key_attributes_set_WHEN_get_key_attributes_called_SHOULD_return_configured_value() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + + ReflectivelySetField.setField(EntityConfig.class, entityConfig, KEY_ATTRIBUTES_FIELD, keyAttributes); + + assertThat(entityConfig.getKeyAttributes(), equalTo(keyAttributes)); + } + + @Test + void GIVEN_attributes_set_WHEN_get_attributes_called_SHOULD_return_configured_value() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map attributes = new HashMap<>(); + attributes.put("AWS.ServiceNameSource", "UserConfiguration"); + + ReflectivelySetField.setField(EntityConfig.class, entityConfig, ATTRIBUTES_FIELD, attributes); + + assertThat(entityConfig.getAttributes(), equalTo(attributes)); + } + + @Test + void GIVEN_entity_config_with_key_attributes_WHEN_deserialized_from_map_SHOULD_populate_fields() { + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + final Map attributes = new HashMap<>(); + attributes.put("AWS.ServiceNameSource", "UserConfiguration"); + final Map jsonMap = new HashMap<>(); + jsonMap.put("key_attributes", keyAttributes); + jsonMap.put("attributes", attributes); + + final EntityConfig entityConfig = objectMapper.convertValue(jsonMap, EntityConfig.class); + + assertThat(entityConfig.getKeyAttributes(), equalTo(keyAttributes)); + assertThat(entityConfig.getAttributes(), equalTo(attributes)); + } + + @Test + void GIVEN_entity_config_with_only_key_attributes_in_yaml_WHEN_deserialized_SHOULD_default_attributes_to_empty_map() { + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + final Map jsonMap = new HashMap<>(); + jsonMap.put("key_attributes", keyAttributes); + + final EntityConfig entityConfig = objectMapper.convertValue(jsonMap, EntityConfig.class); + + assertThat(entityConfig.getKeyAttributes(), equalTo(keyAttributes)); + assertThat(entityConfig.getAttributes(), notNullValue()); + assertThat(entityConfig.getAttributes(), aMapWithSize(0)); + } + + @Test + void GIVEN_entity_config_with_default_key_attributes_WHEN_validated_SHOULD_fail_NotEmpty_constraint() { + final EntityConfig entityConfig = new EntityConfig(); + + final Set> violations = validator.validate(entityConfig); + + assertThat(violations, hasSize(1)); + final ConstraintViolation violation = violations.iterator().next(); + assertThat(violation.getPropertyPath().toString(), equalTo(KEY_ATTRIBUTES_FIELD)); + } + + @Test + void GIVEN_null_attributes_WHEN_validated_SHOULD_fail_NotNull_constraint() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "Service"); + keyAttributes.put("Name", "my-app"); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, KEY_ATTRIBUTES_FIELD, keyAttributes); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, ATTRIBUTES_FIELD, null); + + final Set> violations = validator.validate(entityConfig); + + assertThat(violations, hasSize(1)); + assertThat(violations.iterator().next().getPropertyPath().toString(), equalTo(ATTRIBUTES_FIELD)); + } + + @Test + void GIVEN_key_attributes_set_WHEN_put_on_returned_map_SHOULD_throw_unsupported_operation() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, KEY_ATTRIBUTES_FIELD, keyAttributes); + + assertThrows(UnsupportedOperationException.class, + () -> entityConfig.getKeyAttributes().put("new", "value")); + } + + @Test + void GIVEN_attributes_set_WHEN_put_on_returned_map_SHOULD_throw_unsupported_operation() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map attributes = new HashMap<>(); + attributes.put("key", "value"); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, ATTRIBUTES_FIELD, attributes); + + assertThrows(UnsupportedOperationException.class, + () -> entityConfig.getAttributes().put("new", "value")); + } +}