Skip to content

Add top-level filters option for S3 source#6735

Merged
dlvenable merged 2 commits into
opensearch-project:mainfrom
lawofcycles:feature/s3-source-top-level-filters
Apr 21, 2026
Merged

Add top-level filters option for S3 source#6735
dlvenable merged 2 commits into
opensearch-project:mainfrom
lawofcycles:feature/s3-source-top-level-filters

Conversation

@lawofcycles

@lawofcycles lawofcycles commented Apr 6, 2026

Copy link
Copy Markdown
Contributor

Add a top-level filters configuration to the S3 source that applies include_prefix and exclude_suffix filtering for both SQS and scan modes.

Previously, key path filters were only available under scan bucket options, making it impossible to filter S3 objects when using SQS notifications. The new filters option uses the same bucket name keyed Map pattern as bucket_owners.

source:
 s3:
   filters:
     my-bucket:
       include_prefix:
          - assets/
       exclude_suffix:
          - .jpg
          - .xml

Top-level filters and scan bucket-level filter cannot be used together. A validation error is raised if both are configured.

Testing

In addition to unit tests, I verified this with a real S3 bucket and SQS queue (S3 ObjectCreated notifications forwarded to SQS). With include_prefix: ["assets/"] and exclude_suffix: [".jpg"] configured, the following results were observed.

S3 object key Expected Actual
assets/data1.json Processed Processed (read from S3, output to stdout)
assets/photo.jpg Filtered out (exclude_suffix) Filtered out (SQS message deleted)
logs/app.log Filtered out (include_prefix mismatch) Filtered out (SQS message deleted)
root-file.json Filtered out (include_prefix mismatch) Filtered out (SQS message deleted)

Issues Resolved

Resolves #6386

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions

github-actions Bot commented Apr 6, 2026

Copy link
Copy Markdown

✅ License Header Check Passed

All newly added files have proper license headers. Great work! 🎉

Add a top-level filters configuration to the S3 source that applies
include_prefix and exclude_suffix filtering for both SQS and scan modes.

Previously, key path filters were only available under scan bucket
options, making it impossible to filter S3 objects when using SQS
notifications. The new filters option uses the same bucket name keyed
Map pattern as bucket_owners.

Top-level filters and scan bucket-level filters cannot be used together,
as the top-level filters are intended to eventually replace the scan
bucket-level filters.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @lawofcycles for this contribution! This will be a great help to users of SQS/EventBridge.

/**
* Helper class for applying top-level S3 object key filters (include_prefix and exclude_suffix).
*/
public class S3ObjectFilteringHelper {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for splitting this logic into a class. Maybe rename it to S3ObjectKeyFilter or S3ObjectFilter.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to S3ObjectKeyFilter. Thanks for the suggestion!

when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(null);
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption));

assertThat(helper.isKeyMatchingFilters("my-bucket", "logs/app.log"), equalTo(false));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be combined with the test above using @ParameterizedTest and @CsvSource

@CsvSource({
  "assets/image.png, true"
  "logs/app.log, false"
})

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, combined with @ParameterizedTest and @CsvSource.

when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg", ".xml"));
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption));

assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment about combining tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done as well.

when(keyPathOption.getS3ScanExcludeSuffixOptions()).thenReturn(List.of(".jpg"));
final S3ObjectFilteringHelper helper = new S3ObjectFilteringHelper(Map.of("my-bucket", keyPathOption));

assertThat(helper.isKeyMatchingFilters("my-bucket", "assets/data.json"), equalTo(true));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good scenario for @CsvSource as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done as well.

return true;
}

@AssertTrue(message = "Top-level filters cannot be used together with scan bucket-level filter. Use one or the other.")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this assertion. It is the right approach in my opinion.

@Zhangxunmt Zhangxunmt Apr 10, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask why we don't want to keep filters at both levels. For the case below that includes difference filters at two bucket level, it couldn't support SQS as the notification with a top level filter.

        buckets:
          - bucket:
              name: offlinebatch
              data_selection: metadata_only
              filter:
                include_prefix:
                  - bedrock-multisource/my_batch
                exclude_suffix:
                  - .out
          - bucket:
              name: offlinebatch
              data_selection: data_only
              filter:
                include_prefix:
                  - bedrock-multisource/output/
                exclude_suffix:
                  - manifest.json.out

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zhangxunmt , The top-level filters are still able to differentiate buckets even with SQS. SQS includes the bucket name in the message.

}

@AssertTrue(message = "Top-level filters cannot be used together with scan bucket-level filter. Use one or the other.")
boolean isFiltersNotUsedWithScanBucketFilter() {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Early returns (guard clauses) to reduce nesting for better readability.

boolean isFiltersNotUsedWithScanBucketFilter() {
    if (filters == null || filters.isEmpty()) return true;   // top filter is null
    if (s3ScanScanOptions == null || s3ScanScanOptions.getBuckets() == null) return true;   // bucket scan is null

    return s3ScanScanOptions.getBuckets().stream()
            .map(bucket -> bucket.getS3ScanBucketOption())
            .noneMatch(option -> option != null && option.getS3ScanFilter() != null);    // bucket filter is null
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use guard clauses. Thanks!

Rename S3ObjectFilteringHelper to S3ObjectKeyFilter for clarity.
Combine related test cases using @ParameterizedTest and @CsvSource.
Refactor isFiltersNotUsedWithScanBucketFilter to use guard clauses.

Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
@lawofcycles lawofcycles requested a review from dlvenable April 18, 2026 16:38
@dlvenable dlvenable merged commit 59d45a6 into opensearch-project:main Apr 21, 2026
69 of 72 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Filter options for SQS queue

3 participants