Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,6 @@ public void run() {
LOG.warn("Processing for shard {} was interrupted by a shutdown signal, giving up shard", shardId);
throw new RuntimeException("Consuming shard was interrupted from shutdown");
}

if (waitForExport) {
waitForExport();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.BUFFER_TIMEOUT;
Expand Down Expand Up @@ -486,11 +487,13 @@ void test_shard_has_no_records_null_iterator() throws Exception {
}

@Test
void test_shard_has_no_records_null_with_last_shard_iterator_paginates_through_shard() throws Exception {
void test_shard_has_no_records_null_with_last_shard_iterator_paginates_through_shard_and_exits_without_waiting_on_export() throws Exception {
final AcknowledgementSet finalAcknowledgementSet = mock(AcknowledgementSet.class);
when(shardAcknowledgementManager.createAcknowledgmentSet(any(StreamPartition.class), any(String.class), any(Boolean.class)))
.thenReturn(finalAcknowledgementSet);

final StreamCheckpointer streamCheckpointer = mock(StreamCheckpointer.class);

final String lastShardIterator = UUID.randomUUID().toString();

// Set up response with null nextShardIterator to trigger end of shard
Expand Down Expand Up @@ -519,7 +522,8 @@ void test_shard_has_no_records_null_with_last_shard_iterator_paginates_through_s
.streamPartition(streamPartition)
.tableInfo(tableInfo)
.startTime(null)
.waitForExport(false)
.checkpointer(streamCheckpointer)
.waitForExport(true)
.build();
}

Expand All @@ -535,6 +539,8 @@ void test_shard_has_no_records_null_with_last_shard_iterator_paginates_through_s
assertThat(requestWithLastShardIterator.size(), equalTo(2));
assertThat(requestWithLastShardIterator.get(0).shardIterator(), equalTo(lastShardIterator));
assertThat(requestWithLastShardIterator.get(1).shardIterator(), equalTo(shardIterator));

verifyNoInteractions(streamCheckpointer);
}
}
private List<Record> buildRecords(int count) {
Expand Down
Loading