From c4e96823a926bb6fa488100d12dcf332d015c700 Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Wed, 16 Apr 2025 15:10:09 -0500 Subject: [PATCH 1/2] Fix MySQL shutdown sequence Signed-off-by: Hai Yan --- .../source/rds/schema/MySqlSchemaManager.java | 1 + .../stream/BinlogClientLifecycleListener.java | 40 +++++++++++++++++++ .../rds/stream/BinlogClientWrapper.java | 19 +++++++++ .../rds/stream/BinlogEventListener.java | 4 ++ .../rds/stream/StreamWorkerTaskRefresher.java | 2 + 5 files changed, 66 insertions(+) create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientLifecycleListener.java diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java index 2babcc8edc..db9d022186 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/schema/MySqlSchemaManager.java @@ -156,6 +156,7 @@ public Optional getCurrentBinaryLogPosition() { while (retry <= NUM_OF_RETRIES) { try (final Connection connection = connectionManager.getConnection()) { final String mySqlVersion = connection.getMetaData().getDatabaseProductVersion(); + LOG.info("MySQL version: {}", mySqlVersion); final Statement statement = connection.createStatement(); final ResultSet rs = VersionUtil.compareVersions(mySqlVersion, MYSQL_VERSION_8_4) >= 0 ? statement.executeQuery(NEW_BINLOG_STATUS_QUERY) : diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientLifecycleListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientLifecycleListener.java new file mode 100644 index 0000000000..1d77608ab0 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientLifecycleListener.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.rds.stream; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BinlogClientLifecycleListener implements BinaryLogClient.LifecycleListener { + + private static final Logger LOG = LoggerFactory.getLogger(BinlogClientLifecycleListener.class); + + @Override + public void onConnect(BinaryLogClient binaryLogClient) { + LOG.info("Binlog client connected."); + } + + @Override + public void onCommunicationFailure(BinaryLogClient binaryLogClient, Exception e) { + LOG.error("Binlog client communication failure.", e); + } + + @Override + public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Exception e) { + LOG.error("Binlog client event deserialization failure.", e); + } + + @Override + public void onDisconnect(BinaryLogClient binaryLogClient) { + LOG.info("Binlog client disconnected."); + + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java index 36d8195106..d97450942b 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java @@ -11,11 +11,15 @@ package org.opensearch.dataprepper.plugins.source.rds.stream; import com.github.shyiko.mysql.binlog.BinaryLogClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; public class BinlogClientWrapper implements ReplicationLogClient { + private static final Logger LOG = LoggerFactory.getLogger(BinlogClientWrapper.class); private final BinaryLogClient binlogClient; public BinlogClientWrapper(final BinaryLogClient binlogClient) { @@ -30,6 +34,21 @@ public void connect() throws IOException { @Override public void disconnect() throws IOException { binlogClient.disconnect(); + + List eventListenerList = binlogClient.getEventListeners(); + if (!eventListenerList.isEmpty()) { + for (BinaryLogClient.EventListener eventListener : eventListenerList) { + if (eventListener instanceof BinlogEventListener) { + LOG.debug("Stopping checkpoint manager."); + ((BinlogEventListener) eventListener).stopCheckpointManager(); + } + LOG.debug("Unregistering binlog event listeners."); + binlogClient.unregisterEventListener(eventListener); + } + } + + LOG.debug("Unregistering binlog client lifecycle listeners."); + binlogClient.getLifecycleListeners().forEach(binlogClient::unregisterLifecycleListener); } public BinaryLogClient getBinlogClient() { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java index a17235caae..d0651a481b 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java @@ -201,6 +201,10 @@ public void stopClient() { } } + public void stopCheckpointManager() { + streamCheckpointManager.stop(); + } + void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) { final RotateEventData data = event.getData(); currentBinlogCoordinate = new BinlogCoordinate(data.getBinlogFilename(), data.getBinlogPosition()); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java index a9c327feec..e09b954048 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java @@ -114,6 +114,7 @@ public void update(RdsSourceConfig sourceConfig) { } public void shutdown() { + streamWorker.shutdown(); executorService.shutdownNow(); } @@ -127,6 +128,7 @@ private void refreshTask(RdsSourceConfig sourceConfig) { binaryLogClient.registerEventListener(BinlogEventListener.create( streamPartition, buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient, streamCheckpointer, acknowledgementSetManager, dbTableMetadata, cascadeActionDetector)); + binaryLogClient.registerLifecycleListener(new BinlogClientLifecycleListener()); } else { final LogicalReplicationClient logicalReplicationClient = (LogicalReplicationClient) replicationLogClient; logicalReplicationClient.setEventProcessor(LogicalReplicationEventProcessor.create( From 1a65b912d834b71a68cfb0c36e1cbdbcb56d6f23 Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Thu, 17 Apr 2025 23:02:31 -0500 Subject: [PATCH 2/2] Fix failover handling Signed-off-by: Hai Yan --- .../stream/BinlogClientLifecycleListener.java | 1 - .../rds/stream/BinlogClientWrapper.java | 4 +- .../rds/stream/LogicalReplicationClient.java | 2 +- .../source/rds/stream/StreamScheduler.java | 35 ++++++++++------- .../source/rds/stream/StreamWorker.java | 9 +++-- .../rds/stream/StreamWorkerTaskRefresher.java | 5 ++- .../stream/StreamWorkerTaskRefresherTest.java | 38 ++++++++++++++++++- 7 files changed, 69 insertions(+), 25 deletions(-) diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientLifecycleListener.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientLifecycleListener.java index 1d77608ab0..e5f95e8aa5 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientLifecycleListener.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientLifecycleListener.java @@ -35,6 +35,5 @@ public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Excep @Override public void onDisconnect(BinaryLogClient binaryLogClient) { LOG.info("Binlog client disconnected."); - } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java index d97450942b..7c4fe35ae4 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogClientWrapper.java @@ -33,8 +33,6 @@ public void connect() throws IOException { @Override public void disconnect() throws IOException { - binlogClient.disconnect(); - List eventListenerList = binlogClient.getEventListeners(); if (!eventListenerList.isEmpty()) { for (BinaryLogClient.EventListener eventListener : eventListenerList) { @@ -49,6 +47,8 @@ public void disconnect() throws IOException { LOG.debug("Unregistering binlog client lifecycle listeners."); binlogClient.getLifecycleListeners().forEach(binlogClient::unregisterLifecycleListener); + + binlogClient.disconnect(); } public BinaryLogClient getBinlogClient() { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java index 5faf55dcfa..1fcc0b95e8 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationClient.java @@ -84,7 +84,7 @@ public void connect() { stream.setFlushedLSN(lsn); stream.setAppliedLSN(lsn); } catch (Exception e) { - LOG.error("Exception while processing Postgres replication stream. ", e); + LOG.error("Exception while processing Postgres replication stream. "); closeStream(); throw e; } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java index 7d638931a3..a8f3f6a85c 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamScheduler.java @@ -79,6 +79,10 @@ public void run() { streamPartition = (StreamPartition) sourcePartition.get(); final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics); + if (streamWorkerTaskRefresher != null) { + streamWorkerTaskRefresher.shutdown(); + } + streamWorkerTaskRefresher = StreamWorkerTaskRefresher.create( sourceCoordinator, streamPartition, streamCheckpointer, s3Prefix, replicationLogClientFactory, buffer, () -> Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("rds-source-stream-worker")), @@ -90,22 +94,15 @@ public void run() { pluginConfigObservable.addPluginConfigObserver(pluginConfig -> streamWorkerTaskRefresher.update((RdsSourceConfig) pluginConfig)); } - try { - LOG.debug("Looping to acquire new stream partition or idle while stream worker is working"); - Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); - } catch (final InterruptedException e) { - LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing"); - break; - } - + LOG.debug("Looping to acquire new stream partition or idle while stream worker is working"); + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("The StreamScheduler was interrupted, stopping processing"); + giveUpPartition(streamPartition); + break; } catch (Exception e) { LOG.error("Received an exception during stream processing, backing off and retrying", e); - if (streamPartition != null) { - if (sourceConfig.isDisableS3ReadForLeader()) { - System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY); - } - sourceCoordinator.giveUpPartition(streamPartition); - } + giveUpPartition(streamPartition); try { Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); @@ -119,8 +116,18 @@ public void run() { public void shutdown() { if (streamWorkerTaskRefresher != null) { + LOG.debug("Shutting down StreamWorkerTaskRefresher"); streamWorkerTaskRefresher.shutdown(); } shutdownRequested = true; } + + private void giveUpPartition(final StreamPartition streamPartition) { + if (streamPartition != null) { + if (sourceConfig.isDisableS3ReadForLeader()) { + System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY); + } + sourceCoordinator.giveUpPartition(streamPartition); + } + } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java index a231e6d7ea..7c6aa77680 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java @@ -17,7 +17,6 @@ import java.util.Optional; -import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; public class StreamWorker { private static final Logger LOG = LoggerFactory.getLogger(StreamWorker.class); @@ -64,12 +63,14 @@ public void processStream(final StreamPartition streamPartition) { LOG.info("Connect to database to read change events."); replicationLogClient.connect(); } catch (Exception e) { - LOG.warn(NOISY, "Error while connecting to replication stream, will retry.", e); + LOG.warn("Error while connecting to replication stream, will retry."); + LOG.debug("Give up stream partition and shut down stream worker"); sourceCoordinator.giveUpPartition(streamPartition); - throw new RuntimeException(e); - } finally { shutdown(); + throw new RuntimeException(e); } + + LOG.debug("Exited connect() method in stream worker."); } public void shutdown() { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java index e09b954048..b5c575071c 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresher.java @@ -114,7 +114,10 @@ public void update(RdsSourceConfig sourceConfig) { } public void shutdown() { - streamWorker.shutdown(); + if (streamWorker != null) { + streamWorker.shutdown(); + } + LOG.info("Stream worker stopped."); executorService.shutdownNow(); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java index dfc446321a..8f0840450c 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTaskRefresherTest.java @@ -13,6 +13,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; @@ -41,6 +42,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -232,8 +234,26 @@ void test_update_when_credentials_unchanged_then_do_nothing() { @Test void test_shutdown() { + when(replicationLogClientFactory.create(streamPartition)).thenReturn(binaryLogClientWrapper); + when(binaryLogClientWrapper.getBinlogClient()).thenReturn(binaryLogClient); + final Map progressState = mockGlobalStateAndProgressState(); + try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class); + MockedStatic binlogEventListenerMockedStatic = mockStatic(BinlogEventListener.class); + MockedStatic dbTableMetadataMockedStatic = mockStatic(DbTableMetadata.class)) { + dbTableMetadataMockedStatic.when(() -> DbTableMetadata.fromMap(progressState)).thenReturn(dbTableMetadata); + streamWorkerMockedStatic.when(() -> StreamWorker.create(eq(sourceCoordinator), any(ReplicationLogClient.class), eq(pluginMetrics))) + .thenReturn(streamWorker); + binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(streamPartition), eq(buffer), any(RdsSourceConfig.class), + any(String.class), eq(pluginMetrics), eq(binaryLogClient), eq(streamCheckpointer), + eq(acknowledgementSetManager), eq(dbTableMetadata), any(CascadingActionDetector.class))) + .thenReturn(binlogEventListener); + streamWorkerTaskRefresher.initialize(sourceConfig); + } streamWorkerTaskRefresher.shutdown(); - verify(executorService).shutdownNow(); + + InOrder inOrder = inOrder(streamWorker, executorService); + inOrder.verify(streamWorker).shutdown(); + inOrder.verify(executorService).shutdownNow(); } private StreamWorkerTaskRefresher createObjectUnderTest() { @@ -345,8 +365,22 @@ void test_update_when_credentials_unchanged_then_do_nothing() { @Test void test_shutdown() { + when(replicationLogClientFactory.create(streamPartition)).thenReturn(logicalReplicationClient); + mockGlobalStateAndProgressState(); + try (MockedStatic streamWorkerMockedStatic = mockStatic(StreamWorker.class); + MockedStatic logicalReplicationEventProcessorMockedStatic = mockStatic(LogicalReplicationEventProcessor.class)) { + streamWorkerMockedStatic.when(() -> StreamWorker.create(eq(sourceCoordinator), any(ReplicationLogClient.class), eq(pluginMetrics))) + .thenReturn(streamWorker); + logicalReplicationEventProcessorMockedStatic.when(() -> LogicalReplicationEventProcessor.create(eq(streamPartition), any(RdsSourceConfig.class), + eq(buffer), any(String.class), eq(pluginMetrics), eq(logicalReplicationClient), eq(streamCheckpointer), eq(acknowledgementSetManager))) + .thenReturn(logicalReplicationEventProcessor); + streamWorkerTaskRefresher.initialize(sourceConfig); + } streamWorkerTaskRefresher.shutdown(); - verify(executorService).shutdownNow(); + + InOrder inOrder = inOrder(streamWorker, executorService); + inOrder.verify(streamWorker).shutdown(); + inOrder.verify(executorService).shutdownNow(); } private StreamWorkerTaskRefresher createObjectUnderTest() {