-
Notifications
You must be signed in to change notification settings - Fork 330
Add aggregate metrics for rds source #5697
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| package org.opensearch.dataprepper.plugins.source.rds.stream; | ||
|
|
||
| import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager; | ||
| import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics; | ||
| import org.postgresql.PGConnection; | ||
| import org.postgresql.replication.LogSequenceNumber; | ||
| import org.postgresql.replication.PGReplicationStream; | ||
|
|
@@ -28,10 +29,16 @@ public class LogicalReplicationClient implements ReplicationLogClient { | |
| static final String PROTO_VERSION_KEY = "proto_version"; | ||
| static final String VERSION_ONE = "1"; | ||
| static final String PUBLICATION_NAMES_KEY = "publication_names"; | ||
| static final String AUTHENTICATION_FAILED = "authentication failed"; | ||
| static final String CONNECTION_REFUSED = "Connection refused"; | ||
| static final String REPLICATION_SLOT_DOES_NOT_EXIST = ".*replication slot .* does not exist.*"; | ||
| static final String PERMISSION_DENIED = "permission denied"; | ||
|
|
||
| private final ConnectionManager connectionManager; | ||
| private final String publicationName; | ||
| private final String replicationSlotName; | ||
| private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics; | ||
|
|
||
| private LogSequenceNumber startLsn; | ||
| private LogicalReplicationEventProcessor eventProcessor; | ||
|
|
||
|
|
@@ -40,14 +47,17 @@ public class LogicalReplicationClient implements ReplicationLogClient { | |
|
|
||
| public LogicalReplicationClient(final ConnectionManager connectionManager, | ||
| final String publicationName, | ||
| final String replicationSlotName) { | ||
| final String replicationSlotName, | ||
| final RdsSourceAggregateMetrics rdsSourceAggregateMetrics) { | ||
| this.connectionManager = connectionManager; | ||
| this.publicationName = publicationName; | ||
| this.replicationSlotName = replicationSlotName; | ||
| this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics; | ||
| } | ||
|
|
||
| @Override | ||
| public void connect() { | ||
| rdsSourceAggregateMetrics.getStreamApiInvocations().increment(); | ||
| LOG.debug("Start connecting logical replication stream. "); | ||
| try (Connection conn = connectionManager.getConnection()) { | ||
| PGConnection pgConnection = conn.unwrap(PGConnection.class); | ||
|
|
@@ -95,7 +105,7 @@ public void connect() { | |
|
|
||
| disconnectRequested = false; | ||
| } catch (Exception e) { | ||
| LOG.error("Exception while creating or processing Postgres replication stream. ", e); | ||
| categorizeError(e); | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
|
|
@@ -131,4 +141,27 @@ private void closeStream() { | |
| } | ||
| } | ||
| } | ||
|
|
||
| private void categorizeError(Exception e) { | ||
| if (e.getMessage() != null && e.getMessage().contains(AUTHENTICATION_FAILED)) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as above
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added metrics for those specific errors. |
||
| rdsSourceAggregateMetrics.getStream4xxErrors().increment(); | ||
| rdsSourceAggregateMetrics.getStreamAuthErrors().increment(); | ||
| LOG.error("Failed to create or process PostgreSQL replication stream: Authentication failed. [{}]", e.getMessage()); | ||
| } else if (e.getCause() != null && e.getCause().getMessage() != null && e.getCause().getMessage().contains(CONNECTION_REFUSED)) { | ||
| rdsSourceAggregateMetrics.getStream4xxErrors().increment(); | ||
| rdsSourceAggregateMetrics.getStreamServerNotFoundErrors().increment(); | ||
| LOG.error("Failed to create or process PostgreSQL replication stream: Cannot connect to PostgreSQL server. [{}]", e.getMessage()); | ||
| } else if (e.getMessage() != null && e.getMessage().matches(REPLICATION_SLOT_DOES_NOT_EXIST)) { | ||
| rdsSourceAggregateMetrics.getStream4xxErrors().increment(); | ||
| rdsSourceAggregateMetrics.getStreamReplicationNotEnabledErrors().increment(); | ||
| LOG.error("Failed to create or process PostgreSQL replication stream: Replication slot does not exist. [{}]", e.getMessage()); | ||
| } else if (e.getMessage() != null && e.getMessage().contains(PERMISSION_DENIED)) { | ||
| rdsSourceAggregateMetrics.getStream4xxErrors().increment(); | ||
| rdsSourceAggregateMetrics.getStreamAccessDeniedErrors().increment(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you update the unit test to verify on these new metrics added ?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. |
||
| LOG.error("Failed to create or process PostgreSQL replication stream: Insufficient privileges. [{}]", e.getMessage()); | ||
| } else { | ||
| rdsSourceAggregateMetrics.getStream5xxErrors().increment(); | ||
| LOG.error("Failed to create or process PostgreSQL replication stream. ", e); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.plugins.source.rds.utils; | ||
|
|
||
| import io.micrometer.core.instrument.Counter; | ||
| import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
|
|
||
| public class RdsSourceAggregateMetrics { | ||
| private static final String RDS_SOURCE = "rds"; | ||
|
|
||
| private static final String RDS_SOURCE_STREAM_5XX_EXCEPTIONS = "stream5xxErrors"; | ||
| private static final String RDS_SOURCE_STREAM_4XX_EXCEPTIONS = "stream4xxErrors"; | ||
| private static final String RDS_SOURCE_STREAM_API_INVOCATIONS = "streamApiInvocations"; | ||
| private static final String RDS_SOURCE_EXPORT_5XX_ERRORS = "export5xxErrors"; | ||
| private static final String RDS_SOURCE_EXPORT_4XX_ERRORS = "export4xxErrors"; | ||
| private static final String RDS_SOURCE_EXPORT_API_INVOCATIONS = "exportApiInvocations"; | ||
| private static final String RDS_SOURCE_EXPORT_PARTITION_QUERY_COUNT = "exportPartitionQueryCount"; | ||
| private static final String RDS_SOURCE_STREAM_AUTH_ERRORS = "streamAuthErrors"; | ||
| private static final String RDS_SOURCE_STREAM_SERVER_NOT_FOUND_ERRORS = "streamServerNotFoundErrors"; | ||
| private static final String RDS_SOURCE_STREAM_REPLICATION_NOT_ENABLED_ERRORS = "streamReplicationNotEnabledErrors"; | ||
| private static final String RDS_SOURCE_STREAM_ACCESS_DENIED_ERRORS = "streamAccessDeniedErrors"; | ||
|
|
||
| private final PluginMetrics pluginMetrics; | ||
| private final Counter stream5xxErrors; | ||
| private final Counter stream4xxErrors; | ||
| private final Counter streamApiInvocations; | ||
| private final Counter export5xxErrors; | ||
| private final Counter export4xxErrors; | ||
| private final Counter exportApiInvocations; | ||
| private final Counter exportPartitionQueryCount; | ||
| private final Counter streamAuthErrors; | ||
| private final Counter streamServerNotFoundErrors; | ||
| private final Counter streamReplicationNotEnabledErrors; | ||
| private final Counter streamAccessDeniedErrors; | ||
|
|
||
| public RdsSourceAggregateMetrics() { | ||
| this.pluginMetrics = PluginMetrics.fromPrefix(RDS_SOURCE); | ||
| this.stream5xxErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_5XX_EXCEPTIONS); | ||
| this.stream4xxErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_4XX_EXCEPTIONS); | ||
| this.streamApiInvocations = pluginMetrics.counter(RDS_SOURCE_STREAM_API_INVOCATIONS); | ||
| this.export5xxErrors = pluginMetrics.counter(RDS_SOURCE_EXPORT_5XX_ERRORS); | ||
| this.export4xxErrors = pluginMetrics.counter(RDS_SOURCE_EXPORT_4XX_ERRORS); | ||
| this.exportApiInvocations = pluginMetrics.counter(RDS_SOURCE_EXPORT_API_INVOCATIONS); | ||
| this.exportPartitionQueryCount = pluginMetrics.counter(RDS_SOURCE_EXPORT_PARTITION_QUERY_COUNT); | ||
|
|
||
| // More granular error metrics | ||
| this.streamAuthErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_AUTH_ERRORS); | ||
| this.streamServerNotFoundErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_SERVER_NOT_FOUND_ERRORS); | ||
| this.streamReplicationNotEnabledErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_REPLICATION_NOT_ENABLED_ERRORS); | ||
| this.streamAccessDeniedErrors = pluginMetrics.counter(RDS_SOURCE_STREAM_ACCESS_DENIED_ERRORS); | ||
| } | ||
|
|
||
| public Counter getStream5xxErrors() { | ||
| return stream5xxErrors; | ||
| } | ||
|
|
||
| public Counter getStream4xxErrors() { | ||
| return stream4xxErrors; | ||
| } | ||
|
|
||
| public Counter getStreamApiInvocations() { | ||
| return streamApiInvocations; | ||
| } | ||
|
|
||
| public Counter getExport5xxErrors() { | ||
| return export5xxErrors; | ||
| } | ||
|
|
||
| public Counter getExport4xxErrors() { | ||
| return export4xxErrors; | ||
| } | ||
|
|
||
| public Counter getExportApiInvocations() { | ||
| return exportApiInvocations; | ||
| } | ||
|
|
||
| public Counter getExportPartitionQueryCount() { | ||
| return exportPartitionQueryCount; | ||
| } | ||
|
|
||
| public Counter getStreamAuthErrors() { | ||
| return streamAuthErrors; | ||
| } | ||
|
|
||
| public Counter getStreamServerNotFoundErrors() { | ||
| return streamServerNotFoundErrors; | ||
| } | ||
|
|
||
| public Counter getStreamReplicationNotEnabledErrors() { | ||
| return streamReplicationNotEnabledErrors; | ||
| } | ||
|
|
||
| public Counter getStreamAccessDeniedErrors() { | ||
| return streamAccessDeniedErrors; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to check through exception type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are mostly just generic IOException so I didn't use exception type except the
AuthenticationException