diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java index 61f3968109..ca99808f30 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java @@ -35,6 +35,7 @@ import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClientFactory; import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler; import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener; +import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.rds.RdsClient; @@ -65,6 +66,7 @@ public class RdsService { private final RdsSourceConfig sourceConfig; private final AcknowledgementSetManager acknowledgementSetManager; private final PluginConfigObservable pluginConfigObservable; + private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics; private final PipelineDescription pipelineDescription; private ExecutorService executor; private LeaderScheduler leaderScheduler; @@ -87,6 +89,7 @@ public RdsService(final EnhancedSourceCoordinator sourceCoordinator, this.sourceConfig = sourceConfig; this.acknowledgementSetManager = acknowledgementSetManager; this.pluginConfigObservable = pluginConfigObservable; + this.rdsSourceAggregateMetrics = new RdsSourceAggregateMetrics(); this.pipelineDescription = pipelineDescription; rdsClient = clientFactory.buildRdsClient(); @@ -118,7 +121,7 @@ public void start(Buffer> buffer) { if (sourceConfig.isExportEnabled()) { final SnapshotManager snapshotManager = new SnapshotManager(rdsApiStrategy); - final ExportTaskManager exportTaskManager = new ExportTaskManager(rdsClient); + final ExportTaskManager exportTaskManager = new ExportTaskManager(rdsClient, rdsSourceAggregateMetrics); exportScheduler = new ExportScheduler( sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics); dataFileScheduler = new DataFileScheduler( @@ -128,7 +131,8 @@ public void start(Buffer> buffer) { } if (sourceConfig.isStreamEnabled()) { - ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata); + ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory( + sourceConfig, rdsClient, dbMetadata, rdsSourceAggregateMetrics); if (sourceConfig.isTlsEnabled()) { replicationLogClientFactory.setSSLMode(SSLMode.REQUIRED); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManager.java index dc447c2f42..38e2ebea45 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManager.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManager.java @@ -5,9 +5,11 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.rds.RdsClient; import software.amazon.awssdk.services.rds.model.DescribeExportTasksRequest; import software.amazon.awssdk.services.rds.model.DescribeExportTasksResponse; @@ -25,9 +27,11 @@ public class ExportTaskManager { private static final int EXPORT_TASK_ID_MAX_LENGTH = 60; private final RdsClient rdsClient; + private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics; - public ExportTaskManager(final RdsClient rdsClient) { + public ExportTaskManager(final RdsClient rdsClient, final RdsSourceAggregateMetrics rdsSourceAggregateMetrics) { this.rdsClient = rdsClient; + this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics; } public String startExportTask(String snapshotArn, String iamRoleArn, String bucket, String prefix, String kmsKeyId, Collection includeTables) { @@ -45,12 +49,17 @@ public String startExportTask(String snapshotArn, String iamRoleArn, String buck } try { + rdsSourceAggregateMetrics.getExportApiInvocations().increment(); StartExportTaskResponse response = rdsClient.startExportTask(requestBuilder.build()); LOG.info("Export task submitted with id {} and status {}", exportTaskId, response.status()); return exportTaskId; - + } catch (SdkException e) { + rdsSourceAggregateMetrics.getExport4xxErrors().increment(); + LOG.error("Failed to start an export task with error: {}", e.getMessage()); + return null; } catch (Exception e) { - LOG.error("Failed to start an export task", e); + rdsSourceAggregateMetrics.getExport5xxErrors().increment(); + LOG.error("Failed to start an export task with error: {}", e.getMessage()); return null; } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java index 7c4fe35ae4..38448014a9 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java @@ -11,6 +11,8 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.network.AuthenticationException; +import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,15 +22,27 @@ public class BinlogClientWrapper implements ReplicationLogClient { private static final Logger LOG = LoggerFactory.getLogger(BinlogClientWrapper.class); + static final String CONNECTION_REFUSED = "Connection refused"; + static final String FAILED_TO_DETERMINE_BINLOG_FILENAME = "Failed to determine binlog filename"; + static final String ACCESS_DENIED = "Access denied"; private final BinaryLogClient binlogClient; + private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics; - public BinlogClientWrapper(final BinaryLogClient binlogClient) { + public BinlogClientWrapper(final BinaryLogClient binlogClient, final RdsSourceAggregateMetrics rdsSourceAggregateMetrics) { this.binlogClient = binlogClient; + this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics; } @Override public void connect() throws IOException { - binlogClient.connect(); + try { + rdsSourceAggregateMetrics.getStreamApiInvocations().increment(); + binlogClient.connect(); + } catch (Exception e) { + LOG.error("Failed to connect to replication stream due to {}", e.getMessage()); + categorizeError(e); + throw e; + } } @Override @@ -54,4 +68,27 @@ public void disconnect() throws IOException { public BinaryLogClient getBinlogClient() { return binlogClient; } + + private void categorizeError(Exception e) { + if (e instanceof AuthenticationException) { + rdsSourceAggregateMetrics.getStream4xxErrors().increment(); + rdsSourceAggregateMetrics.getStreamAuthErrors().increment(); + LOG.error("Failed to connect to replication stream: Authentication failed. [{}]", e.getMessage()); + } else if (e.getCause() != null && e.getCause().getMessage() != null && e.getCause().getMessage().contains(CONNECTION_REFUSED)) { + rdsSourceAggregateMetrics.getStream4xxErrors().increment(); + rdsSourceAggregateMetrics.getStreamServerNotFoundErrors().increment(); + LOG.error("Failed to connect to replication stream: Cannot connect to MySQL server. [{}]", e.getMessage()); + } else if (e.getMessage() != null && e.getMessage().contains(FAILED_TO_DETERMINE_BINLOG_FILENAME)) { + rdsSourceAggregateMetrics.getStream4xxErrors().increment(); + rdsSourceAggregateMetrics.getStreamReplicationNotEnabledErrors().increment(); + LOG.error("Failed to connect to replication stream: Binary logging not enabled on the server. [{}]", e.getMessage()); + } else if (e.getMessage() != null && e.getMessage().contains(ACCESS_DENIED)) { + rdsSourceAggregateMetrics.getStream4xxErrors().increment(); + rdsSourceAggregateMetrics.getStreamAccessDeniedErrors().increment(); + LOG.error("Failed to connect to replication stream: Insufficient privileges. [{}]", e.getMessage()); + } else { + rdsSourceAggregateMetrics.getStream5xxErrors().increment(); + LOG.error("Failed to connect to replication stream. ", e); + } + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java index 1fcc0b95e8..e8ca979763 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java @@ -11,6 +11,7 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager; +import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; import org.postgresql.PGConnection; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; @@ -28,10 +29,16 @@ public class LogicalReplicationClient implements ReplicationLogClient { static final String PROTO_VERSION_KEY = "proto_version"; static final String VERSION_ONE = "1"; static final String PUBLICATION_NAMES_KEY = "publication_names"; + static final String AUTHENTICATION_FAILED = "authentication failed"; + static final String CONNECTION_REFUSED = "Connection refused"; + static final String REPLICATION_SLOT_DOES_NOT_EXIST = ".*replication slot .* does not exist.*"; + static final String PERMISSION_DENIED = "permission denied"; private final ConnectionManager connectionManager; private final String publicationName; private final String replicationSlotName; + private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics; + private LogSequenceNumber startLsn; private LogicalReplicationEventProcessor eventProcessor; @@ -40,14 +47,17 @@ public class LogicalReplicationClient implements ReplicationLogClient { public LogicalReplicationClient(final ConnectionManager connectionManager, final String publicationName, - final String replicationSlotName) { + final String replicationSlotName, + final RdsSourceAggregateMetrics rdsSourceAggregateMetrics) { this.connectionManager = connectionManager; this.publicationName = publicationName; this.replicationSlotName = replicationSlotName; + this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics; } @Override public void connect() { + rdsSourceAggregateMetrics.getStreamApiInvocations().increment(); LOG.debug("Start connecting logical replication stream. "); try (Connection conn = connectionManager.getConnection()) { PGConnection pgConnection = conn.unwrap(PGConnection.class); @@ -95,7 +105,7 @@ public void connect() { disconnectRequested = false; } catch (Exception e) { - LOG.error("Exception while creating or processing Postgres replication stream. ", e); + categorizeError(e); throw new RuntimeException(e); } } @@ -131,4 +141,27 @@ private void closeStream() { } } } + + private void categorizeError(Exception e) { + if (e.getMessage() != null && e.getMessage().contains(AUTHENTICATION_FAILED)) { + rdsSourceAggregateMetrics.getStream4xxErrors().increment(); + rdsSourceAggregateMetrics.getStreamAuthErrors().increment(); + LOG.error("Failed to create or process PostgreSQL replication stream: Authentication failed. [{}]", e.getMessage()); + } else if (e.getCause() != null && e.getCause().getMessage() != null && e.getCause().getMessage().contains(CONNECTION_REFUSED)) { + rdsSourceAggregateMetrics.getStream4xxErrors().increment(); + rdsSourceAggregateMetrics.getStreamServerNotFoundErrors().increment(); + LOG.error("Failed to create or process PostgreSQL replication stream: Cannot connect to PostgreSQL server. [{}]", e.getMessage()); + } else if (e.getMessage() != null && e.getMessage().matches(REPLICATION_SLOT_DOES_NOT_EXIST)) { + rdsSourceAggregateMetrics.getStream4xxErrors().increment(); + rdsSourceAggregateMetrics.getStreamReplicationNotEnabledErrors().increment(); + LOG.error("Failed to create or process PostgreSQL replication stream: Replication slot does not exist. [{}]", e.getMessage()); + } else if (e.getMessage() != null && e.getMessage().contains(PERMISSION_DENIED)) { + rdsSourceAggregateMetrics.getStream4xxErrors().increment(); + rdsSourceAggregateMetrics.getStreamAccessDeniedErrors().increment(); + LOG.error("Failed to create or process PostgreSQL replication stream: Insufficient privileges. [{}]", e.getMessage()); + } else { + rdsSourceAggregateMetrics.getStream5xxErrors().increment(); + LOG.error("Failed to create or process PostgreSQL replication stream. ", e); + } + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactory.java index e0ed0a07c9..82fdcb85e6 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactory.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactory.java @@ -19,6 +19,7 @@ import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager; import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManagerFactory; +import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; import software.amazon.awssdk.services.rds.RdsClient; import java.util.NoSuchElementException; @@ -27,6 +28,7 @@ public class ReplicationLogClientFactory { private final RdsClient rdsClient; private final DbMetadata dbMetadata; + private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics; private RdsSourceConfig sourceConfig; private String username; private String password; @@ -34,17 +36,19 @@ public class ReplicationLogClientFactory { public ReplicationLogClientFactory(final RdsSourceConfig sourceConfig, final RdsClient rdsClient, - final DbMetadata dbMetadata) { + final DbMetadata dbMetadata, + final RdsSourceAggregateMetrics rdsSourceAggregateMetrics) { this.sourceConfig = sourceConfig; this.rdsClient = rdsClient; this.dbMetadata = dbMetadata; + this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics; username = sourceConfig.getAuthenticationConfig().getUsername(); password = sourceConfig.getAuthenticationConfig().getPassword(); } public ReplicationLogClient create(StreamPartition streamPartition) { if (sourceConfig.getEngine().isMySql()) { - return new BinlogClientWrapper(createBinaryLogClient()); + return new BinlogClientWrapper(createBinaryLogClient(), rdsSourceAggregateMetrics); } else { // Postgres return createLogicalReplicationClient(streamPartition); } @@ -74,7 +78,7 @@ private LogicalReplicationClient createLogicalReplicationClient(StreamPartition } final ConnectionManagerFactory connectionManagerFactory = new ConnectionManagerFactory(sourceConfig, dbMetadata); final ConnectionManager connectionManager = connectionManagerFactory.getConnectionManager(); - return new LogicalReplicationClient(connectionManager, publicationName, replicationSlotName); + return new LogicalReplicationClient(connectionManager, publicationName, replicationSlotName, rdsSourceAggregateMetrics); } public void setSSLMode(SSLMode sslMode) { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/utils/RdsSourceAggregateMetrics.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/utils/RdsSourceAggregateMetrics.java new file mode 100644 index 0000000000..a83252065a --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/utils/RdsSourceAggregateMetrics.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.rds.utils; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; + +public class RdsSourceAggregateMetrics { + private static final String RDS_SOURCE = "rds"; + + private static final String RDS_SOURCE_STREAM_5XX_EXCEPTIONS = "stream5xxErrors"; + private static final String RDS_SOURCE_STREAM_4XX_EXCEPTIONS = "stream4xxErrors"; + private static final String RDS_SOURCE_STREAM_API_INVOCATIONS = "streamApiInvocations"; + private static final String RDS_SOURCE_EXPORT_5XX_ERRORS = "export5xxErrors"; + private static final String RDS_SOURCE_EXPORT_4XX_ERRORS = "export4xxErrors"; + private static final String RDS_SOURCE_EXPORT_API_INVOCATIONS = "exportApiInvocations"; + private static final String RDS_SOURCE_EXPORT_PARTITION_QUERY_COUNT = "exportPartitionQueryCount"; + private static final String RDS_SOURCE_STREAM_AUTH_ERRORS = "streamAuthErrors"; + private static final String RDS_SOURCE_STREAM_SERVER_NOT_FOUND_ERRORS = "streamServerNotFoundErrors"; + private static final String RDS_SOURCE_STREAM_REPLICATION_NOT_ENABLED_ERRORS = "streamReplicationNotEnabledErrors"; + private static final String RDS_SOURCE_STREAM_ACCESS_DENIED_ERRORS = "streamAccessDeniedErrors"; + + private final PluginMetrics pluginMetrics; + private final Counter stream5xxErrors; + private final Counter stream4xxErrors; + private final Counter streamApiInvocations; + private final Counter export5xxErrors; + private final Counter export4xxErrors; + private final Counter exportApiInvocations; + private final Counter exportPartitionQueryCount; + private final Counter streamAuthErrors; + private final Counter streamServerNotFoundErrors; + private final Counter streamReplicationNotEnabledErrors; + private final Counter streamAccessDeniedErrors; + + public RdsSourceAggregateMetrics() { + this.pluginMetrics = PluginMetrics.fromPrefix(RDS_SOURCE); + this.stream5xxErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_5XX_EXCEPTIONS); + this.stream4xxErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_4XX_EXCEPTIONS); + this.streamApiInvocations = pluginMetrics.counter(RDS_SOURCE_STREAM_API_INVOCATIONS); + this.export5xxErrors = pluginMetrics.counter(RDS_SOURCE_EXPORT_5XX_ERRORS); + this.export4xxErrors = pluginMetrics.counter(RDS_SOURCE_EXPORT_4XX_ERRORS); + this.exportApiInvocations = pluginMetrics.counter(RDS_SOURCE_EXPORT_API_INVOCATIONS); + this.exportPartitionQueryCount = pluginMetrics.counter(RDS_SOURCE_EXPORT_PARTITION_QUERY_COUNT); + + // More granular error metrics + this.streamAuthErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_AUTH_ERRORS); + this.streamServerNotFoundErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_SERVER_NOT_FOUND_ERRORS); + this.streamReplicationNotEnabledErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_REPLICATION_NOT_ENABLED_ERRORS); + this.streamAccessDeniedErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_ACCESS_DENIED_ERRORS); + } + + public Counter getStream5xxErrors() { + return stream5xxErrors; + } + + public Counter getStream4xxErrors() { + return stream4xxErrors; + } + + public Counter getStreamApiInvocations() { + return streamApiInvocations; + } + + public Counter getExport5xxErrors() { + return export5xxErrors; + } + + public Counter getExport4xxErrors() { + return export4xxErrors; + } + + public Counter getExportApiInvocations() { + return exportApiInvocations; + } + + public Counter getExportPartitionQueryCount() { + return exportPartitionQueryCount; + } + + public Counter getStreamAuthErrors() { + return streamAuthErrors; + } + + public Counter getStreamServerNotFoundErrors() { + return streamServerNotFoundErrors; + } + + public Counter getStreamReplicationNotEnabledErrors() { + return streamReplicationNotEnabledErrors; + } + + public Counter getStreamAccessDeniedErrors() { + return streamAccessDeniedErrors; + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManagerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManagerTest.java index 15a23277c7..ea2ea83164 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManagerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/export/ExportTaskManagerTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.rds.export; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -14,11 +15,14 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.rds.RdsClient; import software.amazon.awssdk.services.rds.model.DescribeExportTasksRequest; import software.amazon.awssdk.services.rds.model.DescribeExportTasksResponse; import software.amazon.awssdk.services.rds.model.ExportTask; import software.amazon.awssdk.services.rds.model.StartExportTaskRequest; +import software.amazon.awssdk.services.rds.model.StartExportTaskResponse; import java.util.List; import java.util.UUID; @@ -26,7 +30,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -38,28 +44,49 @@ class ExportTaskManagerTest { @Mock private RdsClient rdsClient; + @Mock + private RdsSourceAggregateMetrics rdsSourceAggregateMetrics; + + @Mock + private Counter exportApiInvocations; + + @Mock + private Counter export4xxErrors; + + @Mock + private Counter export5xxErrors; + private ExportTaskManager exportTaskManager; @BeforeEach void setUp() { exportTaskManager = createObjectUnderTest(); + lenient().when(rdsSourceAggregateMetrics.getExportApiInvocations()).thenReturn(exportApiInvocations); + lenient().when(rdsSourceAggregateMetrics.getExport4xxErrors()).thenReturn(export4xxErrors); + lenient().when(rdsSourceAggregateMetrics.getExport5xxErrors()).thenReturn(export5xxErrors); } @ParameterizedTest @MethodSource("provideStartExportTaskTestParameters") void test_start_export_task(List exportOnly) { - final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:" + UUID.randomUUID(); + final String snapshotId = UUID.randomUUID().toString().substring(0, 5); + final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:" + snapshotId; final String iamRoleArn = "arn:aws:iam:us-east-1:123456789012:role:" + UUID.randomUUID(); final String bucket = UUID.randomUUID().toString(); final String prefix = UUID.randomUUID().toString(); final String kmsKey = UUID.randomUUID().toString(); + final StartExportTaskResponse response = mock(StartExportTaskResponse.class); + when(rdsClient.startExportTask(any(StartExportTaskRequest.class))).thenReturn(response); + when(response.status()).thenReturn(UUID.randomUUID().toString()); - exportTaskManager.startExportTask(snapshotArn, iamRoleArn, bucket, prefix, kmsKey, exportOnly); + final String exportTaskId = exportTaskManager.startExportTask(snapshotArn, iamRoleArn, bucket, prefix, kmsKey, exportOnly); final ArgumentCaptor exportTaskRequestArgumentCaptor = ArgumentCaptor.forClass(StartExportTaskRequest.class); + assertThat(exportTaskId, startsWith(snapshotId)); verify(rdsClient).startExportTask(exportTaskRequestArgumentCaptor.capture()); + verify(exportApiInvocations).increment(); final StartExportTaskRequest actualRequest = exportTaskRequestArgumentCaptor.getValue(); assertThat(actualRequest.sourceArn(), equalTo(snapshotArn)); @@ -70,6 +97,40 @@ void test_start_export_task(List exportOnly) { assertThat(actualRequest.exportOnly(), equalTo(exportOnly)); } + @Test + void test_start_export_task_with_sdkexception_returns_null() { + final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:" + UUID.randomUUID(); + final String iamRoleArn = "arn:aws:iam:us-east-1:123456789012:role:" + UUID.randomUUID(); + final String bucket = UUID.randomUUID().toString(); + final String prefix = UUID.randomUUID().toString(); + final String kmsKey = UUID.randomUUID().toString(); + + when(rdsClient.startExportTask(any(StartExportTaskRequest.class))).thenThrow(SdkException.class); + + final String exportTaskId = exportTaskManager.startExportTask(snapshotArn, iamRoleArn, bucket, prefix, kmsKey, List.of()); + + assertThat(exportTaskId, equalTo(null)); + verify(exportApiInvocations).increment(); + verify(export4xxErrors).increment(); + } + + @Test + void test_start_export_task_with_other_exception_returns_null() { + final String snapshotArn = "arn:aws:rds:us-east-1:123456789012:snapshot:" + UUID.randomUUID(); + final String iamRoleArn = "arn:aws:iam:us-east-1:123456789012:role:" + UUID.randomUUID(); + final String bucket = UUID.randomUUID().toString(); + final String prefix = UUID.randomUUID().toString(); + final String kmsKey = UUID.randomUUID().toString(); + + when(rdsClient.startExportTask(any(StartExportTaskRequest.class))).thenThrow(RuntimeException.class); + + final String exportTaskId = exportTaskManager.startExportTask(snapshotArn, iamRoleArn, bucket, prefix, kmsKey, List.of()); + + assertThat(exportTaskId, equalTo(null)); + verify(exportApiInvocations).increment(); + verify(export5xxErrors).increment(); + } + @Test void test_check_export_status() { final String exportTaskId = UUID.randomUUID().toString(); @@ -99,6 +160,6 @@ private static Stream provideStartExportTaskTestParameters() { } private ExportTaskManager createObjectUnderTest() { - return new ExportTaskManager(rdsClient); + return new ExportTaskManager(rdsClient, rdsSourceAggregateMetrics); } } \ No newline at end of file diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapperTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapperTest.java new file mode 100644 index 0000000000..d30124b078 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapperTest.java @@ -0,0 +1,176 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.network.AuthenticationException; +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; + +import java.io.IOException; +import java.util.List; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientWrapper.ACCESS_DENIED; +import static org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientWrapper.CONNECTION_REFUSED; +import static org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientWrapper.FAILED_TO_DETERMINE_BINLOG_FILENAME; + + +@ExtendWith(MockitoExtension.class) +class BinlogClientWrapperTest { + @Mock + private BinaryLogClient binaryLogClient; + + @Mock + private RdsSourceAggregateMetrics rdsSourceAggregateMetrics; + + @Mock + private Counter streamApiInvocations; + + @Mock + private Counter stream4xxErrors; + + @Mock + private Counter stream5xxErrors; + + @Mock + private Counter streamAuthErrors; + + @Mock + private Counter streamServerNotFoundErrors; + + @Mock + private Counter streamReplicationNotEnabledErrors; + + @Mock + private Counter streamAccessDeniedErrors; + + private BinlogClientWrapper binlogClientWrapper; + + @BeforeEach + void setUp() { + binlogClientWrapper = createObjectUnderTest(); + lenient().when(rdsSourceAggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations); + lenient().when(rdsSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); + lenient().when(rdsSourceAggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); + lenient().when(rdsSourceAggregateMetrics.getStreamAuthErrors()).thenReturn(streamAuthErrors); + lenient().when(rdsSourceAggregateMetrics.getStreamServerNotFoundErrors()).thenReturn(streamServerNotFoundErrors); + lenient().when(rdsSourceAggregateMetrics.getStreamReplicationNotEnabledErrors()).thenReturn(streamReplicationNotEnabledErrors); + lenient().when(rdsSourceAggregateMetrics.getStreamAccessDeniedErrors()).thenReturn(streamAccessDeniedErrors); + } + + @Test + void test_connect_calls_client_connect() throws IOException { + binlogClientWrapper.connect(); + verify(binaryLogClient).connect(); + verify(streamApiInvocations).increment(); + } + + @Test + void test_connect_with_4xx_auth_exception() throws IOException { + doThrow(AuthenticationException.class).when(binaryLogClient).connect(); + + try { + binlogClientWrapper.connect(); + } catch (Exception e) { + verify(streamApiInvocations).increment(); + verify(stream4xxErrors).increment(); + verify(streamAuthErrors).increment(); + } + } + + @Test + void test_connect_with_4xx_server_not_found_exception() throws IOException { + Exception connectionRefusedException = new IOException( + "Failed to connect to MySQL server", + new Exception(CONNECTION_REFUSED) + ); + doThrow(connectionRefusedException).when(binaryLogClient).connect(); + + try { + binlogClientWrapper.connect(); + } catch (Exception e) { + verify(streamApiInvocations).increment(); + verify(stream4xxErrors).increment(); + verify(streamServerNotFoundErrors).increment(); + } + } + + @Test + void test_connect_with_4xx_binlog_exception() throws IOException { + final Exception binlogNotEnabledException = new IOException(FAILED_TO_DETERMINE_BINLOG_FILENAME); + doThrow(binlogNotEnabledException).when(binaryLogClient).connect(); + + try { + binlogClientWrapper.connect(); + } catch (Exception e) { + verify(streamApiInvocations).increment(); + verify(stream4xxErrors).increment(); + verify(streamReplicationNotEnabledErrors).increment(); + } + } + + @Test + void test_connect_with_4xx_access_exception() throws IOException { + final Exception accessDeniedException = new IOException(ACCESS_DENIED); + doThrow(accessDeniedException).when(binaryLogClient).connect(); + + try { + binlogClientWrapper.connect(); + } catch (Exception e) { + verify(streamApiInvocations).increment(); + verify(stream4xxErrors).increment(); + verify(streamAccessDeniedErrors).increment(); + } + } + + @Test + void test_connect_with_5xx_exception() throws IOException { + doThrow(RuntimeException.class).when(binaryLogClient).connect(); + + try { + binlogClientWrapper.connect(); + } catch (Exception e) { + verify(streamApiInvocations).increment(); + verify(stream5xxErrors).increment(); + } + } + + @Test + void test_disconnect_calls_client_disconnect() throws IOException { + BinlogEventListener eventListener = mock(BinlogEventListener.class); + when(binaryLogClient.getEventListeners()).thenReturn(List.of(eventListener)); + BinaryLogClient.LifecycleListener lifecycleListener = mock(BinaryLogClient.LifecycleListener.class); + when(binaryLogClient.getLifecycleListeners()).thenReturn(List.of(lifecycleListener)); + + binlogClientWrapper.disconnect(); + + InOrder inOrder = inOrder(binaryLogClient, eventListener); + inOrder.verify(eventListener).stopCheckpointManager(); + inOrder.verify(binaryLogClient).unregisterEventListener(eventListener); + inOrder.verify(binaryLogClient).unregisterLifecycleListener(lifecycleListener); + inOrder.verify(binaryLogClient).disconnect(); + } + + private BinlogClientWrapper createObjectUnderTest() { + return new BinlogClientWrapper(binaryLogClient, rdsSourceAggregateMetrics); + } +} diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java index 04f39c69a2..87d325d291 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClientTest.java @@ -10,12 +10,14 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.plugins.source.rds.schema.PostgresConnectionManager; +import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; import org.postgresql.PGConnection; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; @@ -33,11 +35,16 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.source.rds.stream.LogicalReplicationClient.AUTHENTICATION_FAILED; +import static org.opensearch.dataprepper.plugins.source.rds.stream.LogicalReplicationClient.CONNECTION_REFUSED; +import static org.opensearch.dataprepper.plugins.source.rds.stream.LogicalReplicationClient.PERMISSION_DENIED; +import static org.opensearch.dataprepper.plugins.source.rds.stream.LogicalReplicationClient.REPLICATION_SLOT_DOES_NOT_EXIST; @ExtendWith(MockitoExtension.class) class LogicalReplicationClientTest { @@ -48,6 +55,30 @@ class LogicalReplicationClientTest { @Mock private LogicalReplicationEventProcessor eventProcessor; + @Mock + private RdsSourceAggregateMetrics rdsSourceAggregateMetrics; + + @Mock + private Counter streamApiInvocations; + + @Mock + private Counter stream4xxErrors; + + @Mock + private Counter stream5xxErrors; + + @Mock + private Counter streamAuthErrors; + + @Mock + private Counter streamServerNotFoundErrors; + + @Mock + private Counter streamReplicationNotEnabledErrors; + + @Mock + private Counter streamAccessDeniedErrors; + private String publicationName; private String replicationSlotName; private LogicalReplicationClient logicalReplicationClient; @@ -58,6 +89,13 @@ void setUp() { replicationSlotName = UUID.randomUUID().toString(); logicalReplicationClient = createObjectUnderTest(); logicalReplicationClient.setEventProcessor(eventProcessor); + lenient().when(rdsSourceAggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations); + lenient().when(rdsSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); + lenient().when(rdsSourceAggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); + lenient().when(rdsSourceAggregateMetrics.getStreamAuthErrors()).thenReturn(streamAuthErrors); + lenient().when(rdsSourceAggregateMetrics.getStreamServerNotFoundErrors()).thenReturn(streamServerNotFoundErrors); + lenient().when(rdsSourceAggregateMetrics.getStreamReplicationNotEnabledErrors()).thenReturn(streamReplicationNotEnabledErrors); + lenient().when(rdsSourceAggregateMetrics.getStreamAccessDeniedErrors()).thenReturn(streamAccessDeniedErrors); } @Test @@ -88,18 +126,60 @@ void test_connect() throws SQLException, InterruptedException { verify(stream).setAppliedLSN(lsn); verify(stream).setFlushedLSN(lsn); + verify(streamApiInvocations).increment(); } @Test - void test_connect_exception_should_throw() throws SQLException { - when(connectionManager.getConnection()).thenThrow(RuntimeException.class); + void test_connect_auth_exception_should_throw() throws SQLException { + when(connectionManager.getConnection()).thenThrow(new RuntimeException(AUTHENTICATION_FAILED)); - final ExecutorService executorService = Executors.newSingleThreadExecutor(); - executorService.submit(() -> logicalReplicationClient.connect()); + assertThrows(RuntimeException.class, () -> logicalReplicationClient.connect()); + verify(streamApiInvocations).increment(); + verify(stream4xxErrors).increment(); + verify(streamAuthErrors).increment(); + } + + @Test + void test_connect_server_exception_should_throw() throws SQLException { + Exception connectionRefusedException = new RuntimeException( + "Failed to establish connection", + new Exception(CONNECTION_REFUSED) + ); + when(connectionManager.getConnection()).thenThrow(connectionRefusedException); assertThrows(RuntimeException.class, () -> logicalReplicationClient.connect()); + verify(streamApiInvocations).increment(); + verify(stream4xxErrors).increment(); + verify(streamServerNotFoundErrors).increment(); + } - executorService.shutdownNow(); + @Test + void test_connect_log_exception_should_throw() throws SQLException { + when(connectionManager.getConnection()).thenThrow(new RuntimeException(REPLICATION_SLOT_DOES_NOT_EXIST)); + + assertThrows(RuntimeException.class, () -> logicalReplicationClient.connect()); + verify(streamApiInvocations).increment(); + verify(stream4xxErrors).increment(); + verify(streamReplicationNotEnabledErrors).increment(); + } + + @Test + void test_connect_access_exception_should_throw() throws SQLException { + when(connectionManager.getConnection()).thenThrow(new RuntimeException(PERMISSION_DENIED)); + + assertThrows(RuntimeException.class, () -> logicalReplicationClient.connect()); + verify(streamApiInvocations).increment(); + verify(stream4xxErrors).increment(); + verify(streamAccessDeniedErrors).increment(); + } + + @Test + void test_connect_unknown_exception_should_throw() throws SQLException { + when(connectionManager.getConnection()).thenThrow(RuntimeException.class); + + assertThrows(RuntimeException.class, () -> logicalReplicationClient.connect()); + verify(streamApiInvocations).increment(); + verify(stream5xxErrors).increment(); } @Test @@ -195,6 +275,6 @@ void test_connect_disconnect_cycles() throws SQLException, InterruptedException } private LogicalReplicationClient createObjectUnderTest() { - return new LogicalReplicationClient(connectionManager, replicationSlotName, publicationName); + return new LogicalReplicationClient(connectionManager, replicationSlotName, publicationName, rdsSourceAggregateMetrics); } } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactoryTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactoryTest.java index 6cf3e7858b..150adde5c1 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactoryTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactoryTest.java @@ -21,6 +21,7 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.state.PostgresStreamState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; +import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; import software.amazon.awssdk.services.rds.RdsClient; import java.util.List; @@ -48,6 +49,9 @@ class ReplicationLogClientFactoryTest { @Mock private StreamPartition streamPartition; + @Mock + private RdsSourceAggregateMetrics rdsSourceAggregateMetrics; + private ReplicationLogClientFactory replicationLogClientFactory; @Test @@ -93,6 +97,6 @@ void test_create_logical_replication_client() { } private ReplicationLogClientFactory createObjectUnderTest() { - return new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata); + return new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata, rdsSourceAggregateMetrics); } }