Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.s3;

import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;

import java.util.List;
import java.util.Map;

/**
* Filters S3 objects by key using top-level include_prefix and exclude_suffix options.
*/
public class S3ObjectKeyFilter {

private final Map<String, S3ScanKeyPathOption> filters;

public S3ObjectKeyFilter(final Map<String, S3ScanKeyPathOption> filters) {
this.filters = filters;
}

/**
* Returns true if the object key matches the filters for the given bucket,
* or if no filters are configured for the bucket.
*/
public boolean isKeyMatchingFilters(final String bucketName, final String objectKey) {
if (filters == null || filters.isEmpty()) {
return true;
}

final S3ScanKeyPathOption keyPathOption = filters.get(bucketName);
if (keyPathOption == null) {
return true;
}

final List<String> includePrefixes = keyPathOption.getS3scanIncludePrefixOptions();
if (includePrefixes != null && !includePrefixes.isEmpty()
&& includePrefixes.stream().noneMatch(objectKey::startsWith)) {
return false;
}

final List<String> excludeSuffixes = keyPathOption.getS3ScanExcludeSuffixOptions();
if (excludeSuffixes != null && !excludeSuffixes.isEmpty()
&& excludeSuffixes.stream().anyMatch(objectKey::endsWith)) {
return false;
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ public class S3ScanPartitionCreationSupplier implements Function<Map<String, Obj

private final SourceCoordinator<S3SourceProgressState> sourceCoordinator;

private final S3ObjectKeyFilter objectFilteringHelper;

public S3ScanPartitionCreationSupplier(final S3Client s3Client,
final BucketOwnerProvider bucketOwnerProvider,
final List<ScanOptions> scanOptionsList,
final S3ScanSchedulingOptions schedulingOptions,
final FolderPartitioningOptions folderPartitioningOptions,
final boolean deleteS3ObjectsOnRead,
final SourceCoordinator<S3SourceProgressState> sourceCoordinator) {
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
final S3ObjectKeyFilter objectFilteringHelper) {

this.s3Client = s3Client;
this.bucketOwnerProvider = bucketOwnerProvider;
Expand All @@ -71,6 +74,7 @@ public S3ScanPartitionCreationSupplier(final S3Client s3Client,
this.folderPartitioningOptions = folderPartitioningOptions;
this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead;
this.sourceCoordinator = sourceCoordinator;
this.objectFilteringHelper = objectFilteringHelper;
}

@Override
Expand Down Expand Up @@ -145,6 +149,7 @@ private void createFilteredS3ObjectPartitionsForBucket(final List<String> exclud
.filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/"))
.filter(keyTimestampPair -> excludeKeyPaths.stream()
.noneMatch(excludeItem -> keyTimestampPair.left().endsWith(excludeItem)))
.filter(keyTimestampPair -> objectFilteringHelper.isKeyMatchingFilters(bucket, keyTimestampPair.left()))
.filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime, isFirstScan))
.map(Pair::left)
.map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3SelectOptions;
import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;

public class S3SourceConfig {
Expand Down Expand Up @@ -104,6 +106,10 @@ public class S3SourceConfig {
@JsonProperty("data_selection")
private S3DataSelection dataSelection = S3DataSelection.DATA_AND_METADATA;

@JsonProperty("filters")
@Valid
private Map<String, S3ScanKeyPathOption> filters;

@AssertTrue(message = "A codec is required for reading objects.")
boolean isCodecProvidedWhenNeeded() {
if(s3SelectOptions == null)
Expand Down Expand Up @@ -139,6 +145,19 @@ boolean isS3SelectNotUsingDeleteS3ObjectsOnRead() {
return true;
}

@AssertTrue(message = "Top-level filters cannot be used together with scan bucket-level filter. Use one or the other.")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this assertion. It is the right approach in my opinion.

@Zhangxunmt Zhangxunmt Apr 10, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask why we don't want to keep filters at both levels. For the case below that includes difference filters at two bucket level, it couldn't support SQS as the notification with a top level filter.

        buckets:
          - bucket:
              name: offlinebatch
              data_selection: metadata_only
              filter:
                include_prefix:
                  - bedrock-multisource/my_batch
                exclude_suffix:
                  - .out
          - bucket:
              name: offlinebatch
              data_selection: data_only
              filter:
                include_prefix:
                  - bedrock-multisource/output/
                exclude_suffix:
                  - manifest.json.out

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zhangxunmt , The top-level filters are still able to differentiate buckets even with SQS. SQS includes the bucket name in the message.

boolean isFiltersNotUsedWithScanBucketFilter() {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Early returns (guard clauses) to reduce nesting for better readability.

boolean isFiltersNotUsedWithScanBucketFilter() {
    if (filters == null || filters.isEmpty()) return true;   // top filter is null
    if (s3ScanScanOptions == null || s3ScanScanOptions.getBuckets() == null) return true;   // bucket scan is null

    return s3ScanScanOptions.getBuckets().stream()
            .map(bucket -> bucket.getS3ScanBucketOption())
            .noneMatch(option -> option != null && option.getS3ScanFilter() != null);    // bucket filter is null
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use guard clauses. Thanks!

if (filters == null || filters.isEmpty()) {
return true;
}
if (s3ScanScanOptions == null || s3ScanScanOptions.getBuckets() == null) {
return true;
}
return s3ScanScanOptions.getBuckets().stream()
.map(bucket -> bucket.getS3ScanBucketOption())
.noneMatch(option -> option != null && option.getS3ScanFilter() != null);
}

public NotificationTypeOption getNotificationType() {
return notificationType;
}
Expand Down Expand Up @@ -222,4 +241,8 @@ public boolean isDeleteS3MetadataInEvent() {
public S3DataSelection getDataSelection() {
return dataSelection;
}

public Map<String, S3ScanKeyPathOption> getFilters() {
return filters != null ? filters : Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public ScanObjectWorker(final S3Client s3Client,
this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions();
this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout();

this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator);
this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator, new S3ObjectKeyFilter(s3SourceConfig.getFilters()));
this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>();
this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ public class SqsWorker implements Runnable {
static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied";
static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled";
static final String SQS_RESOURCE_NOT_FOUND_METRIC_NAME = "sqsResourceNotFound";
static final String S3_OBJECTS_FILTERED_METRIC_NAME = "s3ObjectsFiltered";

private final S3SourceConfig s3SourceConfig;
private final SqsClient sqsClient;
private final S3Service s3Service;
private final SqsOptions sqsOptions;
private final S3EventFilter objectCreatedFilter;
private final S3EventFilter evenBridgeObjectCreatedFilter;
private final S3ObjectKeyFilter objectFilteringHelper;
private final Counter sqsMessagesReceivedCounter;
private final Counter sqsReceiveMessagesFailedCounter;
private final Counter sqsMessagesDeletedCounter;
Expand All @@ -86,6 +88,7 @@ public class SqsWorker implements Runnable {
private final Counter sqsMessageAccessDeniedCounter;
private final Counter sqsMessageThrottledCounter;
private final Counter sqsResourceNotFoundCounter;
private final Counter s3ObjectsFilteredCounter;
private final Timer sqsMessageDelayTimer;
private final Backoff standardBackoff;
private final SqsMessageParser sqsMessageParser;
Expand All @@ -110,6 +113,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
sqsOptions = s3SourceConfig.getSqsOptions();
objectCreatedFilter = new S3ObjectCreatedFilter();
evenBridgeObjectCreatedFilter = new EventBridgeObjectCreatedFilter();
objectFilteringHelper = new S3ObjectKeyFilter(s3SourceConfig.getFilters());
sqsMessageParser = new SqsMessageParser(s3SourceConfig);
failedAttemptCount = 0;
parsedMessageVisibilityTimesMap = new HashMap<>();
Expand All @@ -126,6 +130,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME);
sqsResourceNotFoundCounter = pluginMetrics.counter(SQS_RESOURCE_NOT_FOUND_METRIC_NAME);
s3ObjectsFilteredCounter = pluginMetrics.counter(S3_OBJECTS_FILTERED_METRIC_NAME);
}

@Override
Expand Down Expand Up @@ -234,11 +239,11 @@ private List<DeleteMessageBatchRequestEntry> processS3EventNotificationRecords(f
if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.S3)
&& !parsedMessage.isEmptyNotification()
&& isS3EventNameCreated(parsedMessage)) {
parsedMessagesToRead.add(parsedMessage);
addParsedMessageByFilter(parsedMessage, parsedMessagesToRead, deleteMessageBatchRequestEntryCollection);
}
else if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.EVENTBRIDGE)
&& isEventBridgeEventTypeCreated(parsedMessage)) {
parsedMessagesToRead.add(parsedMessage);
addParsedMessageByFilter(parsedMessage, parsedMessagesToRead, deleteMessageBatchRequestEntryCollection);
}
else {
// TODO: Delete these only if on_error is configured to delete_messages.
Expand Down Expand Up @@ -454,6 +459,19 @@ private boolean isEventBridgeEventTypeCreated(final ParsedMessage parsedMessage)
return evenBridgeObjectCreatedFilter.filter(parsedMessage).isPresent();
}

private void addParsedMessageByFilter(final ParsedMessage parsedMessage,
final List<ParsedMessage> parsedMessagesToRead,
final List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries) {
if (objectFilteringHelper.isKeyMatchingFilters(parsedMessage.getBucketName(), parsedMessage.getObjectKey())) {
parsedMessagesToRead.add(parsedMessage);
} else {
LOG.debug("S3 object {} in bucket {} did not match configured filters. Deleting SQS message.",
parsedMessage.getObjectKey(), parsedMessage.getBucketName());
s3ObjectsFilteredCounter.increment();
deleteMessageBatchRequestEntries.add(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage()));
}
}

private S3ObjectReference populateS3Reference(final String bucketName, final String objectKey) {
return S3ObjectReference
.bucketAndKey(bucketName, objectKey)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.s3;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class S3ObjectKeyFilterTest {

@Test
void isKeyMatchingFilters_returns_true_when_filters_is_empty() {
final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Collections.emptyMap());
assertThat(filter.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true));
}

@Test
void isKeyMatchingFilters_returns_true_when_filters_is_null() {
final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(null);
assertThat(filter.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true));
}

@Test
void isKeyMatchingFilters_returns_true_when_bucket_not_in_filters() {
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/"));
final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("other-bucket", keyPathOption));

assertThat(filter.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true));
}

@ParameterizedTest
@CsvSource({
"assets/image.png, true",
"logs/app.log, false"
})
void isKeyMatchingFilters_with_include_prefix(final String objectKey, final boolean expected) {
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/"));
when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null);
final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption));

assertThat(filter.isKeyMatchingFilters("my-bucket", objectKey), equalTo(expected));
}

@ParameterizedTest
@CsvSource({
"assets/photo.jpg, false",
"assets/data.json, true"
})
void isKeyMatchingFilters_with_exclude_suffix(final String objectKey, final boolean expected) {
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(null);
when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml"));
final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption));

assertThat(filter.isKeyMatchingFilters("my-bucket", objectKey), equalTo(expected));
}

@ParameterizedTest
@CsvSource({
"assets/data.json, true",
"assets/photo.jpg, false",
"logs/app.log, false"
})
void isKeyMatchingFilters_with_both_include_prefix_and_exclude_suffix(final String objectKey, final boolean expected) {
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/"));
when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg"));
final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption));

assertThat(filter.isKeyMatchingFilters("my-bucket", objectKey), equalTo(expected));
}

@Test
void isKeyMatchingFilters_returns_true_when_key_matches_any_include_prefix() {
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/", "data/"));
final S3ObjectKeyFilter filter = new S3ObjectKeyFilter(Map.of("my-bucket", keyPathOption));

assertThat(filter.isKeyMatchingFilters("my-bucket", "data/file.csv"), equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void setup() {


private Function<Map<String, Object>, List<PartitionIdentifier>> createObjectUnderTest() {
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator);
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator, new S3ObjectKeyFilter(Collections.emptyMap()));
}

@Test
Expand Down
Loading
Loading