From 5e5fc0ccbd077b169891522edd46e08f12ba9e18 Mon Sep 17 00:00:00 2001 From: Sotaro Hikita Date: Tue, 7 Apr 2026 06:02:50 +0900 Subject: [PATCH 1/2] Add top-level filters option for S3 source Add a top-level filters configuration to the S3 source that applies include_prefix and exclude_suffix filtering for both SQS and scan modes. Previously, key path filters were only available under scan bucket options, making it impossible to filter S3 objects when using SQS notifications. The new filters option uses the same bucket name keyed Map pattern as bucket_owners. Top-level filters and scan bucket-level filters cannot be used together, as the top-level filters are intended to eventually replace the scan bucket-level filters. Signed-off-by: Sotaro Hikita --- .../source/s3/S3ObjectFilteringHelper.java | 57 +++++++++ .../s3/S3ScanPartitionCreationSupplier.java | 7 +- .../plugins/source/s3/S3SourceConfig.java | 20 ++++ .../plugins/source/s3/ScanObjectWorker.java | 2 +- .../plugins/source/s3/SqsWorker.java | 22 +++- .../s3/S3ObjectFilteringHelperTest.java | 108 ++++++++++++++++++ .../S3ScanPartitionCreationSupplierTest.java | 2 +- .../plugins/source/s3/S3SourceConfigTest.java | 52 +++++++++ 8 files changed, 265 insertions(+), 5 deletions(-) create mode 100644 data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelper.java create mode 100644 data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelperTest.java diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelper.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelper.java new file mode 100644 index 0000000000..d2f99f0f81 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelper.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.s3; + +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; + +import java.util.List; +import java.util.Map; + +/** + * Helper class for applying top-level S3 object key filters (include_prefix and exclude_suffix). + */ +public class S3ObjectFilteringHelper { + + private final Map filters; + + public S3ObjectFilteringHelper(final Map filters) { + this.filters = filters; + } + + /** + * Returns true if the object key matches the filters for the given bucket, + * or if no filters are configured for the bucket. + */ + public boolean isKeyMatchingFilters(final String bucketName, final String objectKey) { + if (filters == null || filters.isEmpty()) { + return true; + } + + final S3ScanKeyPathOption keyPathOption = filters.get(bucketName); + if (keyPathOption == null) { + return true; + } + + final List includePrefixes = keyPathOption.getS3scanIncludePrefixOptions(); + if (includePrefixes != null && !includePrefixes.isEmpty() + && includePrefixes.stream().noneMatch(objectKey::startsWith)) { + return false; + } + + final List excludeSuffixes = keyPathOption.getS3ScanExcludeSuffixOptions(); + if (excludeSuffixes != null && !excludeSuffixes.isEmpty() + && excludeSuffixes.stream().anyMatch(objectKey::endsWith)) { + return false; + } + + return true; + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index b7745ae434..23e8c26fa8 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -56,13 +56,16 @@ public class S3ScanPartitionCreationSupplier implements Function sourceCoordinator; + private final S3ObjectFilteringHelper objectFilteringHelper; + public S3ScanPartitionCreationSupplier(final S3Client s3Client, final BucketOwnerProvider bucketOwnerProvider, final List scanOptionsList, final S3ScanSchedulingOptions schedulingOptions, final FolderPartitioningOptions folderPartitioningOptions, final boolean deleteS3ObjectsOnRead, - final SourceCoordinator sourceCoordinator) { + final SourceCoordinator sourceCoordinator, + final S3ObjectFilteringHelper objectFilteringHelper) { this.s3Client = s3Client; this.bucketOwnerProvider = bucketOwnerProvider; @@ -71,6 +74,7 @@ public S3ScanPartitionCreationSupplier(final S3Client s3Client, this.folderPartitioningOptions = folderPartitioningOptions; this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead; this.sourceCoordinator = sourceCoordinator; + this.objectFilteringHelper = objectFilteringHelper; } @Override @@ -145,6 +149,7 @@ private void createFilteredS3ObjectPartitionsForBucket(final List exclud .filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/")) .filter(keyTimestampPair -> excludeKeyPaths.stream() .noneMatch(excludeItem -> keyTimestampPair.left().endsWith(excludeItem))) + .filter(keyTimestampPair -> objectFilteringHelper.isKeyMatchingFilters(bucket, keyTimestampPair.left())) .filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime, isFirstScan)) .map(Pair::left) .map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java index 1a11788aa0..91c7817a9f 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java @@ -22,8 +22,10 @@ import org.opensearch.dataprepper.plugins.source.s3.configuration.S3SelectOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; import java.time.Duration; +import java.util.Collections; import java.util.Map; public class S3SourceConfig { @@ -104,6 +106,10 @@ public class S3SourceConfig { @JsonProperty("data_selection") private S3DataSelection dataSelection = S3DataSelection.DATA_AND_METADATA; + @JsonProperty("filters") + @Valid + private Map filters; + @AssertTrue(message = "A codec is required for reading objects.") boolean isCodecProvidedWhenNeeded() { if(s3SelectOptions == null) @@ -139,6 +145,16 @@ boolean isS3SelectNotUsingDeleteS3ObjectsOnRead() { return true; } + @AssertTrue(message = "Top-level filters cannot be used together with scan bucket-level filter. Use one or the other.") + boolean isFiltersNotUsedWithScanBucketFilter() { + if (filters != null && !filters.isEmpty() && s3ScanScanOptions != null && s3ScanScanOptions.getBuckets() != null) { + return s3ScanScanOptions.getBuckets().stream() + .noneMatch(bucket -> bucket.getS3ScanBucketOption() != null + && bucket.getS3ScanBucketOption().getS3ScanFilter() != null); + } + return true; + } + public NotificationTypeOption getNotificationType() { return notificationType; } @@ -222,4 +238,8 @@ public boolean isDeleteS3MetadataInEvent() { public S3DataSelection getDataSelection() { return dataSelection; } + + public Map getFilters() { + return filters != null ? filters : Collections.emptyMap(); + } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index c62e1a1c6e..6bf788d65a 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -168,7 +168,7 @@ public ScanObjectWorker(final S3Client s3Client, this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(); this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout(); - this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator); + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator, new S3ObjectFilteringHelper(s3SourceConfig.getFilters())); this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>(); this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>(); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 2aabff0484..311e38a392 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -67,6 +67,7 @@ public class SqsWorker implements Runnable { static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied"; static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled"; static final String SQS_RESOURCE_NOT_FOUND_METRIC_NAME = "sqsResourceNotFound"; + static final String S3_OBJECTS_FILTERED_METRIC_NAME = "s3ObjectsFiltered"; private final S3SourceConfig s3SourceConfig; private final SqsClient sqsClient; @@ -74,6 +75,7 @@ public class SqsWorker implements Runnable { private final SqsOptions sqsOptions; private final S3EventFilter objectCreatedFilter; private final S3EventFilter evenBridgeObjectCreatedFilter; + private final S3ObjectFilteringHelper objectFilteringHelper; private final Counter sqsMessagesReceivedCounter; private final Counter sqsReceiveMessagesFailedCounter; private final Counter sqsMessagesDeletedCounter; @@ -86,6 +88,7 @@ public class SqsWorker implements Runnable { private final Counter sqsMessageAccessDeniedCounter; private final Counter sqsMessageThrottledCounter; private final Counter sqsResourceNotFoundCounter; + private final Counter s3ObjectsFilteredCounter; private final Timer sqsMessageDelayTimer; private final Backoff standardBackoff; private final SqsMessageParser sqsMessageParser; @@ -110,6 +113,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager, sqsOptions = s3SourceConfig.getSqsOptions(); objectCreatedFilter = new S3ObjectCreatedFilter(); evenBridgeObjectCreatedFilter = new EventBridgeObjectCreatedFilter(); + objectFilteringHelper = new S3ObjectFilteringHelper(s3SourceConfig.getFilters()); sqsMessageParser = new SqsMessageParser(s3SourceConfig); failedAttemptCount = 0; parsedMessageVisibilityTimesMap = new HashMap<>(); @@ -126,6 +130,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager, sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME); sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME); sqsResourceNotFoundCounter = pluginMetrics.counter(SQS_RESOURCE_NOT_FOUND_METRIC_NAME); + s3ObjectsFilteredCounter = pluginMetrics.counter(S3_OBJECTS_FILTERED_METRIC_NAME); } @Override @@ -234,11 +239,11 @@ private List processS3EventNotificationRecords(f if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.S3) && !parsedMessage.isEmptyNotification() && isS3EventNameCreated(parsedMessage)) { - parsedMessagesToRead.add(parsedMessage); + addParsedMessageByFilter(parsedMessage, parsedMessagesToRead, deleteMessageBatchRequestEntryCollection); } else if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.EVENTBRIDGE) && isEventBridgeEventTypeCreated(parsedMessage)) { - parsedMessagesToRead.add(parsedMessage); + addParsedMessageByFilter(parsedMessage, parsedMessagesToRead, deleteMessageBatchRequestEntryCollection); } else { // TODO: Delete these only if on_error is configured to delete_messages. @@ -454,6 +459,19 @@ private boolean isEventBridgeEventTypeCreated(final ParsedMessage parsedMessage) return evenBridgeObjectCreatedFilter.filter(parsedMessage).isPresent(); } + private void addParsedMessageByFilter(final ParsedMessage parsedMessage, + final List parsedMessagesToRead, + final List deleteMessageBatchRequestEntries) { + if (objectFilteringHelper.isKeyMatchingFilters(parsedMessage.getBucketName(), parsedMessage.getObjectKey())) { + parsedMessagesToRead.add(parsedMessage); + } else { + LOG.debug("S3 object {} in bucket {} did not match configured filters. Deleting SQS message.", + parsedMessage.getObjectKey(), parsedMessage.getBucketName()); + s3ObjectsFilteredCounter.increment(); + deleteMessageBatchRequestEntries.add(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage())); + } + } + private S3ObjectReference populateS3Reference(final String bucketName, final String objectKey) { return S3ObjectReference .bucketAndKey(bucketName, objectKey) diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelperTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelperTest.java new file mode 100644 index 0000000000..8e4f78eec5 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelperTest.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.s3; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class S3ObjectFilteringHelperTest { + + @Test + void isKeyMatchingFilters_returns_true_when_filters_is_empty() { + final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Collections.emptyMap()); + assertThat(helper.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_filters_is_null() { + final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(null); + assertThat(helper.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_bucket_not_in_filters() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("other-bucket", keyPathOption)); + + assertThat(helper.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_key_matches_include_prefix() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null); + final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); + + assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/image.png"), equalTo(true)); + } + + @Test + void isKeyMatchingFilters_returns_false_when_key_does_not_match_include_prefix() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null); + final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); + + assertThat(helper.isKeyMatchingFilters("my-bucket", "logs/app.log"), equalTo(false)); + } + + @Test + void isKeyMatchingFilters_returns_false_when_key_matches_exclude_suffix() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(null); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml")); + final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); + + assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/photo.jpg"), equalTo(false)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_key_does_not_match_exclude_suffix() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(null); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml")); + final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); + + assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true)); + } + + @Test + void isKeyMatchingFilters_applies_both_include_prefix_and_exclude_suffix() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg")); + final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); + + assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true)); + assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/photo.jpg"), equalTo(false)); + assertThat(helper.isKeyMatchingFilters("my-bucket", "logs/app.log"), equalTo(false)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_key_matches_any_include_prefix() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/", "data/")); + final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); + + assertThat(helper.isKeyMatchingFilters("my-bucket", "data/file.csv"), equalTo(true)); + } +} diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index a49894c289..aeb3c40a39 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -86,7 +86,7 @@ void setup() { private Function, List> createObjectUnderTest() { - return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator); + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator, new S3ObjectFilteringHelper(Collections.emptyMap())); } @Test diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfigTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfigTest.java index ae42a74478..5dce6638ff 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfigTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfigTest.java @@ -11,18 +11,25 @@ import org.opensearch.dataprepper.plugins.source.s3.configuration.NotificationSourceOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.OnErrorOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOption; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOptions; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanScanOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3SelectOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection; import org.opensearch.dataprepper.test.helper.ReflectivelySetField; +import java.util.Collections; import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.s3.S3SourceConfig.DEFAULT_BUFFER_TIMEOUT; import static org.opensearch.dataprepper.plugins.source.s3.S3SourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE; @@ -139,4 +146,49 @@ void getDataSelection_returns_configured_value() throws NoSuchFieldException, Il ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "dataSelection", S3DataSelection.METADATA_ONLY); assertThat(s3SourceConfig.getDataSelection(), equalTo(S3DataSelection.METADATA_ONLY)); } + + @Test + void getFilters_returns_empty_map_by_default() { + assertThat(new S3SourceConfig().getFilters(), equalTo(Collections.emptyMap())); + } + + @Test + void isFiltersNotUsedWithScanBucketFilter_returns_true_when_filters_not_set() { + assertTrue(new S3SourceConfig().isFiltersNotUsedWithScanBucketFilter()); + } + + @Test + void isFiltersNotUsedWithScanBucketFilter_returns_false_when_both_filters_and_scan_bucket_filter_set() throws Exception { + final S3SourceConfig s3SourceConfig = new S3SourceConfig(); + + final S3ScanKeyPathOption scanFilter = mock(S3ScanKeyPathOption.class); + final S3ScanBucketOption bucketOption = mock(S3ScanBucketOption.class); + when(bucketOption.getS3ScanFilter()).thenReturn(scanFilter); + final S3ScanBucketOptions bucketOptions = mock(S3ScanBucketOptions.class); + when(bucketOptions.getS3ScanBucketOption()).thenReturn(bucketOption); + final S3ScanScanOptions scanOptions = mock(S3ScanScanOptions.class); + when(scanOptions.getBuckets()).thenReturn(List.of(bucketOptions)); + + ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "s3ScanScanOptions", scanOptions); + ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "filters", Map.of("my-bucket", mock(S3ScanKeyPathOption.class))); + + assertFalse(s3SourceConfig.isFiltersNotUsedWithScanBucketFilter()); + } + + @Test + void isFiltersNotUsedWithScanBucketFilter_returns_true_when_filters_set_and_scan_bucket_filter_not_set() throws Exception { + final S3SourceConfig s3SourceConfig = new S3SourceConfig(); + + final S3ScanBucketOption bucketOption = mock(S3ScanBucketOption.class); + when(bucketOption.getS3ScanFilter()).thenReturn(null); + final S3ScanBucketOptions bucketOptions = mock(S3ScanBucketOptions.class); + when(bucketOptions.getS3ScanBucketOption()).thenReturn(bucketOption); + final S3ScanScanOptions scanOptions = mock(S3ScanScanOptions.class); + when(scanOptions.getBuckets()).thenReturn(List.of(bucketOptions)); + + ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "s3ScanScanOptions", scanOptions); + ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "filters", Map.of("my-bucket", mock(S3ScanKeyPathOption.class))); + + assertTrue(s3SourceConfig.isFiltersNotUsedWithScanBucketFilter()); + } } From fa1a577bb674635a546dd4e42a7684a32edb4bdc Mon Sep 17 00:00:00 2001 From: Sotaro Hikita Date: Sun, 19 Apr 2026 01:37:34 +0900 Subject: [PATCH 2/2] Address review feedback for S3 source top-level filters Rename S3ObjectFilteringHelper to S3ObjectKeyFilter for clarity. Combine related test cases using @ParameterizedTest and @CsvSource. Refactor isFiltersNotUsedWithScanBucketFilter to use guard clauses. Signed-off-by: Sotaro Hikita --- ...ringHelper.java => S3ObjectKeyFilter.java} | 6 +- .../s3/S3ScanPartitionCreationSupplier.java | 4 +- .../plugins/source/s3/S3SourceConfig.java | 13 ++- .../plugins/source/s3/ScanObjectWorker.java | 2 +- .../plugins/source/s3/SqsWorker.java | 4 +- .../s3/S3ObjectFilteringHelperTest.java | 108 ------------------ .../source/s3/S3ObjectKeyFilterTest.java | 101 ++++++++++++++++ .../S3ScanPartitionCreationSupplierTest.java | 2 +- 8 files changed, 118 insertions(+), 122 deletions(-) rename data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/{S3ObjectFilteringHelper.java => S3ObjectKeyFilter.java} (87%) delete mode 100644 data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelperTest.java create mode 100644 data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilterTest.java diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelper.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilter.java similarity index 87% rename from data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelper.java rename to data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilter.java index d2f99f0f81..a9f1011e8a 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelper.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilter.java @@ -16,13 +16,13 @@ import java.util.Map; /** - * Helper class for applying top-level S3 object key filters (include_prefix and exclude_suffix). + * Filters S3 objects by key using top-level include_prefix and exclude_suffix options. */ -public class S3ObjectFilteringHelper { +public class S3ObjectKeyFilter { private final Map filters; - public S3ObjectFilteringHelper(final Map filters) { + public S3ObjectKeyFilter(final Map filters) { this.filters = filters; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index 23e8c26fa8..f3bff59a51 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -56,7 +56,7 @@ public class S3ScanPartitionCreationSupplier implements Function sourceCoordinator; - private final S3ObjectFilteringHelper objectFilteringHelper; + private final S3ObjectKeyFilter objectFilteringHelper; public S3ScanPartitionCreationSupplier(final S3Client s3Client, final BucketOwnerProvider bucketOwnerProvider, @@ -65,7 +65,7 @@ public S3ScanPartitionCreationSupplier(final S3Client s3Client, final FolderPartitioningOptions folderPartitioningOptions, final boolean deleteS3ObjectsOnRead, final SourceCoordinator sourceCoordinator, - final S3ObjectFilteringHelper objectFilteringHelper) { + final S3ObjectKeyFilter objectFilteringHelper) { this.s3Client = s3Client; this.bucketOwnerProvider = bucketOwnerProvider; diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java index 91c7817a9f..0827bbe1ef 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java @@ -147,12 +147,15 @@ boolean isS3SelectNotUsingDeleteS3ObjectsOnRead() { @AssertTrue(message = "Top-level filters cannot be used together with scan bucket-level filter. Use one or the other.") boolean isFiltersNotUsedWithScanBucketFilter() { - if (filters != null && !filters.isEmpty() && s3ScanScanOptions != null && s3ScanScanOptions.getBuckets() != null) { - return s3ScanScanOptions.getBuckets().stream() - .noneMatch(bucket -> bucket.getS3ScanBucketOption() != null - && bucket.getS3ScanBucketOption().getS3ScanFilter() != null); + if (filters == null || filters.isEmpty()) { + return true; } - return true; + if (s3ScanScanOptions == null || s3ScanScanOptions.getBuckets() == null) { + return true; + } + return s3ScanScanOptions.getBuckets().stream() + .map(bucket -> bucket.getS3ScanBucketOption()) + .noneMatch(option -> option != null && option.getS3ScanFilter() != null); } public NotificationTypeOption getNotificationType() { diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 6bf788d65a..38fab46bfb 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -168,7 +168,7 @@ public ScanObjectWorker(final S3Client s3Client, this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(); this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout(); - this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator, new S3ObjectFilteringHelper(s3SourceConfig.getFilters())); + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator, new S3ObjectKeyFilter(s3SourceConfig.getFilters())); this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>(); this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>(); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 311e38a392..674fb21e2f 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -75,7 +75,7 @@ public class SqsWorker implements Runnable { private final SqsOptions sqsOptions; private final S3EventFilter objectCreatedFilter; private final S3EventFilter evenBridgeObjectCreatedFilter; - private final S3ObjectFilteringHelper objectFilteringHelper; + private final S3ObjectKeyFilter objectFilteringHelper; private final Counter sqsMessagesReceivedCounter; private final Counter sqsReceiveMessagesFailedCounter; private final Counter sqsMessagesDeletedCounter; @@ -113,7 +113,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager, sqsOptions = s3SourceConfig.getSqsOptions(); objectCreatedFilter = new S3ObjectCreatedFilter(); evenBridgeObjectCreatedFilter = new EventBridgeObjectCreatedFilter(); - objectFilteringHelper = new S3ObjectFilteringHelper(s3SourceConfig.getFilters()); + objectFilteringHelper = new S3ObjectKeyFilter(s3SourceConfig.getFilters()); sqsMessageParser = new SqsMessageParser(s3SourceConfig); failedAttemptCount = 0; parsedMessageVisibilityTimesMap = new HashMap<>(); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelperTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelperTest.java deleted file mode 100644 index 8e4f78eec5..0000000000 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectFilteringHelperTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - * - */ - -package org.opensearch.dataprepper.plugins.source.s3; - -import org.junit.jupiter.api.Test; -import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -class S3ObjectFilteringHelperTest { - - @Test - void isKeyMatchingFilters_returns_true_when_filters_is_empty() { - final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Collections.emptyMap()); - assertThat(helper.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); - } - - @Test - void isKeyMatchingFilters_returns_true_when_filters_is_null() { - final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(null); - assertThat(helper.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); - } - - @Test - void isKeyMatchingFilters_returns_true_when_bucket_not_in_filters() { - final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); - when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); - final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("other-bucket", keyPathOption)); - - assertThat(helper.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); - } - - @Test - void isKeyMatchingFilters_returns_true_when_key_matches_include_prefix() { - final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); - when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); - when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null); - final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); - - assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/image.png"), equalTo(true)); - } - - @Test - void isKeyMatchingFilters_returns_false_when_key_does_not_match_include_prefix() { - final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); - when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); - when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null); - final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); - - assertThat(helper.isKeyMatchingFilters("my-bucket", "logs/app.log"), equalTo(false)); - } - - @Test - void isKeyMatchingFilters_returns_false_when_key_matches_exclude_suffix() { - final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); - when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(null); - when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml")); - final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); - - assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/photo.jpg"), equalTo(false)); - } - - @Test - void isKeyMatchingFilters_returns_true_when_key_does_not_match_exclude_suffix() { - final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); - when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(null); - when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml")); - final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); - - assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true)); - } - - @Test - void isKeyMatchingFilters_applies_both_include_prefix_and_exclude_suffix() { - final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); - when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); - when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg")); - final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); - - assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true)); - assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/photo.jpg"), equalTo(false)); - assertThat(helper.isKeyMatchingFilters("my-bucket", "logs/app.log"), equalTo(false)); - } - - @Test - void isKeyMatchingFilters_returns_true_when_key_matches_any_include_prefix() { - final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); - when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/", "data/")); - final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption)); - - assertThat(helper.isKeyMatchingFilters("my-bucket", "data/file.csv"), equalTo(true)); - } -} diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilterTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilterTest.java new file mode 100644 index 0000000000..118d6eced4 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilterTest.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.s3; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class S3ObjectKeyFilterTest { + + @Test + void isKeyMatchingFilters_returns_true_when_filters_is_empty() { + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Collections.emptyMap()); + assertThat(filter.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_filters_is_null() { + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(null); + assertThat(filter.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_bucket_not_in_filters() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("other-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); + } + + @ParameterizedTest + @CsvSource({ + "assets/image.png, true", + "logs/app.log, false" + }) + void isKeyMatchingFilters_with_include_prefix(final String objectKey, final boolean expected) { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", objectKey), equalTo(expected)); + } + + @ParameterizedTest + @CsvSource({ + "assets/photo.jpg, false", + "assets/data.json, true" + }) + void isKeyMatchingFilters_with_exclude_suffix(final String objectKey, final boolean expected) { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(null); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml")); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", objectKey), equalTo(expected)); + } + + @ParameterizedTest + @CsvSource({ + "assets/data.json, true", + "assets/photo.jpg, false", + "logs/app.log, false" + }) + void isKeyMatchingFilters_with_both_include_prefix_and_exclude_suffix(final String objectKey, final boolean expected) { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg")); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", objectKey), equalTo(expected)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_key_matches_any_include_prefix() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/", "data/")); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", "data/file.csv"), equalTo(true)); + } +} diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index aeb3c40a39..5665d638ae 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -86,7 +86,7 @@ void setup() { private Function, List> createObjectUnderTest() { - return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator, new S3ObjectFilteringHelper(Collections.emptyMap())); + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator, new S3ObjectKeyFilter(Collections.emptyMap())); } @Test