Skip to content

Commit 2b443ce

Browse files
committed
(fix) adding support for considering table-name and table-arn independently just for ddblocal
1 parent 2eb6cc1 commit 2b443ce

2 files changed

Lines changed: 7 additions & 4 deletions

File tree

data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderScheduler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,10 @@ private void compareAndCreateChildrenPartitions(List<EnhancedSourcePartition> so
234234
* Once created, the info should not be changed.
235235
*/
236236
private TableInfo getTableInfo(final TableConfig tableConfig) {
237-
final String tableName = tableConfig.getTableArn();
237+
final String tableArn = tableConfig.getTableArn();
238+
final String tableName = tableArn.contains("/") ?
239+
tableArn.substring(tableArn.lastIndexOf('/') + 1) :
240+
tableArn;
238241
DescribeTableResponse describeTableResult;
239242
try {
240243
// Need to call describe table to get the Key schema for table

data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/LeaderSchedulerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ void test_should_init() throws InterruptedException {
227227
ArgumentCaptor<DescribeTableRequest> describeTableRequestArgumentCaptor = ArgumentCaptor.forClass(DescribeTableRequest.class);
228228
verify(dynamoDbClient).describeTable(describeTableRequestArgumentCaptor.capture());
229229
DescribeTableRequest actualDescribeTableRequest = describeTableRequestArgumentCaptor.getValue();
230-
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableArn));
230+
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableName));
231231
// Should check PITR enabled or not
232232
verify(dynamoDbClient).describeContinuousBackups(any(DescribeContinuousBackupsRequest.class));
233233
// Acquire the init partition
@@ -259,7 +259,7 @@ void test_PITR_not_enabled_init_should_failed() throws InterruptedException {
259259
ArgumentCaptor<DescribeTableRequest> describeTableRequestArgumentCaptor = ArgumentCaptor.forClass(DescribeTableRequest.class);
260260
verify(dynamoDbClient).describeTable(describeTableRequestArgumentCaptor.capture());
261261
DescribeTableRequest actualDescribeTableRequest = describeTableRequestArgumentCaptor.getValue();
262-
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableArn));
262+
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableName));
263263

264264
// Should check PITR enabled or not
265265
verify(dynamoDbClient).describeContinuousBackups(any(DescribeContinuousBackupsRequest.class));
@@ -288,7 +288,7 @@ void test_streaming_not_enabled_init_should_failed() throws InterruptedException
288288
ArgumentCaptor<DescribeTableRequest> describeTableRequestArgumentCaptor = ArgumentCaptor.forClass(DescribeTableRequest.class);
289289
verify(dynamoDbClient).describeTable(describeTableRequestArgumentCaptor.capture());
290290
DescribeTableRequest actualDescribeTableRequest = describeTableRequestArgumentCaptor.getValue();
291-
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableArn));
291+
assertThat(actualDescribeTableRequest.tableName(), equalTo(tableName));
292292

293293
// Should check PITR enabled or not
294294
verify(dynamoDbClient).describeContinuousBackups(any(DescribeContinuousBackupsRequest.class));

0 commit comments

Comments
 (0)