diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilter.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilter.java new file mode 100644 index 0000000000..a9f1011e8a --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilter.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.s3; + +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; + +import java.util.List; +import java.util.Map; + +/** + * Filters S3 objects by key using top-level include_prefix and exclude_suffix options. + */ +public class S3ObjectKeyFilter { + + private final Map filters; + + public S3ObjectKeyFilter(final Map filters) { + this.filters = filters; + } + + /** + * Returns true if the object key matches the filters for the given bucket, + * or if no filters are configured for the bucket. + */ + public boolean isKeyMatchingFilters(final String bucketName, final String objectKey) { + if (filters == null || filters.isEmpty()) { + return true; + } + + final S3ScanKeyPathOption keyPathOption = filters.get(bucketName); + if (keyPathOption == null) { + return true; + } + + final List includePrefixes = keyPathOption.getS3scanIncludePrefixOptions(); + if (includePrefixes != null && !includePrefixes.isEmpty() + && includePrefixes.stream().noneMatch(objectKey::startsWith)) { + return false; + } + + final List excludeSuffixes = keyPathOption.getS3ScanExcludeSuffixOptions(); + if (excludeSuffixes != null && !excludeSuffixes.isEmpty() + && excludeSuffixes.stream().anyMatch(objectKey::endsWith)) { + return false; + } + + return true; + } +} diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java index b7745ae434..f3bff59a51 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java @@ -56,13 +56,16 @@ public class S3ScanPartitionCreationSupplier implements Function sourceCoordinator; + private final S3ObjectKeyFilter objectFilteringHelper; + public S3ScanPartitionCreationSupplier(final S3Client s3Client, final BucketOwnerProvider bucketOwnerProvider, final List scanOptionsList, final S3ScanSchedulingOptions schedulingOptions, final FolderPartitioningOptions folderPartitioningOptions, final boolean deleteS3ObjectsOnRead, - final SourceCoordinator sourceCoordinator) { + final SourceCoordinator sourceCoordinator, + final S3ObjectKeyFilter objectFilteringHelper) { this.s3Client = s3Client; this.bucketOwnerProvider = bucketOwnerProvider; @@ -71,6 +74,7 @@ public S3ScanPartitionCreationSupplier(final S3Client s3Client, this.folderPartitioningOptions = folderPartitioningOptions; this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead; this.sourceCoordinator = sourceCoordinator; + this.objectFilteringHelper = objectFilteringHelper; } @Override @@ -145,6 +149,7 @@ private void createFilteredS3ObjectPartitionsForBucket(final List exclud .filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/")) .filter(keyTimestampPair -> excludeKeyPaths.stream() .noneMatch(excludeItem -> keyTimestampPair.left().endsWith(excludeItem))) + .filter(keyTimestampPair -> objectFilteringHelper.isKeyMatchingFilters(bucket, keyTimestampPair.left())) .filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime, isFirstScan)) .map(Pair::left) .map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java index 1a11788aa0..0827bbe1ef 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java @@ -22,8 +22,10 @@ import org.opensearch.dataprepper.plugins.source.s3.configuration.S3SelectOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; import java.time.Duration; +import java.util.Collections; import java.util.Map; public class S3SourceConfig { @@ -104,6 +106,10 @@ public class S3SourceConfig { @JsonProperty("data_selection") private S3DataSelection dataSelection = S3DataSelection.DATA_AND_METADATA; + @JsonProperty("filters") + @Valid + private Map filters; + @AssertTrue(message = "A codec is required for reading objects.") boolean isCodecProvidedWhenNeeded() { if(s3SelectOptions == null) @@ -139,6 +145,19 @@ boolean isS3SelectNotUsingDeleteS3ObjectsOnRead() { return true; } + @AssertTrue(message = "Top-level filters cannot be used together with scan bucket-level filter. Use one or the other.") + boolean isFiltersNotUsedWithScanBucketFilter() { + if (filters == null || filters.isEmpty()) { + return true; + } + if (s3ScanScanOptions == null || s3ScanScanOptions.getBuckets() == null) { + return true; + } + return s3ScanScanOptions.getBuckets().stream() + .map(bucket -> bucket.getS3ScanBucketOption()) + .noneMatch(option -> option != null && option.getS3ScanFilter() != null); + } + public NotificationTypeOption getNotificationType() { return notificationType; } @@ -222,4 +241,8 @@ public boolean isDeleteS3MetadataInEvent() { public S3DataSelection getDataSelection() { return dataSelection; } + + public Map getFilters() { + return filters != null ? filters : Collections.emptyMap(); + } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index c62e1a1c6e..38fab46bfb 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -168,7 +168,7 @@ public ScanObjectWorker(final S3Client s3Client, this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(); this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout(); - this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator); + this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator, new S3ObjectKeyFilter(s3SourceConfig.getFilters())); this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>(); this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>(); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 2aabff0484..674fb21e2f 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -67,6 +67,7 @@ public class SqsWorker implements Runnable { static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied"; static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled"; static final String SQS_RESOURCE_NOT_FOUND_METRIC_NAME = "sqsResourceNotFound"; + static final String S3_OBJECTS_FILTERED_METRIC_NAME = "s3ObjectsFiltered"; private final S3SourceConfig s3SourceConfig; private final SqsClient sqsClient; @@ -74,6 +75,7 @@ public class SqsWorker implements Runnable { private final SqsOptions sqsOptions; private final S3EventFilter objectCreatedFilter; private final S3EventFilter evenBridgeObjectCreatedFilter; + private final S3ObjectKeyFilter objectFilteringHelper; private final Counter sqsMessagesReceivedCounter; private final Counter sqsReceiveMessagesFailedCounter; private final Counter sqsMessagesDeletedCounter; @@ -86,6 +88,7 @@ public class SqsWorker implements Runnable { private final Counter sqsMessageAccessDeniedCounter; private final Counter sqsMessageThrottledCounter; private final Counter sqsResourceNotFoundCounter; + private final Counter s3ObjectsFilteredCounter; private final Timer sqsMessageDelayTimer; private final Backoff standardBackoff; private final SqsMessageParser sqsMessageParser; @@ -110,6 +113,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager, sqsOptions = s3SourceConfig.getSqsOptions(); objectCreatedFilter = new S3ObjectCreatedFilter(); evenBridgeObjectCreatedFilter = new EventBridgeObjectCreatedFilter(); + objectFilteringHelper = new S3ObjectKeyFilter(s3SourceConfig.getFilters()); sqsMessageParser = new SqsMessageParser(s3SourceConfig); failedAttemptCount = 0; parsedMessageVisibilityTimesMap = new HashMap<>(); @@ -126,6 +130,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager, sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME); sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME); sqsResourceNotFoundCounter = pluginMetrics.counter(SQS_RESOURCE_NOT_FOUND_METRIC_NAME); + s3ObjectsFilteredCounter = pluginMetrics.counter(S3_OBJECTS_FILTERED_METRIC_NAME); } @Override @@ -234,11 +239,11 @@ private List processS3EventNotificationRecords(f if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.S3) && !parsedMessage.isEmptyNotification() && isS3EventNameCreated(parsedMessage)) { - parsedMessagesToRead.add(parsedMessage); + addParsedMessageByFilter(parsedMessage, parsedMessagesToRead, deleteMessageBatchRequestEntryCollection); } else if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.EVENTBRIDGE) && isEventBridgeEventTypeCreated(parsedMessage)) { - parsedMessagesToRead.add(parsedMessage); + addParsedMessageByFilter(parsedMessage, parsedMessagesToRead, deleteMessageBatchRequestEntryCollection); } else { // TODO: Delete these only if on_error is configured to delete_messages. @@ -454,6 +459,19 @@ private boolean isEventBridgeEventTypeCreated(final ParsedMessage parsedMessage) return evenBridgeObjectCreatedFilter.filter(parsedMessage).isPresent(); } + private void addParsedMessageByFilter(final ParsedMessage parsedMessage, + final List parsedMessagesToRead, + final List deleteMessageBatchRequestEntries) { + if (objectFilteringHelper.isKeyMatchingFilters(parsedMessage.getBucketName(), parsedMessage.getObjectKey())) { + parsedMessagesToRead.add(parsedMessage); + } else { + LOG.debug("S3 object {} in bucket {} did not match configured filters. Deleting SQS message.", + parsedMessage.getObjectKey(), parsedMessage.getBucketName()); + s3ObjectsFilteredCounter.increment(); + deleteMessageBatchRequestEntries.add(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage())); + } + } + private S3ObjectReference populateS3Reference(final String bucketName, final String objectKey) { return S3ObjectReference .bucketAndKey(bucketName, objectKey) diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilterTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilterTest.java new file mode 100644 index 0000000000..118d6eced4 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectKeyFilterTest.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.s3; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class S3ObjectKeyFilterTest { + + @Test + void isKeyMatchingFilters_returns_true_when_filters_is_empty() { + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Collections.emptyMap()); + assertThat(filter.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_filters_is_null() { + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(null); + assertThat(filter.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_bucket_not_in_filters() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("other-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true)); + } + + @ParameterizedTest + @CsvSource({ + "assets/image.png, true", + "logs/app.log, false" + }) + void isKeyMatchingFilters_with_include_prefix(final String objectKey, final boolean expected) { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", objectKey), equalTo(expected)); + } + + @ParameterizedTest + @CsvSource({ + "assets/photo.jpg, false", + "assets/data.json, true" + }) + void isKeyMatchingFilters_with_exclude_suffix(final String objectKey, final boolean expected) { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(null); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml")); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", objectKey), equalTo(expected)); + } + + @ParameterizedTest + @CsvSource({ + "assets/data.json, true", + "assets/photo.jpg, false", + "logs/app.log, false" + }) + void isKeyMatchingFilters_with_both_include_prefix_and_exclude_suffix(final String objectKey, final boolean expected) { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/")); + when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg")); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", objectKey), equalTo(expected)); + } + + @Test + void isKeyMatchingFilters_returns_true_when_key_matches_any_include_prefix() { + final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class); + when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/", "data/")); + final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption)); + + assertThat(filter.isKeyMatchingFilters("my-bucket", "data/file.csv"), equalTo(true)); + } +} diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java index a49894c289..5665d638ae 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java @@ -86,7 +86,7 @@ void setup() { private Function, List> createObjectUnderTest() { - return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator); + return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator, new S3ObjectKeyFilter(Collections.emptyMap())); } @Test diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfigTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfigTest.java index ae42a74478..5dce6638ff 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfigTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfigTest.java @@ -11,18 +11,25 @@ import org.opensearch.dataprepper.plugins.source.s3.configuration.NotificationSourceOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.OnErrorOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.FolderPartitioningOptions; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOption; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanBucketOptions; +import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanScanOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3SelectOptions; import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection; import org.opensearch.dataprepper.test.helper.ReflectivelySetField; +import java.util.Collections; import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.s3.S3SourceConfig.DEFAULT_BUFFER_TIMEOUT; import static org.opensearch.dataprepper.plugins.source.s3.S3SourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE; @@ -139,4 +146,49 @@ void getDataSelection_returns_configured_value() throws NoSuchFieldException, Il ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "dataSelection", S3DataSelection.METADATA_ONLY); assertThat(s3SourceConfig.getDataSelection(), equalTo(S3DataSelection.METADATA_ONLY)); } + + @Test + void getFilters_returns_empty_map_by_default() { + assertThat(new S3SourceConfig().getFilters(), equalTo(Collections.emptyMap())); + } + + @Test + void isFiltersNotUsedWithScanBucketFilter_returns_true_when_filters_not_set() { + assertTrue(new S3SourceConfig().isFiltersNotUsedWithScanBucketFilter()); + } + + @Test + void isFiltersNotUsedWithScanBucketFilter_returns_false_when_both_filters_and_scan_bucket_filter_set() throws Exception { + final S3SourceConfig s3SourceConfig = new S3SourceConfig(); + + final S3ScanKeyPathOption scanFilter = mock(S3ScanKeyPathOption.class); + final S3ScanBucketOption bucketOption = mock(S3ScanBucketOption.class); + when(bucketOption.getS3ScanFilter()).thenReturn(scanFilter); + final S3ScanBucketOptions bucketOptions = mock(S3ScanBucketOptions.class); + when(bucketOptions.getS3ScanBucketOption()).thenReturn(bucketOption); + final S3ScanScanOptions scanOptions = mock(S3ScanScanOptions.class); + when(scanOptions.getBuckets()).thenReturn(List.of(bucketOptions)); + + ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "s3ScanScanOptions", scanOptions); + ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "filters", Map.of("my-bucket", mock(S3ScanKeyPathOption.class))); + + assertFalse(s3SourceConfig.isFiltersNotUsedWithScanBucketFilter()); + } + + @Test + void isFiltersNotUsedWithScanBucketFilter_returns_true_when_filters_set_and_scan_bucket_filter_not_set() throws Exception { + final S3SourceConfig s3SourceConfig = new S3SourceConfig(); + + final S3ScanBucketOption bucketOption = mock(S3ScanBucketOption.class); + when(bucketOption.getS3ScanFilter()).thenReturn(null); + final S3ScanBucketOptions bucketOptions = mock(S3ScanBucketOptions.class); + when(bucketOptions.getS3ScanBucketOption()).thenReturn(bucketOption); + final S3ScanScanOptions scanOptions = mock(S3ScanScanOptions.class); + when(scanOptions.getBuckets()).thenReturn(List.of(bucketOptions)); + + ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "s3ScanScanOptions", scanOptions); + ReflectivelySetField.setField(S3SourceConfig.class, s3SourceConfig, "filters", Map.of("my-bucket", mock(S3ScanKeyPathOption.class))); + + assertTrue(s3SourceConfig.isFiltersNotUsedWithScanBucketFilter()); + } }