diff --git a/.gitignore b/.gitignore index 460ea738d2..5047a5cd1a 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,9 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml # output folder created when we run test cases **/out/ +# Eclipse/IDE compiled output +**/bin/ + # Development tools .DS_Store .idea diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java index 7f94b2b6c2..6e2324e530 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/Event.java @@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.Serializable; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -146,6 +147,19 @@ public interface Event extends Serializable { */ void merge(Event other); + /** + * Merges only the specified keys from another Event into the current Event. + * Only the keys present in {@code keys} will be copied from {@code other} into this Event. + * Values from {@code other} will overwrite existing values in this Event for matching keys. + * + * @param other the other Event to merge from + * @param keys the list of keys to selectively merge + * @throws IllegalArgumentException if the input event is not compatible to merge. + * @throws UnsupportedOperationException if the current Event does not support merging. + * @since 2.15 + */ + void merge(Event other, Collection keys); + /** * Generates a serialized Json string of the entire Event * diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 13c46653f3..e9059403a2 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -32,6 +32,7 @@ import java.io.ObjectInputStream; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.HashMap; @@ -446,6 +447,26 @@ public void merge(final Event other) { ((ObjectNode) jsonNode).setAll(otherObjectNode); } + @Override + public void merge(final Event other, final Collection keys) { + if (keys == null || keys.isEmpty()) { + throw new IllegalArgumentException("Keys list must not be null or empty for selective merge."); + } + if (!(other instanceof JacksonEvent)) { + throw new IllegalArgumentException("Unable to merge the Event. The input Event must be a JacksonEvent."); + } + if (!(jsonNode instanceof ObjectNode)) { + throw new UnsupportedOperationException("Unable to merge the Event. The current Event must have object data."); + } + + for (final String key : keys) { + final Object value = other.get(key, Object.class); + if (value != null) { + put(key, value); + } + } + } + @Override public String toJsonString() { return jsonNode.toString(); diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index ba8c5ddc8c..d72c284bf2 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -682,6 +682,71 @@ void merge_overrides_existing_values() { assertThat(event.get("info/ids/id", String.class), equalTo("idx")); } + @Test + void merge_with_keys_only_copies_specified_keys() { + final String jsonString = "{\"a\": \"alpha\", \"b\": \"beta\", \"c\": \"gamma\"}"; + Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build(); + event.merge(otherEvent, List.of("a", "b")); + + assertThat(event.get("a", Object.class), equalTo("alpha")); + assertThat(event.get("b", Object.class), equalTo("beta")); + assertThat(event.containsKey("c"), equalTo(false)); + } + + @Test + void merge_with_keys_overwrites_existing_values() { + event.put("a", "original"); + final String jsonString = "{\"a\": \"updated\", \"b\": \"beta\"}"; + Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build(); + event.merge(otherEvent, List.of("a")); + + assertThat(event.get("a", Object.class), equalTo("updated")); + assertThat(event.containsKey("b"), equalTo(false)); + } + + @Test + void merge_with_keys_skips_missing_keys_in_other() { + event.put("existing", "value"); + final String jsonString = "{\"a\": \"alpha\"}"; + Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData(jsonString).build(); + event.merge(otherEvent, List.of("a", "nonexistent")); + + assertThat(event.get("a", Object.class), equalTo("alpha")); + assertThat(event.get("existing", Object.class), equalTo("value")); + assertThat(event.containsKey("nonexistent"), equalTo(false)); + } + + @Test + void merge_with_keys_throws_when_keys_list_is_null() { + Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData("{}").build(); + assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, null)); + } + + @Test + void merge_with_keys_throws_when_keys_list_is_empty() { + Event otherEvent = JacksonEvent.builder().withEventType(EventType.DOCUMENT.toString()).withData("{}").build(); + assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, List.of())); + } + + @Test + void merge_with_keys_throws_when_other_is_not_JacksonEvent() { + final Event otherEvent = mock(Event.class); + assertThrows(IllegalArgumentException.class, () -> event.merge(otherEvent, List.of("a"))); + } + + @Test + void merge_with_keys_throws_when_current_event_has_array_data() { + final JacksonEvent arrayEvent = JacksonEvent.builder() + .withEventType(EventType.DOCUMENT.toString()) + .withData("[1, 2, 3]") + .build(); + final Event otherEvent = JacksonEvent.builder() + .withEventType(EventType.DOCUMENT.toString()) + .withData("{\"a\": \"alpha\"}") + .build(); + assertThrows(UnsupportedOperationException.class, () -> arrayEvent.merge(otherEvent, List.of("a"))); + } + @ParameterizedTest @ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"}) public void testDelete_withNonexistentKey(final String key) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java index 41ba4840e9..4cd68fce05 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java @@ -347,7 +347,9 @@ void listener_gets_list_after_several_failed_attempts() { waitUntilPeerListPopulated(objectUnderTest); - assertThat(listenerEndpoints.size(), equalTo(knownIpPeers.size())); + await().atMost(5, TimeUnit.SECONDS) + .pollDelay(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> assertThat(listenerEndpoints.size(), equalTo(knownIpPeers.size()))); final Set observedIps = listenerEndpoints.stream() .map(Endpoint::ipAddr) diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java index 2fb903873b..1ae483ec42 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java @@ -18,7 +18,18 @@ public class NdjsonInputConfig { @JsonProperty("include_empty_objects") private boolean includeEmptyObjects = false; + /** + * Optional file extension used to identify enrichment source files. + * Defaults to "jsonl". + */ + @JsonProperty("extension") + private String extension = "jsonl"; + public boolean isIncludeEmptyObjects() { return includeEmptyObjects; } + + public String getExtension() { + return extension; + } } diff --git a/data-prepper-plugins/s3-enrich-processor/README.md b/data-prepper-plugins/s3-enrich-processor/README.md new file mode 100644 index 0000000000..69072f2035 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/README.md @@ -0,0 +1,111 @@ + +# S3 Enricher Processor + +This plugin enables you to merge data from a S3 file with source data from your Data Prepper pipeline. + +## Usage +```aidl +ml_merge-pipeline: +... + processor: + - s3_enrich: + # ============================================================================= + # S3 SOURCE BUCKET CONFIGURATION + # Defines where to fetch the original/source data for enrichment + # ============================================================================= + bucket: + # The S3 bucket containing the source records to enrich from + name: offlinebatch + filter: + # S3 prefix path where source files are located + # The processor will look for source files under this prefix + # Example: s3://offlinebatch/bedrockbatch/originsource/test_batch_50k.jsonl + include_prefix: bedrockbatch/originsource/ + + # ============================================================================= + # DATA FORMAT CONFIGURATION + # ============================================================================= + # Codec for parsing source S3 files + # Options: ndjson, json, csv, etc. + codec: + ndjson: + + # ============================================================================= + # AWS CONFIGURATION + # ============================================================================= + # AWS account ID that owns the S3 bucket (for cross-account access) + default_bucket_owner: 802041417063 + + aws: + # AWS region where the S3 bucket is located + region: us-east-1 + + # ============================================================================= + # S3 OBJECT SETTINGS + # ============================================================================= + # Maximum size (in MB) of S3 source files to process + # Files exceeding this limit will be skipped + s3_object_size_limit: 100mb + + # JSON path in the incoming pipeline event that contains the S3 object key + # Used to determine which source file to fetch for enrichment + # Example event: {"s3": {"bucket": "...", "key": "output/file.jsonl.out"}} + s3_key_path: "s3/key" + + # ============================================================================= + # SOURCE FILE NAME EXTRACTION + # ============================================================================= + # Regex pattern to extract the base filename from the output S3 key + # The first capture group (.*?) extracts the original source filename + # + # Example: + # Input: test_batch_50k-2025-11-06T21-19-15Z-1762463955825635000-uuid.jsonl.out + # Match: Group 1 = "test_batch_50k" + # Result: Looks for source file "test_batch_50k.jsonl" in include_prefix path + # + # Pattern breakdown: + # ^(.*?) - Capture base filename (non-greedy) + # -\d{4}-\d{2}-\d{2} - Match date: -YYYY-MM-DD + # T\d{2}-\d{2}-\d{2}Z - Match time: THH-MM-SSZ + # -.* - Match remaining (job ID, UUID, etc.) + # \.jsonl\.out$ - Match file extension + s3_object_name_pattern: ^(.*?)-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}Z-.*\.jsonl\.out$ + + # ============================================================================= + # RECORD MATCHING & ENRICHMENT + # ============================================================================= + # Field name used to correlate/match records between output and source files + # Both the pipeline event and source records must contain this field + # Records with matching correlation values will be merged + correlation_key: "recordId" + + # List of fields to copy from the source record into the pipeline event + # Only these specified fields will be merged; all other source fields are ignored + # If a field doesn't exist in source, it will be skipped + keys_to_merge: + - "field_A" + - "field_B" + - "field_C" + + # ============================================================================= + # CONDITIONAL PROCESSING + # ============================================================================= + # Data Prepper expression to conditionally apply enrichment + # Only events matching this condition will be processed by the enricher + # Events not matching will pass through unchanged + enrich_when: /s3/key != null +``` +`keys_to_merge` List of fields to copy from the source record into the pipeline event. +`s3_object_name_pattern` as Regex pattern to extract the base filename from the output S3 key. +`s3_key_path` as JSON path in the incoming pipeline event that contains the S3 object key +`correlation_key` as the Field name used to correlate/match records between output and source files + +## Metrics +- 'numberOfRecordsEnrichedSuccessFromS3': Number of pipeline records successfully enriched from S3 source +- 'numberOfRecordsEnrichedFailerFromS3': Number of pipeline records that failed enrichment from S3 source +- 's3EnricherObjectsFailed': Number of S3 source objects successfully loaded for enrichment +- 's3EnricherObjectsSucceeded': Number of S3 source objects that failed to load for enrichment + +## Developer Guide + +The integration tests for this plugin do not run as part of the Data Prepper build. diff --git a/data-prepper-plugins/s3-enrich-processor/build.gradle b/data-prepper-plugins/s3-enrich-processor/build.gradle new file mode 100644 index 0000000000..c227326938 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/build.gradle @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +dependencies { + implementation project(path: ':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:s3-common') + implementation project(':data-prepper-plugins:aws-plugin-api') + implementation 'software.amazon.awssdk:sdk-core' + implementation 'software.amazon.awssdk:sts' + implementation 'io.micrometer:micrometer-core' + implementation 'org.json:json' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' + implementation 'org.projectlombok:lombok:1.18.22' + implementation libs.parquet.common + implementation 'org.apache.httpcomponents:httpcore:4.4.16' + implementation libs.caffeine + implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final' + annotationProcessor 'org.projectlombok:lombok:1.18.20' + implementation 'software.amazon.awssdk:s3' + testImplementation project(':data-prepper-test:test-event') + testImplementation testLibs.slf4j.simple +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + + systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties' + + filter { + includeTestsMatching '*IT' + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/AwsAuthenticationAdapter.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/AwsAuthenticationAdapter.java new file mode 100644 index 0000000000..5b0aa34b77 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/AwsAuthenticationAdapter.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + +public class AwsAuthenticationAdapter { + private final AwsCredentialsSupplier awsCredentialsSupplier; + private final S3EnrichProcessorConfig s3EnrichProcessorConfig; + + + AwsAuthenticationAdapter( + final AwsCredentialsSupplier awsCredentialsSupplier, + final S3EnrichProcessorConfig s3EnricherProcessorConfig) { + this.awsCredentialsSupplier = awsCredentialsSupplier; + this.s3EnrichProcessorConfig = s3EnricherProcessorConfig; + } + + AwsCredentialsProvider getCredentialsProvider() { + final AwsAuthenticationOptions awsAuthenticationOptions = s3EnrichProcessorConfig.getAwsAuthenticationOptions(); + + final AwsCredentialsOptions options = AwsCredentialsOptions.builder() + .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()) + .withRegion(awsAuthenticationOptions.getAwsRegion()) + .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()) + .withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId()) + .build(); + + return awsCredentialsSupplier.getProvider(options); + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessor.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessor.java new file mode 100644 index 0000000000..1bea62afcb --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessor.java @@ -0,0 +1,277 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.expression.ExpressionParsingException; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.annotations.Experimental; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.processor.AbstractProcessor; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.s3.common.ownership.BucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3.common.source.S3ObjectPluginMetrics; +import org.opensearch.dataprepper.plugins.s3.common.source.S3ObjectReference; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.cache.CacheFactory; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.cache.S3EnricherCacheService; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.client.S3ClientBuilderFactory; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source.S3ObjectReferenceResolver; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source.S3ObjectWorker; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source.ownership.ConfigBucketOwnerProviderFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; + +@Experimental +@DataPrepperPlugin(name = "s3_enrich", pluginType = Processor.class, pluginConfigurationType = S3EnrichProcessorConfig.class) +public class S3EnrichProcessor extends AbstractProcessor, Record> { + private static final Logger LOG = LoggerFactory.getLogger(S3EnrichProcessor.class); + private static final long BYTES_IN_MB = 1024 * 1024; + + public static final String NUMBER_OF_RECORDS_ENRICHED_SUCCESS = "numberOfRecordsEnrichedSuccessFromS3"; + public static final String NUMBER_OF_RECORDS_ENRICHED_FAILED = "numberOfRecordsEnrichedFailerFromS3"; + + private final S3EnrichProcessorConfig s3EnrichProcessorConfig; + private final ExpressionEvaluator expressionEvaluator; + private final AwsCredentialsSupplier awsCredentialsSupplier; + private final PluginSetting codecPluginSettings; + private final PluginFactory pluginFactory; + private final InputCodec codec; + private final S3ObjectWorker s3ObjectWorker; + protected final List tagsOnFailure; + + // AWS & S3 components (thread-safe, reusable) + private final BucketOwnerProvider bucketOwnerProvider; + private final S3ObjectReferenceResolver s3ObjectReferenceResolver; + private S3ClientBuilderFactory s3ClientBuilderFactory; + + // Processing components + private final S3ObjectPluginMetrics s3EnrichObjectPluginMetrics; + private final S3EnricherCacheService cacheService; + + private final Counter numberOfRecordsSuccessCounter; + private final Counter numberOfRecordsFailedCounter; + + @DataPrepperPluginConstructor + public S3EnrichProcessor(final S3EnrichProcessorConfig s3EnrichProcessorConfig, + final PluginMetrics pluginMetrics, + final AwsCredentialsSupplier awsCredentialsSupplier, + final ExpressionEvaluator expressionEvaluator, + final PluginFactory pluginFactory ) { + super(pluginMetrics); + this.s3EnrichProcessorConfig = s3EnrichProcessorConfig; + this.expressionEvaluator = expressionEvaluator; + this.awsCredentialsSupplier = awsCredentialsSupplier; + this.pluginFactory = pluginFactory; + CacheFactory factory = new CacheFactory(s3EnrichProcessorConfig); + this.cacheService = new S3EnricherCacheService(factory); + this.tagsOnFailure = s3EnrichProcessorConfig.getTagsOnFailure(); + + final AwsAuthenticationAdapter awsAuthenticationAdapter = new AwsAuthenticationAdapter(awsCredentialsSupplier, s3EnrichProcessorConfig); + final AwsCredentialsProvider credentialsProvider = awsAuthenticationAdapter.getCredentialsProvider(); + final ConfigBucketOwnerProviderFactory configBucketOwnerProviderFactory = new ConfigBucketOwnerProviderFactory(credentialsProvider); + bucketOwnerProvider = configBucketOwnerProviderFactory.createBucketOwnerProvider(s3EnrichProcessorConfig); + final PluginModel codecConfiguration = s3EnrichProcessorConfig.getCodec(); + codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings()); + this.codec = pluginFactory.loadPlugin(InputCodec.class, codecPluginSettings); + this.s3ObjectReferenceResolver = new S3ObjectReferenceResolver(s3EnrichProcessorConfig); + + s3ClientBuilderFactory = new S3ClientBuilderFactory(s3EnrichProcessorConfig, credentialsProvider); + s3EnrichObjectPluginMetrics = new S3ObjectPluginMetrics(pluginMetrics); + this.s3ObjectWorker = new S3ObjectWorker(s3EnrichProcessorConfig, codec, bucketOwnerProvider, s3ClientBuilderFactory.getS3Client(), s3EnrichObjectPluginMetrics, cacheService); + + this.numberOfRecordsSuccessCounter = pluginMetrics.counter( + NUMBER_OF_RECORDS_ENRICHED_SUCCESS); + this.numberOfRecordsFailedCounter = pluginMetrics.counter( + NUMBER_OF_RECORDS_ENRICHED_FAILED); + + } + + @Override + public Collection> doExecute(Collection> records) { + List> resultRecords = new ArrayList<>(); + + // Process new records + String whenCondition = s3EnrichProcessorConfig.getWhenCondition(); + List> recordsToEnrich = records.stream() + .filter(record -> { + try { + boolean meetCondition = whenCondition == null || expressionEvaluator.evaluateConditional(whenCondition, record.getData()); + if (!meetCondition) { + resultRecords.add(record); + } + return meetCondition; // Include in recordsToEnrich if true + } catch (ExpressionParsingException e) { + LOG.warn("Expression parsing failed for record: {}. Error: {}", record, e.getMessage()); + resultRecords.add(record); + return false; // Skip the record on parsing failure + } catch (ClassCastException e) { + LOG.warn("Unexpected return type when evaluating condition for record: {}. Error: {}", record, e.getMessage()); + resultRecords.add(record); + return false; // Skip the record on type mismatch + } catch (Exception e) { + LOG.error("Failed to evaluate conditional expression for record: {}", record, e); + resultRecords.add(record); + return false; // Skip the record if evaluation fails + } + }) + .collect(Collectors.toList()); + + if (recordsToEnrich.isEmpty()) { + return records; + } + + try { + for (Record record : recordsToEnrich) { + try { + processRecord(record); + numberOfRecordsSuccessCounter.increment(); + } catch (Exception e) { + LOG.error(NOISY, "Error processing record", e); + addFailureTags(record); + numberOfRecordsFailedCounter.increment(); + } + resultRecords.add(record); + } + } catch (Exception e) { + LOG.error(NOISY, "Error while initializing the S3 Object Worker", e); + numberOfRecordsFailedCounter.increment(recordsToEnrich.size()); + addFailureTags(recordsToEnrich); + resultRecords.addAll(recordsToEnrich); + } + + return resultRecords; + } + + private void processRecord(Record record) { + Event event = record.getData(); + + // Resolve S3ObjectReference from the event + final S3ObjectReference s3ObjectReference = s3ObjectReferenceResolver.resolve(event); + final String s3Uri = s3ObjectReference.uri(); + + // Thread-safe load-if-absent pattern + cacheService.loadIfAbsent(s3Uri, () -> { + try { + s3ObjectWorker.processS3Object(s3ObjectReference); + } catch (Exception e) { + LOG.error("Failed to load S3 object: {}", s3Uri, e); + throw new RuntimeException("Failed to load S3 object to enrich: " + s3Uri, e); + } + }); + + // Get correlation value from the output event + String correlationKey = s3EnrichProcessorConfig.getCorrelationKeys().get(0); // only consider one key for now + String correlationValue = event.get(correlationKey, String.class); + + if (correlationValue == null || correlationValue.isBlank()) { + LOG.warn("No correlation value found for key '{}' in event", correlationKey); + throw new IllegalArgumentException("No correlation value found for key '" + correlationKey + "'"); + } + + // Lookup source event from cache + Event enrichFromEvent = cacheService.get(s3Uri, correlationValue); + if (enrichFromEvent != null) { + mergeData(event, enrichFromEvent); + LOG.debug("Successfully merged data for correlationValue: {}", correlationValue); + } else { + LOG.warn("No matching source record found for correlation value: {} in {}", correlationValue, s3Uri); + throw new RuntimeException("No matching source record found for correlation value: " + correlationValue); + } + } + + /** + * Merges specified fields from the source event into the target event + * based on the configured merge keys. + * + * @param targetEvent the event to enrich (output record) + * @param sourceEvent the event containing source data (from cache) + */ + private void mergeData(Event targetEvent, Event sourceEvent) { + List mergeKeys = s3EnrichProcessorConfig.getMergeKeys(); + + // If no merge keys specified, merge nothing from source + if (mergeKeys == null || mergeKeys.isEmpty()) { + LOG.debug("No merge keys configured, does not merge anything from source"); + throw new IllegalArgumentException("No merge keys configured"); + } + + final List keyPaths = mergeKeys.stream() + .map(EventKey::getKey) + .collect(Collectors.toList()); + + targetEvent.merge(sourceEvent, keyPaths); + } + + /** + * Add the failure tags to multiple records that aren't processed + */ + protected void addFailureTags(List> records) { + if (tagsOnFailure == null || tagsOnFailure.isEmpty()) { + return; + } + + if (records == null || records.isEmpty()) { + return; + } + + for (Record record : records) { + addFailureTags(record); + } + } + + /* + * Add the failure tags to the records that aren't processed + */ + protected void addFailureTags(Record record) { + if (tagsOnFailure == null || tagsOnFailure.isEmpty()) { + return; + } + // Add failure tags to each event + Event event = record.getData(); + EventMetadata metadata = event.getMetadata(); + if (metadata != null) { + metadata.addTags(tagsOnFailure); + } else { + LOG.warn("Event metadata is null, cannot add failure tags."); + } + } + + @Override + public void prepareForShutdown() {} + + @Override + public boolean isReadyForShutdown() { + return true; + } + + @Override + public void shutdown() { + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessorConfig.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessorConfig.java new file mode 100644 index 0000000000..965ffeb808 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessorConfig.java @@ -0,0 +1,148 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor; + +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import jakarta.validation.Valid; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Size; +import lombok.Getter; +import org.hibernate.validator.constraints.time.DurationMax; +import org.hibernate.validator.constraints.time.DurationMin; +import org.opensearch.dataprepper.aws.validator.AwsAccountId; +import org.opensearch.dataprepper.model.annotations.ExampleValues; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.constraints.ByteCountMax; +import org.opensearch.dataprepper.model.constraints.ByteCountMin; +import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.types.ByteCount; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.configuration.S3EnrichBucketOption; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.configuration.S3EnrichKeyPathOption; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@Getter +@JsonPropertyOrder +@JsonClassDescription("The s3_enricher processor enriches your data from a S3 source") +public class S3EnrichProcessorConfig { + private static final String DEFAULT_ENRICHER_SIZE_LIMIT = "100mb"; + private static final int DEFAULT_CACHE_SIZE_LIMIT = 100000; + private static final Duration DEFAULT_CACHE_TTL = Duration.ofMinutes(10); + + @JsonProperty("bucket") + @Valid + private S3EnrichBucketOption s3EnrichBucketOption; + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("disable_bucket_ownership_validation") + private boolean disableBucketOwnershipValidation = false; + + @JsonProperty("bucket_owners") + private Map bucketOwners; + + @JsonProperty("default_bucket_owner") + @AwsAccountId + private String defaultBucketOwner; + + @JsonProperty("codec") + @NotNull + private PluginModel codec; + + @JsonProperty("compression") + private CompressionOption compression = CompressionOption.NONE; + + @JsonProperty(value = "s3_object_size_limit", defaultValue = DEFAULT_ENRICHER_SIZE_LIMIT) + @ByteCountMin(DEFAULT_ENRICHER_SIZE_LIMIT) + @ByteCountMax("300mb") + private ByteCount enricherSizeLimit = ByteCount.parse(DEFAULT_ENRICHER_SIZE_LIMIT); + + @JsonProperty(value = "cache_max_count", defaultValue="200000") + @Min(0) + @Max(1000000) + private int cacheCountLimit = DEFAULT_CACHE_SIZE_LIMIT; + + @JsonProperty(value = "cache_ttl", defaultValue = "PT10M") + @DurationMin(minutes = 1) + @DurationMax(minutes = 120) + @JsonPropertyDescription("The TTL for cache entries. Accepts ISO-8601 duration format (e.g., PT10M for 10 minutes, PT1H for 1 hour).") + private Duration cacheTtl = DEFAULT_CACHE_TTL; + + @JsonPropertyDescription("defines the key that defines the s3 enricher object base name") + @JsonProperty("s3_key_path") + private String enricherKeyPath; + + @JsonPropertyDescription("defines the key ") + @JsonProperty("s3_object_name_pattern") + private String enricherNamePattern; + + @JsonPropertyDescription("defines the unique key identifier in the events from the pipeline to match the events from S3 enricher source") + @JsonProperty("correlation_keys") + @Size(min = 1, max = 1) + private List correlationKeys; + + @JsonProperty("keys_to_merge") + @JsonPropertyDescription("A list of keys of the fields to be merged.") + private List mergeKeys; + + @JsonPropertyDescription("Defines a condition for event to use this processor.") + @ExampleValues({ + @ExampleValues.Example(value = "/some_key == null", description = "The processor will only run on events where this condition evaluates to true.") + }) + @JsonProperty("enrich_when") + private String whenCondition; + + @JsonProperty("tags_on_failure") + @JsonPropertyDescription( + "A List of Strings that specifies the tags to be set in the event when ml_merge processor fails to merge " + + + "or exception occurs. This tag may be used in conditional expressions in " + + "other parts of the configuration.") + private List tagsOnFailure = Collections.emptyList(); + + /** + * Returns the file extension configured on the codec (e.g. "jsonl"). + * Reads the {@code extension} key from the codec plugin settings, falling back to {@code "jsonl"}. + */ + public String getCodecExtension() { + if (codec == null || codec.getPluginSettings() == null) { + return "jsonl"; + } + final Object ext = codec.getPluginSettings().get("extension"); + return (ext instanceof String && !((String) ext).isBlank()) ? (String) ext : "jsonl"; + } + + /** + * Safely retrieves the S3 scan include prefix from the configuration chain. + * + * @return Optional containing the prefix if present, empty Optional otherwise + */ + public Optional getS3IncludePrefix() { + return Optional.ofNullable(s3EnrichBucketOption) + .map(S3EnrichBucketOption::getS3SourceFilter) + .map(S3EnrichKeyPathOption::getS3scanIncludePrefixOption) + .filter(prefix -> !prefix.isBlank()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/CacheFactory.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/CacheFactory.java new file mode 100644 index 0000000000..ca232fe376 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/CacheFactory.java @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import lombok.Getter; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; + +public class CacheFactory { + private static final Logger LOG = LoggerFactory.getLogger(CacheFactory.class); + + private final S3EnrichProcessorConfig config; + + /** SINGLETON cache: S3 URL -> (recordId -> Event cache) + * -- GETTER -- + * Return singleton outer cache + */ + @Getter + private final Cache> s3Cache; + + public CacheFactory(S3EnrichProcessorConfig config) { + this.config = config; + this.s3Cache = buildS3Cache(); // created ONCE + } + + /** Build outer cache once */ + private Cache> buildS3Cache() { + Duration cacheTtl = config.getCacheTtl(); + int outerMaxSize = 100; + + LOG.info("Initializing singleton S3 URL Cache: maxCount={} ttl={}m", + outerMaxSize, cacheTtl.toMinutes()); + + return Caffeine.newBuilder() + .maximumSize(outerMaxSize) + .expireAfterAccess(cacheTtl) + .recordStats() + .removalListener((key, value, cause) -> + LOG.trace("[Outer S3 URL Eviction] key={} cause={}", key, cause)) + .build(); + } + + /** Inner cache builder (1 per S3 URL) */ + public Cache createEventsCache(String s3Url) { + int maxSize = config.getCacheCountLimit(); + Duration cacheTtl = config.getCacheTtl(); + + LOG.info("Creating Inner Events Cache for {}: maxSize={} ttl={}m", + s3Url, maxSize, cacheTtl.toMinutes()); + + return Caffeine.newBuilder() + .maximumSize(maxSize) + .expireAfterWrite(cacheTtl) + .recordStats() + .removalListener((recordId, event, cause) -> + LOG.trace("[Inner Event Eviction] s3Url={} recordId={} cause={}", + s3Url, recordId, cause)) + .build(); + } + +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/S3EnricherCacheService.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/S3EnricherCacheService.java new file mode 100644 index 0000000000..6fbf3b9ce7 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/S3EnricherCacheService.java @@ -0,0 +1,147 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import lombok.Getter; +import org.opensearch.dataprepper.model.event.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +public class S3EnricherCacheService { + private static final Logger LOG = LoggerFactory.getLogger(S3EnricherCacheService.class); + + @Getter + private final Cache> s3Cache; + private final CacheFactory cacheFactory; + + // Locks for coordinating S3 object loading + private final ConcurrentHashMap loadingLocks = new ConcurrentHashMap<>(); + + public S3EnricherCacheService(CacheFactory cacheFactory) { + this.cacheFactory = cacheFactory; + this.s3Cache = cacheFactory.getS3Cache(); + } + + /** + * Get or create per-S3URL record cache. + * Use this when you INTEND to populate the cache. + */ + public Cache getOrCreateRecordCache(String s3Url) { + return s3Cache.get(s3Url, key -> cacheFactory.createEventsCache(s3Url)); + } + + /** + * Get existing record cache WITHOUT creating one. + * Returns null if s3Url not in cache. + */ + public Cache getRecordCacheIfPresent(String s3Url) { + return s3Cache.getIfPresent(s3Url); + } + + /** + * Put event - creates inner cache if needed. + */ + public void put(String s3Url, String recordId, Event event) { + getOrCreateRecordCache(s3Url).put(recordId, event); + } + + /** + * Get event - returns null if s3Url or recordId not found. + * Does NOT create empty cache as side effect. + */ + public Event get(String s3Url, String recordId) { + Cache recordCache = s3Cache.getIfPresent(s3Url); + if (recordCache == null) { + return null; + } + return recordCache.getIfPresent(recordId); + } + + /** + * Check if s3Url exists in cache (without side effects). + */ + public boolean containsS3Uri(String s3Url) { + return s3Cache.getIfPresent(s3Url) != null; + } + + /** + * Thread-safe method to load S3 data if not already cached. + * Ensures only ONE thread loads a given S3 URI. + * + * @param s3Url the S3 URI to check/load + * @param loader the function to load data (called only if not cached) + */ + public void loadIfAbsent(String s3Url, Runnable loader) { + // Fast path: already cached + if (containsS3Uri(s3Url)) { + return; + } + + ReentrantLock lock = loadingLocks.computeIfAbsent(s3Url, k -> new ReentrantLock()); + + lock.lock(); + try { + // Double-check after acquiring lock + if (containsS3Uri(s3Url)) { + return; + } + loader.run(); + } finally { + lock.unlock(); + loadingLocks.remove(s3Url, lock); + } + } + + /** + * Alternative: Atomic compute pattern using Caffeine's built-in mechanism. + * Returns the record cache, loading from S3 if necessary. + */ + public Cache getOrLoadRecordCache(String s3Url, Supplier> loader) { + return s3Cache.get(s3Url, key -> { + LOG.debug("Loading S3 data for: {}", key); + return loader.get(); + }); + } + + /** + * Get count of records for a given s3Url. + */ + public long getRecordCount(String s3Url) { + Cache recordCache = s3Cache.getIfPresent(s3Url); + return recordCache != null ? recordCache.estimatedSize() : 0; + } + + /** + * Clear entire cache hierarchy. + */ + public void clearAll() { + s3Cache.asMap().values().forEach(Cache::invalidateAll); + s3Cache.invalidateAll(); + loadingLocks.clear(); + } + + /** + * Clear cache for specific s3Url. + */ + public void clearS3Uri(String s3Url) { + Cache recordCache = s3Cache.getIfPresent(s3Url); + if (recordCache != null) { + recordCache.invalidateAll(); + s3Cache.invalidate(s3Url); + } + loadingLocks.remove(s3Url); + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/client/S3ClientBuilderFactory.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/client/S3ClientBuilderFactory.java new file mode 100644 index 0000000000..8f4ddaaecc --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/client/S3ClientBuilderFactory.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.client; + +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * A Builder Factory for creating and fetching the S3Client and S3AsyncClient. + */ +public class S3ClientBuilderFactory { + private static final Logger LOG = LoggerFactory.getLogger(S3ClientBuilderFactory.class); + private final S3EnrichProcessorConfig s3SourceConfig; + private final AwsCredentialsProvider credentialsProvider; + private final S3Client s3Client; + public S3ClientBuilderFactory(final S3EnrichProcessorConfig s3SourceConfig, AwsCredentialsProvider credentialsProvider){ + this.s3SourceConfig = s3SourceConfig; + this.credentialsProvider = credentialsProvider; + this.s3Client = createS3Client(); + } + /** + * Create a S3Client Object for download the s3 Objects + * @return a S3Client Object + */ + public S3Client createS3Client() { + LOG.info("Creating S3 client"); + return S3Client.builder() + .region(s3SourceConfig.getAwsAuthenticationOptions().getAwsRegion()) + .crossRegionAccessEnabled(true) + .credentialsProvider(credentialsProvider) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build()) + .build()) + .build(); + } + /** + * get the S3Client Object + * @return a S3AClient Object + */ + public S3Client getS3Client() { + return s3Client; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/client/S3ClientFactory.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/client/S3ClientFactory.java new file mode 100644 index 0000000000..2064a4eb8a --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/client/S3ClientFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.client; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +public class S3ClientFactory { + + public static S3Client createS3Client(final S3EnrichProcessorConfig s3EnrichProcessorConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { + final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions( + s3EnrichProcessorConfig.getAwsAuthenticationOptions()); + final Region region = s3EnrichProcessorConfig.getAwsAuthenticationOptions().getAwsRegion(); + final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider( + awsCredentialsOptions); + + return S3Client.builder() + .region(region) + .credentialsProvider(awsCredentialsProvider) + .overrideConfiguration(ClientOverrideConfiguration.builder() + .retryPolicy(retryPolicy -> retryPolicy.numRetries(5).build()) + .build()) + .build(); + } + + public static AwsCredentialsOptions convertToCredentialsOptions( + final AwsAuthenticationOptions awsAuthenticationOptions) { + if (awsAuthenticationOptions == null || awsAuthenticationOptions.getAwsStsRoleArn() == null) { + return AwsCredentialsOptions.defaultOptionsWithDefaultCredentialsProvider(); + } + return AwsCredentialsOptions.builder() + .withRegion(awsAuthenticationOptions.getAwsRegion()) + .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()) + .withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId()) + .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()) + .build(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/configuration/S3EnrichBucketOption.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/configuration/S3EnrichBucketOption.java new file mode 100644 index 0000000000..efa355f049 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/configuration/S3EnrichBucketOption.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.Size; +import lombok.Getter; + +/** + * Class consists the bucket related configuration properties. + */ +@Getter +public class S3EnrichBucketOption { + + @JsonProperty("name") + @NotEmpty + @Size(min = 3, max = 500, message = "bucket length should be at least 3 characters") + private String name; + + @JsonProperty("filter") + private S3EnrichKeyPathOption s3SourceFilter; + + public S3EnrichKeyPathOption getS3SourceFilter() { + return s3SourceFilter; + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/configuration/S3EnrichKeyPathOption.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/configuration/S3EnrichKeyPathOption.java new file mode 100644 index 0000000000..efc80f4def --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/configuration/S3EnrichKeyPathOption.java @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class S3EnrichKeyPathOption { + @JsonProperty("include_prefix") + private String s3scanIncludePrefixOption; + public String getS3scanIncludePrefixOption() { + return s3scanIncludePrefixOption; + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectReferenceResolver.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectReferenceResolver.java new file mode 100644 index 0000000000..8e2adc17cf --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectReferenceResolver.java @@ -0,0 +1,112 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source; + +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.s3.common.source.S3ObjectReference; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Resolves an {@link S3ObjectReference} from a given {@link Event} + * based on the processor configuration. + */ +public class S3ObjectReferenceResolver { + private static final Logger LOG = LoggerFactory.getLogger(S3ObjectReferenceResolver.class); + + private final S3EnrichProcessorConfig config; + private final Pattern baseNamePattern; + + public S3ObjectReferenceResolver(final S3EnrichProcessorConfig config) { + this.config = config; + this.baseNamePattern = Pattern.compile(config.getEnricherNamePattern()); + } + + /** + * Resolves an S3ObjectReference from the given event. + * + * @param event the event containing the S3 key path + * @return the resolved S3ObjectReference + * @throws IllegalArgumentException if bucket or key is missing/blank + */ + public S3ObjectReference resolve(final Event event) { + final String bucket = config.getS3EnrichBucketOption().getName(); + final String s3Key = event.get(config.getEnricherKeyPath(), String.class); + + if (bucket == null || bucket.isBlank() || s3Key == null || s3Key.isBlank()) { + LOG.warn("Missing bucket or key in event, skipping enrichment"); + throw new IllegalArgumentException("Missing bucket or key in event"); + } + + final String enrichFromObjectKeyName = getEnrichFromObjectKey(s3Key); + return S3ObjectReference.bucketAndKey(bucket, enrichFromObjectKeyName).build(); + } + + /** + * Constructs the enrichment source object key from the output key. + */ + private String getEnrichFromObjectKey(final String s3Key) { + final String enrichFromFileName = extractFileName(s3Key); + if (enrichFromFileName == null) { + LOG.warn("Could not extract filename from s3Key: {}", s3Key); + throw new IllegalArgumentException("Could not extract filename from s3Key: " + s3Key); + } + + return config.getS3IncludePrefix() + .map(prefix -> prefix + enrichFromFileName) + .orElse(enrichFromFileName); + } + + private String extractFileName(final String outputKey) { + if (outputKey == null || outputKey.isBlank()) { + return null; + } + + final String fileName = extractFileNameFromS3Key(outputKey); + if (fileName == null || fileName.isBlank()) { + LOG.debug("Could not extract filename from path: {}", outputKey); + return null; + } + + if (baseNamePattern == null) { + LOG.warn("Base name pattern is not configured, returning original filename: {}", fileName); + return fileName; + } + + try { + final Matcher matcher = baseNamePattern.matcher(fileName); + if (matcher.matches() && matcher.groupCount() >= 1) { + final String baseName = matcher.group(1); + if (baseName != null && !baseName.isBlank()) { + return baseName + "." + config.getCodecExtension(); + } + } + } catch (Exception e) { + LOG.error("Error matching pattern against filename: {}", fileName, e); + return null; + } + + LOG.debug("Pattern did not match filename: {}, returning as-is", fileName); + return fileName; + } + + private String extractFileNameFromS3Key(final String s3Key) { + if (s3Key == null || s3Key.isBlank()) { + return null; + } + final String normalized = s3Key.endsWith("/") ? s3Key.substring(0, s3Key.length() - 1) : s3Key; + return normalized.substring(normalized.lastIndexOf('/') + 1); + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectWorker.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectWorker.java new file mode 100644 index 0000000000..cc257f1a9b --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectWorker.java @@ -0,0 +1,114 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source; + +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.s3.common.ownership.BucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3.common.source.S3InputFile; +import org.opensearch.dataprepper.plugins.s3.common.source.S3ObjectPluginMetrics; +import org.opensearch.dataprepper.plugins.s3.common.source.S3ObjectReference; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessor; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.cache.S3EnricherCacheService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; + +import java.io.IOException; +import java.time.Instant; +import java.util.concurrent.Callable; +import java.util.function.Consumer; + +public class S3ObjectWorker { + private static final Logger LOG = LoggerFactory.getLogger(S3EnrichProcessor.class); + private final CompressionOption compressionOption; + private final InputCodec codec; + private final BucketOwnerProvider bucketOwnerProvider; + private final S3ObjectPluginMetrics s3ObjectPluginMetrics; + private final S3Client s3Client; + private final S3EnrichProcessorConfig s3EnricherProcessorConfig; + private Instant lastModified; + private S3EnricherCacheService cacheService; + + public S3ObjectWorker(S3EnrichProcessorConfig s3EnrichProcessorConfig, InputCodec codec, BucketOwnerProvider bucketOwnerProvider, S3Client s3Client, S3ObjectPluginMetrics s3ObjectPluginMetrics, S3EnricherCacheService cacheService) { + this.compressionOption = s3EnrichProcessorConfig.getCompression(); + this.s3EnricherProcessorConfig = s3EnrichProcessorConfig; + this.codec = codec; + this.bucketOwnerProvider = bucketOwnerProvider; + this.s3Client = s3Client; + this.lastModified = Instant.now(); + this.s3ObjectPluginMetrics = s3ObjectPluginMetrics; + this.cacheService = cacheService; + } + + public void processS3Object(final S3ObjectReference s3ObjectReference) throws IOException { + try { + s3ObjectPluginMetrics.getS3ObjectReadTimer().recordCallable((Callable) () -> { + doProcessObject(s3ObjectReference); + return null; + }); + } catch (final IllegalArgumentException e) { + throw new IOException(e.getMessage()); + } catch (final IOException | RuntimeException e) { + throw e; + } catch (final Exception e) { + // doParseObject does not throw Exception, only IOException or RuntimeException. But, Callable has Exception as a checked + // exception on the interface. This catch block thus should not be reached, but, in case it is, wrap it. + throw new RuntimeException(e); + } + s3ObjectPluginMetrics.getS3ObjectsSucceededCounter().increment(); + } + + public long consumeS3Object(final S3InputFile inputFile, final Consumer> consumer) throws Exception { + final S3ObjectReference s3ObjectReference = inputFile.getObjectReference(); + + final CompressionOption fileCompressionOption = compressionOption != CompressionOption.AUTOMATIC ? + compressionOption : CompressionOption.fromFileName(s3ObjectReference.getKey()); + + codec.parse(inputFile, fileCompressionOption.getDecompressionEngine(), consumer::accept); + return inputFile.getLength(); + } + + private void doProcessObject(final S3ObjectReference s3ObjectReference) throws Exception { + final long s3ObjectSize; + LOG.info("Read S3 object: {}", s3ObjectReference); + final S3InputFile inputFile = new S3InputFile(s3Client, s3ObjectReference, bucketOwnerProvider, s3ObjectPluginMetrics); + try { + final Instant lastModifiedTime = inputFile.getLastModified(); + final Instant now = Instant.now(); + final Instant originationTime = (lastModifiedTime == null || lastModifiedTime.isAfter(now)) ? now : lastModifiedTime; + s3ObjectSize = consumeS3Object(inputFile, (record) -> { + try { + Event event = record.getData(); + + String correlationValue = event.getJsonNode().get(s3EnricherProcessorConfig.getCorrelationKeys().get(0)).asText(); + + event.getMetadata().setExternalOriginationTime(originationTime); + event.getEventHandle().setExternalOriginationTime(originationTime); + cacheService.put(s3ObjectReference.uri(), correlationValue, event); + } catch (final Exception e) { + LOG.error("Failed writing S3 objects to buffer due to: {}", e.getMessage()); + } + }); + + } catch (final Exception ex) { + s3ObjectPluginMetrics.getS3ObjectsFailedCounter().increment(); + LOG.error("Error reading from S3 object: s3ObjectReference={}. {}", s3ObjectReference, ex.getMessage()); + throw ex; + } + + s3ObjectPluginMetrics.getS3ObjectSizeSummary().record(s3ObjectSize); + } + +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/ownership/ConfigBucketOwnerProviderFactory.java b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/ownership/ConfigBucketOwnerProviderFactory.java new file mode 100644 index 0000000000..03f021ffbe --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/ownership/ConfigBucketOwnerProviderFactory.java @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source.ownership; + +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.plugins.s3.common.ownership.BucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3.common.ownership.MappedBucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3.common.ownership.NoOwnershipBucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3.common.ownership.StaticBucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3.common.source.StsArnRole; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + +/** + * Produces a {@link BucketOwnerProvider} from the S3 source configuration as + * provided in a {@link S3EnrichProcessorConfig}. + */ +public class ConfigBucketOwnerProviderFactory { + private final AwsCredentialsProvider defaultAwsCredentialsProvider; + + + public ConfigBucketOwnerProviderFactory(final AwsCredentialsProvider defaultAwsCredentialsProvider) { + this.defaultAwsCredentialsProvider = defaultAwsCredentialsProvider; + } + + /** + * Creates the {@link BucketOwnerProvider} + * @param s3EnrichProcessorConfig The input {@link S3EnrichProcessorConfig} + * @return The {@link BucketOwnerProvider} + */ + public BucketOwnerProvider createBucketOwnerProvider(final S3EnrichProcessorConfig s3EnrichProcessorConfig) { + if(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()) + return new NoOwnershipBucketOwnerProvider(); + final StaticBucketOwnerProvider staticBucketOwnerProvider = getStaticBucketOwnerProvider(s3EnrichProcessorConfig); + + if(s3EnrichProcessorConfig.getBucketOwners() != null && !s3EnrichProcessorConfig.getBucketOwners().isEmpty()) { + return new MappedBucketOwnerProvider(s3EnrichProcessorConfig.getBucketOwners(), staticBucketOwnerProvider); + } else { + return staticBucketOwnerProvider; + } + } + + private StaticBucketOwnerProvider getStaticBucketOwnerProvider(final S3EnrichProcessorConfig s3EnrichProcessorConfig) { + final String accountId; + + if(s3EnrichProcessorConfig.getDefaultBucketOwner() != null) + accountId = s3EnrichProcessorConfig.getDefaultBucketOwner(); + else if(s3EnrichProcessorConfig.getAwsAuthenticationOptions() != null && s3EnrichProcessorConfig.getAwsAuthenticationOptions().getAwsStsRoleArn() != null) + accountId = extractStsRoleArnAccountId(s3EnrichProcessorConfig); + else { + accountId = defaultAwsCredentialsProvider.resolveCredentials().accountId() + .orElseThrow(() -> new InvalidPluginConfigurationException( + "The S3 Enricher is unable to determine a bucket owner. Configure the default_bucket_owner for the account Id that owns the bucket. You may also want to configure bucket_owners if you read from S3 buckets in different accounts." + )); + } + + return new StaticBucketOwnerProvider(accountId); + } + + private String extractStsRoleArnAccountId(final S3EnrichProcessorConfig s3EnrichProcessorConfig) { + return StsArnRole.parse(s3EnrichProcessorConfig.getAwsAuthenticationOptions().getAwsStsRoleArn()) + .getAccountId(); + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessorTest.java b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessorTest.java new file mode 100644 index 0000000000..4d5d8c6b96 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessorTest.java @@ -0,0 +1,415 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Timer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.expression.ExpressionParsingException; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.event.EventMetadata; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.codec.CompressionOption; +import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.s3.common.source.S3ObjectReference; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.cache.S3EnricherCacheService; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.configuration.S3EnrichBucketOption; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source.S3ObjectReferenceResolver; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source.S3ObjectWorker; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class S3EnrichProcessorTest { + + private static final String BUCKET_NAME = "test-bucket"; + private static final String KEY_PATH = "s3_key"; + private static final String CORRELATION_KEY = "id"; + private static final String TAG_ON_FAILURE = "s3_enrich_failed"; + + @Mock + private S3EnrichProcessorConfig s3EnrichProcessorConfig; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + @Mock + private PluginFactory pluginFactory; + + @Mock + private S3EnricherCacheService cacheService; + + @Mock + private S3ObjectWorker s3ObjectWorker; + + @Mock + private S3ObjectReferenceResolver s3ObjectReferenceResolver; + + @Mock + private Counter successCounter; + + @Mock + private Counter failureCounter; + + private S3EnrichProcessor objectUnderTest; + + @BeforeEach + void setUp() throws Exception { + // --- PluginMetrics stubs --- + final Counter genericCounter = mock(Counter.class); + final Timer genericTimer = mock(Timer.class); + final DistributionSummary genericSummary = mock(DistributionSummary.class); + when(pluginMetrics.counter(anyString())).thenReturn(genericCounter); + when(pluginMetrics.timer(anyString())).thenReturn(genericTimer); + when(pluginMetrics.summary(anyString())).thenReturn(genericSummary); + // Return specific counters for the two we care about + when(pluginMetrics.counter(S3EnrichProcessor.NUMBER_OF_RECORDS_ENRICHED_SUCCESS)).thenReturn(successCounter); + when(pluginMetrics.counter(S3EnrichProcessor.NUMBER_OF_RECORDS_ENRICHED_FAILED)).thenReturn(failureCounter); + + // --- Config stubs --- + final AwsAuthenticationOptions awsAuthOptions = mock(AwsAuthenticationOptions.class); + when(awsAuthOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + when(awsAuthOptions.getAwsStsRoleArn()).thenReturn(null); + when(awsAuthOptions.getAwsStsHeaderOverrides()).thenReturn(Collections.emptyMap()); + when(awsAuthOptions.getAwsStsExternalId()).thenReturn(null); + + final S3EnrichBucketOption bucketOption = mock(S3EnrichBucketOption.class); + when(bucketOption.getName()).thenReturn(BUCKET_NAME); + when(bucketOption.getS3SourceFilter()).thenReturn(null); + + final PluginModel codec = mock(PluginModel.class); + when(codec.getPluginName()).thenReturn("newline_delimited"); + when(codec.getPluginSettings()).thenReturn(Collections.emptyMap()); + + when(s3EnrichProcessorConfig.getCacheTtl()).thenReturn(java.time.Duration.ofMinutes(10)); + when(s3EnrichProcessorConfig.getCacheCountLimit()).thenReturn(1000); + when(s3EnrichProcessorConfig.getEnricherNamePattern()).thenReturn("^(.*)_output\\.jsonl$"); + when(s3EnrichProcessorConfig.getTagsOnFailure()).thenReturn(List.of(TAG_ON_FAILURE)); + when(s3EnrichProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthOptions); + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(true); + when(s3EnrichProcessorConfig.getCodec()).thenReturn(codec); + when(s3EnrichProcessorConfig.getEnricherKeyPath()).thenReturn(KEY_PATH); + when(s3EnrichProcessorConfig.getCorrelationKeys()).thenReturn(List.of(CORRELATION_KEY)); + when(s3EnrichProcessorConfig.getS3EnrichBucketOption()).thenReturn(bucketOption); + when(s3EnrichProcessorConfig.getCompression()).thenReturn(CompressionOption.NONE); + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn(null); + when(s3EnrichProcessorConfig.getMergeKeys()).thenReturn(Collections.emptyList()); + when(s3EnrichProcessorConfig.getS3IncludePrefix()).thenReturn(java.util.Optional.empty()); + + // --- AWS credential stubs --- + final AwsCredentialsProvider credentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(any())).thenReturn(credentialsProvider); + + // --- Plugin factory stub --- + when(pluginFactory.loadPlugin(any(), any())).thenReturn(mock(org.opensearch.dataprepper.model.codec.InputCodec.class)); + + objectUnderTest = new S3EnrichProcessor( + s3EnrichProcessorConfig, + pluginMetrics, + awsCredentialsSupplier, + expressionEvaluator, + pluginFactory); + + // Inject mocked dependencies via reflection + injectField(objectUnderTest, "cacheService", cacheService); + injectField(objectUnderTest, "s3ObjectWorker", s3ObjectWorker); + injectField(objectUnderTest, "s3ObjectReferenceResolver", s3ObjectReferenceResolver); + injectField(objectUnderTest, "numberOfRecordsSuccessCounter", successCounter); + injectField(objectUnderTest, "numberOfRecordsFailedCounter", failureCounter); + } + + // ---- doExecute: when-condition filtering ---- + + @Test + void doExecute_returns_all_records_when_collection_is_empty() { + final Collection> result = objectUnderTest.doExecute(Collections.emptyList()); + + assertThat(result.size(), equalTo(0)); + } + + @Test + void doExecute_processes_all_records_when_no_when_condition_configured() throws Exception { + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn(null); + final Record record = mockRecord("s3://bucket/key", "123"); + final Event event = record.getData(); + final Event enrichEvent = mock(Event.class); + final S3ObjectReference ref = S3ObjectReference.bucketAndKey(BUCKET_NAME, "key").build(); + + when(s3ObjectReferenceResolver.resolve(event)).thenReturn(ref); + when(event.get(CORRELATION_KEY, String.class)).thenReturn("123"); + when(cacheService.get(ref.uri(), "123")).thenReturn(enrichEvent); + + final EventKey mergeKey = mock(EventKey.class); + when(mergeKey.getKey()).thenReturn("field1"); + when(s3EnrichProcessorConfig.getMergeKeys()).thenReturn(List.of(mergeKey)); + + final Collection> result = objectUnderTest.doExecute(List.of(record)); + + assertThat(result.size(), equalTo(1)); + verify(successCounter).increment(); + } + + @Test + void doExecute_passes_through_records_not_meeting_when_condition() { + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn("/some_key != null"); + final Record record = mockRecord("s3://bucket/key", "123"); + when(expressionEvaluator.evaluateConditional("/some_key != null", record.getData())).thenReturn(false); + + final Collection> result = objectUnderTest.doExecute(List.of(record)); + + assertThat(result.size(), equalTo(1)); + verify(successCounter, never()).increment(); + verify(failureCounter, never()).increment(); + } + + @Test + void doExecute_returns_record_unmodified_when_expression_parsing_fails() { + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn("/bad_expr"); + final Record record = mockRecord("s3://bucket/key", "123"); + when(expressionEvaluator.evaluateConditional(anyString(), any(Event.class))) + .thenThrow(new ExpressionParsingException("bad expression", new RuntimeException())); + + final Collection> result = objectUnderTest.doExecute(List.of(record)); + + assertThat(result.size(), equalTo(1)); + verify(s3ObjectReferenceResolver, never()).resolve(any()); + } + + @Test + void doExecute_returns_record_unmodified_when_class_cast_exception_during_condition_eval() { + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn("/some_key"); + final Record record = mockRecord("s3://bucket/key", "123"); + when(expressionEvaluator.evaluateConditional(anyString(), any(Event.class))) + .thenThrow(new ClassCastException("unexpected type")); + + final Collection> result = objectUnderTest.doExecute(List.of(record)); + + assertThat(result.size(), equalTo(1)); + verify(s3ObjectReferenceResolver, never()).resolve(any()); + } + + @Test + void doExecute_returns_record_unmodified_when_unexpected_exception_during_condition_eval() { + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn("/some_key"); + final Record record = mockRecord("s3://bucket/key", "123"); + when(expressionEvaluator.evaluateConditional(anyString(), any(Event.class))) + .thenThrow(new RuntimeException("unexpected")); + + final Collection> result = objectUnderTest.doExecute(List.of(record)); + + assertThat(result.size(), equalTo(1)); + verify(s3ObjectReferenceResolver, never()).resolve(any()); + } + + // ---- processRecord: correlation and merge ---- + + @Test + void doExecute_adds_failure_tag_and_increments_counter_when_process_record_throws() { + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn(null); + final Record record = mockRecord("s3://bucket/key", "123"); + final Event event = record.getData(); + final S3ObjectReference ref = S3ObjectReference.bucketAndKey(BUCKET_NAME, "key").build(); + when(s3ObjectReferenceResolver.resolve(event)).thenReturn(ref); + when(event.get(CORRELATION_KEY, String.class)).thenReturn(null); // triggers exception in processRecord + + final Collection> result = objectUnderTest.doExecute(List.of(record)); + + assertThat(result.size(), equalTo(1)); + verify(failureCounter).increment(); + verify(successCounter, never()).increment(); + final EventMetadata metadata = event.getMetadata(); + verify(metadata).addTags(List.of(TAG_ON_FAILURE)); + } + + @Test + void doExecute_increments_failure_counter_when_no_matching_enrich_event_in_cache() { + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn(null); + final Record record = mockRecord("s3://bucket/key", "123"); + final Event event = record.getData(); + final S3ObjectReference ref = S3ObjectReference.bucketAndKey(BUCKET_NAME, "key").build(); + when(s3ObjectReferenceResolver.resolve(event)).thenReturn(ref); + when(event.get(CORRELATION_KEY, String.class)).thenReturn("123"); + when(cacheService.get(ref.uri(), "123")).thenReturn(null); // no matching event + + final Collection> result = objectUnderTest.doExecute(List.of(record)); + + assertThat(result.size(), equalTo(1)); + verify(failureCounter).increment(); + } + + @Test + void doExecute_succeeds_and_merges_fields_when_enrich_event_found_in_cache() { + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn(null); + final Record record = mockRecord("s3://bucket/key", "123"); + final Event event = record.getData(); + final Event enrichEvent = mock(Event.class); + final S3ObjectReference ref = S3ObjectReference.bucketAndKey(BUCKET_NAME, "key").build(); + when(s3ObjectReferenceResolver.resolve(event)).thenReturn(ref); + when(event.get(CORRELATION_KEY, String.class)).thenReturn("123"); + when(cacheService.get(ref.uri(), "123")).thenReturn(enrichEvent); + + final EventKey mergeKey = mock(EventKey.class); + when(mergeKey.getKey()).thenReturn("city"); + when(s3EnrichProcessorConfig.getMergeKeys()).thenReturn(List.of(mergeKey)); + + final Collection> result = objectUnderTest.doExecute(List.of(record)); + + assertThat(result.size(), equalTo(1)); + verify(successCounter).increment(); + verify(event).merge(enrichEvent, List.of("city")); + } + + @Test + void doExecute_partial_merge_failure_logs_warning_but_does_not_increment_failure_counter() { + when(s3EnrichProcessorConfig.getWhenCondition()).thenReturn(null); + final Record record = mockRecord("s3://bucket/key", "123"); + final Event event = record.getData(); + final Event enrichEvent = mock(Event.class); + final S3ObjectReference ref = S3ObjectReference.bucketAndKey(BUCKET_NAME, "key").build(); + when(s3ObjectReferenceResolver.resolve(event)).thenReturn(ref); + when(event.get(CORRELATION_KEY, String.class)).thenReturn("123"); + when(cacheService.get(ref.uri(), "123")).thenReturn(enrichEvent); + + final EventKey goodKey = mock(EventKey.class); + when(goodKey.getKey()).thenReturn("good_field"); + final EventKey missingKey = mock(EventKey.class); + when(missingKey.getKey()).thenReturn("missing_field"); + when(s3EnrichProcessorConfig.getMergeKeys()).thenReturn(List.of(goodKey, missingKey)); + + final Collection> result = objectUnderTest.doExecute(List.of(record)); + + assertThat(result.size(), equalTo(1)); + verify(successCounter).increment(); + verify(failureCounter, never()).increment(); + verify(event).merge(enrichEvent, List.of("good_field", "missing_field")); + } + + // ---- addFailureTags ---- + + @Test + void addFailureTags_adds_configured_tags_to_event_metadata() { + final Record record = mockRecord("s3://bucket/key", "id-val"); + final Event event = record.getData(); + final EventMetadata metadata = event.getMetadata(); + + objectUnderTest.addFailureTags(record); + + verify(metadata).addTags(List.of(TAG_ON_FAILURE)); + } + + @Test + void addFailureTags_does_nothing_when_tags_on_failure_is_empty() throws Exception { + when(s3EnrichProcessorConfig.getTagsOnFailure()).thenReturn(Collections.emptyList()); + // re-inject tagsOnFailure via reflection + injectField(objectUnderTest, "tagsOnFailure", Collections.emptyList()); + final Record record = mockRecord("s3://bucket/key", "id-val"); + final Event event = record.getData(); + + objectUnderTest.addFailureTags(record); + + verify(event.getMetadata(), never()).addTags(any()); + } + + @Test + void addFailureTags_list_overload_adds_tags_to_all_records() { + final Record record1 = mockRecord("s3://bucket/key", "id-1"); + final Record record2 = mockRecord("s3://bucket/key", "id-2"); + + objectUnderTest.addFailureTags(List.of(record1, record2)); + + verify(record1.getData().getMetadata()).addTags(List.of(TAG_ON_FAILURE)); + verify(record2.getData().getMetadata()).addTags(List.of(TAG_ON_FAILURE)); + } + + @Test + void addFailureTags_list_overload_does_nothing_for_empty_list() { + objectUnderTest.addFailureTags(Collections.emptyList()); + // no exception, no interactions + } + + // ---- lifecycle ---- + + @Test + void isReadyForShutdown_returns_true() { + assertThat(objectUnderTest.isReadyForShutdown(), equalTo(true)); + } + + @Test + void prepareForShutdown_does_not_throw() { + objectUnderTest.prepareForShutdown(); + } + + @Test + void shutdown_does_not_throw() { + objectUnderTest.shutdown(); + } + + // ---- helpers ---- + + private Record mockRecord(final String s3Uri, final String eventId) { + final Event event = mock(Event.class); + final EventMetadata metadata = mock(EventMetadata.class); + when(event.getMetadata()).thenReturn(metadata); + @SuppressWarnings("unchecked") + final Record record = mock(Record.class); + when(record.getData()).thenReturn(event); + return record; + } + + private static void injectField(final Object target, final String fieldName, final Object value) throws Exception { + Class clazz = target.getClass(); + while (clazz != null) { + try { + final Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + return; + } catch (final NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + throw new NoSuchFieldException("Field '" + fieldName + "' not found in class hierarchy of " + target.getClass().getName()); + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/CacheFactoryTest.java b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/CacheFactoryTest.java new file mode 100644 index 0000000000..11d3aa6e33 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/CacheFactoryTest.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig; + +import java.time.Duration; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class CacheFactoryTest { + + @Mock + private S3EnrichProcessorConfig config; + + @BeforeEach + void setUp() { + when(config.getCacheTtl()).thenReturn(Duration.ofMinutes(10)); + when(config.getCacheCountLimit()).thenReturn(1000); + } + + private CacheFactory createObjectUnderTest() { + return new CacheFactory(config); + } + + @Test + void constructor_builds_s3Cache_on_initialization() { + final CacheFactory objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.getS3Cache(), notNullValue()); + } + + @Test + void getS3Cache_returns_same_instance_across_calls() { + final CacheFactory objectUnderTest = createObjectUnderTest(); + + final Cache> first = objectUnderTest.getS3Cache(); + final Cache> second = objectUnderTest.getS3Cache(); + + assertThat(first, notNullValue()); + assertThat(first == second, org.hamcrest.CoreMatchers.equalTo(true)); + } + + @Test + void createEventsCache_returns_new_cache_per_call() { + final CacheFactory objectUnderTest = createObjectUnderTest(); + + final Cache first = objectUnderTest.createEventsCache("s3://bucket/key1"); + final Cache second = objectUnderTest.createEventsCache("s3://bucket/key2"); + + assertThat(first, notNullValue()); + assertThat(second, notNullValue()); + assertThat(first != second, org.hamcrest.CoreMatchers.equalTo(true)); + } + + @Test + void createEventsCache_accepts_configured_max_size() { + when(config.getCacheCountLimit()).thenReturn(500); + final CacheFactory objectUnderTest = createObjectUnderTest(); + + final Cache cache = objectUnderTest.createEventsCache("s3://bucket/key"); + + assertThat(cache, notNullValue()); + } + + @Test + void createEventsCache_with_short_ttl_still_creates_valid_cache() { + when(config.getCacheTtl()).thenReturn(Duration.ofSeconds(1)); + final CacheFactory objectUnderTest = createObjectUnderTest(); + + final Cache cache = objectUnderTest.createEventsCache("s3://bucket/key"); + + assertThat(cache, notNullValue()); + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/S3EnricherCacheServiceTest.java b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/S3EnricherCacheServiceTest.java new file mode 100644 index 0000000000..6206bdd4e2 --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/S3EnricherCacheServiceTest.java @@ -0,0 +1,267 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.event.Event; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class S3EnricherCacheServiceTest { + + @Mock + private CacheFactory cacheFactory; + + private S3EnricherCacheService objectUnderTest; + + @BeforeEach + void setUp() { + final Cache> outerCache = Caffeine.newBuilder() + .maximumSize(100) + .expireAfterAccess(Duration.ofMinutes(10)) + .build(); + when(cacheFactory.getS3Cache()).thenReturn(outerCache); + objectUnderTest = new S3EnricherCacheService(cacheFactory); + } + + @Test + void getOrCreateRecordCache_creates_new_cache_for_unknown_s3Url() { + final String s3Url = "s3://bucket/key"; + final Cache newCache = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url)).thenReturn(newCache); + + final Cache result = objectUnderTest.getOrCreateRecordCache(s3Url); + + assertThat(result, notNullValue()); + verify(cacheFactory).createEventsCache(s3Url); + } + + @Test + void getOrCreateRecordCache_returns_existing_cache_for_known_s3Url() { + final String s3Url = "s3://bucket/key"; + final Cache newCache = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url)).thenReturn(newCache); + + objectUnderTest.getOrCreateRecordCache(s3Url); + final Cache result = objectUnderTest.getOrCreateRecordCache(s3Url); + + assertThat(result, notNullValue()); + verify(cacheFactory, times(1)).createEventsCache(s3Url); // only created once + } + + @Test + void getRecordCacheIfPresent_returns_null_for_unknown_s3Url() { + final Cache result = objectUnderTest.getRecordCacheIfPresent("s3://bucket/unknown"); + + assertThat(result, nullValue()); + } + + @Test + void getRecordCacheIfPresent_returns_cache_for_known_s3Url() { + final String s3Url = "s3://bucket/key"; + final Cache newCache = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url)).thenReturn(newCache); + objectUnderTest.getOrCreateRecordCache(s3Url); + + final Cache result = objectUnderTest.getRecordCacheIfPresent(s3Url); + + assertThat(result, notNullValue()); + } + + @Test + void put_and_get_returns_stored_event() { + final String s3Url = "s3://bucket/key"; + final String recordId = UUID.randomUUID().toString(); + final Event event = mock(Event.class); + final Cache innerCache = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache); + + objectUnderTest.put(s3Url, recordId, event); + final Event result = objectUnderTest.get(s3Url, recordId); + + assertThat(result, equalTo(event)); + } + + @Test + void get_returns_null_for_unknown_s3Url() { + final Event result = objectUnderTest.get("s3://bucket/unknown", "record-id"); + + assertThat(result, nullValue()); + } + + @Test + void get_returns_null_for_unknown_recordId() { + final String s3Url = "s3://bucket/key"; + final Cache innerCache = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache); + objectUnderTest.put(s3Url, "record-id-1", mock(Event.class)); + + final Event result = objectUnderTest.get(s3Url, "record-id-unknown"); + + assertThat(result, nullValue()); + } + + @Test + void containsS3Uri_returns_false_for_unknown_uri() { + assertThat(objectUnderTest.containsS3Uri("s3://bucket/unknown"), equalTo(false)); + } + + @Test + void containsS3Uri_returns_true_after_put() { + final String s3Url = "s3://bucket/key"; + final Cache innerCache = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache); + objectUnderTest.put(s3Url, "record-id", mock(Event.class)); + + assertThat(objectUnderTest.containsS3Uri(s3Url), equalTo(true)); + } + + @Test + void loadIfAbsent_calls_loader_when_not_cached() { + final String s3Url = "s3://bucket/key"; + final AtomicInteger callCount = new AtomicInteger(0); + + objectUnderTest.loadIfAbsent(s3Url, callCount::incrementAndGet); + + assertThat(callCount.get(), equalTo(1)); + } + + @Test + void loadIfAbsent_does_not_call_loader_when_already_cached() { + final String s3Url = "s3://bucket/key"; + final Cache innerCache = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache); + objectUnderTest.put(s3Url, "record-id", mock(Event.class)); + final AtomicInteger callCount = new AtomicInteger(0); + + objectUnderTest.loadIfAbsent(s3Url, callCount::incrementAndGet); + + assertThat(callCount.get(), equalTo(0)); + } + + @Test + void loadIfAbsent_loader_is_called_exactly_once_even_if_called_multiple_times_for_same_uri() { + final String s3Url = "s3://bucket/key"; + final Cache innerCache = Caffeine.newBuilder().maximumSize(100).build(); + final AtomicInteger callCount = new AtomicInteger(0); + when(cacheFactory.createEventsCache(anyString())).thenReturn(innerCache); + + // First call - not cached, so loader runs + objectUnderTest.loadIfAbsent(s3Url, () -> { + callCount.incrementAndGet(); + objectUnderTest.put(s3Url, "record-id", mock(Event.class)); // populate cache in loader + }); + + // Second call - now cached, loader should NOT run + objectUnderTest.loadIfAbsent(s3Url, callCount::incrementAndGet); + + assertThat(callCount.get(), equalTo(1)); + } + + @Test + void getOrLoadRecordCache_calls_loader_when_not_cached() { + final String s3Url = "s3://bucket/key"; + final Cache innerCache = Caffeine.newBuilder().maximumSize(100).build(); + + final Cache result = objectUnderTest.getOrLoadRecordCache(s3Url, () -> innerCache); + + assertThat(result, equalTo(innerCache)); + } + + @Test + void getOrLoadRecordCache_returns_existing_cache_when_already_cached() { + final String s3Url = "s3://bucket/key"; + final Cache firstCache = Caffeine.newBuilder().maximumSize(100).build(); + final Cache secondCache = Caffeine.newBuilder().maximumSize(100).build(); + + objectUnderTest.getOrLoadRecordCache(s3Url, () -> firstCache); + final Cache result = objectUnderTest.getOrLoadRecordCache(s3Url, () -> secondCache); + + assertThat(result, equalTo(firstCache)); + } + + @Test + void getRecordCount_returns_zero_for_unknown_s3Url() { + assertThat(objectUnderTest.getRecordCount("s3://bucket/unknown"), equalTo(0L)); + } + + @Test + void getRecordCount_returns_count_of_stored_records() { + final String s3Url = "s3://bucket/key"; + final Cache innerCache = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache); + objectUnderTest.put(s3Url, "record-1", mock(Event.class)); + objectUnderTest.put(s3Url, "record-2", mock(Event.class)); + + assertThat(objectUnderTest.getRecordCount(s3Url), equalTo(2L)); + } + + @Test + void clearAll_removes_all_cached_entries() { + final String s3Url1 = "s3://bucket/key1"; + final String s3Url2 = "s3://bucket/key2"; + final Cache innerCache1 = Caffeine.newBuilder().maximumSize(100).build(); + final Cache innerCache2 = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url1)).thenReturn(innerCache1); + when(cacheFactory.createEventsCache(s3Url2)).thenReturn(innerCache2); + + objectUnderTest.put(s3Url1, "record-1", mock(Event.class)); + objectUnderTest.put(s3Url2, "record-2", mock(Event.class)); + + objectUnderTest.clearAll(); + + assertThat(objectUnderTest.containsS3Uri(s3Url1), equalTo(false)); + assertThat(objectUnderTest.containsS3Uri(s3Url2), equalTo(false)); + } + + @Test + void clearS3Uri_removes_only_specified_s3Url() { + final String s3Url1 = "s3://bucket/key1"; + final String s3Url2 = "s3://bucket/key2"; + final Cache innerCache1 = Caffeine.newBuilder().maximumSize(100).build(); + final Cache innerCache2 = Caffeine.newBuilder().maximumSize(100).build(); + when(cacheFactory.createEventsCache(s3Url1)).thenReturn(innerCache1); + when(cacheFactory.createEventsCache(s3Url2)).thenReturn(innerCache2); + + objectUnderTest.put(s3Url1, "record-1", mock(Event.class)); + objectUnderTest.put(s3Url2, "record-2", mock(Event.class)); + + objectUnderTest.clearS3Uri(s3Url1); + + assertThat(objectUnderTest.containsS3Uri(s3Url1), equalTo(false)); + assertThat(objectUnderTest.containsS3Uri(s3Url2), equalTo(true)); + } + + @Test + void clearS3Uri_does_not_throw_for_unknown_uri() { + objectUnderTest.clearS3Uri("s3://bucket/nonexistent"); + // no exception expected + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectReferenceResolverTest.java b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectReferenceResolverTest.java new file mode 100644 index 0000000000..67ce300f9f --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectReferenceResolverTest.java @@ -0,0 +1,183 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.s3.common.source.S3ObjectReference; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.configuration.S3EnrichBucketOption; + +import java.util.Optional; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class S3ObjectReferenceResolverTest { + + private static final String NAME_PATTERN = "^(.*)_output\\.jsonl$"; + + private String bucketName; + private String keyPath; + + @Mock + private S3EnrichProcessorConfig config; + + @Mock + private S3EnrichBucketOption bucketOption; + + @Mock + private Event event; + + @BeforeEach + void setUp() { + bucketName = UUID.randomUUID().toString(); + keyPath = UUID.randomUUID().toString(); + + when(config.getEnricherNamePattern()).thenReturn(NAME_PATTERN); + when(config.getS3EnrichBucketOption()).thenReturn(bucketOption); + when(config.getEnricherKeyPath()).thenReturn(keyPath); + when(bucketOption.getName()).thenReturn(bucketName); + } + + private S3ObjectReferenceResolver createObjectUnderTest() { + return new S3ObjectReferenceResolver(config); + } + + @Test + void resolve_returns_S3ObjectReference_with_correct_bucket() { + when(config.getS3IncludePrefix()).thenReturn(Optional.empty()); + when(config.getCodecExtension()).thenReturn("jsonl"); + when(event.get(keyPath, String.class)).thenReturn("data/reports/daily_output.jsonl"); + + final S3ObjectReference result = createObjectUnderTest().resolve(event); + + assertThat(result, notNullValue()); + assertThat(result.getBucketName(), equalTo(bucketName)); + } + + @Test + void resolve_extracts_base_name_and_appends_jsonl_extension() { + when(config.getS3IncludePrefix()).thenReturn(Optional.empty()); + when(config.getCodecExtension()).thenReturn("jsonl"); + when(event.get(keyPath, String.class)).thenReturn("path/to/daily_output.jsonl"); + + final S3ObjectReference result = createObjectUnderTest().resolve(event); + + assertThat(result.getKey(), equalTo("daily.jsonl")); + } + + @Test + void resolve_prepends_prefix_when_s3IncludePrefix_is_configured() { + when(config.getS3IncludePrefix()).thenReturn(Optional.of("enrichment/")); + when(config.getCodecExtension()).thenReturn("jsonl"); + when(event.get(keyPath, String.class)).thenReturn("data/daily_output.jsonl"); + + final S3ObjectReference result = createObjectUnderTest().resolve(event); + + assertThat(result.getKey(), equalTo("enrichment/daily.jsonl")); + } + + @Test + void resolve_uses_filename_as_is_when_pattern_does_not_match() { + when(config.getS3IncludePrefix()).thenReturn(Optional.empty()); + when(event.get(keyPath, String.class)).thenReturn("path/to/some_file.parquet"); + + final S3ObjectReference result = createObjectUnderTest().resolve(event); + + assertThat(result.getKey(), equalTo("some_file.parquet")); + } + + @Test + void resolve_produces_correct_uri() { + when(config.getS3IncludePrefix()).thenReturn(Optional.empty()); + when(config.getCodecExtension()).thenReturn("jsonl"); + when(event.get(keyPath, String.class)).thenReturn("data/report_output.jsonl"); + + final S3ObjectReference result = createObjectUnderTest().resolve(event); + + assertThat(result.uri(), equalTo("s3://" + bucketName + "/" + result.getKey())); + } + + @Test + void resolve_throws_when_s3_key_is_null() { + when(bucketOption.getName()).thenReturn(bucketName); + when(event.get(keyPath, String.class)).thenReturn(null); + + assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest().resolve(event)); + } + + @Test + void resolve_throws_when_s3_key_is_blank() { + when(bucketOption.getName()).thenReturn(bucketName); + when(event.get(keyPath, String.class)).thenReturn(" "); + + assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest().resolve(event)); + } + + @Test + void resolve_throws_when_bucket_name_is_blank() { + when(bucketOption.getName()).thenReturn(""); + when(event.get(keyPath, String.class)).thenReturn("path/to/file_output.jsonl"); + + assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest().resolve(event)); + } + @Test + void resolve_handles_s3_key_at_root_level_without_path_separator() { + when(config.getS3IncludePrefix()).thenReturn(Optional.empty()); + when(config.getCodecExtension()).thenReturn("jsonl"); + when(event.get(keyPath, String.class)).thenReturn("myfile_output.jsonl"); + + final S3ObjectReference result = createObjectUnderTest().resolve(event); + + assertThat(result.getKey(), equalTo("myfile.jsonl")); + } + + @ParameterizedTest + @ValueSource(strings = { + "data/subdir/report_output.jsonl", + "report_output.jsonl", + "a/b/c/d/report_output.jsonl" + }) + void resolve_extracts_filename_correctly_from_various_path_depths(final String s3Key) { + when(config.getS3IncludePrefix()).thenReturn(Optional.empty()); + when(config.getCodecExtension()).thenReturn("jsonl"); + when(event.get(keyPath, String.class)).thenReturn(s3Key); + + final S3ObjectReference result = createObjectUnderTest().resolve(event); + + assertThat(result, notNullValue()); + assertThat(result.getBucketName(), equalTo(bucketName)); + } + + @Test + void resolve_handles_s3_key_with_trailing_slash_by_stripping_it() { + when(config.getS3IncludePrefix()).thenReturn(Optional.empty()); + when(event.get(keyPath, String.class)).thenReturn("path/to/dir/"); + + // trailing slash is stripped; "dir" is extracted as filename; pattern doesn't match so returned as-is + final S3ObjectReference result = createObjectUnderTest().resolve(event); + + assertThat(result, notNullValue()); + assertThat(result.getKey(), equalTo("dir")); + } +} diff --git a/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/ownership/ConfigBucketOwnerProviderFactoryTest.java b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/ownership/ConfigBucketOwnerProviderFactoryTest.java new file mode 100644 index 0000000000..d7b7522c8c --- /dev/null +++ b/data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/ownership/ConfigBucketOwnerProviderFactoryTest.java @@ -0,0 +1,189 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.s3_enrich.processor.s3source.ownership; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.s3.common.ownership.BucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3.common.ownership.MappedBucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3.common.ownership.NoOwnershipBucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3.common.ownership.StaticBucketOwnerProvider; +import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; + +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ConfigBucketOwnerProviderFactoryTest { + + @Mock + private S3EnrichProcessorConfig s3EnrichProcessorConfig; + + @Mock + private AwsCredentialsProvider defaultAwsCredentialsProvider; + + @Mock + private AwsCredentials awsCredentials; + + private String accountId; + + @BeforeEach + void setUp() { + accountId = String.valueOf(100000000000L + new Random().nextInt(900000000)); + } + + private ConfigBucketOwnerProviderFactory createObjectUnderTest() { + return new ConfigBucketOwnerProviderFactory(defaultAwsCredentialsProvider); + } + + @Test + void createBucketOwnerProvider_returns_NoOwnershipBucketOwnerProvider_when_validation_disabled() { + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(true); + + final BucketOwnerProvider result = createObjectUnderTest().createBucketOwnerProvider(s3EnrichProcessorConfig); + + assertThat(result, instanceOf(NoOwnershipBucketOwnerProvider.class)); + } + + @Test + void createBucketOwnerProvider_returns_StaticBucketOwnerProvider_for_defaultBucketOwner() { + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(false); + when(s3EnrichProcessorConfig.getDefaultBucketOwner()).thenReturn(accountId); + + final BucketOwnerProvider result = createObjectUnderTest().createBucketOwnerProvider(s3EnrichProcessorConfig); + + assertThat(result, instanceOf(StaticBucketOwnerProvider.class)); + } + + @Test + void createBucketOwnerProvider_returns_correct_owner_for_defaultBucketOwner() { + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(false); + when(s3EnrichProcessorConfig.getDefaultBucketOwner()).thenReturn(accountId); + + final BucketOwnerProvider result = createObjectUnderTest().createBucketOwnerProvider(s3EnrichProcessorConfig); + + final Optional owner = result.getBucketOwner(UUID.randomUUID().toString()); + assertThat(owner.isPresent(), equalTo(true)); + assertThat(owner.get(), equalTo(accountId)); + } + + @Test + void createBucketOwnerProvider_returns_MappedBucketOwnerProvider_when_bucketOwners_are_defined() { + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(false); + when(s3EnrichProcessorConfig.getDefaultBucketOwner()).thenReturn(accountId); + when(s3EnrichProcessorConfig.getBucketOwners()).thenReturn( + Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString())); + + final BucketOwnerProvider result = createObjectUnderTest().createBucketOwnerProvider(s3EnrichProcessorConfig); + + assertThat(result, instanceOf(MappedBucketOwnerProvider.class)); + } + + @Test + void createBucketOwnerProvider_returns_correct_mapped_owner_for_specific_bucket() { + final String specificBucket = UUID.randomUUID().toString(); + final String specificOwner = String.valueOf(100000000000L + new Random().nextInt(900000000)); + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(false); + when(s3EnrichProcessorConfig.getDefaultBucketOwner()).thenReturn(accountId); + when(s3EnrichProcessorConfig.getBucketOwners()).thenReturn(Map.of(specificBucket, specificOwner)); + + final BucketOwnerProvider result = createObjectUnderTest().createBucketOwnerProvider(s3EnrichProcessorConfig); + + final Optional owner = result.getBucketOwner(specificBucket); + assertThat(owner.isPresent(), equalTo(true)); + assertThat(owner.get(), equalTo(specificOwner)); + } + + @Test + void createBucketOwnerProvider_falls_back_to_default_when_bucket_not_in_map() { + final String unmappedBucket = UUID.randomUUID().toString(); + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(false); + when(s3EnrichProcessorConfig.getDefaultBucketOwner()).thenReturn(accountId); + when(s3EnrichProcessorConfig.getBucketOwners()).thenReturn( + Map.of(UUID.randomUUID().toString(), String.valueOf(100000000000L + new Random().nextInt(900000000)))); + + final BucketOwnerProvider result = createObjectUnderTest().createBucketOwnerProvider(s3EnrichProcessorConfig); + + final Optional owner = result.getBucketOwner(unmappedBucket); + assertThat(owner.isPresent(), equalTo(true)); + assertThat(owner.get(), equalTo(accountId)); + } + + @Test + void createBucketOwnerProvider_extracts_account_from_sts_role_arn_when_no_defaultBucketOwner() { + final String stsRoleArn = String.format("arn:aws:iam::%s:role/SomeRole", accountId); + final AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); + + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(false); + when(s3EnrichProcessorConfig.getDefaultBucketOwner()).thenReturn(null); + when(s3EnrichProcessorConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + + final BucketOwnerProvider result = createObjectUnderTest().createBucketOwnerProvider(s3EnrichProcessorConfig); + + assertThat(result, notNullValue()); + final Optional owner = result.getBucketOwner(UUID.randomUUID().toString()); + assertThat(owner.isPresent(), equalTo(true)); + assertThat(owner.get(), equalTo(accountId)); + } + + @Test + void createBucketOwnerProvider_uses_default_credentials_when_no_defaultBucketOwner_and_no_stsRoleArn() { + when(defaultAwsCredentialsProvider.resolveCredentials()).thenReturn(awsCredentials); + when(awsCredentials.accountId()).thenReturn(Optional.of(accountId)); + + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(false); + when(s3EnrichProcessorConfig.getDefaultBucketOwner()).thenReturn(null); + when(s3EnrichProcessorConfig.getAwsAuthenticationOptions()).thenReturn(null); + + final BucketOwnerProvider result = createObjectUnderTest().createBucketOwnerProvider(s3EnrichProcessorConfig); + + assertThat(result, notNullValue()); + final Optional owner = result.getBucketOwner(UUID.randomUUID().toString()); + assertThat(owner.isPresent(), equalTo(true)); + assertThat(owner.get(), equalTo(accountId)); + } + + @Test + void createBucketOwnerProvider_throws_InvalidPluginConfigurationException_when_ownership_cannot_be_determined() { + when(defaultAwsCredentialsProvider.resolveCredentials()).thenReturn(awsCredentials); + when(awsCredentials.accountId()).thenReturn(Optional.empty()); + + when(s3EnrichProcessorConfig.isDisableBucketOwnershipValidation()).thenReturn(false); + when(s3EnrichProcessorConfig.getDefaultBucketOwner()).thenReturn(null); + when(s3EnrichProcessorConfig.getAwsAuthenticationOptions()).thenReturn(null); + + final ConfigBucketOwnerProviderFactory objectUnderTest = createObjectUnderTest(); + final InvalidPluginConfigurationException thrown = assertThrows( + InvalidPluginConfigurationException.class, + () -> objectUnderTest.createBucketOwnerProvider(s3EnrichProcessorConfig)); + + assertThat(thrown.getMessage(), containsString("default_bucket_owner")); + } +} diff --git a/settings.gradle b/settings.gradle index 3824f3c9fe..a5eaa04bf2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -198,6 +198,7 @@ include 'data-prepper-plugins:http-common' include 'data-prepper-plugins:aws-lambda' include 'data-prepper-plugins:ml-inference-processor' include 'data-prepper-plugins:s3-common' +include 'data-prepper-plugins:s3-enrich-processor' //include 'data-prepper-plugins:dummy-plugin' include 'data-prepper-plugin-schema' include 'data-prepper-plugins:kinesis-source'