-
Notifications
You must be signed in to change notification settings - Fork 325
add S3 Enrich processor to merge ml batch job output with source inputs #5992
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
a70d6c3
add s3 enricher processor
Zhangxunmt 43f6507
rename from S3Enricher to S3Enrich
Zhangxunmt 478e15c
add unit tests for s3 enrich processor
Zhangxunmt b92c00c
Add selective merge to Event API and refactor S3EnrichProcessor
Zhangxunmt 6e9681a
add experimental to the S3 enrich processor
Zhangxunmt File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
|
|
||
| # S3 Enricher Processor | ||
|
|
||
| This plugin enables you to merge data from a S3 file with source data from your Data Prepper pipeline. | ||
|
|
||
| ## Usage | ||
| ```aidl | ||
| ml_merge-pipeline: | ||
| ... | ||
| processor: | ||
| - s3_enrich: | ||
| # ============================================================================= | ||
| # S3 SOURCE BUCKET CONFIGURATION | ||
| # Defines where to fetch the original/source data for enrichment | ||
| # ============================================================================= | ||
| bucket: | ||
| # The S3 bucket containing the source records to enrich from | ||
| name: offlinebatch | ||
| filter: | ||
| # S3 prefix path where source files are located | ||
| # The processor will look for source files under this prefix | ||
| # Example: s3://offlinebatch/bedrockbatch/originsource/test_batch_50k.jsonl | ||
| include_prefix: bedrockbatch/originsource/ | ||
|
|
||
| # ============================================================================= | ||
| # DATA FORMAT CONFIGURATION | ||
| # ============================================================================= | ||
| # Codec for parsing source S3 files | ||
| # Options: ndjson, json, csv, etc. | ||
| codec: | ||
| ndjson: | ||
|
|
||
| # ============================================================================= | ||
| # AWS CONFIGURATION | ||
| # ============================================================================= | ||
| # AWS account ID that owns the S3 bucket (for cross-account access) | ||
| default_bucket_owner: 802041417063 | ||
|
|
||
| aws: | ||
| # AWS region where the S3 bucket is located | ||
| region: us-east-1 | ||
|
|
||
| # ============================================================================= | ||
| # S3 OBJECT SETTINGS | ||
| # ============================================================================= | ||
| # Maximum size (in MB) of S3 source files to process | ||
| # Files exceeding this limit will be skipped | ||
| s3_object_size_limit: 100mb | ||
|
|
||
| # JSON path in the incoming pipeline event that contains the S3 object key | ||
| # Used to determine which source file to fetch for enrichment | ||
| # Example event: {"s3": {"bucket": "...", "key": "output/file.jsonl.out"}} | ||
| s3_key_path: "s3/key" | ||
|
|
||
| # ============================================================================= | ||
| # SOURCE FILE NAME EXTRACTION | ||
| # ============================================================================= | ||
| # Regex pattern to extract the base filename from the output S3 key | ||
| # The first capture group (.*?) extracts the original source filename | ||
| # | ||
| # Example: | ||
| # Input: test_batch_50k-2025-11-06T21-19-15Z-1762463955825635000-uuid.jsonl.out | ||
| # Match: Group 1 = "test_batch_50k" | ||
| # Result: Looks for source file "test_batch_50k.jsonl" in include_prefix path | ||
| # | ||
| # Pattern breakdown: | ||
| # ^(.*?) - Capture base filename (non-greedy) | ||
| # -\d{4}-\d{2}-\d{2} - Match date: -YYYY-MM-DD | ||
| # T\d{2}-\d{2}-\d{2}Z - Match time: THH-MM-SSZ | ||
| # -.* - Match remaining (job ID, UUID, etc.) | ||
| # \.jsonl\.out$ - Match file extension | ||
| s3_object_name_pattern: ^(.*?)-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}Z-.*\.jsonl\.out$ | ||
|
|
||
| # ============================================================================= | ||
| # RECORD MATCHING & ENRICHMENT | ||
| # ============================================================================= | ||
| # Field name used to correlate/match records between output and source files | ||
| # Both the pipeline event and source records must contain this field | ||
| # Records with matching correlation values will be merged | ||
| correlation_key: "recordId" | ||
|
|
||
| # List of fields to copy from the source record into the pipeline event | ||
| # Only these specified fields will be merged; all other source fields are ignored | ||
| # If a field doesn't exist in source, it will be skipped | ||
| keys_to_merge: | ||
| - "field_A" | ||
| - "field_B" | ||
| - "field_C" | ||
|
|
||
| # ============================================================================= | ||
| # CONDITIONAL PROCESSING | ||
| # ============================================================================= | ||
| # Data Prepper expression to conditionally apply enrichment | ||
| # Only events matching this condition will be processed by the enricher | ||
| # Events not matching will pass through unchanged | ||
| enrich_when: /s3/key != null | ||
| ``` | ||
| `keys_to_merge` List of fields to copy from the source record into the pipeline event. | ||
| `s3_object_name_pattern` as Regex pattern to extract the base filename from the output S3 key. | ||
| `s3_key_path` as JSON path in the incoming pipeline event that contains the S3 object key | ||
| `correlation_key` as the Field name used to correlate/match records between output and source files | ||
|
|
||
| ## Metrics | ||
| - 'numberOfRecordsEnrichedSuccessFromS3': Number of pipeline records successfully enriched from S3 source | ||
| - 'numberOfRecordsEnrichedFailerFromS3': Number of pipeline records that failed enrichment from S3 source | ||
| - 's3EnricherObjectsFailed': Number of S3 source objects successfully loaded for enrichment | ||
| - 's3EnricherObjectsSucceeded': Number of S3 source objects that failed to load for enrichment | ||
|
|
||
| ## Developer Guide | ||
|
|
||
| The integration tests for this plugin do not run as part of the Data Prepper build. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| dependencies { | ||
| implementation project(path: ':data-prepper-plugins:common') | ||
| implementation project(':data-prepper-plugins:s3-common') | ||
| implementation project(':data-prepper-plugins:aws-plugin-api') | ||
| implementation 'software.amazon.awssdk:sdk-core' | ||
| implementation 'software.amazon.awssdk:sts' | ||
| implementation 'io.micrometer:micrometer-core' | ||
| implementation 'org.json:json' | ||
| implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' | ||
| implementation 'org.projectlombok:lombok:1.18.22' | ||
| implementation libs.parquet.common | ||
| implementation 'org.apache.httpcomponents:httpcore:4.4.16' | ||
| implementation libs.caffeine | ||
| implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final' | ||
| annotationProcessor 'org.projectlombok:lombok:1.18.20' | ||
| implementation 'software.amazon.awssdk:s3' | ||
| testImplementation project(':data-prepper-test:test-event') | ||
| testImplementation testLibs.slf4j.simple | ||
| } | ||
|
|
||
| sourceSets { | ||
| integrationTest { | ||
| java { | ||
| compileClasspath += main.output + test.output | ||
| runtimeClasspath += main.output + test.output | ||
| srcDir file('src/integrationTest/java') | ||
| } | ||
| resources.srcDir file('src/integrationTest/resources') | ||
| } | ||
| } | ||
|
|
||
| configurations { | ||
| integrationTestImplementation.extendsFrom testImplementation | ||
| integrationTestRuntime.extendsFrom testRuntime | ||
| } | ||
|
|
||
| task integrationTest(type: Test) { | ||
| group = 'verification' | ||
| testClassesDirs = sourceSets.integrationTest.output.classesDirs | ||
|
|
||
| useJUnitPlatform() | ||
|
|
||
| classpath = sourceSets.integrationTest.runtimeClasspath | ||
|
|
||
| systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties' | ||
|
|
||
| filter { | ||
| includeTestsMatching '*IT' | ||
| } | ||
| } |
42 changes: 42 additions & 0 deletions
42
...java/org/opensearch/dataprepper/plugins/s3_enrich/processor/AwsAuthenticationAdapter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| * | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.s3_enrich.processor; | ||
|
|
||
| import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; | ||
| import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; | ||
| import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions; | ||
| import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||
|
|
||
| public class AwsAuthenticationAdapter { | ||
| private final AwsCredentialsSupplier awsCredentialsSupplier; | ||
| private final S3EnrichProcessorConfig s3EnrichProcessorConfig; | ||
|
|
||
|
|
||
| AwsAuthenticationAdapter( | ||
| final AwsCredentialsSupplier awsCredentialsSupplier, | ||
| final S3EnrichProcessorConfig s3EnricherProcessorConfig) { | ||
| this.awsCredentialsSupplier = awsCredentialsSupplier; | ||
| this.s3EnrichProcessorConfig = s3EnricherProcessorConfig; | ||
| } | ||
|
|
||
| AwsCredentialsProvider getCredentialsProvider() { | ||
| final AwsAuthenticationOptions awsAuthenticationOptions = s3EnrichProcessorConfig.getAwsAuthenticationOptions(); | ||
|
|
||
| final AwsCredentialsOptions options = AwsCredentialsOptions.builder() | ||
| .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()) | ||
| .withRegion(awsAuthenticationOptions.getAwsRegion()) | ||
| .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()) | ||
| .withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId()) | ||
| .build(); | ||
|
|
||
| return awsCredentialsSupplier.getProvider(options); | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the data has something like this { "key": null }, do you thing users will want it merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. For the S3 enrich use case(adding data from a lookup), skipping nulls makes sense — you don't want a missing/null enrichment field to overwrite a valid value in the target event. But in general, a user might explicitly want to merge a null to clear a field.
Given that the only current consumer is S3EnrichProcessor and the semantics are enrichment, I think we can keep skipping nulls. But I'll add a comment in the code in the next PR to make the decision explicit so in the future this can enhanced for other cases.