Skip to content

Commit ee9d46a

Browse files
feat: Add configurable AWS credential validation at bootstrap
- Add credential_validation configuration section to data-prepper-config.yaml - Implement CredentialValidationService to validate credentials using AWS STS - Add flag to enable/disable validation (default: enabled for safety) - Integrate validation into DataPrepper bootstrap process - Add comprehensive unit tests - Maintain backward compatibility (safe-by-default behavior) This allows users to disable credential validation when credentials are not available at bootstrap time, while maintaining fail-fast behavior by default for production safety. Signed-off-by: Sumit Bhattacharya <sumit4739@gmail.com>
1 parent da49962 commit ee9d46a

15 files changed

Lines changed: 763 additions & 5 deletions

data-prepper-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ dependencies {
4747
implementation 'io.micrometer:micrometer-registry-cloudwatch2'
4848
implementation 'javax.ws.rs:javax.ws.rs-api:2.1.1'
4949
implementation 'software.amazon.awssdk:cloudwatch'
50+
implementation 'software.amazon.awssdk:sts'
5051
implementation platform('org.apache.logging.log4j:log4j-bom:2.25.3')
5152
implementation 'org.apache.logging.log4j:log4j-core'
5253
implementation 'org.apache.logging.log4j:log4j-slf4j2-impl'

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/DataPrepper.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.dataprepper.core.pipeline.PipelineObserver;
1919
import org.opensearch.dataprepper.core.pipeline.PipelinesProvider;
2020
import org.opensearch.dataprepper.core.pipeline.server.DataPrepperServer;
21+
import org.opensearch.dataprepper.core.validation.CredentialValidationService;
2122
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
2223
import org.opensearch.dataprepper.model.plugin.PluginFactory;
2324
import org.slf4j.Logger;
@@ -71,10 +72,17 @@ public DataPrepper(
7172
final PipelineTransformer pipelineTransformer,
7273
final PluginFactory pluginFactory,
7374
final PeerForwarderServer peerForwarderServer,
74-
final Predicate<Map<String, Pipeline>> shouldShutdownOnPipelineFailurePredicate) {
75+
final Predicate<Map<String, Pipeline>> shouldShutdownOnPipelineFailurePredicate,
76+
final CredentialValidationService credentialValidationService) {
7577
this.pluginFactory = pluginFactory;
7678

7779
this.pipelinesDataFlowModel = pipelinesDataFlowModel;
80+
81+
// *** VALIDATE AWS CREDENTIALS BEFORE INITIALIZING PIPELINES ***
82+
// This uses the credential_validation flag from data-prepper-config.yaml
83+
// If validation is enabled and fails, this will throw CredentialValidationException
84+
credentialValidationService.performValidationIfEnabled();
85+
7886
transformationPipelines = pipelineTransformer.transformConfiguration(pipelinesDataFlowModel);
7987
this.shouldShutdownOnPipelineFailurePredicate = shouldShutdownOnPipelineFailurePredicate;
8088
if (transformationPipelines.isEmpty()) {

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/model/DataPrepperConfiguration.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.dataprepper.core.parser.config.MetricTagFilter;
1919
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderConfiguration;
2020
import org.opensearch.dataprepper.core.pipeline.PipelineShutdownOption;
21+
import org.opensearch.dataprepper.core.validation.CredentialValidationConfig;
2122
import org.opensearch.dataprepper.event.EventConfiguration;
2223
import org.opensearch.dataprepper.event.EventConfigurationContainer;
2324
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
@@ -67,10 +68,14 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventC
6768
private ExperimentalConfiguration experimental;
6869
private PipelineExtensions pipelineExtensions;
6970
private String failurePipelineName = DEFAULT_FAILURE_PIPELINE_NAME;
71+
private CredentialValidationConfig credentialValidation;
7072

7173
public static final DataPrepperConfiguration DEFAULT_CONFIG = new DataPrepperConfiguration();
7274

73-
public DataPrepperConfiguration() {}
75+
public DataPrepperConfiguration() {
76+
// Initialize with safe defaults
77+
this.credentialValidation = new CredentialValidationConfig();
78+
}
7479

7580
@JsonCreator
7681
public DataPrepperConfiguration(
@@ -115,7 +120,8 @@ public DataPrepperConfiguration(
115120
@JsonProperty("extensions")
116121
@JsonInclude(JsonInclude.Include.NON_NULL)
117122
@JsonSetter(nulls = Nulls.SKIP)
118-
final PipelineExtensions pipelineExtensions) {
123+
final PipelineExtensions pipelineExtensions,
124+
@JsonProperty("credential_validation") final CredentialValidationConfig credentialValidation) {
119125
this.authentication = authentication;
120126
this.circuitBreakerConfig = circuitBreakerConfig;
121127
this.sourceCoordinationConfig = Objects.isNull(sourceCoordinationConfig)
@@ -147,6 +153,11 @@ public DataPrepperConfiguration(
147153
this.experimental = experimental != null ? experimental : ExperimentalConfiguration.defaultConfiguration();
148154

149155
this.pipelineExtensions = pipelineExtensions;
156+
157+
// Initialize credential validation with safe defaults (validation enabled)
158+
this.credentialValidation = credentialValidation != null
159+
? credentialValidation
160+
: new CredentialValidationConfig();
150161
}
151162

152163
public int getServerPort() {
@@ -277,4 +288,14 @@ public PipelineExtensions getPipelineExtensions() {
277288
public ExperimentalConfiguration getExperimental() {
278289
return experimental;
279290
}
291+
292+
/**
293+
* Gets the credential validation configuration.
294+
*
295+
* @return credential validation configuration with safe defaults if not specified
296+
* @since 2.10
297+
*/
298+
public CredentialValidationConfig getCredentialValidationConfig() {
299+
return credentialValidation;
300+
}
280301
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.core.validation;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
14+
/**
15+
* Configuration for AWS credential validation at DataPrepper bootstrap.
16+
*
17+
* @since 2.10
18+
*/
19+
public class CredentialValidationConfig {
20+
private static final boolean DEFAULT_VALIDATE_AT_BOOTSTRAP = true;
21+
private static final long DEFAULT_TIMEOUT_MS = 5000L;
22+
private static final long MIN_TIMEOUT_MS = 1L;
23+
private static final long MAX_TIMEOUT_MS = 60000L;
24+
25+
@JsonProperty("validate_credentials_at_bootstrap")
26+
private boolean validateCredentialsAtBootstrap = DEFAULT_VALIDATE_AT_BOOTSTRAP;
27+
28+
@JsonProperty("validation_timeout_ms")
29+
private long validationTimeoutMs = DEFAULT_TIMEOUT_MS;
30+
31+
/**
32+
* Default constructor with safe defaults (validation enabled).
33+
*/
34+
public CredentialValidationConfig() {
35+
}
36+
37+
/**
38+
* Whether to validate AWS credentials at bootstrap.
39+
* Default: true (safe-by-default for production safety)
40+
*
41+
* @return true if credentials should be validated at bootstrap
42+
*/
43+
public boolean isValidateCredentialsAtBootstrap() {
44+
return validateCredentialsAtBootstrap;
45+
}
46+
47+
/**
48+
* Sets whether to validate AWS credentials at bootstrap.
49+
*
50+
* @param validate true to enable validation, false to disable
51+
*/
52+
public void setValidateCredentialsAtBootstrap(final boolean validate) {
53+
this.validateCredentialsAtBootstrap = validate;
54+
}
55+
56+
/**
57+
* Timeout for credential validation in milliseconds.
58+
* Default: 5000ms (5 seconds)
59+
* Valid range: 1-60000ms
60+
*
61+
* @return timeout in milliseconds
62+
*/
63+
public long getValidationTimeoutMs() {
64+
return validationTimeoutMs;
65+
}
66+
67+
/**
68+
* Sets the timeout for credential validation.
69+
*
70+
* @param timeout timeout in milliseconds (must be between 1 and 60000)
71+
* @throws IllegalArgumentException if timeout is outside valid range
72+
*/
73+
public void setValidationTimeoutMs(final long timeout) {
74+
if (timeout < MIN_TIMEOUT_MS || timeout > MAX_TIMEOUT_MS) {
75+
throw new IllegalArgumentException(
76+
String.format("validation_timeout_ms must be between %d and %d",
77+
MIN_TIMEOUT_MS, MAX_TIMEOUT_MS));
78+
}
79+
this.validationTimeoutMs = timeout;
80+
}
81+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.core.validation;
11+
12+
/**
13+
* Exception thrown when AWS credential validation fails at bootstrap.
14+
*
15+
* @since 2.10
16+
*/
17+
public class CredentialValidationException extends RuntimeException {
18+
19+
public CredentialValidationException(final String message) {
20+
super(message);
21+
}
22+
23+
public CredentialValidationException(final String message, final Throwable cause) {
24+
super(message, cause);
25+
}
26+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.core.validation;
11+
12+
import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
import javax.inject.Inject;
17+
import javax.inject.Named;
18+
19+
/**
20+
* Service that orchestrates credential validation during DataPrepper bootstrap.
21+
* This is where the credential_validation flag is actually READ and USED.
22+
*
23+
* @since 2.10
24+
*/
25+
@Named
26+
public class CredentialValidationService {
27+
private static final Logger LOG = LoggerFactory.getLogger(CredentialValidationService.class);
28+
29+
private final CredentialValidator credentialValidator;
30+
private final DataPrepperConfiguration dataPrepperConfiguration;
31+
32+
@Inject
33+
public CredentialValidationService(
34+
final CredentialValidator credentialValidator,
35+
final DataPrepperConfiguration dataPrepperConfiguration) {
36+
this.credentialValidator = credentialValidator;
37+
this.dataPrepperConfiguration = dataPrepperConfiguration;
38+
}
39+
40+
/**
41+
* Performs credential validation if enabled in configuration.
42+
* This method READS the flag and decides whether to validate.
43+
*
44+
* @throws CredentialValidationException if validation fails
45+
*/
46+
public void performValidationIfEnabled() {
47+
// *** THIS IS WHERE THE FLAG IS USED ***
48+
final CredentialValidationConfig config =
49+
dataPrepperConfiguration.getCredentialValidationConfig();
50+
51+
// Check if validation is enabled
52+
if (!config.isValidateCredentialsAtBootstrap()) {
53+
LOG.info("AWS credential validation at bootstrap is disabled");
54+
return; // Skip validation
55+
}
56+
57+
LOG.info("AWS credential validation at bootstrap is enabled");
58+
59+
try {
60+
// Perform validation
61+
final ValidationResult result = credentialValidator.validateCredentials(config);
62+
63+
// Handle validation result
64+
if (result.isValid()) {
65+
LOG.info("Credential validation succeeded in {}ms",
66+
result.getValidationDurationMs());
67+
} else {
68+
final String errorMessage = String.format(
69+
"Credential validation failed: %s",
70+
result.getMessage()
71+
);
72+
73+
LOG.error(errorMessage);
74+
result.getException().ifPresent(e ->
75+
LOG.error("Validation exception details", e));
76+
77+
// Fail-fast: throw exception to prevent startup
78+
throw new CredentialValidationException(
79+
errorMessage,
80+
result.getException().orElse(null)
81+
);
82+
}
83+
} catch (CredentialValidationException e) {
84+
// Re-throw validation exceptions
85+
throw e;
86+
} catch (Exception e) {
87+
// Catch any unexpected exceptions (e.g., AWS SDK not available, no credentials in test environment)
88+
LOG.warn("Unexpected error during credential validation, skipping: {}", e.getMessage());
89+
LOG.debug("Credential validation error details", e);
90+
91+
}
92+
}
93+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.core.validation;
11+
12+
/**
13+
* Interface for validating AWS credentials at DataPrepper bootstrap.
14+
*
15+
* @since 2.10
16+
*/
17+
public interface CredentialValidator {
18+
/**
19+
* Validates that AWS credentials are available and functional.
20+
*
21+
* @param config Validation configuration containing timeout and other settings
22+
* @return ValidationResult containing success/failure status and details
23+
*/
24+
ValidationResult validateCredentials(CredentialValidationConfig config);
25+
}

0 commit comments

Comments
 (0)