diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 152efb620a..460bb4c019 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -42,6 +42,8 @@ import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; import software.amazon.kinesis.exceptions.ThrottlingException; +import software.amazon.kinesis.metrics.MetricsConfig; +import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.polling.PollingConfig; @@ -198,6 +200,10 @@ public Scheduler createScheduler(final Buffer> buffer) { kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis())); } + MetricsConfig metricsConfig = kinesisSourceConfig.isKclMetricsEnabled() + ? configsBuilder.metricsConfig() + : configsBuilder.metricsConfig().metricsFactory(new NullMetricsFactory()); + return new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig() @@ -205,7 +211,7 @@ public Scheduler createScheduler(final Buffer> buffer) { .maxInitializationAttempts(kinesisSourceConfig.getMaxInitializationAttempts()), configsBuilder.leaseManagementConfig().billingMode(BillingMode.PAY_PER_REQUEST), configsBuilder.lifecycleConfig(), - configsBuilder.metricsConfig(), + metricsConfig, configsBuilder.processorConfig(), retrievalConfig ); diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java index 64461fffc4..455453dda0 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java @@ -79,6 +79,10 @@ public Duration getShardAcknowledgmentTimeout() { @Getter @JsonProperty("initialization_backoff_time") private Duration initializationBackoffTime = DEFAULT_INITIALIZATION_BACKOFF_TIME; + + @Getter + @JsonProperty("kcl_metrics_enabled") + private boolean kclMetricsEnabled = true; } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 896faa4155..994e114ffa 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -41,7 +41,9 @@ import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.coordinator.Scheduler; +import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.time.Duration; @@ -58,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -257,6 +260,30 @@ void testCreateScheduler() { verify(workerIdentifierGenerator, times(1)).generate(); } + @Test + void testCreateSchedulerUsesNullMetricsFactoryWhenMetricsDisabled() { + when(kinesisSourceConfig.isKclMetricsEnabled()).thenReturn(false); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); + Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + + assertNotNull(schedulerObjectUnderTest); + assertNotNull(schedulerObjectUnderTest.metricsConfig()); + assertInstanceOf(NullMetricsFactory.class, schedulerObjectUnderTest.metricsConfig().metricsFactory()); + } + + @Test + void testCreateSchedulerUsesCloudWatchMetricsFactoryWhenMetricsEnabled() { + when(kinesisSourceConfig.isKclMetricsEnabled()).thenReturn(true); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); + Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + + assertNotNull(schedulerObjectUnderTest); + assertNotNull(schedulerObjectUnderTest.metricsConfig()); + assertInstanceOf(CloudWatchMetricsFactory.class, schedulerObjectUnderTest.metricsConfig().metricsFactory()); + } + @Test void testCreateSchedulerWithPollingStrategy() { when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java index aeb140110f..642dcb51ab 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java @@ -48,6 +48,7 @@ public class KinesisSourceConfigTest { private static final String PIPELINE_CONFIG_CHECKPOINT_ENABLED = "pipeline_with_checkpoint_enabled.yaml"; private static final String PIPELINE_CONFIG_STREAM_ARN_ENABLED = "pipeline_with_stream_arn_config.yaml"; private static final String PIPELINE_CONFIG_STREAM_ARN_CONSUMER_ARN_ENABLED = "pipeline_with_stream_arn_consumer_arn_config.yaml"; + private static final String PIPELINE_CONFIG_WITH_METRICS_ENABLED = "pipeline_with_metrics_enabled.yaml"; private static final String PIPELINE_CONFIG_WITH_INITIAL_POSITION_AT_TIMESTAMP = "pipeline_with_initial_position_at_timestamp_config.yaml"; private static final Duration MINIMAL_CHECKPOINT_INTERVAL = Duration.ofMillis(2 * 60 * 1000); // 2 minute @@ -84,6 +85,7 @@ void testSourceConfig() { assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts()); assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime()); assertTrue(kinesisSourceConfig.isAcknowledgments()); + assertTrue(kinesisSourceConfig.isKclMetricsEnabled()); assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1); @@ -117,6 +119,7 @@ void testSourceConfigWithStreamCodec() { assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts()); assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime()); assertFalse(kinesisSourceConfig.isAcknowledgments()); + assertTrue(kinesisSourceConfig.isKclMetricsEnabled()); assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1); @@ -239,6 +242,26 @@ void testSourceConfigWithStreamArnConsumerArn() { } @Test + @Tag(PIPELINE_CONFIG_WITH_METRICS_ENABLED) + void testSourceConfigWithMetricsEnabled() { + + assertThat(kinesisSourceConfig, notNullValue()); + assertTrue(kinesisSourceConfig.isKclMetricsEnabled()); + assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate()); + assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout()); + assertFalse(kinesisSourceConfig.isAcknowledgments()); + assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); + assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1); + assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsRoleArn(), "arn:aws:iam::123456789012:role/OSI-PipelineRole"); + + List streamConfigs = kinesisSourceConfig.getStreams(); + assertNotNull(kinesisSourceConfig.getCodec()); + assertEquals(streamConfigs.size(), 3); + + for (KinesisStreamConfig kinesisStreamConfig: streamConfigs) { + assertTrue(kinesisStreamConfig.getName().contains("stream")); + } + } @Tag(PIPELINE_CONFIG_WITH_INITIAL_POSITION_AT_TIMESTAMP) void testSourceConfigWithInitialPositionAtTimestamp() { diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml new file mode 100644 index 0000000000..e5bbf9b138 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml @@ -0,0 +1,20 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +source: + kinesis: + streams: + - stream_name: "stream-1" + - stream_name: "stream-2" + - stream_name: "stream-3" + codec: + ndjson: + aws: + sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole" + region: "us-east-1" + kcl_metrics_enabled: true +