From 1c7f6221ab7c8296bc5ded517aaa7dd3fb681f2e Mon Sep 17 00:00:00 2001 From: Nikhil Bagmar Date: Fri, 15 May 2026 15:19:18 -0700 Subject: [PATCH 1/5] feat(cloudwatch-logs): Add Entity config support to sink MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an optional 'entity' configuration block on the CloudWatch Logs sink that attaches CloudWatch Entity metadata (key_attributes + attributes) to every PutLogEvents request, enabling entity-based correlation in CloudWatch. When 'entity' is omitted the sink behaves identically to before. When configured, an SDK Entity is built and passed to the dispatcher via the builder; the dispatcher conditionally sets it on each PLE request. If CloudWatch rejects the entity, the request is still considered successful (events are released), a WARN log records the RejectedEntityInfo error type, and a new 'cloudWatchLogsEntityRejected' counter is incremented as the primary alarmable signal. Validation is intentionally minimal — @NotEmpty on key_attributes only. AWS-owned limits (max entries, allowed key names, length caps) are enforced server-side and surfaced via the rejection metric, avoiding client-side drift when the Entity API contract changes. Tests follow TDD: each test was written before the code that makes it pass. New unit tests cover EntityConfig defaults, deserialization, and @NotEmpty validation; @Valid cascade from CloudWatchLogsSinkConfig; the new counter; dispatcher entity-on-request, no-entity, and rejection paths; and sink wiring of EntityConfig -> Entity -> dispatcher. TestSinkOperationWithEntity integration test exercises the SDK end-to-end. Acceptance criteria from the spec are met: backward compatibility, entity attached when configured, non-fatal rejection with metric and log, all existing tests green, new tests cover new code paths, license headers on new files. Refs: data-prepper-plugins/cloudwatch-logs/specs/entity-config.md Signed-off-by: Nikhil Bagmar --- .../cloudwatch-logs/README.md | 16 ++ .../cloudwatch_logs/CloudWatchLogsIT.java | 46 ++++++ .../cloudwatch_logs/CloudWatchLogsSink.java | 9 ++ .../client/CloudWatchLogsDispatcher.java | 18 ++- .../client/CloudWatchLogsMetrics.java | 7 + .../config/CloudWatchLogsSinkConfig.java | 8 + .../cloudwatch_logs/config/EntityConfig.java | 30 ++++ .../CloudWatchLogsSinkTest.java | 59 +++++++- .../client/CloudWatchLogsDispatcherTest.java | 87 +++++++++++ .../client/CloudWatchLogsMetricsTest.java | 9 ++ .../config/CloudWatchLogsSinkConfigTest.java | 46 ++++++ .../config/EntityConfigTest.java | 141 ++++++++++++++++++ 12 files changed, 472 insertions(+), 4 deletions(-) create mode 100644 data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java create mode 100644 data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java diff --git a/data-prepper-plugins/cloudwatch-logs/README.md b/data-prepper-plugins/cloudwatch-logs/README.md index cf1a4b8fb9..d8405b9277 100644 --- a/data-prepper-plugins/cloudwatch-logs/README.md +++ b/data-prepper-plugins/cloudwatch-logs/README.md @@ -37,6 +37,12 @@ pipeline: endpoint: "https://logs.us-west-2.amazonaws.com" create_log_group: false create_log_stream: false + entity: + key_attributes: + Type: "RemoteService" + Name: "okta_auth0" + attributes: + AWS.ServiceNameSource: "UserConfiguration" ``` ## AWS Configuration @@ -77,6 +83,15 @@ pipeline: `false`. When set to `true`, the IAM principal must have `logs:CreateLogStream` permission. +## Entity Configuration + +- `entity` (Optional) : An object that attaches CloudWatch [Entity](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_Entity.html) metadata to every `PutLogEvents` request. When omitted, no entity is attached and behavior is unchanged. + + - `key_attributes` (Required when `entity` is configured) : A `Map` of key attributes that uniquely identify the entity (for example `Type`, `Name`). + - `attributes` (Optional) : A `Map` of additional attributes describing the entity. Defaults to an empty map. + + AWS-owned limits — maximum number of entries, allowed key names (such as `Type`, `ResourceType`, `Identifier`, `Name`, `Environment`), and key/value length caps — are enforced by CloudWatch at request time and are intentionally not mirrored here. Violations are surfaced via the `cloudWatchLogsEntityRejected` metric and a log warning. See the [AWS Entity API docs](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_Entity.html) for the current contract. + ## Buffer Type Configuration - `buffer_type` (Optional) : A string representing the type of buffer to use to hold onto events. Currently only supports `in_memory`. @@ -95,6 +110,7 @@ threshold parameters. * `cloudWatchLogsEventsFailed` - The number of log events failed while publishing to CloudWatch Logs. * `cloudWatchLogsRequestsSucceeded` - The number of log requests successfully made to CloudWatch Logs. * `cloudWatchLogsRequestsFailed` - The number of log requests failed to reach CloudWatch Logs. +* `cloudWatchLogsEntityRejected` - The number of `PutLogEvents` responses where CloudWatch rejected the configured entity. The request itself still succeeds and events are released; this counter is the primary signal for misconfigured `entity` attributes. ## Developer Guide diff --git a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java index 6234b13623..1d3fae0644 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java +++ b/data-prepper-plugins/cloudwatch-logs/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsIT.java @@ -22,9 +22,11 @@ import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.EntityConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.GetLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.OutputLogEvent; @@ -335,6 +337,50 @@ void TestSinkOperationWithLogSendInterval() throws Exception { verify(eventHandle, times(NUM_RECORDS)).release(true); } + @Test + void TestSinkOperationWithEntity() throws Exception { + long startTime = Instant.now().toEpochMilli(); + when(thresholdConfig.getBatchSize()).thenReturn(10); + when(thresholdConfig.getFlushInterval()).thenReturn(10L); + when(thresholdConfig.getMaxRequestSizeBytes()).thenReturn(1000L); + + final EntityConfig entityConfig = new EntityConfig(); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "keyAttributes", + Map.of("Type", "RemoteService", "Name", "data-prepper-cwl-it")); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "attributes", + Map.of("AWS.ServiceNameSource", "UserConfiguration")); + when(cloudWatchLogsSinkConfig.getEntityConfig()).thenReturn(entityConfig); + + sink = createObjectUnderTest(); + Collection> records = getRecordList(NUM_RECORDS); + sink.doOutput(records); + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> { + sink.doOutput(Collections.emptyList()); + long endTime = Instant.now().toEpochMilli(); + GetLogEventsRequest getRequest = GetLogEventsRequest + .builder() + .logGroupName(logGroupName) + .logStreamName(logStreamName) + .startTime(startTime) + .endTime(endTime) + .build(); + GetLogEventsResponse response = cloudWatchLogsClient.getLogEvents(getRequest); + List events = response.events(); + assertThat(events.size(), equalTo(NUM_RECORDS)); + for (int i = 0; i < events.size(); i++) { + String message = events.get(i).message(); + Map event = objectMapper.readValue(message, Map.class); + assertThat(event.get("name"), equalTo("Person" + i)); + assertThat(event.get("age"), equalTo(Integer.toString(i))); + } + }); + assertThat(eventsSuccessCount.get(), equalTo(NUM_RECORDS)); + assertThat(requestsSuccessCount.get(), equalTo(1)); + assertThat(dlqSuccessCount.get(), equalTo(0)); + verify(eventHandle, times(NUM_RECORDS)).release(true); + } + @Test void TestSinkOperationWithBatchSize() throws Exception { long startTime = Instant.now().toEpochMilli(); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java index 62e576f194..b6d21e8797 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSink.java @@ -24,11 +24,13 @@ import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsClientFactory; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.EntityConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.exception.InvalidBufferTypeException; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.utils.CloudWatchLogsLimits; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.Entity; import org.opensearch.dataprepper.plugins.dlq.DlqPushHandler; import org.opensearch.dataprepper.model.annotations.Experimental; import org.slf4j.Logger; @@ -87,6 +89,12 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, Executor executor = Executors.newFixedThreadPool(cloudWatchLogsSinkConfig.getWorkers()); + final EntityConfig entityConfig = cloudWatchLogsSinkConfig.getEntityConfig(); + final Entity entity = entityConfig == null ? null : Entity.builder() + .keyAttributes(entityConfig.getKeyAttributes()) + .attributes(entityConfig.getAttributes()) + .build(); + CloudWatchLogsDispatcher cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder() .cloudWatchLogsClient(cloudWatchLogsClient) .cloudWatchLogsMetrics(cloudWatchLogsMetrics) @@ -98,6 +106,7 @@ public CloudWatchLogsSink(final PluginSetting pluginSetting, .executor(executor) .createLogGroup(cloudWatchLogsSinkConfig.getCreateLogGroup()) .createLogStream(cloudWatchLogsSinkConfig.getCreateLogStream()) + .entity(entity) .build(); Buffer buffer; diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index 3011076af9..d1cfacd87f 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -15,6 +15,7 @@ import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; +import software.amazon.awssdk.services.cloudwatchlogs.model.Entity; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse; @@ -46,6 +47,7 @@ public class CloudWatchLogsDispatcher { private int retryCount; private boolean createLogGroup; private boolean createLogStream; + private Entity entity; /** * Will read in a collection of log messages in byte form and transform them into a collection of InputLogEvents. @@ -74,11 +76,16 @@ public List prepareInputLogEvents(final Collection eventM } public void dispatchLogs(List inputLogEvents, List eventHandles) { - PutLogEventsRequest putLogEventsRequest = PutLogEventsRequest.builder() + final PutLogEventsRequest.Builder requestBuilder = PutLogEventsRequest.builder() .logEvents(inputLogEvents) .logGroupName(logGroup) - .logStreamName(logStream) - .build(); + .logStreamName(logStream); + + if (entity != null) { + requestBuilder.entity(entity); + } + + final PutLogEventsRequest putLogEventsRequest = requestBuilder.build(); executor.execute(Uploader.builder() .cloudWatchLogsClient(cloudWatchLogsClient) @@ -169,6 +176,11 @@ public void upload() { dlqObjects = getDlqObjectsFromResponse(putLogEventsResponse); } cloudWatchLogsMetrics.increaseLogEventSuccessCounter(totalEventCount - dlqObjects.size()); + if (putLogEventsResponse != null && putLogEventsResponse.rejectedEntityInfo() != null) { + cloudWatchLogsMetrics.increaseEntityRejectedCounter(1); + LOG.warn("Entity was rejected by CloudWatch: {}", + putLogEventsResponse.rejectedEntityInfo().errorTypeAsString()); + } releaseEventHandles(putLogEventsResponse); } CloudWatchLogsSinkUtils.handleDlqObjects(dlqObjects, dlqPushHandler); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java index 63fc55cf4e..e8f5199604 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetrics.java @@ -23,12 +23,14 @@ public class CloudWatchLogsMetrics { public static final String CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED = "cloudWatchLogsLargeEventsDropped"; public static final String CLOUDWATCH_LOGS_LOG_SIZE = "cloudWatchLogsLogSize"; public static final String CLOUDWATCH_LOGS_REQUEST_SIZE = "cloudWatchLogsRequestSize"; + public static final String CLOUDWATCH_LOGS_ENTITY_REJECTED = "cloudWatchLogsEntityRejected"; private final Counter logEventSuccessCounter; private final Counter logEventFailCounter; private final Counter requestSuccessCount; private final Counter requestFailCount; private final Counter requestMultiFailCount; private final Counter logLargeEventsDroppedCounter; + private final Counter entityRejectedCounter; private final DistributionSummary logSizeMetric; private final DistributionSummary requestSizeMetric; @@ -39,6 +41,7 @@ public CloudWatchLogsMetrics(final PluginMetrics pluginMetrics) { this.logEventFailCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED); this.requestSuccessCount = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED); this.logLargeEventsDroppedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_LARGE_EVENTS_DROPPED); + this.entityRejectedCounter = pluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_ENTITY_REJECTED); this.logSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_LOG_SIZE); this.requestSizeMetric = pluginMetrics.summary(CLOUDWATCH_LOGS_REQUEST_SIZE); } @@ -67,6 +70,10 @@ public void increaseLogLargeEventsDroppedCounter(int value) { logLargeEventsDroppedCounter.increment(value); } + public void increaseEntityRejectedCounter(int value) { + entityRejectedCounter.increment(value); + } + public void recordLogSize(int value) { logSizeMetric.record(value); } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java index 563cd1070f..afa9842a62 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfig.java @@ -67,6 +67,10 @@ public class CloudWatchLogsSinkConfig { @JsonProperty(value = "create_log_stream", defaultValue = "false") private boolean createLogStream = false; + @JsonProperty("entity") + @Valid + private EntityConfig entityConfig; + public AwsConfig getAwsConfig() { return awsConfig; } @@ -111,4 +115,8 @@ public boolean getCreateLogStream() { return createLogStream; } + public EntityConfig getEntityConfig() { + return entityConfig; + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java new file mode 100644 index 0000000000..02cfb4b3fe --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; + +import java.util.HashMap; +import java.util.Map; + +public class EntityConfig { + + @JsonProperty("key_attributes") + @NotEmpty + private Map keyAttributes; + + @JsonProperty("attributes") + private Map attributes = new HashMap<>(); + + public Map getKeyAttributes() { + return keyAttributes; + } + + public Map getAttributes() { + return attributes; + } +} diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java index a4b2dfa5dc..42db6c78bc 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -24,8 +24,11 @@ import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.client.CloudWatchLogsMetrics; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.AwsConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.CloudWatchLogsSinkConfig; +import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.EntityConfig; import org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config.ThresholdConfig; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.Entity; import software.amazon.awssdk.regions.Region; @@ -36,6 +39,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -69,7 +73,7 @@ class CloudWatchLogsSinkTest { private static final String TEST_BUFFER_TYPE = "in_memory"; // Number of args Lombok @Builder passes to the all-args constructor of CloudWatchLogsDispatcher. // Bumping this is the signal that positional context.arguments().get(N) calls below need to be re-audited. - private static final int EXPECTED_DISPATCHER_ARITY = 10; + private static final int EXPECTED_DISPATCHER_ARITY = 11; private int numRetries; @BeforeEach void setUp() { @@ -291,6 +295,7 @@ void WHEN_sink_has_dlq_config_THEN_retries_set_to_user_configured_value() { assertThat(numRetries, equalTo(TEST_MAX_RETRIES)); } + @Test @Test void WHEN_create_log_group_and_stream_flags_are_set_THEN_flags_passed_to_dispatcher() { when(mockCloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(true); @@ -349,4 +354,56 @@ void WHEN_create_log_group_is_false_and_create_log_stream_is_false_THEN_flags_pa assertThat(capturedCreateLogStream[0], equalTo(false)); } + @Test + void WHEN_entity_config_is_null_THEN_dispatcher_built_without_entity() { + final Entity[] capturedEntity = new Entity[1]; + + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + final MockedConstruction dispatcherMock = + mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedEntity[0] = (Entity) context.arguments().get(10); + }); + + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class), any(), any())) + .thenReturn(mockClient); + + getTestCloudWatchSink(); + dispatcherMock.close(); + } + assertThat(capturedEntity[0], equalTo(null)); + } + + @Test + void WHEN_entity_config_is_provided_THEN_dispatcher_built_with_entity() throws Exception { + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + final Map attributes = new HashMap<>(); + attributes.put("AWS.ServiceNameSource", "UserConfiguration"); + + final EntityConfig entityConfig = new EntityConfig(); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "keyAttributes", keyAttributes); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "attributes", attributes); + when(mockCloudWatchLogsSinkConfig.getEntityConfig()).thenReturn(entityConfig); + + final Entity[] capturedEntity = new Entity[1]; + try(MockedStatic mockedStatic = mockStatic(CloudWatchLogsClientFactory.class)) { + final MockedConstruction dispatcherMock = + mockConstruction(CloudWatchLogsDispatcher.class, (mock, context) -> { + capturedEntity[0] = (Entity) context.arguments().get(10); + }); + + mockedStatic.when(() -> CloudWatchLogsClientFactory.createCwlClient(any(AwsConfig.class), + any(AwsCredentialsSupplier.class), any(), any())) + .thenReturn(mockClient); + + getTestCloudWatchSink(); + dispatcherMock.close(); + } + assertThat(capturedEntity[0], notNullValue()); + assertThat(capturedEntity[0].keyAttributes(), equalTo(keyAttributes)); + assertThat(capturedEntity[0].attributes(), equalTo(attributes)); + } + } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 01f67dc064..283b6778ba 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -15,6 +15,8 @@ import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogGroupResponse; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.CreateLogStreamResponse; +import software.amazon.awssdk.services.cloudwatchlogs.model.Entity; +import software.amazon.awssdk.services.cloudwatchlogs.model.RejectedEntityInfo; import software.amazon.awssdk.services.cloudwatchlogs.model.InputLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.PutLogEventsResponse; @@ -25,6 +27,8 @@ import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import java.util.Collection; import java.util.List; import java.util.concurrent.Executor; @@ -550,4 +554,87 @@ void GIVEN_create_flag_true_AND_creation_succeeds_BUT_put_log_events_still_throw verify(mockCloudWatchLogsMetrics, times(RETRY_COUNT)).increaseRequestFailCounter(1); verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); } + + @Test + void GIVEN_entity_configured_WHEN_dispatch_logs_called_SHOULD_set_entity_on_put_log_events_request() { + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + final Map attributes = new HashMap<>(); + attributes.put("AWS.ServiceNameSource", "UserConfiguration"); + final Entity entity = Entity.builder().keyAttributes(keyAttributes).attributes(attributes).build(); + + cloudWatchLogsDispatcher = CloudWatchLogsDispatcher.builder() + .cloudWatchLogsClient(mockCloudWatchLogsClient) + .cloudWatchLogsMetrics(mockCloudWatchLogsMetrics) + .executor(mockExecutor) + .logGroup(LOG_GROUP) + .logStream(LOG_STREAM) + .retryCount(RETRY_COUNT) + .dropIfDlqNotConfigured(true) + .entity(entity) + .build(); + + final List eventHandles = getSampleEventHandles(); + final PutLogEventsResponse response = mock(PutLogEventsResponse.class); + when(response.rejectedLogEventsInfo()).thenReturn(null); + when(response.rejectedEntityInfo()).thenReturn(null); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenReturn(response); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PutLogEventsRequest.class); + verify(mockCloudWatchLogsClient).putLogEvents(requestCaptor.capture()); + final PutLogEventsRequest captured = requestCaptor.getValue(); + + assertThat(captured.entity(), notNullValue()); + assertThat(captured.entity().keyAttributes(), equalTo(keyAttributes)); + assertThat(captured.entity().attributes(), equalTo(attributes)); + } + + @Test + void GIVEN_entity_not_configured_WHEN_dispatch_logs_called_SHOULD_not_set_entity_on_put_log_events_request() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT); + + final List eventHandles = getSampleEventHandles(); + final PutLogEventsResponse response = mock(PutLogEventsResponse.class); + when(response.rejectedLogEventsInfo()).thenReturn(null); + when(response.rejectedEntityInfo()).thenReturn(null); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenReturn(response); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PutLogEventsRequest.class); + verify(mockCloudWatchLogsClient).putLogEvents(requestCaptor.capture()); + + assertThat(requestCaptor.getValue().entity(), nullValue()); + } + + @Test + void GIVEN_entity_rejected_WHEN_put_log_events_succeeds_SHOULD_increment_rejection_metric_and_release_events() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcher(RETRY_COUNT); + + final List eventHandles = getSampleEventHandles(); + final PutLogEventsResponse response = mock(PutLogEventsResponse.class); + final RejectedEntityInfo rejectedEntityInfo = mock(RejectedEntityInfo.class); + when(rejectedEntityInfo.errorTypeAsString()).thenReturn("InvalidEntity"); + when(response.rejectedLogEventsInfo()).thenReturn(null); + when(response.rejectedEntityInfo()).thenReturn(rejectedEntityInfo); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))).thenReturn(response); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + verify(mockCloudWatchLogsMetrics, times(1)).increaseEntityRejectedCounter(1); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + eventHandles.forEach(eventHandle -> verify(eventHandle).release(true)); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java index 528034d373..7da149a391 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsMetricsTest.java @@ -18,6 +18,7 @@ class CloudWatchLogsMetricsTest { private Counter mockSuccessRequestCounter; private Counter mockFailedEventCounter; private Counter mockFailedRequestCounter; + private Counter mockEntityRejectedCounter; @BeforeEach void setUp() { @@ -26,11 +27,13 @@ void setUp() { mockSuccessRequestCounter = mock(Counter.class); mockFailedEventCounter = mock(Counter.class); mockFailedRequestCounter = mock(Counter.class); + mockEntityRejectedCounter = mock(Counter.class); when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_SUCCEEDED)).thenReturn(mockSuccessEventCounter); when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_SUCCEEDED)).thenReturn(mockSuccessRequestCounter); when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_EVENTS_FAILED)).thenReturn(mockFailedEventCounter); when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_REQUESTS_FAILED)).thenReturn(mockFailedRequestCounter); + when(mockPluginMetrics.counter(CloudWatchLogsMetrics.CLOUDWATCH_LOGS_ENTITY_REJECTED)).thenReturn(mockEntityRejectedCounter); testCloudWatchLogsMetrics = new CloudWatchLogsMetrics(mockPluginMetrics); } @@ -63,4 +66,10 @@ void WHEN_increase_request_failed_counter_called_THEN_request_failed_counter_inc testCloudWatchLogsMetrics.increaseRequestFailCounter(1); verify(mockFailedRequestCounter, times(1)).increment(1); } + + @Test + void WHEN_increase_entity_rejected_counter_called_THEN_entity_rejected_counter_increase_method_should_be_called() { + testCloudWatchLogsMetrics.increaseEntityRejectedCounter(1); + verify(mockEntityRejectedCounter, times(1)).increment(1); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java index 6c9f24d2df..bdee899435 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java @@ -12,6 +12,10 @@ import org.opensearch.dataprepper.test.helper.ReflectivelySetField; import jakarta.validation.ConstraintValidatorContext; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validation; +import jakarta.validation.Validator; +import org.hibernate.validator.messageinterpolation.ParameterMessageInterpolator; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.aMapWithSize; @@ -26,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.Set; class CloudWatchLogsSinkConfigTest { private CloudWatchLogsSinkConfig cloudWatchLogsSinkConfig; @@ -323,6 +328,7 @@ void GIVEN_endpoint_configured_SHOULD_return_the_configured_value() throws NoSuc assertThat(cloudWatchLogsSinkConfig.getEndpoint(), equalTo(testEndpoint)); } + @Test @Test void GIVEN_new_sink_config_WHEN_get_create_log_group_called_SHOULD_return_false() { assertThat(new CloudWatchLogsSinkConfig().getCreateLogGroup(), equalTo(false)); @@ -348,4 +354,44 @@ void GIVEN_create_log_stream_set_true_WHEN_get_called_SHOULD_return_true() "createLogStream", true); assertThat(cloudWatchLogsSinkConfig.getCreateLogStream(), equalTo(true)); } + + @Test + void GIVEN_new_sink_config_WHEN_get_entity_config_called_SHOULD_return_null() { + assertThat(new CloudWatchLogsSinkConfig().getEntityConfig(), equalTo(null)); + } + + @Test + void GIVEN_entity_config_set_WHEN_get_entity_config_called_SHOULD_return_configured_value() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, "keyAttributes", keyAttributes); + + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "entityConfig", entityConfig); + + assertThat(cloudWatchLogsSinkConfig.getEntityConfig(), equalTo(entityConfig)); + assertThat(cloudWatchLogsSinkConfig.getEntityConfig().getKeyAttributes(), equalTo(keyAttributes)); + } + + @Test + void GIVEN_invalid_entity_config_WHEN_sink_config_validated_SHOULD_propagate_validation_error() + throws NoSuchFieldException, IllegalAccessException { + final Validator beanValidator = Validation.byDefaultProvider() + .configure() + .messageInterpolator(new ParameterMessageInterpolator()) + .buildValidatorFactory() + .getValidator(); + + final EntityConfig invalidEntityConfig = new EntityConfig(); + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "logGroup", LOG_GROUP); + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "logStream", LOG_STREAM); + ReflectivelySetField.setField(cloudWatchLogsSinkConfig.getClass(), cloudWatchLogsSinkConfig, "entityConfig", invalidEntityConfig); + + final Set> violations = beanValidator.validate(cloudWatchLogsSinkConfig); + + assertThat(violations.stream() + .anyMatch(v -> v.getPropertyPath().toString().equals("entityConfig.keyAttributes")), equalTo(true)); + } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java new file mode 100644 index 0000000000..a9e8092c5f --- /dev/null +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java @@ -0,0 +1,141 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.validation.ConstraintViolation; +import jakarta.validation.Validation; +import jakarta.validation.Validator; +import org.hibernate.validator.messageinterpolation.ParameterMessageInterpolator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.test.helper.ReflectivelySetField; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +class EntityConfigTest { + private static final String KEY_ATTRIBUTES_FIELD = "keyAttributes"; + private static final String ATTRIBUTES_FIELD = "attributes"; + + private ObjectMapper objectMapper; + private Validator validator; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + validator = Validation.byDefaultProvider() + .configure() + .messageInterpolator(new ParameterMessageInterpolator()) + .buildValidatorFactory() + .getValidator(); + } + + @Test + void GIVEN_new_entity_config_WHEN_get_key_attributes_called_SHOULD_return_null() { + final EntityConfig entityConfig = new EntityConfig(); + + assertThat(entityConfig.getKeyAttributes(), nullValue()); + } + + @Test + void GIVEN_new_entity_config_WHEN_get_attributes_called_SHOULD_return_empty_map() { + final EntityConfig entityConfig = new EntityConfig(); + + assertThat(entityConfig.getAttributes(), notNullValue()); + assertThat(entityConfig.getAttributes(), aMapWithSize(0)); + } + + @Test + void GIVEN_key_attributes_set_WHEN_get_key_attributes_called_SHOULD_return_configured_value() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + + ReflectivelySetField.setField(EntityConfig.class, entityConfig, KEY_ATTRIBUTES_FIELD, keyAttributes); + + assertThat(entityConfig.getKeyAttributes(), equalTo(keyAttributes)); + } + + @Test + void GIVEN_attributes_set_WHEN_get_attributes_called_SHOULD_return_configured_value() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map attributes = new HashMap<>(); + attributes.put("AWS.ServiceNameSource", "UserConfiguration"); + + ReflectivelySetField.setField(EntityConfig.class, entityConfig, ATTRIBUTES_FIELD, attributes); + + assertThat(entityConfig.getAttributes(), equalTo(attributes)); + } + + @Test + void GIVEN_entity_config_with_key_attributes_WHEN_deserialized_from_map_SHOULD_populate_fields() { + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + final Map attributes = new HashMap<>(); + attributes.put("AWS.ServiceNameSource", "UserConfiguration"); + final Map jsonMap = new HashMap<>(); + jsonMap.put("key_attributes", keyAttributes); + jsonMap.put("attributes", attributes); + + final EntityConfig entityConfig = objectMapper.convertValue(jsonMap, EntityConfig.class); + + assertThat(entityConfig.getKeyAttributes(), equalTo(keyAttributes)); + assertThat(entityConfig.getAttributes(), equalTo(attributes)); + } + + @Test + void GIVEN_entity_config_with_only_key_attributes_in_yaml_WHEN_deserialized_SHOULD_default_attributes_to_empty_map() { + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + keyAttributes.put("Name", "okta_auth0"); + final Map jsonMap = new HashMap<>(); + jsonMap.put("key_attributes", keyAttributes); + + final EntityConfig entityConfig = objectMapper.convertValue(jsonMap, EntityConfig.class); + + assertThat(entityConfig.getKeyAttributes(), equalTo(keyAttributes)); + assertThat(entityConfig.getAttributes(), notNullValue()); + assertThat(entityConfig.getAttributes(), aMapWithSize(0)); + } + + @Test + void GIVEN_entity_config_with_empty_key_attributes_WHEN_validated_SHOULD_fail_NotEmpty_constraint() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, KEY_ATTRIBUTES_FIELD, Collections.emptyMap()); + + final Set> violations = validator.validate(entityConfig); + + assertThat(violations, hasSize(1)); + final ConstraintViolation violation = violations.iterator().next(); + assertThat(violation.getPropertyPath().toString(), equalTo(KEY_ATTRIBUTES_FIELD)); + } + + @Test + void GIVEN_entity_config_with_null_key_attributes_WHEN_validated_SHOULD_fail_NotEmpty_constraint() { + final EntityConfig entityConfig = new EntityConfig(); + + final Set> violations = validator.validate(entityConfig); + + assertThat(violations, hasSize(1)); + final ConstraintViolation violation = violations.iterator().next(); + assertThat(violation.getPropertyPath().toString(), equalTo(KEY_ATTRIBUTES_FIELD)); + } +} From 36a3c348eca78fc96d407a547ba19ba2fe69f48e Mon Sep 17 00:00:00 2001 From: Nikhil Bagmar Date: Tue, 19 May 2026 12:14:59 -0700 Subject: [PATCH 2/5] Add full license headers to EntityConfig and EntityConfigTest Signed-off-by: Nikhil Bagmar --- .../plugins/sink/cloudwatch_logs/config/EntityConfig.java | 5 +++++ .../sink/cloudwatch_logs/config/EntityConfigTest.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java index 02cfb4b3fe..e2958c2180 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java index a9e8092c5f..02b0da219f 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java @@ -1,6 +1,11 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * */ package org.opensearch.dataprepper.plugins.sink.cloudwatch_logs.config; From 60dc0395ab226aa2d8bf5ff5e7f286af18f2ba8d Mon Sep 17 00:00:00 2001 From: Nikhil Bagmar Date: Thu, 21 May 2026 14:25:56 -0700 Subject: [PATCH 3/5] fix: resolve rebase conflicts with create_log_group_and_stream - Remove duplicate @Test annotations from auto-merge - Add Hamcrest imports for entity assertion matchers - Update EXPECTED_DISPATCHER_ARITY to 11 (entity field added) Signed-off-by: Nikhil Bagmar --- .../plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java | 1 - .../cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java | 4 ++++ .../cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java index 42db6c78bc..36bf042e29 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/CloudWatchLogsSinkTest.java @@ -295,7 +295,6 @@ void WHEN_sink_has_dlq_config_THEN_retries_set_to_user_configured_value() { assertThat(numRetries, equalTo(TEST_MAX_RETRIES)); } - @Test @Test void WHEN_create_log_group_and_stream_flags_are_set_THEN_flags_passed_to_dispatcher() { when(mockCloudWatchLogsSinkConfig.getCreateLogGroup()).thenReturn(true); diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 283b6778ba..130099eee9 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -33,6 +33,10 @@ import java.util.List; import java.util.concurrent.Executor; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java index bdee899435..92b5f06850 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/CloudWatchLogsSinkConfigTest.java @@ -328,7 +328,6 @@ void GIVEN_endpoint_configured_SHOULD_return_the_configured_value() throws NoSuc assertThat(cloudWatchLogsSinkConfig.getEndpoint(), equalTo(testEndpoint)); } - @Test @Test void GIVEN_new_sink_config_WHEN_get_create_log_group_called_SHOULD_return_false() { assertThat(new CloudWatchLogsSinkConfig().getCreateLogGroup(), equalTo(false)); From b5f09b4b8c29f56d1f8db941ec45e62c7785d19e Mon Sep 17 00:00:00 2001 From: Nikhil Bagmar Date: Thu, 21 May 2026 16:23:05 -0700 Subject: [PATCH 4/5] fix(cloudwatch-logs): make EntityConfig immutable and reorder entity rejection check - Default keyAttributes and attributes to Collections.emptyMap() - Return Collections.unmodifiableMap() from both getters - Move entity rejection metric before releaseEventHandles - Add unmodifiable map and createResources exception tests Signed-off-by: Nikhil Bagmar --- .../client/CloudWatchLogsDispatcher.java | 2 +- .../cloudwatch_logs/config/EntityConfig.java | 10 +++--- .../client/CloudWatchLogsDispatcherTest.java | 25 ++++++++++++++ .../config/EntityConfigTest.java | 33 ++++++++++++++++--- 4 files changed, 60 insertions(+), 10 deletions(-) diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index d1cfacd87f..326d32f1c1 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -175,12 +175,12 @@ public void upload() { if (putLogEventsResponse != null) { dlqObjects = getDlqObjectsFromResponse(putLogEventsResponse); } - cloudWatchLogsMetrics.increaseLogEventSuccessCounter(totalEventCount - dlqObjects.size()); if (putLogEventsResponse != null && putLogEventsResponse.rejectedEntityInfo() != null) { cloudWatchLogsMetrics.increaseEntityRejectedCounter(1); LOG.warn("Entity was rejected by CloudWatch: {}", putLogEventsResponse.rejectedEntityInfo().errorTypeAsString()); } + cloudWatchLogsMetrics.increaseLogEventSuccessCounter(totalEventCount - dlqObjects.size()); releaseEventHandles(putLogEventsResponse); } CloudWatchLogsSinkUtils.handleDlqObjects(dlqObjects, dlqPushHandler); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java index e2958c2180..c3acc756f9 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java @@ -13,23 +13,23 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotEmpty; -import java.util.HashMap; +import java.util.Collections; import java.util.Map; public class EntityConfig { @JsonProperty("key_attributes") @NotEmpty - private Map keyAttributes; + private Map keyAttributes = Collections.emptyMap(); @JsonProperty("attributes") - private Map attributes = new HashMap<>(); + private Map attributes = Collections.emptyMap(); public Map getKeyAttributes() { - return keyAttributes; + return Collections.unmodifiableMap(keyAttributes); } public Map getAttributes() { - return attributes; + return Collections.unmodifiableMap(attributes); } } diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java index 130099eee9..0fef5b031f 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcherTest.java @@ -559,6 +559,31 @@ void GIVEN_create_flag_true_AND_creation_succeeds_BUT_put_log_events_still_throw verify(mockCloudWatchLogsMetrics, never()).increaseRequestSuccessCounter(1); } + @Test + void GIVEN_resource_not_found_and_both_create_group_and_stream_throw_non_already_exists_SHOULD_not_kill_uploader() { + cloudWatchLogsDispatcher = getCloudWatchLogsDispatcherWithCreateFlag(RETRY_COUNT, true, true); + + final List eventHandles = getSampleEventHandles(); + when(mockCloudWatchLogsClient.putLogEvents(any(PutLogEventsRequest.class))) + .thenThrow(ResourceNotFoundException.builder().message("missing").build()) + .thenReturn(mock(PutLogEventsResponse.class)); + when(mockCloudWatchLogsClient.createLogGroup(any(CreateLogGroupRequest.class))) + .thenThrow(CloudWatchLogsException.builder().message("access denied").build()); + when(mockCloudWatchLogsClient.createLogStream(any(CreateLogStreamRequest.class))) + .thenThrow(CloudWatchLogsException.builder().message("access denied").build()); + + final List inputLogEventList = cloudWatchLogsDispatcher.prepareInputLogEvents(getSampleBufferedData()); + cloudWatchLogsDispatcher.dispatchLogs(inputLogEventList, eventHandles); + + executeDispatcherRunnable(); + + // Both creation calls failed but the helper swallowed both exceptions; PutLogEvents retry succeeded. + verify(mockCloudWatchLogsClient, times(1)).createLogGroup(any(CreateLogGroupRequest.class)); + verify(mockCloudWatchLogsClient, times(1)).createLogStream(any(CreateLogStreamRequest.class)); + verify(mockCloudWatchLogsMetrics, times(1)).increaseRequestSuccessCounter(1); + verify(mockCloudWatchLogsMetrics, never()).increaseRequestFailCounter(1); + } + @Test void GIVEN_entity_configured_WHEN_dispatch_logs_called_SHOULD_set_entity_on_put_log_events_request() { final Map keyAttributes = new HashMap<>(); diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java index 02b0da219f..f76d52a76a 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java @@ -29,7 +29,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; class EntityConfigTest { private static final String KEY_ATTRIBUTES_FIELD = "keyAttributes"; @@ -49,10 +49,11 @@ void setUp() { } @Test - void GIVEN_new_entity_config_WHEN_get_key_attributes_called_SHOULD_return_null() { + void GIVEN_new_entity_config_WHEN_get_key_attributes_called_SHOULD_return_empty_map() { final EntityConfig entityConfig = new EntityConfig(); - assertThat(entityConfig.getKeyAttributes(), nullValue()); + assertThat(entityConfig.getKeyAttributes(), notNullValue()); + assertThat(entityConfig.getKeyAttributes(), aMapWithSize(0)); } @Test @@ -134,7 +135,7 @@ void GIVEN_entity_config_with_empty_key_attributes_WHEN_validated_SHOULD_fail_No } @Test - void GIVEN_entity_config_with_null_key_attributes_WHEN_validated_SHOULD_fail_NotEmpty_constraint() { + void GIVEN_entity_config_with_default_key_attributes_WHEN_validated_SHOULD_fail_NotEmpty_constraint() { final EntityConfig entityConfig = new EntityConfig(); final Set> violations = validator.validate(entityConfig); @@ -143,4 +144,28 @@ void GIVEN_entity_config_with_null_key_attributes_WHEN_validated_SHOULD_fail_Not final ConstraintViolation violation = violations.iterator().next(); assertThat(violation.getPropertyPath().toString(), equalTo(KEY_ATTRIBUTES_FIELD)); } + + @Test + void GIVEN_key_attributes_set_WHEN_put_on_returned_map_SHOULD_throw_unsupported_operation() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "RemoteService"); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, KEY_ATTRIBUTES_FIELD, keyAttributes); + + assertThrows(UnsupportedOperationException.class, + () -> entityConfig.getKeyAttributes().put("new", "value")); + } + + @Test + void GIVEN_attributes_set_WHEN_put_on_returned_map_SHOULD_throw_unsupported_operation() + throws NoSuchFieldException, IllegalAccessException { + final EntityConfig entityConfig = new EntityConfig(); + final Map attributes = new HashMap<>(); + attributes.put("key", "value"); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, ATTRIBUTES_FIELD, attributes); + + assertThrows(UnsupportedOperationException.class, + () -> entityConfig.getAttributes().put("new", "value")); + } } From 73dba973366567cd3d189ad23cd86512be8156be Mon Sep 17 00:00:00 2001 From: Nikhil Bagmar Date: Tue, 26 May 2026 16:47:16 -0700 Subject: [PATCH 5/5] fix(cloudwatch-logs): add @NotNull guard, rate-limit entity rejection log, remove duplicate test - Add @NotNull on attributes field to prevent NPE from explicit null in YAML - Use NOISY marker on entity rejection WARN to rate-limit under load - Remove redundant empty-key-attributes test (default is already emptyMap) - Add test for @NotNull constraint violation on null attributes Signed-off-by: Nikhil Bagmar --- .../client/CloudWatchLogsDispatcher.java | 2 +- .../cloudwatch_logs/config/EntityConfig.java | 2 ++ .../cloudwatch_logs/config/EntityConfigTest.java | 16 +++++++++------- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java index 326d32f1c1..cabf8c3771 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/client/CloudWatchLogsDispatcher.java @@ -177,7 +177,7 @@ public void upload() { } if (putLogEventsResponse != null && putLogEventsResponse.rejectedEntityInfo() != null) { cloudWatchLogsMetrics.increaseEntityRejectedCounter(1); - LOG.warn("Entity was rejected by CloudWatch: {}", + LOG.warn(NOISY, "Entity was rejected by CloudWatch: {}", putLogEventsResponse.rejectedEntityInfo().errorTypeAsString()); } cloudWatchLogsMetrics.increaseLogEventSuccessCounter(totalEventCount - dlqObjects.size()); diff --git a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java index c3acc756f9..84d92c9e21 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java +++ b/data-prepper-plugins/cloudwatch-logs/src/main/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfig.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; import java.util.Collections; import java.util.Map; @@ -23,6 +24,7 @@ public class EntityConfig { private Map keyAttributes = Collections.emptyMap(); @JsonProperty("attributes") + @NotNull private Map attributes = Collections.emptyMap(); public Map getKeyAttributes() { diff --git a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java index f76d52a76a..1e8b41d48b 100644 --- a/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java +++ b/data-prepper-plugins/cloudwatch-logs/src/test/java/org/opensearch/dataprepper/plugins/sink/cloudwatch_logs/config/EntityConfigTest.java @@ -19,7 +19,6 @@ import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.test.helper.ReflectivelySetField; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -122,10 +121,8 @@ void GIVEN_entity_config_with_only_key_attributes_in_yaml_WHEN_deserialized_SHOU } @Test - void GIVEN_entity_config_with_empty_key_attributes_WHEN_validated_SHOULD_fail_NotEmpty_constraint() - throws NoSuchFieldException, IllegalAccessException { + void GIVEN_entity_config_with_default_key_attributes_WHEN_validated_SHOULD_fail_NotEmpty_constraint() { final EntityConfig entityConfig = new EntityConfig(); - ReflectivelySetField.setField(EntityConfig.class, entityConfig, KEY_ATTRIBUTES_FIELD, Collections.emptyMap()); final Set> violations = validator.validate(entityConfig); @@ -135,14 +132,19 @@ void GIVEN_entity_config_with_empty_key_attributes_WHEN_validated_SHOULD_fail_No } @Test - void GIVEN_entity_config_with_default_key_attributes_WHEN_validated_SHOULD_fail_NotEmpty_constraint() { + void GIVEN_null_attributes_WHEN_validated_SHOULD_fail_NotNull_constraint() + throws NoSuchFieldException, IllegalAccessException { final EntityConfig entityConfig = new EntityConfig(); + final Map keyAttributes = new HashMap<>(); + keyAttributes.put("Type", "Service"); + keyAttributes.put("Name", "my-app"); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, KEY_ATTRIBUTES_FIELD, keyAttributes); + ReflectivelySetField.setField(EntityConfig.class, entityConfig, ATTRIBUTES_FIELD, null); final Set> violations = validator.validate(entityConfig); assertThat(violations, hasSize(1)); - final ConstraintViolation violation = violations.iterator().next(); - assertThat(violation.getPropertyPath().toString(), equalTo(KEY_ATTRIBUTES_FIELD)); + assertThat(violations.iterator().next().getPropertyPath().toString(), equalTo(ATTRIBUTES_FIELD)); } @Test