Skip to content

Commit d3288d4

Browse files
authored
Fix MongoTasksRefresher to force executor restart on MongoSecurityException (#6716)
* Fix MongoTasksRefresher to force executor restart on MongoSecurityException When DocumentDB revokes old credentials after secret rotation, the pipeline enters a permanent auth failure state because basicAuthChanged() returns false (the secret value in Secrets Manager hasn't changed). Add forceRefresh() to MongoTasksRefresher with exponential backoff (30s, 60s, 120s, max 3 attempts) that restarts the executor with the current config. StreamScheduler now walks the exception cause chain for MongoSecurityException and calls forceRefresh() when detected. If all 3 attempts fail, falls back to the normal hourly scheduled credential refresh. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Remove unused imports in mongodb plugin tests Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --------- Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
1 parent d08fcfd commit d3288d4

4 files changed

Lines changed: 195 additions & 4 deletions

File tree

data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresher.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,13 @@ public class MongoTasksRefresher implements PluginConfigObserver<MongoDBSourceCo
4444
private final Counter executorRefreshErrorsCounter;
4545
private final String s3PathPrefix;
4646
private final DocumentDBSourceAggregateMetrics documentDBAggregateMetrics;
47+
private static final int MAX_FORCE_REFRESH_ATTEMPTS = 3;
48+
private static final long BASE_BACKOFF_MS = 30_000;
4749
private MongoDBExportPartitionSupplier currentMongoDBExportPartitionSupplier;
4850
private MongoDBSourceConfig currentMongoDBSourceConfig;
4951
private ExecutorService currentExecutor;
52+
private volatile int forceRefreshAttempts = 0;
53+
private volatile long lastForceRefreshTime = 0;
5054

5155
public MongoTasksRefresher(final Buffer<Record<Event>> buffer,
5256
final EnhancedSourceCoordinator sourceCoordinator,
@@ -81,13 +85,40 @@ public void update(MongoDBSourceConfig pluginConfig) {
8185
currentExecutor.shutdownNow();
8286
refreshJobs(pluginConfig);
8387
currentMongoDBSourceConfig = pluginConfig;
88+
forceRefreshAttempts = 0;
89+
lastForceRefreshTime = 0;
8490
} catch (Exception e) {
8591
executorRefreshErrorsCounter.increment();
8692
LOG.error("Refreshing executor failed.", e);
8793
}
8894
}
8995
}
9096

97+
public void forceRefresh() {
98+
if (forceRefreshAttempts >= MAX_FORCE_REFRESH_ATTEMPTS) {
99+
LOG.warn("Max force refresh attempts ({}) reached. Waiting for next scheduled credential refresh.",
100+
MAX_FORCE_REFRESH_ATTEMPTS);
101+
return;
102+
}
103+
final long now = System.currentTimeMillis();
104+
final long backoff = BASE_BACKOFF_MS * (1L << forceRefreshAttempts);
105+
if (now - lastForceRefreshTime < backoff) {
106+
return;
107+
}
108+
lastForceRefreshTime = now;
109+
forceRefreshAttempts++;
110+
LOG.info("Forcing credential refresh due to authentication failure (attempt {}/{})",
111+
forceRefreshAttempts, MAX_FORCE_REFRESH_ATTEMPTS);
112+
try {
113+
currentExecutor.shutdownNow();
114+
refreshJobs(currentMongoDBSourceConfig);
115+
credentialsChangeCounter.increment();
116+
} catch (final Exception e) {
117+
executorRefreshErrorsCounter.increment();
118+
LOG.error("Forced refresh failed.", e);
119+
}
120+
}
121+
91122
private void refreshJobs(MongoDBSourceConfig pluginConfig) {
92123
final List<Runnable> runnables = new ArrayList<>();
93124
if (pluginConfig.getCollections().stream().anyMatch(CollectionConfig::isExport)) {
@@ -98,7 +129,7 @@ private void refreshJobs(MongoDBSourceConfig pluginConfig) {
98129
}
99130
if (pluginConfig.getCollections().stream().anyMatch(CollectionConfig::isStream)) {
100131
runnables.add(new StreamScheduler(
101-
sourceCoordinator, buffer, acknowledgementSetManager, pluginConfig, s3PathPrefix, pluginMetrics, documentDBAggregateMetrics));
132+
sourceCoordinator, buffer, acknowledgementSetManager, pluginConfig, s3PathPrefix, pluginMetrics, documentDBAggregateMetrics, this));
102133
}
103134
this.currentExecutor = executorServiceFunction.apply(runnables.size());
104135
runnables.forEach(currentExecutor::submit);

data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.opensearch.dataprepper.plugins.mongo.stream;
22

3+
import com.mongodb.MongoSecurityException;
34
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
45
import org.opensearch.dataprepper.metrics.PluginMetrics;
56
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -13,6 +14,7 @@
1314
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
1415
import org.opensearch.dataprepper.plugins.mongo.converter.PartitionKeyRecordConverter;
1516
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition;
17+
import org.opensearch.dataprepper.plugins.mongo.documentdb.MongoTasksRefresher;
1618
import org.opensearch.dataprepper.plugins.mongo.utils.DocumentDBSourceAggregateMetrics;
1719
import org.slf4j.Logger;
1820
import org.slf4j.LoggerFactory;
@@ -46,13 +48,15 @@ public class StreamScheduler implements Runnable {
4648
private final String s3PathPrefix;
4749
private final PluginMetrics pluginMetrics;
4850
private final DocumentDBSourceAggregateMetrics documentDBAggregateMetrics;
51+
private final MongoTasksRefresher mongoTasksRefresher;
4952
public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator,
5053
final Buffer<Record<Event>> buffer,
5154
final AcknowledgementSetManager acknowledgementSetManager,
5255
final MongoDBSourceConfig sourceConfig,
5356
final String s3PathPrefix,
5457
final PluginMetrics pluginMetrics,
55-
final DocumentDBSourceAggregateMetrics documentDBAggregateMetrics) {
58+
final DocumentDBSourceAggregateMetrics documentDBAggregateMetrics,
59+
final MongoTasksRefresher mongoTasksRefresher) {
5660
this.sourceCoordinator = sourceCoordinator;
5761
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
5862
recordBufferWriter = RecordBufferWriter.create(bufferAccumulator, pluginMetrics);
@@ -62,6 +66,7 @@ public StreamScheduler(final EnhancedSourceCoordinator sourceCoordinator,
6266
this.s3PathPrefix = s3PathPrefix;
6367
this.pluginMetrics = pluginMetrics;
6468
this.documentDBAggregateMetrics = documentDBAggregateMetrics;
69+
this.mongoTasksRefresher = mongoTasksRefresher;
6570
}
6671

6772
@Override
@@ -89,6 +94,9 @@ public void run() {
8994
}
9095
} catch (final Exception e) {
9196
LOG.error("Received an exception during stream processing from DocumentDB, backing off and retrying", e);
97+
if (isCausedByMongoSecurityException(e) && mongoTasksRefresher != null) {
98+
mongoTasksRefresher.forceRefresh();
99+
}
92100
if (streamPartition != null) {
93101
if (sourceConfig.isDisableS3ReadForLeader()) {
94102
System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY);
@@ -131,4 +139,15 @@ private PartitionKeyRecordConverter getPartitionKeyRecordConverter(final StreamP
131139
return new PartitionKeyRecordConverter(streamPartition.getCollection(),
132140
StreamPartition.PARTITION_TYPE, s3Prefix);
133141
}
142+
143+
private boolean isCausedByMongoSecurityException(final Throwable throwable) {
144+
Throwable cause = throwable;
145+
while (cause != null) {
146+
if (cause instanceof MongoSecurityException) {
147+
return true;
148+
}
149+
cause = cause.getCause();
150+
}
151+
return false;
152+
}
134153
}

data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/documentdb/MongoTasksRefresherTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.opensearch.dataprepper.plugins.mongo.stream.StreamScheduler;
2525
import org.opensearch.dataprepper.plugins.mongo.utils.DocumentDBSourceAggregateMetrics;
2626

27+
import java.lang.reflect.Field;
2728
import java.util.List;
2829
import java.util.UUID;
2930
import java.util.concurrent.ExecutorService;
@@ -266,6 +267,81 @@ void testTaskRefreshShutdown() {
266267
verify(executorService).submit(any(StreamScheduler.class));
267268
verify(executorService).shutdownNow();
268269
verifyNoMoreInteractions(executorServiceFunction);
270+
}
271+
272+
@Test
273+
void testForceRefreshRestartsExecutor() {
274+
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
275+
final MongoTasksRefresher objectUnderTest = createObjectUnderTest();
276+
objectUnderTest.initialize(sourceConfig);
277+
final ExecutorService newExecutorService = mock(ExecutorService.class);
278+
when(executorServiceFunction.apply(anyInt())).thenReturn(newExecutorService);
279+
objectUnderTest.forceRefresh();
280+
verify(executorService).shutdownNow();
281+
verify(credentialsChangeCounter).increment();
282+
verify(executorServiceFunction, times(2)).apply(eq(3));
283+
}
284+
285+
@Test
286+
void testForceRefreshStopsAfterMaxAttempts() throws Exception {
287+
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
288+
final MongoTasksRefresher objectUnderTest = createObjectUnderTest();
289+
objectUnderTest.initialize(sourceConfig);
290+
when(executorServiceFunction.apply(anyInt())).thenReturn(mock(ExecutorService.class));
291+
objectUnderTest.forceRefresh();
292+
resetLastForceRefreshTime(objectUnderTest);
293+
objectUnderTest.forceRefresh();
294+
resetLastForceRefreshTime(objectUnderTest);
295+
objectUnderTest.forceRefresh();
296+
resetLastForceRefreshTime(objectUnderTest);
297+
// 4th attempt should be ignored (max reached)
298+
objectUnderTest.forceRefresh();
299+
verify(credentialsChangeCounter, times(3)).increment();
300+
}
301+
302+
@Test
303+
void testForceRefreshCounterResetsOnCredentialChange() throws Exception {
304+
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
305+
final MongoTasksRefresher objectUnderTest = createObjectUnderTest();
306+
objectUnderTest.initialize(sourceConfig);
307+
when(executorServiceFunction.apply(anyInt())).thenReturn(mock(ExecutorService.class));
308+
objectUnderTest.forceRefresh();
309+
resetLastForceRefreshTime(objectUnderTest);
310+
objectUnderTest.forceRefresh();
311+
resetLastForceRefreshTime(objectUnderTest);
312+
objectUnderTest.forceRefresh();
313+
// Simulate credential change via update()
314+
when(sourceConfig.getAuthenticationConfig()).thenReturn(credentialsConfig);
315+
when(credentialsConfig.getUsername()).thenReturn(TEST_USERNAME);
316+
when(credentialsConfig.getPassword()).thenReturn(TEST_PASSWORD);
317+
final MongoDBSourceConfig newSourceConfig = mock(MongoDBSourceConfig.class);
318+
when(newSourceConfig.getCollections()).thenReturn(List.of(collectionConfig));
319+
final MongoDBSourceConfig.AuthenticationConfig newCredentialsConfig = mock(
320+
MongoDBSourceConfig.AuthenticationConfig.class);
321+
when(newSourceConfig.getAuthenticationConfig()).thenReturn(newCredentialsConfig);
322+
when(newCredentialsConfig.getUsername()).thenReturn(TEST_USERNAME);
323+
when(newCredentialsConfig.getPassword()).thenReturn(TEST_PASSWORD + "_changed");
324+
objectUnderTest.update(newSourceConfig);
325+
// Force refresh should work again after counter reset
326+
objectUnderTest.forceRefresh();
327+
// 3 from forceRefresh + 1 from update + 1 from forceRefresh after reset
328+
verify(credentialsChangeCounter, times(5)).increment();
329+
}
330+
331+
@Test
332+
void testForceRefreshHandlesException() {
333+
when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter);
334+
when(pluginMetrics.counter(EXECUTOR_REFRESH_ERRORS)).thenReturn(executorRefreshErrorsCounter);
335+
final MongoTasksRefresher objectUnderTest = createObjectUnderTest();
336+
objectUnderTest.initialize(sourceConfig);
337+
doThrow(RuntimeException.class).when(executorService).shutdownNow();
338+
objectUnderTest.forceRefresh();
339+
verify(executorRefreshErrorsCounter).increment();
340+
}
269341

342+
private void resetLastForceRefreshTime(final MongoTasksRefresher refresher) throws Exception {
343+
final Field field = MongoTasksRefresher.class.getDeclaredField("lastForceRefreshTime");
344+
field.setAccessible(true);
345+
field.setLong(refresher, 0);
270346
}
271347
}

data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamSchedulerTest.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.opensearch.dataprepper.plugins.mongo.stream;
22

3+
import com.mongodb.MongoSecurityException;
34
import org.junit.jupiter.api.BeforeEach;
45
import org.junit.jupiter.api.Test;
56
import org.junit.jupiter.api.extension.ExtendWith;
@@ -17,6 +18,7 @@
1718
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
1819
import org.opensearch.dataprepper.plugins.mongo.converter.PartitionKeyRecordConverter;
1920
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition;
21+
import org.opensearch.dataprepper.plugins.mongo.documentdb.MongoTasksRefresher;
2022
import org.opensearch.dataprepper.plugins.mongo.utils.DocumentDBSourceAggregateMetrics;
2123

2224
import java.time.Duration;
@@ -71,13 +73,16 @@ public class StreamSchedulerTest {
7173
@Mock
7274
private StreamWorker streamWorker;
7375

76+
@Mock
77+
private MongoTasksRefresher mongoTasksRefresher;
78+
7479

7580
private StreamScheduler streamScheduler;
7681

7782
@BeforeEach
7883
void setup() {
7984
lenient().when(sourceConfig.getCollections()).thenReturn(List.of(collectionConfig));
80-
streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, S3_PATH_PREFIX, pluginMetrics, documentDBSourceAggregateMetrics);
85+
streamScheduler = new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, S3_PATH_PREFIX, pluginMetrics, documentDBSourceAggregateMetrics, mongoTasksRefresher);
8186
}
8287

8388

@@ -205,6 +210,66 @@ void test_stream_sourceCoordinatorThrowsException() {
205210

206211
@Test
207212
void test_stream_withNullS3PathPrefix() {
208-
assertThrows(IllegalArgumentException.class, () -> new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, null, pluginMetrics, documentDBSourceAggregateMetrics));
213+
assertThrows(IllegalArgumentException.class, () -> new StreamScheduler(sourceCoordinator, buffer, acknowledgementSetManager, sourceConfig, null, pluginMetrics, documentDBSourceAggregateMetrics, mongoTasksRefresher));
214+
}
215+
216+
@Test
217+
void test_stream_mongoSecurityException_triggersForceRefresh() {
218+
final String collection = UUID.randomUUID().toString();
219+
final StreamPartition streamPartition = new StreamPartition(collection, null);
220+
given(sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition));
221+
given(collectionConfig.getCollection()).willReturn(collection);
222+
final int streamBatchSize = 1000;
223+
given(collectionConfig.getStreamBatchSize()).willReturn(streamBatchSize);
224+
225+
final ExecutorService executorService = Executors.newSingleThreadExecutor();
226+
final Future<?> future = executorService.submit(() -> {
227+
try (MockedStatic<StreamWorker> streamWorkerMockedStatic = mockStatic(StreamWorker.class)) {
228+
final MongoSecurityException securityException = new MongoSecurityException(
229+
null, "auth failed", new RuntimeException("credential revoked"));
230+
streamWorkerMockedStatic.when(() -> StreamWorker.create(any(RecordBufferWriter.class), any(PartitionKeyRecordConverter.class), eq(sourceConfig),
231+
any(StreamAcknowledgementManager.class), any(DataStreamPartitionCheckpoint.class), eq(pluginMetrics), eq(DEFAULT_RECORD_FLUSH_BATCH_SIZE),
232+
eq(DEFAULT_CHECKPOINT_INTERVAL_MILLS), eq(DEFAULT_BUFFER_WRITE_INTERVAL_MILLS), eq(streamBatchSize), any(DocumentDBSourceAggregateMetrics.class)))
233+
.thenThrow(new RuntimeException(securityException));
234+
streamScheduler.run();
235+
}
236+
});
237+
238+
await()
239+
.atMost(Duration.ofSeconds(5))
240+
.untilAsserted(() -> verify(mongoTasksRefresher).forceRefresh());
241+
242+
future.cancel(true);
243+
executorService.shutdownNow();
244+
}
245+
246+
@Test
247+
void test_stream_nonSecurityException_doesNotTriggerForceRefresh() {
248+
final String collection = UUID.randomUUID().toString();
249+
final StreamPartition streamPartition = new StreamPartition(collection, null);
250+
given(sourceCoordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition));
251+
given(collectionConfig.getCollection()).willReturn(collection);
252+
final int streamBatchSize = 1000;
253+
given(collectionConfig.getStreamBatchSize()).willReturn(streamBatchSize);
254+
255+
final ExecutorService executorService = Executors.newSingleThreadExecutor();
256+
final Future<?> future = executorService.submit(() -> {
257+
try (MockedStatic<StreamWorker> streamWorkerMockedStatic = mockStatic(StreamWorker.class)) {
258+
streamWorkerMockedStatic.when(() -> StreamWorker.create(any(RecordBufferWriter.class), any(PartitionKeyRecordConverter.class), eq(sourceConfig),
259+
any(StreamAcknowledgementManager.class), any(DataStreamPartitionCheckpoint.class), eq(pluginMetrics), eq(DEFAULT_RECORD_FLUSH_BATCH_SIZE),
260+
eq(DEFAULT_CHECKPOINT_INTERVAL_MILLS), eq(DEFAULT_BUFFER_WRITE_INTERVAL_MILLS), eq(streamBatchSize), any(DocumentDBSourceAggregateMetrics.class)))
261+
.thenThrow(RuntimeException.class);
262+
streamScheduler.run();
263+
}
264+
});
265+
266+
await()
267+
.atMost(Duration.ofSeconds(5))
268+
.untilAsserted(() -> verify(sourceCoordinator).giveUpPartition(streamPartition));
269+
270+
verify(mongoTasksRefresher, never()).forceRefresh();
271+
272+
future.cancel(true);
273+
executorService.shutdownNow();
209274
}
210275
}

0 commit comments

Comments
 (0)