Skip to content

Commit 1a5e0c9

Browse files
sb2k16sbose2k21
andauthored
Attempt to fix the integration test for Kinesis source (opensearch-project#6886)
Signed-off-by: Souvik Bose <souvbose@amazon.com> Co-authored-by: Souvik Bose <souvbose@amazon.com>
1 parent cb47f5c commit 1a5e0c9

4 files changed

Lines changed: 10 additions & 7 deletions

File tree

data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ void setup() {
194194
when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE);
195195
when(kinesisSourceConfig.getBufferTimeout()).thenReturn(BUFFER_TIMEOUT);
196196
when(kinesisSourceConfig.getMaxInitializationAttempts()).thenReturn(MAX_INITIALIZATION_ATTEMPTS);
197+
when(kinesisSourceConfig.getInitializationBackoffTime()).thenReturn(Duration.ofMillis(1000));
197198

198199
kinesisClientFactory = mock(KinesisClientFactory.class);
199200
when(kinesisClientFactory.buildDynamoDBClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(DynamoDbAsyncClient.builder()

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/apihandler/KinesisClientApiRetryHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public <T> T executeWithRetry(String operationName, Supplier<T> operation,
5151
}
5252

5353
private void applyBackoff(int attempt) {
54-
final long delayMillis = backoff.nextDelayMillis(attempt);
54+
final long delayMillis = backoff.nextDelayMillis(attempt + 1);
5555
if (delayMillis < 0) {
5656
throw new KinesisRetriesExhaustedException(
5757
"Retries exhausted. Make sure that configuration is valid and required permissions are present.");

data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/apihandler/KinesisClientApiRetryHandlerTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void executeWithRetry_successOnFirstAttempt_returnsResult() {
8181

8282
@Test
8383
void executeWithRetry_successOnSecondAttempt_returnsResult() {
84-
when(backoff.nextDelayMillis(0)).thenReturn(DELAY_MILLIS);
84+
when(backoff.nextDelayMillis(1)).thenReturn(DELAY_MILLIS);
8585
AtomicInteger attempts = new AtomicInteger(0);
8686
String expectedResult = "success";
8787

@@ -97,7 +97,7 @@ void executeWithRetry_successOnSecondAttempt_returnsResult() {
9797
);
9898

9999
assertEquals(expectedResult, result);
100-
verify(backoff, times(1)).nextDelayMillis(0);
100+
verify(backoff, times(1)).nextDelayMillis(1);
101101
verify(exceptionHandler, times(1)).handle(any(), eq(0));
102102
}
103103

@@ -125,7 +125,7 @@ void executeWithRetry_allAttemptsFail_throwsException() {
125125

126126
@Test
127127
void executeWithRetry_negativeBackoffDelay_throwsException() {
128-
when(backoff.nextDelayMillis(0)).thenReturn(-1L);
128+
when(backoff.nextDelayMillis(1)).thenReturn(-1L);
129129
RuntimeException testException = new RuntimeException("Test failure");
130130

131131
KinesisRetriesExhaustedException exception = assertThrows(
@@ -141,13 +141,13 @@ void executeWithRetry_negativeBackoffDelay_throwsException() {
141141
"Retries exhausted. Make sure that configuration is valid and required permissions are present.",
142142
exception.getMessage()
143143
);
144-
verify(backoff, times(1)).nextDelayMillis(0);
144+
verify(backoff, times(1)).nextDelayMillis(1);
145145
verify(exceptionHandler, times(1)).handle(any(), eq(0));
146146
}
147147

148148
@Test
149149
void executeWithRetry_interruptedDuringSleep_throwsException() {
150-
when(backoff.nextDelayMillis(0)).thenReturn(DELAY_MILLIS);
150+
when(backoff.nextDelayMillis(1)).thenReturn(DELAY_MILLIS);
151151
RuntimeException testException = new RuntimeException("Test failure");
152152
Thread.currentThread().interrupt();
153153

@@ -160,7 +160,7 @@ void executeWithRetry_interruptedDuringSleep_throwsException() {
160160
)
161161
);
162162

163-
verify(backoff, times(1)).nextDelayMillis(0);
163+
verify(backoff, times(1)).nextDelayMillis(1);
164164
verify(exceptionHandler, times(1)).handle(any(), eq(0));
165165
}
166166

testing/aws-testing-cdk/lib/kinesis/KinesisSourceStack.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export class KinesisSourceStack extends Stack {
2727
'kinesis:ListStreamConsumers',
2828
'kinesis:ListShards',
2929
'kinesis:DescribeStream',
30+
'kinesis:DescribeStreamSummary',
3031
'kinesis:GetRecords',
3132
'kinesis:GetResourcePolicy',
3233
'kinesis:SubscribeToShard',
@@ -53,6 +54,7 @@ export class KinesisSourceStack extends Stack {
5354
'dynamodb:DescribeTimeToLive',
5455
'dynamodb:CreateTable',
5556
'dynamodb:DescribeTable',
57+
'dynamodb:DeleteTable',
5658
'dynamodb:GetItem',
5759
'dynamodb:PutItem',
5860
'dynamodb:UpdateItem',

0 commit comments

Comments
 (0)