diff --git a/.github/workflows/kinesis-source-integration-tests.yml b/.github/workflows/kinesis-source-integration-tests.yml index 709f94ca7d..e05a932808 100644 --- a/.github/workflows/kinesis-source-integration-tests.yml +++ b/.github/workflows/kinesis-source-integration-tests.yml @@ -5,13 +5,18 @@ on: paths: - 'data-prepper-plugins/kinesis-source/**' - '*gradle*' - pull_request: + pull_request_target: + types: [ opened, synchronize, reopened ] paths: - 'data-prepper-plugins/kinesis-source/**' - '*gradle*' workflow_dispatch: +permissions: + id-token: write + contents: read + jobs: build: strategy: diff --git a/data-prepper-plugins/kinesis-source/build.gradle b/data-prepper-plugins/kinesis-source/build.gradle index b3937bb7d1..fd899bd884 100644 --- a/data-prepper-plugins/kinesis-source/build.gradle +++ b/data-prepper-plugins/kinesis-source/build.gradle @@ -34,7 +34,7 @@ dependencies { implementation libs.armeria.core implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'io.micrometer:micrometer-core' - implementation 'software.amazon.kinesis:amazon-kinesis-client:2.6.0' + implementation 'software.amazon.kinesis:amazon-kinesis-client:2.7.2' compileOnly 'org.projectlombok:lombok:1.18.20' annotationProcessor 'org.projectlombok:lombok:1.18.20' diff --git a/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java b/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java index 76081c5525..1c915d93cf 100644 --- a/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java +++ b/data-prepper-plugins/kinesis-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceIT.java @@ -249,7 +249,7 @@ public void testKinesisService() throws Exception { when(pluginFactory.loadPlugin(eq(InputCodec.class), any())) .thenReturn(new NdjsonInputCodec(new NdjsonInputConfig(), TestEventFactory.getTestEventFactory())); - final List> actualRecordsWritten = new ArrayList<>(); + final List> actualRecordsWritten = Collections.synchronizedList(new ArrayList<>()); doAnswer(a -> actualRecordsWritten.addAll(a.getArgument(0, Collection.class))) .when(buffer).writeAll(anyCollection(), anyInt()); diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 152efb620a..fd14f76787 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -10,7 +10,6 @@ package org.opensearch.dataprepper.plugins.kinesis.source; -import com.amazonaws.SdkClientException; import com.linecorp.armeria.client.retry.Backoff; import lombok.Getter; import lombok.Setter; @@ -34,6 +33,7 @@ import org.opensearch.dataprepper.plugins.kinesis.source.processor.KinesisShardRecordProcessorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.BillingMode; diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/apihandler/KinesisClientApiHandler.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/apihandler/KinesisClientApiHandler.java index cb29f04fe1..fb54b88897 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/apihandler/KinesisClientApiHandler.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/apihandler/KinesisClientApiHandler.java @@ -14,6 +14,7 @@ import lombok.extern.slf4j.Slf4j; import org.opensearch.dataprepper.plugins.kinesis.source.exceptions.KinesisConsumerNotFoundException; import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; @@ -111,7 +112,7 @@ private DescribeStreamSummaryResponse getStreamDescriptionSummary( private void handleStreamSummaryException(CompletionException ex, String streamName) { Throwable cause = ex.getCause(); - if (cause instanceof KinesisException || cause instanceof com.amazonaws.SdkClientException) { + if (cause instanceof KinesisException || cause instanceof SdkClientException) { log.error("AWS error while describing stream summary for stream {}: {}", streamName, ex.getMessage()); } else { @@ -136,7 +137,7 @@ private DescribeStreamConsumerResponse describeStreamConsumer( private void handleConsumerException(CompletionException ex, String streamArn, int attempt) { Throwable cause = ex.getCause(); - if (cause instanceof KinesisException || cause instanceof com.amazonaws.SdkClientException) { + if (cause instanceof KinesisException || cause instanceof SdkClientException) { log.error("AWS error while describing stream consumer for stream {}: {}. Attempt {}.", streamArn, ex.getMessage(), attempt + 1); } else {