Skip to content

Commit fc37ac1

Browse files
committed
add s3 enricher processor
Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent 860e593 commit fc37ac1

25 files changed

Lines changed: 2333 additions & 0 deletions
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
2+
# S3 Enricher Processor
3+
4+
This plugin enables you to merge data from a S3 file with source data from your Data Prepper pipeline.
5+
6+
## Usage
7+
```aidl
8+
ml_merge-pipeline:
9+
...
10+
processor:
11+
- s3_enricher:
12+
fields:
13+
- to_key: field_B
14+
from_key: /modelInput/inputText
15+
merge_all_fields: true # Whether to merge all fields from input source
16+
retry_on_failure: 3 # Retry attempts for failed requests
17+
tags_on_failure: ["lookup_failed"] # Tags for failed events
18+
# S3 source configuration
19+
source_path_prefix: "s3://bucket/input/" # Base path for source/input files
20+
result_file_suffix: ".out" # Suffix to identify result files
21+
correlation_field: "recordId" # Field used to match related records
22+
```
23+
`fields` as the fields to be merged into the pipeline.
24+
`merge_all_fields` as the flag to merge all data into the pipeline.
25+
`source_path_prefix` as the Base path for source/input files
26+
`correlation_field` as the field name used to match related records
27+
28+
# Metrics
29+
30+
### Counter
31+
32+
## Developer Guide
33+
34+
The integration tests for this plugin do not run as part of the Data Prepper build.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
dependencies {
7+
implementation project(path: ':data-prepper-plugins:common')
8+
implementation project(':data-prepper-plugins:aws-plugin-api')
9+
implementation 'software.amazon.awssdk:sdk-core'
10+
implementation 'software.amazon.awssdk:sts'
11+
implementation 'io.micrometer:micrometer-core'
12+
implementation 'org.json:json'
13+
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
14+
implementation 'org.projectlombok:lombok:1.18.22'
15+
implementation libs.parquet.common
16+
implementation 'dev.failsafe:failsafe:3.3.2'
17+
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
18+
implementation libs.caffeine
19+
annotationProcessor 'org.projectlombok:lombok:1.18.20'
20+
implementation 'software.amazon.awssdk:s3'
21+
testImplementation project(':data-prepper-test:test-event')
22+
testImplementation testLibs.slf4j.simple
23+
}
24+
25+
test {
26+
useJUnitPlatform()
27+
}
28+
29+
sourceSets {
30+
integrationTest {
31+
java {
32+
compileClasspath += main.output + test.output
33+
runtimeClasspath += main.output + test.output
34+
srcDir file('src/integrationTest/java')
35+
}
36+
resources.srcDir file('src/integrationTest/resources')
37+
}
38+
}
39+
40+
configurations {
41+
integrationTestImplementation.extendsFrom testImplementation
42+
integrationTestRuntime.extendsFrom testRuntime
43+
}
44+
45+
task integrationTest(type: Test) {
46+
group = 'verification'
47+
testClassesDirs = sourceSets.integrationTest.output.classesDirs
48+
49+
useJUnitPlatform()
50+
51+
classpath = sourceSets.integrationTest.runtimeClasspath
52+
53+
systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'
54+
55+
filter {
56+
includeTestsMatching '*IT'
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.s3_enricher.processor;
7+
8+
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
9+
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
10+
import org.opensearch.dataprepper.plugins.s3_enricher.processor.configuration.AwsAuthenticationOptions;
11+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
12+
13+
public class AwsAuthenticationAdapter {
14+
private final AwsCredentialsSupplier awsCredentialsSupplier;
15+
private final S3EnricherProcessorConfig s3EnricherProcessorConfig;
16+
17+
18+
AwsAuthenticationAdapter(
19+
final AwsCredentialsSupplier awsCredentialsSupplier,
20+
final S3EnricherProcessorConfig s3EnricherProcessorConfig) {
21+
this.awsCredentialsSupplier = awsCredentialsSupplier;
22+
this.s3EnricherProcessorConfig = s3EnricherProcessorConfig;
23+
}
24+
25+
AwsCredentialsProvider getCredentialsProvider() {
26+
final AwsAuthenticationOptions awsAuthenticationOptions = s3EnricherProcessorConfig.getAwsAuthenticationOptions();
27+
28+
final AwsCredentialsOptions options = AwsCredentialsOptions.builder()
29+
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
30+
.withRegion(awsAuthenticationOptions.getAwsRegion())
31+
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
32+
.withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId())
33+
.build();
34+
35+
return awsCredentialsSupplier.getProvider(options);
36+
}
37+
}

0 commit comments

Comments
 (0)