Skip to content

Commit a6cddc2

Browse files
committed
add s3 enricher processor
Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent 860e593 commit a6cddc2

25 files changed

Lines changed: 2409 additions & 0 deletions
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
2+
# S3 Enricher Processor
3+
4+
This plugin enables you to merge data from a S3 file with source data from your Data Prepper pipeline.
5+
6+
## Usage
7+
```aidl
8+
ml_merge-pipeline:
9+
...
10+
processor:
11+
- s3_enricher:
12+
# =============================================================================
13+
# S3 SOURCE BUCKET CONFIGURATION
14+
# Defines where to fetch the original/source data for enrichment
15+
# =============================================================================
16+
bucket:
17+
# The S3 bucket containing the source records to enrich from
18+
name: offlinebatch
19+
filter:
20+
# S3 prefix path where source files are located
21+
# The processor will look for source files under this prefix
22+
# Example: s3://offlinebatch/bedrockbatch/originsource/test_batch_50k.jsonl
23+
include_prefix: bedrockbatch/originsource/
24+
25+
# =============================================================================
26+
# DATA FORMAT CONFIGURATION
27+
# =============================================================================
28+
# Codec for parsing source S3 files
29+
# Options: ndjson, json, csv, etc.
30+
codec:
31+
ndjson:
32+
33+
# =============================================================================
34+
# AWS CONFIGURATION
35+
# =============================================================================
36+
# AWS account ID that owns the S3 bucket (for cross-account access)
37+
default_bucket_owner: 802041417063
38+
39+
aws:
40+
# AWS region where the S3 bucket is located
41+
region: us-east-1
42+
43+
# =============================================================================
44+
# S3 OBJECT SETTINGS
45+
# =============================================================================
46+
# Maximum size (in MB) of S3 source files to process
47+
# Files exceeding this limit will be skipped
48+
s3_object_size_limit_mb: 100
49+
50+
# JSON path in the incoming pipeline event that contains the S3 object key
51+
# Used to determine which source file to fetch for enrichment
52+
# Example event: {"s3": {"bucket": "...", "key": "output/file.jsonl.out"}}
53+
s3_key_path: "s3/key"
54+
55+
# =============================================================================
56+
# SOURCE FILE NAME EXTRACTION
57+
# =============================================================================
58+
# Regex pattern to extract the base filename from the output S3 key
59+
# The first capture group (.*?) extracts the original source filename
60+
#
61+
# Example:
62+
# Input: test_batch_50k-2025-11-06T21-19-15Z-1762463955825635000-uuid.jsonl.out
63+
# Match: Group 1 = "test_batch_50k"
64+
# Result: Looks for source file "test_batch_50k.jsonl" in include_prefix path
65+
#
66+
# Pattern breakdown:
67+
# ^(.*?) - Capture base filename (non-greedy)
68+
# -\d{4}-\d{2}-\d{2} - Match date: -YYYY-MM-DD
69+
# T\d{2}-\d{2}-\d{2}Z - Match time: THH-MM-SSZ
70+
# -.* - Match remaining (job ID, UUID, etc.)
71+
# \.jsonl\.out$ - Match file extension
72+
s3_object_name_pattern: ^(.*?)-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}Z-.*\.jsonl\.out$
73+
74+
# =============================================================================
75+
# RECORD MATCHING & ENRICHMENT
76+
# =============================================================================
77+
# Field name used to correlate/match records between output and source files
78+
# Both the pipeline event and source records must contain this field
79+
# Records with matching correlation values will be merged
80+
correlation_key: "recordId"
81+
82+
# List of fields to copy from the source record into the pipeline event
83+
# Only these specified fields will be merged; all other source fields are ignored
84+
# If a field doesn't exist in source, it will be skipped
85+
keys_to_merge:
86+
- "field_A"
87+
- "field_B"
88+
- "field_C"
89+
90+
# =============================================================================
91+
# CONDITIONAL PROCESSING
92+
# =============================================================================
93+
# Data Prepper expression to conditionally apply enrichment
94+
# Only events matching this condition will be processed by the enricher
95+
# Events not matching will pass through unchanged
96+
enrich_when: /s3/key != null
97+
```
98+
`keys_to_merge` List of fields to copy from the source record into the pipeline event.
99+
`s3_object_name_pattern` as Regex pattern to extract the base filename from the output S3 key.
100+
`s3_key_path` as JSON path in the incoming pipeline event that contains the S3 object key
101+
`correlation_key` as the Field name used to correlate/match records between output and source files
102+
103+
## Metrics
104+
- 'numberOfRecordsEnrichedSuccessFromS3': Number of pipeline records successfully enriched from S3 source
105+
- 'numberOfRecordsEnrichedFailerFromS3': Number of pipeline records that failed enrichment from S3 source
106+
- 's3EnricherObjectsFailed': Number of S3 source objects successfully loaded for enrichment
107+
- 's3EnricherObjectsSucceeded': Number of S3 source objects that failed to load for enrichment
108+
109+
## Developer Guide
110+
111+
The integration tests for this plugin do not run as part of the Data Prepper build.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
dependencies {
7+
implementation project(path: ':data-prepper-plugins:common')
8+
implementation project(':data-prepper-plugins:aws-plugin-api')
9+
implementation 'software.amazon.awssdk:sdk-core'
10+
implementation 'software.amazon.awssdk:sts'
11+
implementation 'io.micrometer:micrometer-core'
12+
implementation 'org.json:json'
13+
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
14+
implementation 'org.projectlombok:lombok:1.18.22'
15+
implementation libs.parquet.common
16+
implementation 'dev.failsafe:failsafe:3.3.2'
17+
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
18+
implementation libs.caffeine
19+
annotationProcessor 'org.projectlombok:lombok:1.18.20'
20+
implementation 'software.amazon.awssdk:s3'
21+
testImplementation project(':data-prepper-test:test-event')
22+
testImplementation testLibs.slf4j.simple
23+
}
24+
25+
test {
26+
useJUnitPlatform()
27+
}
28+
29+
sourceSets {
30+
integrationTest {
31+
java {
32+
compileClasspath += main.output + test.output
33+
runtimeClasspath += main.output + test.output
34+
srcDir file('src/integrationTest/java')
35+
}
36+
resources.srcDir file('src/integrationTest/resources')
37+
}
38+
}
39+
40+
configurations {
41+
integrationTestImplementation.extendsFrom testImplementation
42+
integrationTestRuntime.extendsFrom testRuntime
43+
}
44+
45+
task integrationTest(type: Test) {
46+
group = 'verification'
47+
testClassesDirs = sourceSets.integrationTest.output.classesDirs
48+
49+
useJUnitPlatform()
50+
51+
classpath = sourceSets.integrationTest.runtimeClasspath
52+
53+
systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'
54+
55+
filter {
56+
includeTestsMatching '*IT'
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.s3_enricher.processor;
7+
8+
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
9+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
10+
import org.opensearch.dataprepper.plugins.s3_enricher.processor.configuration.AwsAuthenticationOptions;
11+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
12+
13+
public class AwsAuthenticationAdapter {
14+
private final AwsCredentialsSupplier awsCredentialsSupplier;
15+
private final S3EnricherProcessorConfig s3EnricherProcessorConfig;
16+
17+
18+
AwsAuthenticationAdapter(
19+
final AwsCredentialsSupplier awsCredentialsSupplier,
20+
final S3EnricherProcessorConfig s3EnricherProcessorConfig) {
21+
this.awsCredentialsSupplier = awsCredentialsSupplier;
22+
this.s3EnricherProcessorConfig = s3EnricherProcessorConfig;
23+
}
24+
25+
AwsCredentialsProvider getCredentialsProvider() {
26+
final AwsAuthenticationOptions awsAuthenticationOptions = s3EnricherProcessorConfig.getAwsAuthenticationOptions();
27+
28+
final AwsCredentialsOptions options = AwsCredentialsOptions.builder()
29+
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
30+
.withRegion(awsAuthenticationOptions.getAwsRegion())
31+
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
32+
.withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId())
33+
.build();
34+
35+
return awsCredentialsSupplier.getProvider(options);
36+
}
37+
}

0 commit comments

Comments
 (0)