Skip to content

Commit 5e5fc0c

Browse files
committed
Add top-level filters option for S3 source
Add a top-level filters configuration to the S3 source that applies include_prefix and exclude_suffix filtering for both SQS and scan modes. Previously, key path filters were only available under scan bucket options, making it impossible to filter S3 objects when using SQS notifications. The new filters option uses the same bucket name keyed Map pattern as bucket_owners. Top-level filters and scan bucket-level filters cannot be used together, as the top-level filters are intended to eventually replace the scan bucket-level filters. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent d3288d4 commit 5e5fc0c

8 files changed

Lines changed: 265 additions & 5 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.s3;
12+
13+
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
/**
19+
* Helper class for applying top-level S3 object key filters (include_prefix and exclude_suffix).
20+
*/
21+
public class S3ObjectFilteringHelper {
22+
23+
private final Map<String, S3ScanKeyPathOption> filters;
24+
25+
public S3ObjectFilteringHelper(final Map<String, S3ScanKeyPathOption> filters) {
26+
this.filters = filters;
27+
}
28+
29+
/**
30+
* Returns true if the object key matches the filters for the given bucket,
31+
* or if no filters are configured for the bucket.
32+
*/
33+
public boolean isKeyMatchingFilters(final String bucketName, final String objectKey) {
34+
if (filters == null || filters.isEmpty()) {
35+
return true;
36+
}
37+
38+
final S3ScanKeyPathOption keyPathOption = filters.get(bucketName);
39+
if (keyPathOption == null) {
40+
return true;
41+
}
42+
43+
final List<String> includePrefixes = keyPathOption.getS3scanIncludePrefixOptions();
44+
if (includePrefixes != null && !includePrefixes.isEmpty()
45+
&& includePrefixes.stream().noneMatch(objectKey::startsWith)) {
46+
return false;
47+
}
48+
49+
final List<String> excludeSuffixes = keyPathOption.getS3ScanExcludeSuffixOptions();
50+
if (excludeSuffixes != null && !excludeSuffixes.isEmpty()
51+
&& excludeSuffixes.stream().anyMatch(objectKey::endsWith)) {
52+
return false;
53+
}
54+
55+
return true;
56+
}
57+
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplier.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,16 @@ public class S3ScanPartitionCreationSupplier implements Function<Map<String, Obj
5656

5757
private final SourceCoordinator<S3SourceProgressState> sourceCoordinator;
5858

59+
private final S3ObjectFilteringHelper objectFilteringHelper;
60+
5961
public S3ScanPartitionCreationSupplier(final S3Client s3Client,
6062
final BucketOwnerProvider bucketOwnerProvider,
6163
final List<ScanOptions> scanOptionsList,
6264
final S3ScanSchedulingOptions schedulingOptions,
6365
final FolderPartitioningOptions folderPartitioningOptions,
6466
final boolean deleteS3ObjectsOnRead,
65-
final SourceCoordinator<S3SourceProgressState> sourceCoordinator) {
67+
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
68+
final S3ObjectFilteringHelper objectFilteringHelper) {
6669

6770
this.s3Client = s3Client;
6871
this.bucketOwnerProvider = bucketOwnerProvider;
@@ -71,6 +74,7 @@ public S3ScanPartitionCreationSupplier(final S3Client s3Client,
7174
this.folderPartitioningOptions = folderPartitioningOptions;
7275
this.deleteS3ObjectsOnRead = deleteS3ObjectsOnRead;
7376
this.sourceCoordinator = sourceCoordinator;
77+
this.objectFilteringHelper = objectFilteringHelper;
7478
}
7579

7680
@Override
@@ -145,6 +149,7 @@ private void createFilteredS3ObjectPartitionsForBucket(final List<String> exclud
145149
.filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/"))
146150
.filter(keyTimestampPair -> excludeKeyPaths.stream()
147151
.noneMatch(excludeItem -> keyTimestampPair.left().endsWith(excludeItem)))
152+
.filter(keyTimestampPair -> objectFilteringHelper.isKeyMatchingFilters(bucket, keyTimestampPair.left()))
148153
.filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime, isFirstScan))
149154
.map(Pair::left)
150155
.map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build())

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3SourceConfig.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3SelectOptions;
2323
import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions;
2424
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3DataSelection;
25+
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;
2526

2627
import java.time.Duration;
28+
import java.util.Collections;
2729
import java.util.Map;
2830

2931
public class S3SourceConfig {
@@ -104,6 +106,10 @@ public class S3SourceConfig {
104106
@JsonProperty("data_selection")
105107
private S3DataSelection dataSelection = S3DataSelection.DATA_AND_METADATA;
106108

109+
@JsonProperty("filters")
110+
@Valid
111+
private Map<String, S3ScanKeyPathOption> filters;
112+
107113
@AssertTrue(message = "A codec is required for reading objects.")
108114
boolean isCodecProvidedWhenNeeded() {
109115
if(s3SelectOptions == null)
@@ -139,6 +145,16 @@ boolean isS3SelectNotUsingDeleteS3ObjectsOnRead() {
139145
return true;
140146
}
141147

148+
@AssertTrue(message = "Top-level filters cannot be used together with scan bucket-level filter. Use one or the other.")
149+
boolean isFiltersNotUsedWithScanBucketFilter() {
150+
if (filters != null && !filters.isEmpty() && s3ScanScanOptions != null && s3ScanScanOptions.getBuckets() != null) {
151+
return s3ScanScanOptions.getBuckets().stream()
152+
.noneMatch(bucket -> bucket.getS3ScanBucketOption() != null
153+
&& bucket.getS3ScanBucketOption().getS3ScanFilter() != null);
154+
}
155+
return true;
156+
}
157+
142158
public NotificationTypeOption getNotificationType() {
143159
return notificationType;
144160
}
@@ -222,4 +238,8 @@ public boolean isDeleteS3MetadataInEvent() {
222238
public S3DataSelection getDataSelection() {
223239
return dataSelection;
224240
}
241+
242+
public Map<String, S3ScanKeyPathOption> getFilters() {
243+
return filters != null ? filters : Collections.emptyMap();
244+
}
225245
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public ScanObjectWorker(final S3Client s3Client,
168168
this.folderPartitioningOptions = s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions();
169169
this.acknowledgmentSetTimeout = s3SourceConfig.getS3ScanScanOptions().getAcknowledgmentTimeout();
170170

171-
this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator);
171+
this.partitionCreationSupplier = new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsBuilderList, s3ScanSchedulingOptions, s3SourceConfig.getS3ScanScanOptions().getPartitioningOptions(), s3SourceConfig.isDeleteS3ObjectsOnRead(), sourceCoordinator, new S3ObjectFilteringHelper(s3SourceConfig.getFilters()));
172172
this.acknowledgmentsRemainingForPartitions = new ConcurrentHashMap<>();
173173
this.objectsToDeleteForAcknowledgmentSets = new ConcurrentHashMap<>();
174174
}

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,15 @@ public class SqsWorker implements Runnable {
6767
static final String SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME = "sqsMessagesAccessDenied";
6868
static final String SQS_MESSAGE_THROTTLED_METRIC_NAME = "sqsMessagesThrottled";
6969
static final String SQS_RESOURCE_NOT_FOUND_METRIC_NAME = "sqsResourceNotFound";
70+
static final String S3_OBJECTS_FILTERED_METRIC_NAME = "s3ObjectsFiltered";
7071

7172
private final S3SourceConfig s3SourceConfig;
7273
private final SqsClient sqsClient;
7374
private final S3Service s3Service;
7475
private final SqsOptions sqsOptions;
7576
private final S3EventFilter objectCreatedFilter;
7677
private final S3EventFilter evenBridgeObjectCreatedFilter;
78+
private final S3ObjectFilteringHelper objectFilteringHelper;
7779
private final Counter sqsMessagesReceivedCounter;
7880
private final Counter sqsReceiveMessagesFailedCounter;
7981
private final Counter sqsMessagesDeletedCounter;
@@ -86,6 +88,7 @@ public class SqsWorker implements Runnable {
8688
private final Counter sqsMessageAccessDeniedCounter;
8789
private final Counter sqsMessageThrottledCounter;
8890
private final Counter sqsResourceNotFoundCounter;
91+
private final Counter s3ObjectsFilteredCounter;
8992
private final Timer sqsMessageDelayTimer;
9093
private final Backoff standardBackoff;
9194
private final SqsMessageParser sqsMessageParser;
@@ -110,6 +113,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
110113
sqsOptions = s3SourceConfig.getSqsOptions();
111114
objectCreatedFilter = new S3ObjectCreatedFilter();
112115
evenBridgeObjectCreatedFilter = new EventBridgeObjectCreatedFilter();
116+
objectFilteringHelper = new S3ObjectFilteringHelper(s3SourceConfig.getFilters());
113117
sqsMessageParser = new SqsMessageParser(s3SourceConfig);
114118
failedAttemptCount = 0;
115119
parsedMessageVisibilityTimesMap = new HashMap<>();
@@ -126,6 +130,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
126130
sqsMessageAccessDeniedCounter = pluginMetrics.counter(SQS_MESSAGE_ACCESS_DENIED_METRIC_NAME);
127131
sqsMessageThrottledCounter = pluginMetrics.counter(SQS_MESSAGE_THROTTLED_METRIC_NAME);
128132
sqsResourceNotFoundCounter = pluginMetrics.counter(SQS_RESOURCE_NOT_FOUND_METRIC_NAME);
133+
s3ObjectsFilteredCounter = pluginMetrics.counter(S3_OBJECTS_FILTERED_METRIC_NAME);
129134
}
130135

131136
@Override
@@ -234,11 +239,11 @@ private List<DeleteMessageBatchRequestEntry> processS3EventNotificationRecords(f
234239
if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.S3)
235240
&& !parsedMessage.isEmptyNotification()
236241
&& isS3EventNameCreated(parsedMessage)) {
237-
parsedMessagesToRead.add(parsedMessage);
242+
addParsedMessageByFilter(parsedMessage, parsedMessagesToRead, deleteMessageBatchRequestEntryCollection);
238243
}
239244
else if (s3SourceConfig.getNotificationSource().equals(NotificationSourceOption.EVENTBRIDGE)
240245
&& isEventBridgeEventTypeCreated(parsedMessage)) {
241-
parsedMessagesToRead.add(parsedMessage);
246+
addParsedMessageByFilter(parsedMessage, parsedMessagesToRead, deleteMessageBatchRequestEntryCollection);
242247
}
243248
else {
244249
// TODO: Delete these only if on_error is configured to delete_messages.
@@ -454,6 +459,19 @@ private boolean isEventBridgeEventTypeCreated(final ParsedMessage parsedMessage)
454459
return evenBridgeObjectCreatedFilter.filter(parsedMessage).isPresent();
455460
}
456461

462+
private void addParsedMessageByFilter(final ParsedMessage parsedMessage,
463+
final List<ParsedMessage> parsedMessagesToRead,
464+
final List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntries) {
465+
if (objectFilteringHelper.isKeyMatchingFilters(parsedMessage.getBucketName(), parsedMessage.getObjectKey())) {
466+
parsedMessagesToRead.add(parsedMessage);
467+
} else {
468+
LOG.debug("S3 object {} in bucket {} did not match configured filters. Deleting SQS message.",
469+
parsedMessage.getObjectKey(), parsedMessage.getBucketName());
470+
s3ObjectsFilteredCounter.increment();
471+
deleteMessageBatchRequestEntries.add(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage()));
472+
}
473+
}
474+
457475
private S3ObjectReference populateS3Reference(final String bucketName, final String objectKey) {
458476
return S3ObjectReference
459477
.bucketAndKey(bucketName, objectKey)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.s3;
12+
13+
import org.junit.jupiter.api.Test;
14+
import org.opensearch.dataprepper.plugins.source.s3.configuration.S3ScanKeyPathOption;
15+
16+
import java.util.Collections;
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
import static org.hamcrest.CoreMatchers.equalTo;
21+
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
class S3ObjectFilteringHelperTest {
26+
27+
@Test
28+
void isKeyMatchingFilters_returns_true_when_filters_is_empty() {
29+
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Collections.emptyMap());
30+
assertThat(helper.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true));
31+
}
32+
33+
@Test
34+
void isKeyMatchingFilters_returns_true_when_filters_is_null() {
35+
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(null);
36+
assertThat(helper.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true));
37+
}
38+
39+
@Test
40+
void isKeyMatchingFilters_returns_true_when_bucket_not_in_filters() {
41+
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
42+
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/"));
43+
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("other-bucket", keyPathOption));
44+
45+
assertThat(helper.isKeyMatchingFilters("my-bucket", "any/key.json"), equalTo(true));
46+
}
47+
48+
@Test
49+
void isKeyMatchingFilters_returns_true_when_key_matches_include_prefix() {
50+
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
51+
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/"));
52+
when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null);
53+
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption));
54+
55+
assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/image.png"), equalTo(true));
56+
}
57+
58+
@Test
59+
void isKeyMatchingFilters_returns_false_when_key_does_not_match_include_prefix() {
60+
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
61+
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/"));
62+
when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null);
63+
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption));
64+
65+
assertThat(helper.isKeyMatchingFilters("my-bucket", "logs/app.log"), equalTo(false));
66+
}
67+
68+
@Test
69+
void isKeyMatchingFilters_returns_false_when_key_matches_exclude_suffix() {
70+
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
71+
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(null);
72+
when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml"));
73+
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption));
74+
75+
assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/photo.jpg"), equalTo(false));
76+
}
77+
78+
@Test
79+
void isKeyMatchingFilters_returns_true_when_key_does_not_match_exclude_suffix() {
80+
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
81+
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(null);
82+
when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml"));
83+
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption));
84+
85+
assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true));
86+
}
87+
88+
@Test
89+
void isKeyMatchingFilters_applies_both_include_prefix_and_exclude_suffix() {
90+
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
91+
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/"));
92+
when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg"));
93+
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption));
94+
95+
assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true));
96+
assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/photo.jpg"), equalTo(false));
97+
assertThat(helper.isKeyMatchingFilters("my-bucket", "logs/app.log"), equalTo(false));
98+
}
99+
100+
@Test
101+
void isKeyMatchingFilters_returns_true_when_key_matches_any_include_prefix() {
102+
final S3ScanKeyPathOption keyPathOption = mock(S3ScanKeyPathOption.class);
103+
when(keyPathOption.getS3scanIncludePrefixOptions()).thenReturn(List.of("assets/", "data/"));
104+
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption));
105+
106+
assertThat(helper.isKeyMatchingFilters("my-bucket", "data/file.csv"), equalTo(true));
107+
}
108+
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanPartitionCreationSupplierTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ void setup() {
8686

8787

8888
private Function<Map<String, Object>, List<PartitionIdentifier>> createObjectUnderTest() {
89-
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator);
89+
return new S3ScanPartitionCreationSupplier(s3Client, bucketOwnerProvider, scanOptionsList, schedulingOptions, folderPartitioningOptions, isDeleteS3ObjectsOnRead, sourceCoordinator, new S3ObjectFilteringHelper(Collections.emptyMap()));
9090
}
9191

9292
@Test

0 commit comments

Comments
 (0)