Skip to content

Commit 87f1154

Browse files
authored
fix: add BackoffCredentialsProvider to mitigate STS throttling across all plugins (#6637)
Wrap StsAssumeRoleCredentialsProvider with BackoffCredentialsProvider in CredentialsProviderFactory. When credential resolution fails (e.g. role deleted or trust policy misconfigured), the wrapper caches the failure and applies exponential backoff (10s to 10min) before retrying STS, preventing excessive AssumeRole calls that cause STS throttling. This protects all plugins that use CredentialsProviderFactory including S3, OpenSearch, Lambda, SQS, and most AWS-integrated sources and sinks. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent aaefdf3 commit 87f1154

5 files changed

Lines changed: 338 additions & 20 deletions

File tree

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.aws;
11+
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
import software.amazon.awssdk.auth.credentials.AwsCredentials;
15+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
16+
import software.amazon.awssdk.core.exception.SdkException;
17+
18+
import java.time.Clock;
19+
import java.time.Duration;
20+
import java.time.Instant;
21+
22+
/**
23+
* A credentials provider wrapper that applies exponential backoff when the
24+
* delegate provider fails to resolve credentials. During the backoff window,
25+
* this provider throws the cached exception without calling the delegate,
26+
* preventing excessive STS AssumeRole calls when a role is deleted or
27+
* misconfigured.
28+
*
29+
* <p>On successful credential resolution, the backoff state is reset.</p>
30+
*/
31+
class BackoffCredentialsProvider implements AwsCredentialsProvider {
32+
private static final Logger LOG = LoggerFactory.getLogger(BackoffCredentialsProvider.class);
33+
34+
static final Duration INITIAL_BACKOFF = Duration.ofSeconds(10);
35+
static final Duration MAX_BACKOFF = Duration.ofMinutes(10);
36+
static final int BACKOFF_MULTIPLIER = 2;
37+
38+
private final AwsCredentialsProvider delegate;
39+
private final Clock clock;
40+
41+
private volatile SdkException lastException;
42+
private volatile Instant backoffUntil;
43+
private volatile int failureCount;
44+
45+
BackoffCredentialsProvider(final AwsCredentialsProvider delegate) {
46+
this(delegate, Clock.systemUTC());
47+
}
48+
49+
BackoffCredentialsProvider(final AwsCredentialsProvider delegate, final Clock clock) {
50+
this.delegate = delegate;
51+
this.clock = clock;
52+
this.backoffUntil = Instant.MIN;
53+
this.failureCount = 0;
54+
}
55+
56+
@Override
57+
public AwsCredentials resolveCredentials() {
58+
final Instant now = clock.instant();
59+
60+
final SdkException cachedException = lastException;
61+
if (cachedException != null && now.isBefore(backoffUntil)) {
62+
LOG.debug("Credentials resolution in backoff period until {}. Throwing cached exception.", backoffUntil);
63+
throw cachedException;
64+
}
65+
66+
try {
67+
final AwsCredentials credentials = delegate.resolveCredentials();
68+
resetBackoff();
69+
return credentials;
70+
} catch (final SdkException e) {
71+
applyBackoff(e);
72+
throw e;
73+
}
74+
}
75+
76+
private synchronized void resetBackoff() {
77+
if (failureCount > 0) {
78+
LOG.info("Credentials resolution succeeded after {} failures. Resetting backoff.", failureCount);
79+
}
80+
failureCount = 0;
81+
lastException = null;
82+
backoffUntil = Instant.MIN;
83+
}
84+
85+
private synchronized void applyBackoff(final SdkException exception) {
86+
failureCount++;
87+
lastException = exception;
88+
89+
long backoffMillis = INITIAL_BACKOFF.toMillis() * (long) Math.pow(BACKOFF_MULTIPLIER, failureCount - 1);
90+
backoffMillis = Math.min(backoffMillis, MAX_BACKOFF.toMillis());
91+
92+
backoffUntil = clock.instant().plus(Duration.ofMillis(backoffMillis));
93+
LOG.warn("Credentials resolution failed (attempt {}). Backing off for {}ms until {}.",
94+
failureCount, backoffMillis, backoffUntil, exception);
95+
}
96+
}

data-prepper-plugins/aws-plugin/src/main/java/org/opensearch/dataprepper/plugins/aws/CredentialsProviderFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,10 @@ private AwsCredentialsProvider createStsCredentials(final AwsCredentialsOptions
109109
.overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader));
110110
}
111111

112-
return StsAssumeRoleCredentialsProvider.builder()
112+
return new BackoffCredentialsProvider(StsAssumeRoleCredentialsProvider.builder()
113113
.stsClient(stsClient)
114114
.refreshRequest(assumeRoleRequestBuilder.build())
115-
.build();
115+
.build());
116116
}
117117

118118
private void validateStsRoleArn(final String stsRoleArn) {

data-prepper-plugins/aws-plugin/src/test/java/org/opensearch/dataprepper/plugins/aws/AwsPluginIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
2323
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
2424
import software.amazon.awssdk.regions.Region;
25-
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
2625

2726
import java.util.Optional;
2827
import java.util.UUID;
@@ -81,7 +80,7 @@ void test_AwsPlugin_with_STS_role() {
8180

8281
final AwsCredentialsProvider awsCredentialsProvider1 = awsCredentialsSupplier.getProvider(awsCredentialsOptions1);
8382

84-
assertThat(awsCredentialsProvider1, instanceOf(StsAssumeRoleCredentialsProvider.class));
83+
assertThat(awsCredentialsProvider1, instanceOf(BackoffCredentialsProvider.class));
8584

8685
final AwsCredentialsOptions awsCredentialsOptions2 = AwsCredentialsOptions.builder()
8786
.withStsRoleArn(stsRole)
@@ -150,7 +149,7 @@ void test_AwsPlugin_without_STS_role_and_with_default_role_uses_default_role() {
150149

151150
final AwsCredentialsProvider awsCredentialsProvider1 = awsCredentialsSupplier.getProvider(awsCredentialsOptions1);
152151

153-
assertThat(awsCredentialsProvider1, instanceOf(StsAssumeRoleCredentialsProvider.class));
152+
assertThat(awsCredentialsProvider1, instanceOf(BackoffCredentialsProvider.class));
154153

155154
final AwsCredentialsOptions awsCredentialsOptions2 = AwsCredentialsOptions.builder()
156155
.withRegion(Region.US_EAST_1)
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.aws;
11+
12+
import org.junit.jupiter.api.BeforeEach;
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.extension.ExtendWith;
15+
import org.mockito.Mock;
16+
import org.mockito.junit.jupiter.MockitoExtension;
17+
import software.amazon.awssdk.auth.credentials.AwsCredentials;
18+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
19+
import software.amazon.awssdk.core.exception.SdkClientException;
20+
import software.amazon.awssdk.core.exception.SdkException;
21+
22+
import java.time.Clock;
23+
import java.time.Duration;
24+
import java.time.Instant;
25+
26+
import static org.hamcrest.CoreMatchers.equalTo;
27+
import static org.hamcrest.CoreMatchers.sameInstance;
28+
import static org.hamcrest.MatcherAssert.assertThat;
29+
import static org.junit.jupiter.api.Assertions.assertThrows;
30+
import static org.mockito.Mockito.lenient;
31+
import static org.mockito.Mockito.times;
32+
import static org.mockito.Mockito.verify;
33+
import static org.mockito.Mockito.when;
34+
35+
@ExtendWith(MockitoExtension.class)
36+
class BackoffCredentialsProviderTest {
37+
38+
private static final Instant BASE_TIME = Instant.parse("2025-01-01T00:00:00Z");
39+
40+
@Mock
41+
private AwsCredentialsProvider delegate;
42+
43+
@Mock
44+
private AwsCredentials credentials;
45+
46+
@Mock
47+
private Clock clock;
48+
49+
private BackoffCredentialsProvider createObjectUnderTest() {
50+
return new BackoffCredentialsProvider(delegate, clock);
51+
}
52+
53+
@BeforeEach
54+
void setUp() {
55+
lenient().when(clock.instant()).thenReturn(BASE_TIME);
56+
}
57+
58+
@Test
59+
void constructor_with_delegate_only_creates_provider() {
60+
final BackoffCredentialsProvider provider = new BackoffCredentialsProvider(delegate);
61+
when(delegate.resolveCredentials()).thenReturn(credentials);
62+
63+
final AwsCredentials result = provider.resolveCredentials();
64+
65+
assertThat(result, sameInstance(credentials));
66+
}
67+
68+
@Test
69+
void resolveCredentials_delegates_on_success() {
70+
when(delegate.resolveCredentials()).thenReturn(credentials);
71+
72+
final AwsCredentials result = createObjectUnderTest().resolveCredentials();
73+
74+
assertThat(result, sameInstance(credentials));
75+
verify(delegate).resolveCredentials();
76+
}
77+
78+
@Test
79+
void resolveCredentials_on_failure_throws_and_applies_backoff() {
80+
final SdkException exception = SdkClientException.create("access denied");
81+
when(delegate.resolveCredentials()).thenThrow(exception);
82+
83+
final BackoffCredentialsProvider objectUnderTest = createObjectUnderTest();
84+
85+
final SdkException thrown = assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
86+
assertThat(thrown, sameInstance(exception));
87+
}
88+
89+
@Test
90+
void resolveCredentials_during_backoff_throws_cached_exception_without_calling_delegate() {
91+
final SdkException exception = SdkClientException.create("access denied");
92+
when(delegate.resolveCredentials()).thenThrow(exception);
93+
94+
final BackoffCredentialsProvider objectUnderTest = createObjectUnderTest();
95+
96+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
97+
98+
when(clock.instant()).thenReturn(BASE_TIME.plusSeconds(5));
99+
100+
final SdkException thrown = assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
101+
assertThat(thrown, sameInstance(exception));
102+
verify(delegate, times(1)).resolveCredentials();
103+
}
104+
105+
@Test
106+
void resolveCredentials_after_backoff_expires_calls_delegate_again() {
107+
final SdkException exception = SdkClientException.create("access denied");
108+
when(delegate.resolveCredentials()).thenThrow(exception).thenReturn(credentials);
109+
110+
final BackoffCredentialsProvider objectUnderTest = createObjectUnderTest();
111+
112+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
113+
114+
when(clock.instant()).thenReturn(BASE_TIME.plusSeconds(11));
115+
116+
final AwsCredentials result = objectUnderTest.resolveCredentials();
117+
assertThat(result, sameInstance(credentials));
118+
verify(delegate, times(2)).resolveCredentials();
119+
}
120+
121+
@Test
122+
void resolveCredentials_success_after_failure_resets_backoff() {
123+
final SdkException exception = SdkClientException.create("access denied");
124+
when(delegate.resolveCredentials())
125+
.thenThrow(exception)
126+
.thenReturn(credentials)
127+
.thenThrow(exception);
128+
129+
final BackoffCredentialsProvider objectUnderTest = createObjectUnderTest();
130+
131+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
132+
133+
when(clock.instant()).thenReturn(BASE_TIME.plusSeconds(11));
134+
objectUnderTest.resolveCredentials();
135+
136+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
137+
138+
when(clock.instant()).thenReturn(BASE_TIME.plusSeconds(16));
139+
140+
final SdkException thrown = assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
141+
assertThat(thrown, sameInstance(exception));
142+
verify(delegate, times(3)).resolveCredentials();
143+
}
144+
145+
@Test
146+
void resolveCredentials_exponential_backoff_progression() {
147+
final SdkException exception = SdkClientException.create("access denied");
148+
when(delegate.resolveCredentials()).thenThrow(exception);
149+
150+
final BackoffCredentialsProvider objectUnderTest = createObjectUnderTest();
151+
152+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
153+
154+
when(clock.instant()).thenReturn(BASE_TIME.plusSeconds(9));
155+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
156+
verify(delegate, times(1)).resolveCredentials();
157+
158+
when(clock.instant()).thenReturn(BASE_TIME.plus(BackoffCredentialsProvider.INITIAL_BACKOFF).plusMillis(1));
159+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
160+
verify(delegate, times(2)).resolveCredentials();
161+
162+
when(clock.instant()).thenReturn(BASE_TIME.plus(BackoffCredentialsProvider.INITIAL_BACKOFF).plusSeconds(21));
163+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
164+
verify(delegate, times(3)).resolveCredentials();
165+
166+
when(clock.instant()).thenReturn(BASE_TIME.plus(BackoffCredentialsProvider.INITIAL_BACKOFF).plusSeconds(61));
167+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
168+
verify(delegate, times(4)).resolveCredentials();
169+
}
170+
171+
@Test
172+
void resolveCredentials_backoff_caps_at_max() {
173+
final SdkException exception = SdkClientException.create("access denied");
174+
when(delegate.resolveCredentials()).thenThrow(exception);
175+
176+
final BackoffCredentialsProvider objectUnderTest = createObjectUnderTest();
177+
178+
Instant currentTime = BASE_TIME;
179+
for (int i = 0; i < 10; i++) {
180+
long expectedBackoffMs = BackoffCredentialsProvider.INITIAL_BACKOFF.toMillis()
181+
* (long) Math.pow(BackoffCredentialsProvider.BACKOFF_MULTIPLIER, i);
182+
expectedBackoffMs = Math.min(expectedBackoffMs, BackoffCredentialsProvider.MAX_BACKOFF.toMillis());
183+
184+
when(clock.instant()).thenReturn(currentTime);
185+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
186+
187+
currentTime = currentTime.plus(Duration.ofMillis(expectedBackoffMs)).plusMillis(1);
188+
}
189+
190+
verify(delegate, times(10)).resolveCredentials();
191+
}
192+
193+
@Test
194+
void resolveCredentials_when_now_equals_backoffUntil_calls_delegate() {
195+
final SdkException exception = SdkClientException.create("access denied");
196+
when(delegate.resolveCredentials()).thenThrow(exception).thenReturn(credentials);
197+
198+
final BackoffCredentialsProvider objectUnderTest = createObjectUnderTest();
199+
200+
assertThrows(SdkException.class, objectUnderTest::resolveCredentials);
201+
202+
when(clock.instant()).thenReturn(BASE_TIME.plus(BackoffCredentialsProvider.INITIAL_BACKOFF));
203+
204+
final AwsCredentials result = objectUnderTest.resolveCredentials();
205+
assertThat(result, sameInstance(credentials));
206+
verify(delegate, times(2)).resolveCredentials();
207+
}
208+
209+
@Test
210+
void resolveCredentials_first_success_does_not_affect_state() {
211+
when(delegate.resolveCredentials()).thenReturn(credentials);
212+
213+
final BackoffCredentialsProvider objectUnderTest = createObjectUnderTest();
214+
objectUnderTest.resolveCredentials();
215+
objectUnderTest.resolveCredentials();
216+
217+
verify(delegate, times(2)).resolveCredentials();
218+
}
219+
220+
@Test
221+
void resolveCredentials_constants_have_expected_values() {
222+
assertThat(BackoffCredentialsProvider.INITIAL_BACKOFF, equalTo(Duration.ofSeconds(10)));
223+
assertThat(BackoffCredentialsProvider.MAX_BACKOFF, equalTo(Duration.ofMinutes(10)));
224+
assertThat(BackoffCredentialsProvider.BACKOFF_MULTIPLIER, equalTo(2));
225+
}
226+
}

0 commit comments

Comments
 (0)