Skip to content

Commit 17c3e29

Browse files
authored
Handle DynamoDB source leader exceptions correctly by attempting to reacquire partition (#6195)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent ca7168c commit 17c3e29

3 files changed

Lines changed: 36 additions & 3 deletions

File tree

  • data-prepper-plugins

data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ private void tryUpdateItem(final DynamoDbSourcePartitionItem dynamoDbSourceParti
183183
} catch (final ConditionalCheckFailedException e) {
184184
final String message = String.format(
185185
"ConditionalCheckFailed while updating partition %s. This partition item was either deleted from the dynamo table, " +
186-
"or another instance of Data Prepper has modified it.",
187-
dynamoDbSourcePartitionItem.getSourcePartitionKey());
186+
"or another instance of Data Prepper has modified it. Expected version: %s",
187+
dynamoDbSourcePartitionItem.getSourcePartitionKey(), dynamoDbSourcePartitionItem.getVersion() - 1L);
188188
throw new PartitionUpdateException(message, e);
189189
} catch (final Exception e) {
190190
final String errorMessage = String.format("An exception occurred while attempting to update a DynamoDb partition item %s",

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ public void run() {
129129
try {
130130
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
131131
} catch (final Exception e) {
132-
LOG.error("Failed to update ownership for leader partition. Retrying...");
132+
LOG.error("Failed to update ownership for leader partition. Will attempt to reacquire this partition...");
133+
leaderPartition = null;
133134
}
134135
}
135136
try {

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderSchedulerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.mockito.junit.jupiter.MockitoExtension;
99
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
1010
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
11+
import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException;
1112
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
1213
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.ExportConfig;
1314
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
@@ -16,6 +17,7 @@
1617
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.LeaderPartition;
1718
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
1819
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
20+
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;
1921
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
2022
import software.amazon.awssdk.services.dynamodb.model.ContinuousBackupsDescription;
2123
import software.amazon.awssdk.services.dynamodb.model.ContinuousBackupsStatus;
@@ -55,6 +57,7 @@
5557
import static org.mockito.ArgumentMatchers.isNull;
5658
import static org.mockito.BDDMockito.given;
5759
import static org.mockito.Mockito.atLeast;
60+
import static org.mockito.Mockito.doThrow;
5861
import static org.mockito.Mockito.lenient;
5962
import static org.mockito.Mockito.never;
6063
import static org.mockito.Mockito.times;
@@ -330,6 +333,35 @@ void run_without_acquiring_leader_partition_does_not_save_null_state() {
330333
verify(coordinator, never()).saveProgressStateForPartition(isNull(), any(Duration.class));
331334
}
332335

336+
@Test
337+
void test_shardDiscovery_with_failure_to_save_partition_state_reacquires_partition() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
338+
leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, List.of(tableConfig));
339+
leaderPartition = new LeaderPartition();
340+
leaderPartition.getProgressState().get().setInitialized(true);
341+
leaderPartition.getProgressState().get().setStreamArns(List.of(streamArn));
342+
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
343+
doThrow(PartitionUpdateException.class).when(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
344+
345+
ReflectivelySetField.setField(LeaderScheduler.class, leaderScheduler, "leaseInterval", Duration.ofMillis(40));
346+
347+
ExecutorService executorService = Executors.newSingleThreadExecutor();
348+
executorService.submit(() -> leaderScheduler.run());
349+
350+
Thread.sleep(100);
351+
executorService.shutdownNow();
352+
// Already init
353+
verifyNoInteractions(dynamoDbClient);
354+
355+
// Should check the completed partitions
356+
verify(coordinator, atLeast(2)).queryCompletedPartitions(eq(StreamPartition.PARTITION_TYPE), any(Instant.class));
357+
358+
// Should create 3 stream partitions for child shards found
359+
verify(coordinator, atLeast(3)).createPartition(any(EnhancedSourcePartition.class));
360+
361+
verify(coordinator, atLeast(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
362+
verify(coordinator, atLeast(2)).acquireAvailablePartition(LeaderPartition.PARTITION_TYPE);
363+
}
364+
333365

334366
/**
335367
* Helper function to mock DescribeContinuousBackupsResponse

0 commit comments

Comments
 (0)