From 496230d324a8ffb509d7dfe9078321474dcb07cf Mon Sep 17 00:00:00 2001 From: RashmiRam Date: Thu, 18 Dec 2025 09:24:39 +0530 Subject: [PATCH 1/3] KCL metrics management through config 1.Added "metrics" config to disable/enable sending KCL metrics to cloudwatch 2.Default value is false. Signed-off-by: RashmiRam --- .../kinesis/source/KinesisService.java | 8 +++++- .../configuration/KinesisSourceConfig.java | 4 +++ .../kinesis/source/KinesisServiceTest.java | 27 +++++++++++++++++++ .../KinesisSourceConfigTest.java | 25 +++++++++++++++++ .../pipeline_with_metrics_enabled.yaml | 13 +++++++++ 5 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 152efb620a..23341d8595 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -42,6 +42,8 @@ import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException; import software.amazon.kinesis.exceptions.ThrottlingException; +import software.amazon.kinesis.metrics.MetricsConfig; +import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.retrieval.polling.PollingConfig; @@ -198,6 +200,10 @@ public Scheduler createScheduler(final Buffer> buffer) { kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis())); } + MetricsConfig metricsConfig = kinesisSourceConfig.isMetrics() + ? configsBuilder.metricsConfig() + : configsBuilder.metricsConfig().metricsFactory(new NullMetricsFactory()); + return new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig() @@ -205,7 +211,7 @@ public Scheduler createScheduler(final Buffer> buffer) { .maxInitializationAttempts(kinesisSourceConfig.getMaxInitializationAttempts()), configsBuilder.leaseManagementConfig().billingMode(BillingMode.PAY_PER_REQUEST), configsBuilder.lifecycleConfig(), - configsBuilder.metricsConfig(), + metricsConfig, configsBuilder.processorConfig(), retrievalConfig ); diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java index 64461fffc4..cdfbc2f202 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java @@ -79,6 +79,10 @@ public Duration getShardAcknowledgmentTimeout() { @Getter @JsonProperty("initialization_backoff_time") private Duration initializationBackoffTime = DEFAULT_INITIALIZATION_BACKOFF_TIME; + + @Getter + @JsonProperty("metrics") + private boolean metrics = false; } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 896faa4155..76c0357685 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -41,7 +41,9 @@ import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.coordinator.Scheduler; +import software.amazon.kinesis.metrics.CloudWatchMetricsFactory; import software.amazon.kinesis.metrics.MetricsLevel; +import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.polling.PollingConfig; import java.time.Duration; @@ -58,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -257,6 +260,30 @@ void testCreateScheduler() { verify(workerIdentifierGenerator, times(1)).generate(); } + @Test + void testCreateSchedulerUsesNullMetricsFactoryWhenMetricsDisabled() { + when(kinesisSourceConfig.isMetrics()).thenReturn(false); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); + Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + + assertNotNull(schedulerObjectUnderTest); + assertNotNull(schedulerObjectUnderTest.metricsConfig()); + assertInstanceOf(NullMetricsFactory.class, schedulerObjectUnderTest.metricsConfig().metricsFactory()); + } + + @Test + void testCreateSchedulerUsesCloudWatchMetricsFactoryWhenMetricsEnabled() { + when(kinesisSourceConfig.isMetrics()).thenReturn(true); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); + Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + + assertNotNull(schedulerObjectUnderTest); + assertNotNull(schedulerObjectUnderTest.metricsConfig()); + assertInstanceOf(CloudWatchMetricsFactory.class, schedulerObjectUnderTest.metricsConfig().metricsFactory()); + } + @Test void testCreateSchedulerWithPollingStrategy() { when(kinesisSourceConfig.getConsumerStrategy()).thenReturn(ConsumerStrategy.POLLING); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java index 63bb0ab854..4e5e52e02f 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java @@ -46,6 +46,7 @@ public class KinesisSourceConfigTest { private static final String PIPELINE_CONFIG_CHECKPOINT_ENABLED = "pipeline_with_checkpoint_enabled.yaml"; private static final String PIPELINE_CONFIG_STREAM_ARN_ENABLED = "pipeline_with_stream_arn_config.yaml"; private static final String PIPELINE_CONFIG_STREAM_ARN_CONSUMER_ARN_ENABLED = "pipeline_with_stream_arn_consumer_arn_config.yaml"; + private static final String PIPELINE_CONFIG_WITH_METRICS_ENABLED = "pipeline_with_metrics_enabled.yaml"; private static final Duration MINIMAL_CHECKPOINT_INTERVAL = Duration.ofMillis(2 * 60 * 1000); // 2 minute KinesisSourceConfig kinesisSourceConfig; @@ -81,6 +82,7 @@ void testSourceConfig() { assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts()); assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime()); assertTrue(kinesisSourceConfig.isAcknowledgments()); + assertFalse(kinesisSourceConfig.isMetrics()); assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1); @@ -114,6 +116,7 @@ void testSourceConfigWithStreamCodec() { assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts()); assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime()); assertFalse(kinesisSourceConfig.isAcknowledgments()); + assertFalse(kinesisSourceConfig.isMetrics()); assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1); @@ -234,4 +237,26 @@ void testSourceConfigWithStreamArnConsumerArn() { assertEquals(kinesisStreamConfig.getCheckPointInterval(), MINIMAL_CHECKPOINT_INTERVAL); } } + + @Test + @Tag(PIPELINE_CONFIG_WITH_METRICS_ENABLED) + void testSourceConfigWithMetricsEnabled() { + + assertThat(kinesisSourceConfig, notNullValue()); + assertTrue(kinesisSourceConfig.isMetrics()); + assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate()); + assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout()); + assertFalse(kinesisSourceConfig.isAcknowledgments()); + assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); + assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1); + assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsStsRoleArn(), "arn:aws:iam::123456789012:role/OSI-PipelineRole"); + + List streamConfigs = kinesisSourceConfig.getStreams(); + assertNotNull(kinesisSourceConfig.getCodec()); + assertEquals(streamConfigs.size(), 3); + + for (KinesisStreamConfig kinesisStreamConfig: streamConfigs) { + assertTrue(kinesisStreamConfig.getName().contains("stream")); + } + } } \ No newline at end of file diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml new file mode 100644 index 0000000000..7125edd423 --- /dev/null +++ b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml @@ -0,0 +1,13 @@ +source: + kinesis: + streams: + - stream_name: "stream-1" + - stream_name: "stream-2" + - stream_name: "stream-3" + codec: + ndjson: + aws: + sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole" + region: "us-east-1" + metrics: true + From 2a54f6bf82cde24b4b4b2691a1112ad406e7b571 Mon Sep 17 00:00:00 2001 From: RashmiRam Date: Tue, 20 Jan 2026 19:37:06 +0530 Subject: [PATCH 2/3] Rename kinesis metrics config to kcl_metrics_enabled and default to true Signed-off-by: RashmiRam --- .../dataprepper/plugins/kinesis/source/KinesisService.java | 2 +- .../kinesis/source/configuration/KinesisSourceConfig.java | 4 ++-- .../plugins/kinesis/source/KinesisServiceTest.java | 4 ++-- .../source/configuration/KinesisSourceConfigTest.java | 6 +++--- .../src/test/resources/pipeline_with_metrics_enabled.yaml | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 23341d8595..460bb4c019 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -200,7 +200,7 @@ public Scheduler createScheduler(final Buffer> buffer) { kinesisSourceConfig.getPollingConfig().getIdleTimeBetweenReads().toMillis())); } - MetricsConfig metricsConfig = kinesisSourceConfig.isMetrics() + MetricsConfig metricsConfig = kinesisSourceConfig.isKclMetricsEnabled() ? configsBuilder.metricsConfig() : configsBuilder.metricsConfig().metricsFactory(new NullMetricsFactory()); diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java index cdfbc2f202..455453dda0 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java @@ -81,8 +81,8 @@ public Duration getShardAcknowledgmentTimeout() { private Duration initializationBackoffTime = DEFAULT_INITIALIZATION_BACKOFF_TIME; @Getter - @JsonProperty("metrics") - private boolean metrics = false; + @JsonProperty("kcl_metrics_enabled") + private boolean kclMetricsEnabled = true; } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 76c0357685..994e114ffa 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -262,7 +262,7 @@ void testCreateScheduler() { @Test void testCreateSchedulerUsesNullMetricsFactoryWhenMetricsDisabled() { - when(kinesisSourceConfig.isMetrics()).thenReturn(false); + when(kinesisSourceConfig.isKclMetricsEnabled()).thenReturn(false); KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); @@ -274,7 +274,7 @@ void testCreateSchedulerUsesNullMetricsFactoryWhenMetricsDisabled() { @Test void testCreateSchedulerUsesCloudWatchMetricsFactoryWhenMetricsEnabled() { - when(kinesisSourceConfig.isMetrics()).thenReturn(true); + when(kinesisSourceConfig.isKclMetricsEnabled()).thenReturn(true); KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java index 4e5e52e02f..7eb0449cfb 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java @@ -82,7 +82,7 @@ void testSourceConfig() { assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts()); assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime()); assertTrue(kinesisSourceConfig.isAcknowledgments()); - assertFalse(kinesisSourceConfig.isMetrics()); + assertTrue(kinesisSourceConfig.isKclMetricsEnabled()); assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1); @@ -116,7 +116,7 @@ void testSourceConfigWithStreamCodec() { assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts()); assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime()); assertFalse(kinesisSourceConfig.isAcknowledgments()); - assertFalse(kinesisSourceConfig.isMetrics()); + assertTrue(kinesisSourceConfig.isKclMetricsEnabled()); assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); assertEquals(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion(), Region.US_EAST_1); @@ -243,7 +243,7 @@ void testSourceConfigWithStreamArnConsumerArn() { void testSourceConfigWithMetricsEnabled() { assertThat(kinesisSourceConfig, notNullValue()); - assertTrue(kinesisSourceConfig.isMetrics()); + assertTrue(kinesisSourceConfig.isKclMetricsEnabled()); assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate()); assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout()); assertFalse(kinesisSourceConfig.isAcknowledgments()); diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml index 7125edd423..0562c9b42a 100644 --- a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml +++ b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml @@ -9,5 +9,5 @@ source: aws: sts_role_arn: "arn:aws:iam::123456789012:role/OSI-PipelineRole" region: "us-east-1" - metrics: true + kcl_metrics_enabled: true From 95bab6fbff77c6473cf67669ea09ca3f2c204fe4 Mon Sep 17 00:00:00 2001 From: RashmiRam Date: Sun, 25 Jan 2026 19:52:25 +0530 Subject: [PATCH 3/3] Add copyright header to pipeline_with_metrics_enabled.yaml Signed-off-by: RashmiRam --- .../src/test/resources/pipeline_with_metrics_enabled.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml index 0562c9b42a..e5bbf9b138 100644 --- a/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml +++ b/data-prepper-plugins/kinesis-source/src/test/resources/pipeline_with_metrics_enabled.yaml @@ -1,3 +1,10 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + source: kinesis: streams: