Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener;
import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.rds.RdsClient;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class RdsService {
private final RdsSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;
private final PluginConfigObservable pluginConfigObservable;
private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics;
private final PipelineDescription pipelineDescription;
private ExecutorService executor;
private LeaderScheduler leaderScheduler;
Expand All @@ -87,6 +89,7 @@ public RdsService(final EnhancedSourceCoordinator sourceCoordinator,
this.sourceConfig = sourceConfig;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginConfigObservable = pluginConfigObservable;
this.rdsSourceAggregateMetrics = new RdsSourceAggregateMetrics();
this.pipelineDescription = pipelineDescription;

rdsClient = clientFactory.buildRdsClient();
Expand Down Expand Up @@ -118,7 +121,7 @@ public void start(Buffer<Record<Event>> buffer) {

if (sourceConfig.isExportEnabled()) {
final SnapshotManager snapshotManager = new SnapshotManager(rdsApiStrategy);
final ExportTaskManager exportTaskManager = new ExportTaskManager(rdsClient);
final ExportTaskManager exportTaskManager = new ExportTaskManager(rdsClient, rdsSourceAggregateMetrics);
exportScheduler = new ExportScheduler(
sourceCoordinator, snapshotManager, exportTaskManager, s3Client, pluginMetrics);
dataFileScheduler = new DataFileScheduler(
Expand All @@ -128,7 +131,8 @@ public void start(Buffer<Record<Event>> buffer) {
}

if (sourceConfig.isStreamEnabled()) {
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(sourceConfig, rdsClient, dbMetadata);
ReplicationLogClientFactory replicationLogClientFactory = new ReplicationLogClientFactory(
sourceConfig, rdsClient, dbMetadata, rdsSourceAggregateMetrics);

if (sourceConfig.isTlsEnabled()) {
replicationLogClientFactory.setSSLMode(SSLMode.REQUIRED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

package org.opensearch.dataprepper.plugins.source.rds.export;

import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.rds.RdsClient;
import software.amazon.awssdk.services.rds.model.DescribeExportTasksRequest;
import software.amazon.awssdk.services.rds.model.DescribeExportTasksResponse;
Expand All @@ -25,9 +27,11 @@ public class ExportTaskManager {
private static final int EXPORT_TASK_ID_MAX_LENGTH = 60;

private final RdsClient rdsClient;
private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics;

public ExportTaskManager(final RdsClient rdsClient) {
public ExportTaskManager(final RdsClient rdsClient, final RdsSourceAggregateMetrics rdsSourceAggregateMetrics) {
this.rdsClient = rdsClient;
this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics;
}

public String startExportTask(String snapshotArn, String iamRoleArn, String bucket, String prefix, String kmsKeyId, Collection<String> includeTables) {
Expand All @@ -45,12 +49,17 @@ public String startExportTask(String snapshotArn, String iamRoleArn, String buck
}

try {
rdsSourceAggregateMetrics.getExportApiInvocations().increment();
StartExportTaskResponse response = rdsClient.startExportTask(requestBuilder.build());
LOG.info("Export task submitted with id {} and status {}", exportTaskId, response.status());
return exportTaskId;

} catch (SdkException e) {
rdsSourceAggregateMetrics.getExport4xxErrors().increment();
LOG.error("Failed to start an export task with error: {}", e.getMessage());
return null;
} catch (Exception e) {
LOG.error("Failed to start an export task", e);
rdsSourceAggregateMetrics.getExport5xxErrors().increment();
LOG.error("Failed to start an export task with error: {}", e.getMessage());
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package org.opensearch.dataprepper.plugins.source.rds.stream;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -20,15 +22,27 @@
public class BinlogClientWrapper implements ReplicationLogClient {

private static final Logger LOG = LoggerFactory.getLogger(BinlogClientWrapper.class);
static final String CONNECTION_REFUSED = "Connection refused";
static final String FAILED_TO_DETERMINE_BINLOG_FILENAME = "Failed to determine binlog filename";
static final String ACCESS_DENIED = "Access denied";
private final BinaryLogClient binlogClient;
private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics;

public BinlogClientWrapper(final BinaryLogClient binlogClient) {
public BinlogClientWrapper(final BinaryLogClient binlogClient, final RdsSourceAggregateMetrics rdsSourceAggregateMetrics) {
this.binlogClient = binlogClient;
this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics;
}

@Override
public void connect() throws IOException {
binlogClient.connect();
try {
rdsSourceAggregateMetrics.getStreamApiInvocations().increment();
binlogClient.connect();
} catch (Exception e) {
LOG.error("Failed to connect to replication stream due to {}", e.getMessage());
categorizeError(e);
throw e;
}
}

@Override
Expand All @@ -54,4 +68,27 @@ public void disconnect() throws IOException {
public BinaryLogClient getBinlogClient() {
return binlogClient;
}

private void categorizeError(Exception e) {
if (e instanceof AuthenticationException) {
rdsSourceAggregateMetrics.getStream4xxErrors().increment();
rdsSourceAggregateMetrics.getStreamAuthErrors().increment();
LOG.error("Failed to connect to replication stream: Authentication failed. [{}]", e.getMessage());
} else if (e.getCause() != null && e.getCause().getMessage() != null && e.getCause().getMessage().contains(CONNECTION_REFUSED)) {
rdsSourceAggregateMetrics.getStream4xxErrors().increment();
rdsSourceAggregateMetrics.getStreamServerNotFoundErrors().increment();
LOG.error("Failed to connect to replication stream: Cannot connect to MySQL server. [{}]", e.getMessage());
} else if (e.getMessage() != null && e.getMessage().contains(FAILED_TO_DETERMINE_BINLOG_FILENAME)) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to check through exception type?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are mostly just generic IOException so I didn't use exception type except the AuthenticationException

rdsSourceAggregateMetrics.getStream4xxErrors().increment();
rdsSourceAggregateMetrics.getStreamReplicationNotEnabledErrors().increment();
LOG.error("Failed to connect to replication stream: Binary logging not enabled on the server. [{}]", e.getMessage());
} else if (e.getMessage() != null && e.getMessage().contains(ACCESS_DENIED)) {
rdsSourceAggregateMetrics.getStream4xxErrors().increment();
rdsSourceAggregateMetrics.getStreamAccessDeniedErrors().increment();
LOG.error("Failed to connect to replication stream: Insufficient privileges. [{}]", e.getMessage());
} else {
rdsSourceAggregateMetrics.getStream5xxErrors().increment();
LOG.error("Failed to connect to replication stream. ", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package org.opensearch.dataprepper.plugins.source.rds.stream;

import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics;
import org.postgresql.PGConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
Expand All @@ -28,10 +29,16 @@ public class LogicalReplicationClient implements ReplicationLogClient {
static final String PROTO_VERSION_KEY = "proto_version";
static final String VERSION_ONE = "1";
static final String PUBLICATION_NAMES_KEY = "publication_names";
static final String AUTHENTICATION_FAILED = "authentication failed";
static final String CONNECTION_REFUSED = "Connection refused";
static final String REPLICATION_SLOT_DOES_NOT_EXIST = ".*replication slot .* does not exist.*";
static final String PERMISSION_DENIED = "permission denied";

private final ConnectionManager connectionManager;
private final String publicationName;
private final String replicationSlotName;
private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics;

private LogSequenceNumber startLsn;
private LogicalReplicationEventProcessor eventProcessor;

Expand All @@ -40,14 +47,17 @@ public class LogicalReplicationClient implements ReplicationLogClient {

public LogicalReplicationClient(final ConnectionManager connectionManager,
final String publicationName,
final String replicationSlotName) {
final String replicationSlotName,
final RdsSourceAggregateMetrics rdsSourceAggregateMetrics) {
this.connectionManager = connectionManager;
this.publicationName = publicationName;
this.replicationSlotName = replicationSlotName;
this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics;
}

@Override
public void connect() {
rdsSourceAggregateMetrics.getStreamApiInvocations().increment();
LOG.debug("Start connecting logical replication stream. ");
try (Connection conn = connectionManager.getConnection()) {
PGConnection pgConnection = conn.unwrap(PGConnection.class);
Expand Down Expand Up @@ -95,7 +105,7 @@ public void connect() {

disconnectRequested = false;
} catch (Exception e) {
LOG.error("Exception while creating or processing Postgres replication stream. ", e);
categorizeError(e);
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -131,4 +141,27 @@ private void closeStream() {
}
}
}

private void categorizeError(Exception e) {
if (e.getMessage() != null && e.getMessage().contains(AUTHENTICATION_FAILED)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added metrics for those specific errors.

rdsSourceAggregateMetrics.getStream4xxErrors().increment();
rdsSourceAggregateMetrics.getStreamAuthErrors().increment();
LOG.error("Failed to create or process PostgreSQL replication stream: Authentication failed. [{}]", e.getMessage());
} else if (e.getCause() != null && e.getCause().getMessage() != null && e.getCause().getMessage().contains(CONNECTION_REFUSED)) {
rdsSourceAggregateMetrics.getStream4xxErrors().increment();
rdsSourceAggregateMetrics.getStreamServerNotFoundErrors().increment();
LOG.error("Failed to create or process PostgreSQL replication stream: Cannot connect to PostgreSQL server. [{}]", e.getMessage());
} else if (e.getMessage() != null && e.getMessage().matches(REPLICATION_SLOT_DOES_NOT_EXIST)) {
rdsSourceAggregateMetrics.getStream4xxErrors().increment();
rdsSourceAggregateMetrics.getStreamReplicationNotEnabledErrors().increment();
LOG.error("Failed to create or process PostgreSQL replication stream: Replication slot does not exist. [{}]", e.getMessage());
} else if (e.getMessage() != null && e.getMessage().contains(PERMISSION_DENIED)) {
rdsSourceAggregateMetrics.getStream4xxErrors().increment();
rdsSourceAggregateMetrics.getStreamAccessDeniedErrors().increment();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update the unit test to verify on these new metrics added ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

LOG.error("Failed to create or process PostgreSQL replication stream: Insufficient privileges. [{}]", e.getMessage());
} else {
rdsSourceAggregateMetrics.getStream5xxErrors().increment();
LOG.error("Failed to create or process PostgreSQL replication stream. ", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManagerFactory;
import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics;
import software.amazon.awssdk.services.rds.RdsClient;

import java.util.NoSuchElementException;
Expand All @@ -27,24 +28,27 @@ public class ReplicationLogClientFactory {

private final RdsClient rdsClient;
private final DbMetadata dbMetadata;
private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics;
private RdsSourceConfig sourceConfig;
private String username;
private String password;
private SSLMode sslMode = SSLMode.REQUIRED;

public ReplicationLogClientFactory(final RdsSourceConfig sourceConfig,
final RdsClient rdsClient,
final DbMetadata dbMetadata) {
final DbMetadata dbMetadata,
final RdsSourceAggregateMetrics rdsSourceAggregateMetrics) {
this.sourceConfig = sourceConfig;
this.rdsClient = rdsClient;
this.dbMetadata = dbMetadata;
this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics;
username = sourceConfig.getAuthenticationConfig().getUsername();
password = sourceConfig.getAuthenticationConfig().getPassword();
}

public ReplicationLogClient create(StreamPartition streamPartition) {
if (sourceConfig.getEngine().isMySql()) {
return new BinlogClientWrapper(createBinaryLogClient());
return new BinlogClientWrapper(createBinaryLogClient(), rdsSourceAggregateMetrics);
} else { // Postgres
return createLogicalReplicationClient(streamPartition);
}
Expand Down Expand Up @@ -74,7 +78,7 @@ private LogicalReplicationClient createLogicalReplicationClient(StreamPartition
}
final ConnectionManagerFactory connectionManagerFactory = new ConnectionManagerFactory(sourceConfig, dbMetadata);
final ConnectionManager connectionManager = connectionManagerFactory.getConnectionManager();
return new LogicalReplicationClient(connectionManager, publicationName, replicationSlotName);
return new LogicalReplicationClient(connectionManager, publicationName, replicationSlotName, rdsSourceAggregateMetrics);
}

public void setSSLMode(SSLMode sslMode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.utils;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;

public class RdsSourceAggregateMetrics {
private static final String RDS_SOURCE = "rds";

private static final String RDS_SOURCE_STREAM_5XX_EXCEPTIONS = "stream5xxErrors";
private static final String RDS_SOURCE_STREAM_4XX_EXCEPTIONS = "stream4xxErrors";
private static final String RDS_SOURCE_STREAM_API_INVOCATIONS = "streamApiInvocations";
private static final String RDS_SOURCE_EXPORT_5XX_ERRORS = "export5xxErrors";
private static final String RDS_SOURCE_EXPORT_4XX_ERRORS = "export4xxErrors";
private static final String RDS_SOURCE_EXPORT_API_INVOCATIONS = "exportApiInvocations";
private static final String RDS_SOURCE_EXPORT_PARTITION_QUERY_COUNT = "exportPartitionQueryCount";
private static final String RDS_SOURCE_STREAM_AUTH_ERRORS = "streamAuthErrors";
private static final String RDS_SOURCE_STREAM_SERVER_NOT_FOUND_ERRORS = "streamServerNotFoundErrors";
private static final String RDS_SOURCE_STREAM_REPLICATION_NOT_ENABLED_ERRORS = "streamReplicationNotEnabledErrors";
private static final String RDS_SOURCE_STREAM_ACCESS_DENIED_ERRORS = "streamAccessDeniedErrors";

private final PluginMetrics pluginMetrics;
private final Counter stream5xxErrors;
private final Counter stream4xxErrors;
private final Counter streamApiInvocations;
private final Counter export5xxErrors;
private final Counter export4xxErrors;
private final Counter exportApiInvocations;
private final Counter exportPartitionQueryCount;
private final Counter streamAuthErrors;
private final Counter streamServerNotFoundErrors;
private final Counter streamReplicationNotEnabledErrors;
private final Counter streamAccessDeniedErrors;

public RdsSourceAggregateMetrics() {
this.pluginMetrics = PluginMetrics.fromPrefix(RDS_SOURCE);
this.stream5xxErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_5XX_EXCEPTIONS);
this.stream4xxErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_4XX_EXCEPTIONS);
this.streamApiInvocations = pluginMetrics.counter(RDS_SOURCE_STREAM_API_INVOCATIONS);
this.export5xxErrors = pluginMetrics.counter(RDS_SOURCE_EXPORT_5XX_ERRORS);
this.export4xxErrors = pluginMetrics.counter(RDS_SOURCE_EXPORT_4XX_ERRORS);
this.exportApiInvocations = pluginMetrics.counter(RDS_SOURCE_EXPORT_API_INVOCATIONS);
this.exportPartitionQueryCount = pluginMetrics.counter(RDS_SOURCE_EXPORT_PARTITION_QUERY_COUNT);

// More granular error metrics
this.streamAuthErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_AUTH_ERRORS);
this.streamServerNotFoundErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_SERVER_NOT_FOUND_ERRORS);
this.streamReplicationNotEnabledErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_REPLICATION_NOT_ENABLED_ERRORS);
this.streamAccessDeniedErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_ACCESS_DENIED_ERRORS);
}

public Counter getStream5xxErrors() {
return stream5xxErrors;
}

public Counter getStream4xxErrors() {
return stream4xxErrors;
}

public Counter getStreamApiInvocations() {
return streamApiInvocations;
}

public Counter getExport5xxErrors() {
return export5xxErrors;
}

public Counter getExport4xxErrors() {
return export4xxErrors;
}

public Counter getExportApiInvocations() {
return exportApiInvocations;
}

public Counter getExportPartitionQueryCount() {
return exportPartitionQueryCount;
}

public Counter getStreamAuthErrors() {
return streamAuthErrors;
}

public Counter getStreamServerNotFoundErrors() {
return streamServerNotFoundErrors;
}

public Counter getStreamReplicationNotEnabledErrors() {
return streamReplicationNotEnabledErrors;
}

public Counter getStreamAccessDeniedErrors() {
return streamAccessDeniedErrors;
}
}
Loading
Loading