Skip to content

Commit a781b13

Browse files
committed
rename from S3Enricher to S3Enrich
Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent 8a327dc commit a781b13

28 files changed

Lines changed: 374 additions & 1435 deletions

File tree

File renamed without changes.

data-prepper-plugins/s3-enricher-processor/build.gradle renamed to data-prepper-plugins/s3-enrich-processor/build.gradle

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
dependencies {
77
implementation project(path: ':data-prepper-plugins:common')
8+
implementation project(':data-prepper-plugins:s3-common')
89
implementation project(':data-prepper-plugins:aws-plugin-api')
910
implementation 'software.amazon.awssdk:sdk-core'
1011
implementation 'software.amazon.awssdk:sts'
@@ -22,10 +23,6 @@ dependencies {
2223
testImplementation testLibs.slf4j.simple
2324
}
2425

25-
test {
26-
useJUnitPlatform()
27-
}
28-
2926
sourceSets {
3027
integrationTest {
3128
java {

data-prepper-plugins/s3-enricher-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enricher/processor/AwsAuthenticationAdapter.java renamed to data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/AwsAuthenticationAdapter.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,34 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

6-
package org.opensearch.dataprepper.plugins.s3_enricher.processor;
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor;
712

813
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
914
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
10-
import org.opensearch.dataprepper.plugins.s3_enricher.processor.configuration.AwsAuthenticationOptions;
15+
import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions;
1116
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
1217

1318
public class AwsAuthenticationAdapter {
1419
private final AwsCredentialsSupplier awsCredentialsSupplier;
15-
private final S3EnricherProcessorConfig s3EnricherProcessorConfig;
20+
private final S3EnrichProcessorConfig s3EnrichProcessorConfig;
1621

1722

1823
AwsAuthenticationAdapter(
1924
final AwsCredentialsSupplier awsCredentialsSupplier,
20-
final S3EnricherProcessorConfig s3EnricherProcessorConfig) {
25+
final S3EnrichProcessorConfig s3EnricherProcessorConfig) {
2126
this.awsCredentialsSupplier = awsCredentialsSupplier;
22-
this.s3EnricherProcessorConfig = s3EnricherProcessorConfig;
27+
this.s3EnrichProcessorConfig = s3EnricherProcessorConfig;
2328
}
2429

2530
AwsCredentialsProvider getCredentialsProvider() {
26-
final AwsAuthenticationOptions awsAuthenticationOptions = s3EnricherProcessorConfig.getAwsAuthenticationOptions();
31+
final AwsAuthenticationOptions awsAuthenticationOptions = s3EnrichProcessorConfig.getAwsAuthenticationOptions();
2732

2833
final AwsCredentialsOptions options = AwsCredentialsOptions.builder()
2934
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())

data-prepper-plugins/s3-enricher-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enricher/processor/S3EnricherProcessor.java renamed to data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessor.java

Lines changed: 47 additions & 160 deletions
Large diffs are not rendered by default.

data-prepper-plugins/s3-enricher-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enricher/processor/S3EnricherProcessorConfig.java renamed to data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessorConfig.java

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

6-
package org.opensearch.dataprepper.plugins.s3_enricher.processor;
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor;
712

813
import com.fasterxml.jackson.annotation.JsonClassDescription;
914
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -13,30 +18,37 @@
1318
import jakarta.validation.constraints.Max;
1419
import jakarta.validation.constraints.Min;
1520
import jakarta.validation.constraints.NotNull;
21+
import jakarta.validation.constraints.Size;
1622
import lombok.Getter;
1723
import org.opensearch.dataprepper.aws.validator.AwsAccountId;
1824
import org.opensearch.dataprepper.model.annotations.ExampleValues;
1925
import org.opensearch.dataprepper.model.configuration.PluginModel;
26+
import org.opensearch.dataprepper.model.constraints.ByteCountMax;
27+
import org.opensearch.dataprepper.model.constraints.ByteCountMin;
2028
import org.opensearch.dataprepper.model.event.EventKey;
29+
import org.opensearch.dataprepper.model.types.ByteCount;
2130
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
22-
import org.opensearch.dataprepper.plugins.s3_enricher.processor.configuration.AwsAuthenticationOptions;
23-
import org.opensearch.dataprepper.plugins.s3_enricher.processor.configuration.S3EnricherBucketOption;
31+
import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions;
32+
import org.opensearch.dataprepper.plugins.s3_enrich.processor.configuration.S3EnrichBucketOption;
33+
import org.opensearch.dataprepper.plugins.s3_enrich.processor.configuration.S3EnrichKeyPathOption;
2434

35+
import java.time.Duration;
2536
import java.util.Collections;
2637
import java.util.List;
2738
import java.util.Map;
39+
import java.util.Optional;
2840

2941
@Getter
3042
@JsonPropertyOrder
3143
@JsonClassDescription("The <code>s3_enricher</code> processor enriches your data from a S3 source")
32-
public class S3EnricherProcessorConfig {
33-
private static final int DEFAULT_ENRICHER_SIZE_LIMIT = 100;
44+
public class S3EnrichProcessorConfig {
45+
private static final String DEFAULT_ENRICHER_SIZE_LIMIT = "100mb";
3446
private static final int DEFAULT_CACHE_SIZE_LIMIT = 100000;
35-
private static final int DEFAULT_CACHE_EXPIRE_AFTER_ACCESS_LIMIT = 10;
47+
private static final Duration DEFAULT_CACHE_TTL = Duration.ofMinutes(10);
3648

3749
@JsonProperty("bucket")
3850
@Valid
39-
private S3EnricherBucketOption s3EnricherBucketOption;
51+
private S3EnrichBucketOption s3EnrichBucketOption;
4052

4153
@JsonProperty("aws")
4254
@NotNull
@@ -60,32 +72,32 @@ public class S3EnricherProcessorConfig {
6072
@JsonProperty("compression")
6173
private CompressionOption compression = CompressionOption.NONE;
6274

63-
@JsonProperty(value = "s3_object_size_limit_mb", defaultValue="100")
64-
@Min(0)
65-
@Max(300)
66-
private int enricherSizeLimit = DEFAULT_ENRICHER_SIZE_LIMIT;
75+
@JsonProperty(value = "s3_object_size_limit", defaultValue = DEFAULT_ENRICHER_SIZE_LIMIT)
76+
@ByteCountMin(DEFAULT_ENRICHER_SIZE_LIMIT)
77+
@ByteCountMax("300mb")
78+
private ByteCount enricherSizeLimit = ByteCount.parse(DEFAULT_ENRICHER_SIZE_LIMIT);
6779

6880
@JsonProperty(value = "cache_max_size", defaultValue="100000")
6981
@Min(0)
7082
@Max(300000)
7183
private int cacheSizeLimit = DEFAULT_CACHE_SIZE_LIMIT;
7284

73-
@JsonProperty(value = "cache_ttl_minutes", defaultValue="10")
74-
@Min(0)
75-
@Max(30)
76-
private int cacheExpirationMinutes = DEFAULT_CACHE_EXPIRE_AFTER_ACCESS_LIMIT;
85+
@JsonProperty(value = "cache_ttl", defaultValue = "PT10M")
86+
@JsonPropertyDescription("The TTL for cache entries. Accepts ISO-8601 duration format (e.g., PT10M for 10 minutes, PT1H for 1 hour).")
87+
private Duration cacheTtl = DEFAULT_CACHE_TTL;
7788

7889
@JsonPropertyDescription("defines the key that defines the s3 enricher object base name")
7990
@JsonProperty("s3_key_path")
80-
private String enricherKeyField;
91+
private String enricherKeyPath;
8192

8293
@JsonPropertyDescription("defines the key ")
8394
@JsonProperty("s3_object_name_pattern")
8495
private String enricherNamePattern;
8596

8697
@JsonPropertyDescription("defines the unique key identifier in the events from the pipeline to match the events from S3 enricher source")
87-
@JsonProperty("correlation_key")
88-
private String correlationKey;
98+
@JsonProperty("correlation_keys")
99+
@Size(min = 1, max = 1)
100+
private List<String> correlationKeys;
89101

90102
@JsonProperty("keys_to_merge")
91103
@JsonPropertyDescription("A list of keys of the fields to be merged.")
@@ -105,4 +117,16 @@ public class S3EnricherProcessorConfig {
105117
"or exception occurs. This tag may be used in conditional expressions in " +
106118
"other parts of the configuration.")
107119
private List<String> tagsOnFailure = Collections.emptyList();
120+
121+
/**
122+
* Safely retrieves the S3 scan include prefix from the configuration chain.
123+
*
124+
* @return Optional containing the prefix if present, empty Optional otherwise
125+
*/
126+
public Optional<String> getS3IncludePrefix() {
127+
return Optional.ofNullable(s3EnrichBucketOption)
128+
.map(S3EnrichBucketOption::getS3SourceFilter)
129+
.map(S3EnrichKeyPathOption::getS3scanIncludePrefixOption)
130+
.filter(prefix -> !prefix.isBlank());
131+
}
108132
}

data-prepper-plugins/s3-enricher-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enricher/processor/cache/CacheFactory.java renamed to data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/CacheFactory.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
1-
package org.opensearch.dataprepper.plugins.s3_enricher.processor.cache;
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor.cache;
212

313
import com.github.benmanes.caffeine.cache.Cache;
414
import com.github.benmanes.caffeine.cache.Caffeine;
515
import lombok.Getter;
616
import org.opensearch.dataprepper.model.event.Event;
7-
import org.opensearch.dataprepper.plugins.s3_enricher.processor.S3EnricherProcessorConfig;
17+
import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig;
818
import org.slf4j.Logger;
919
import org.slf4j.LoggerFactory;
10-
import java.util.concurrent.TimeUnit;
20+
21+
import java.time.Duration;
1122

1223
public class CacheFactory {
1324
private static final Logger LOG = LoggerFactory.getLogger(CacheFactory.class);
1425

15-
private final S3EnricherProcessorConfig config;
26+
private final S3EnrichProcessorConfig config;
1627

1728
/** SINGLETON cache: S3 URL -> (recordId -> Event cache)
1829
* -- GETTER --
@@ -21,42 +32,42 @@ public class CacheFactory {
2132
@Getter
2233
private final Cache<String, Cache<String, Event>> s3Cache;
2334

24-
public CacheFactory(S3EnricherProcessorConfig config) {
35+
public CacheFactory(S3EnrichProcessorConfig config) {
2536
this.config = config;
2637
this.s3Cache = buildS3Cache(); // created ONCE
2738
}
2839

2940
/** Build outer cache once */
3041
private Cache<String, Cache<String, Event>> buildS3Cache() {
31-
int ttlMinutes = config.getCacheExpirationMinutes();
42+
Duration cacheTtl = config.getCacheTtl();
3243
int outerMaxSize = 100;
3344

3445
LOG.info("Initializing SINGLETON S3 URL Cache: maxSize={} ttl={}m",
35-
outerMaxSize, ttlMinutes);
46+
outerMaxSize, cacheTtl.toMinutes());
3647

3748
return Caffeine.newBuilder()
3849
.maximumSize(outerMaxSize)
39-
.expireAfterAccess(ttlMinutes, TimeUnit.MINUTES)
50+
.expireAfterAccess(cacheTtl)
4051
.recordStats()
4152
.removalListener((key, value, cause) ->
42-
LOG.debug("[Outer S3 URL Eviction] key={} cause={}", key, cause))
53+
LOG.trace("[Outer S3 URL Eviction] key={} cause={}", key, cause))
4354
.build();
4455
}
4556

4657
/** Inner cache builder (1 per S3 URL) */
4758
public Cache<String, Event> createEventsCache(String s3Url) {
4859
int maxSize = config.getCacheSizeLimit();
49-
int ttlMinutes = config.getCacheExpirationMinutes();
60+
Duration cacheTtl = config.getCacheTtl();
5061

5162
LOG.info("Creating Inner Events Cache for {}: maxSize={} ttl={}m",
52-
s3Url, maxSize, ttlMinutes);
63+
s3Url, maxSize, cacheTtl.toMinutes());
5364

5465
return Caffeine.newBuilder()
5566
.maximumSize(maxSize)
56-
.expireAfterWrite(ttlMinutes, TimeUnit.MINUTES)
67+
.expireAfterWrite(cacheTtl)
5768
.recordStats()
5869
.removalListener((recordId, event, cause) ->
59-
LOG.debug("[Inner Event Eviction] s3Url={} recordId={} cause={}",
70+
LOG.trace("[Inner Event Eviction] s3Url={} recordId={} cause={}",
6071
s3Url, recordId, cause))
6172
.build();
6273
}

data-prepper-plugins/s3-enricher-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enricher/processor/cache/S3EnricherCacheService.java renamed to data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/cache/S3EnricherCacheService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

6-
package org.opensearch.dataprepper.plugins.s3_enricher.processor.cache;
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor.cache;
712

813
import com.github.benmanes.caffeine.cache.Cache;
914
import lombok.Getter;

data-prepper-plugins/s3-enricher-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enricher/processor/client/S3ClientBuilderFactory.java renamed to data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/client/S3ClientBuilderFactory.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
5-
package org.opensearch.dataprepper.plugins.s3_enricher.processor.client;
610

7-
import org.opensearch.dataprepper.plugins.s3_enricher.processor.S3EnricherProcessorConfig;
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor.client;
12+
13+
import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig;
814
import org.slf4j.Logger;
915
import org.slf4j.LoggerFactory;
1016
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -16,10 +22,10 @@
1622
*/
1723
public class S3ClientBuilderFactory {
1824
private static final Logger LOG = LoggerFactory.getLogger(S3ClientBuilderFactory.class);
19-
private final S3EnricherProcessorConfig s3SourceConfig;
25+
private final S3EnrichProcessorConfig s3SourceConfig;
2026
private final AwsCredentialsProvider credentialsProvider;
2127
private final S3Client s3Client;
22-
public S3ClientBuilderFactory(final S3EnricherProcessorConfig s3SourceConfig, AwsCredentialsProvider credentialsProvider){
28+
public S3ClientBuilderFactory(final S3EnrichProcessorConfig s3SourceConfig, AwsCredentialsProvider credentialsProvider){
2329
this.s3SourceConfig = s3SourceConfig;
2430
this.credentialsProvider = credentialsProvider;
2531
this.s3Client = createS3Client();

data-prepper-plugins/s3-enricher-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enricher/processor/client/S3ClientFactory.java renamed to data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/client/S3ClientFactory.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,30 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

6-
package org.opensearch.dataprepper.plugins.s3_enricher.processor.client;
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor.client;
712

813
import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
914
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
10-
import org.opensearch.dataprepper.plugins.s3_enricher.processor.S3EnricherProcessorConfig;
11-
import org.opensearch.dataprepper.plugins.s3_enricher.processor.configuration.AwsAuthenticationOptions;
15+
import org.opensearch.dataprepper.plugins.s3.common.config.AwsAuthenticationOptions;
16+
import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig;
1217
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
1318
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
1419
import software.amazon.awssdk.regions.Region;
1520
import software.amazon.awssdk.services.s3.S3Client;
1621

1722
public class S3ClientFactory {
1823

19-
public static S3Client createS3Client(final S3EnricherProcessorConfig s3EnricherProcessorConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
24+
public static S3Client createS3Client(final S3EnrichProcessorConfig s3EnrichProcessorConfig, final AwsCredentialsSupplier awsCredentialsSupplier) {
2025
final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(
21-
s3EnricherProcessorConfig.getAwsAuthenticationOptions());
22-
final Region region = s3EnricherProcessorConfig.getAwsAuthenticationOptions().getAwsRegion();
26+
s3EnrichProcessorConfig.getAwsAuthenticationOptions());
27+
final Region region = s3EnrichProcessorConfig.getAwsAuthenticationOptions().getAwsRegion();
2328
final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(
2429
awsCredentialsOptions);
2530

data-prepper-plugins/s3-enricher-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enricher/processor/configuration/S3EnricherBucketOption.java renamed to data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/configuration/S3EnrichBucketOption.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
5-
package org.opensearch.dataprepper.plugins.s3_enricher.processor.configuration;
10+
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor.configuration;
612

713
import com.fasterxml.jackson.annotation.JsonProperty;
814
import jakarta.validation.constraints.NotEmpty;
@@ -13,17 +19,17 @@
1319
* Class consists the bucket related configuration properties.
1420
*/
1521
@Getter
16-
public class S3EnricherBucketOption {
22+
public class S3EnrichBucketOption {
1723

1824
@JsonProperty("name")
1925
@NotEmpty
2026
@Size(min = 3, max = 500, message = "bucket length should be at least 3 characters")
2127
private String name;
2228

2329
@JsonProperty("filter")
24-
private S3EnricherKeyPathOption s3SourceFilter;
30+
private S3EnrichKeyPathOption s3SourceFilter;
2531

26-
public S3EnricherKeyPathOption getS3SourceFilter() {
32+
public S3EnrichKeyPathOption getS3SourceFilter() {
2733
return s3SourceFilter;
2834
}
2935
}

0 commit comments

Comments
 (0)