Skip to content

Commit 3ddd6f3

Browse files
committed
Update the version of KCL from 2.6.0 to 2.7.2
Signed-off-by: Souvik Bose <souvbose@amazon.com>
1 parent 1789cf9 commit 3ddd6f3

5 files changed

Lines changed: 12 additions & 6 deletions

File tree

.github/workflows/kinesis-source-integration-tests.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@ on:
55
paths:
66
- 'data-prepper-plugins/kinesis-source/**'
77
- '*gradle*'
8-
pull_request:
8+
pull_request_target:
9+
types: [ opened, synchronize, reopened ]
910
paths:
1011
- 'data-prepper-plugins/kinesis-source/**'
1112
- '*gradle*'
1213

1314
workflow_dispatch:
1415

16+
permissions:
17+
id-token: write
18+
contents: read
19+
1520
jobs:
1621
build:
1722
strategy:

data-prepper-plugins/kinesis-source/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ dependencies {
3434
implementation libs.armeria.core
3535
implementation 'com.fasterxml.jackson.core:jackson-core'
3636
implementation 'io.micrometer:micrometer-core'
37-
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0'
37+
implementation 'software.amazon.kinesis:amazon-kinesis-client:2.7.2'
3838
compileOnly 'org.projectlombok:lombok:1.18.20'
3939
annotationProcessor 'org.projectlombok:lombok:1.18.20'
4040

data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ public void testKinesisService() throws Exception {
249249
when(pluginFactory.loadPlugin(eq(InputCodec.class), any()))
250250
.thenReturn(new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory()));
251251

252-
final List<Record<?>> actualRecordsWritten = new ArrayList<>();
252+
final List<Record<?>> actualRecordsWritten = Collections.synchronizedList(new ArrayList<>());
253253
doAnswer(a -> actualRecordsWritten.addAll(a.getArgument(0, Collection.class)))
254254
.when(buffer).writeAll(anyCollection(), anyInt());
255255

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
package org.opensearch.dataprepper.plugins.kinesis.source;
1212

13-
import com.amazonaws.SdkClientException;
1413
import com.linecorp.armeria.client.retry.Backoff;
1514
import lombok.Getter;
1615
import lombok.Setter;
@@ -34,6 +33,7 @@
3433
import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisShardRecordProcessorFactory;
3534
import org.slf4j.Logger;
3635
import org.slf4j.LoggerFactory;
36+
import software.amazon.awssdk.core.exception.SdkClientException;
3737
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
3838
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
3939
import software.amazon.awssdk.services.dynamodb.model.BillingMode;

data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/apihandler/KinesisClientApiHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import lombok.extern.slf4j.Slf4j;
1515
import org.opensearch.dataprepper.plugins.kinesis.source.exceptions.KinesisConsumerNotFoundException;
1616
import software.amazon.awssdk.arns.Arn;
17+
import software.amazon.awssdk.core.exception.SdkClientException;
1718
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
1819
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
1920
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
@@ -111,7 +112,7 @@ private DescribeStreamSummaryResponse getStreamDescriptionSummary(
111112

112113
private void handleStreamSummaryException(CompletionException ex, String streamName) {
113114
Throwable cause = ex.getCause();
114-
if (cause instanceof KinesisException || cause instanceof com.amazonaws.SdkClientException) {
115+
if (cause instanceof KinesisException || cause instanceof SdkClientException) {
115116
log.error("AWS error while describing stream summary for stream {}: {}",
116117
streamName, ex.getMessage());
117118
} else {
@@ -136,7 +137,7 @@ private DescribeStreamConsumerResponse describeStreamConsumer(
136137

137138
private void handleConsumerException(CompletionException ex, String streamArn, int attempt) {
138139
Throwable cause = ex.getCause();
139-
if (cause instanceof KinesisException || cause instanceof com.amazonaws.SdkClientException) {
140+
if (cause instanceof KinesisException || cause instanceof SdkClientException) {
140141
log.error("AWS error while describing stream consumer for stream {}: {}. Attempt {}.",
141142
streamArn, ex.getMessage(), attempt + 1);
142143
} else {

0 commit comments

Comments
 (0)