Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public Optional<BinlogCoordinate> getCurrentBinaryLogPosition() {
while (retry <= NUM_OF_RETRIES) {
try (final Connection connection = connectionManager.getConnection()) {
final String mySqlVersion = connection.getMetaData().getDatabaseProductVersion();
LOG.info("MySQL version: {}", mySqlVersion);
final Statement statement = connection.createStatement();
final ResultSet rs = VersionUtil.compareVersions(mySqlVersion, MYSQL_VERSION_8_4) >= 0 ?
statement.executeQuery(NEW_BINLOG_STATUS_QUERY) :
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.stream;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinlogClientLifecycleListener implements BinaryLogClient.LifecycleListener {

private static final Logger LOG = LoggerFactory.getLogger(BinlogClientLifecycleListener.class);

@Override
public void onConnect(BinaryLogClient binaryLogClient) {
LOG.info("Binlog client connected.");
}

@Override
public void onCommunicationFailure(BinaryLogClient binaryLogClient, Exception e) {
LOG.error("Binlog client communication failure.", e);
}

@Override
public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Exception e) {
LOG.error("Binlog client event deserialization failure.", e);
}

@Override
public void onDisconnect(BinaryLogClient binaryLogClient) {
LOG.info("Binlog client disconnected.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
package org.opensearch.dataprepper.plugins.source.rds.stream;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

public class BinlogClientWrapper implements ReplicationLogClient {

private static final Logger LOG = LoggerFactory.getLogger(BinlogClientWrapper.class);
private final BinaryLogClient binlogClient;

public BinlogClientWrapper(final BinaryLogClient binlogClient) {
Expand All @@ -29,6 +33,21 @@ public void connect() throws IOException {

@Override
public void disconnect() throws IOException {
List<BinaryLogClient.EventListener> eventListenerList = binlogClient.getEventListeners();
if (!eventListenerList.isEmpty()) {
for (BinaryLogClient.EventListener eventListener : eventListenerList) {
if (eventListener instanceof BinlogEventListener) {
LOG.debug("Stopping checkpoint manager.");
((BinlogEventListener) eventListener).stopCheckpointManager();
}
LOG.debug("Unregistering binlog event listeners.");
binlogClient.unregisterEventListener(eventListener);
}
}

LOG.debug("Unregistering binlog client lifecycle listeners.");
binlogClient.getLifecycleListeners().forEach(binlogClient::unregisterLifecycleListener);

binlogClient.disconnect();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ public void stopClient() {
}
}

public void stopCheckpointManager() {
streamCheckpointManager.stop();
}

void handleRotateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
final RotateEventData data = event.getData();
currentBinlogCoordinate = new BinlogCoordinate(data.getBinlogFilename(), data.getBinlogPosition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void connect() {
stream.setFlushedLSN(lsn);
stream.setAppliedLSN(lsn);
} catch (Exception e) {
LOG.error("Exception while processing Postgres replication stream. ", e);
LOG.error("Exception while processing Postgres replication stream. ");
closeStream();
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public void run() {
streamPartition = (StreamPartition) sourcePartition.get();
final StreamCheckpointer streamCheckpointer = new StreamCheckpointer(sourceCoordinator, streamPartition, pluginMetrics);

if (streamWorkerTaskRefresher != null) {
streamWorkerTaskRefresher.shutdown();
}

streamWorkerTaskRefresher = StreamWorkerTaskRefresher.create(
sourceCoordinator, streamPartition, streamCheckpointer, s3Prefix, replicationLogClientFactory, buffer,
() -> Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("rds-source-stream-worker")),
Expand All @@ -90,22 +94,15 @@ public void run() {
pluginConfigObservable.addPluginConfigObserver(pluginConfig -> streamWorkerTaskRefresher.update((RdsSourceConfig) pluginConfig));
}

try {
LOG.debug("Looping to acquire new stream partition or idle while stream worker is working");
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
LOG.info("The StreamScheduler was interrupted while waiting to retry, stopping processing");
break;
}

LOG.debug("Looping to acquire new stream partition or idle while stream worker is working");
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
LOG.info("The StreamScheduler was interrupted, stopping processing");
giveUpPartition(streamPartition);
break;
} catch (Exception e) {
LOG.error("Received an exception during stream processing, backing off and retrying", e);
if (streamPartition != null) {
if (sourceConfig.isDisableS3ReadForLeader()) {
System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY);
}
sourceCoordinator.giveUpPartition(streamPartition);
}
giveUpPartition(streamPartition);

try {
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
Expand All @@ -119,8 +116,18 @@ public void run() {

public void shutdown() {
if (streamWorkerTaskRefresher != null) {
LOG.debug("Shutting down StreamWorkerTaskRefresher");
streamWorkerTaskRefresher.shutdown();
}
shutdownRequested = true;
}

private void giveUpPartition(final StreamPartition streamPartition) {
if (streamPartition != null) {
if (sourceConfig.isDisableS3ReadForLeader()) {
System.clearProperty(STOP_S3_SCAN_PROCESSING_PROPERTY);
}
sourceCoordinator.giveUpPartition(streamPartition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.util.Optional;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;

public class StreamWorker {
private static final Logger LOG = LoggerFactory.getLogger(StreamWorker.class);
Expand Down Expand Up @@ -64,12 +63,14 @@ public void processStream(final StreamPartition streamPartition) {
LOG.info("Connect to database to read change events.");
replicationLogClient.connect();
} catch (Exception e) {
LOG.warn(NOISY, "Error while connecting to replication stream, will retry.", e);
LOG.warn("Error while connecting to replication stream, will retry.");
LOG.debug("Give up stream partition and shut down stream worker");
sourceCoordinator.giveUpPartition(streamPartition);
throw new RuntimeException(e);
} finally {
shutdown();
throw new RuntimeException(e);
}

LOG.debug("Exited connect() method in stream worker.");
}

public void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public void update(RdsSourceConfig sourceConfig) {
}

public void shutdown() {
if (streamWorker != null) {
streamWorker.shutdown();
}
LOG.info("Stream worker stopped.");
executorService.shutdownNow();
}

Expand All @@ -127,6 +131,7 @@ private void refreshTask(RdsSourceConfig sourceConfig) {
binaryLogClient.registerEventListener(BinlogEventListener.create(
streamPartition, buffer, sourceConfig, s3Prefix, pluginMetrics, binaryLogClient,
streamCheckpointer, acknowledgementSetManager, dbTableMetadata, cascadeActionDetector));
binaryLogClient.registerLifecycleListener(new BinlogClientLifecycleListener());
} else {
final LogicalReplicationClient logicalReplicationClient = (LogicalReplicationClient) replicationLogClient;
logicalReplicationClient.setEventProcessor(LogicalReplicationEventProcessor.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down Expand Up @@ -41,6 +42,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -232,8 +234,26 @@ void test_update_when_credentials_unchanged_then_do_nothing() {

@Test
void test_shutdown() {
when(replicationLogClientFactory.create(streamPartition)).thenReturn(binaryLogClientWrapper);
when(binaryLogClientWrapper.getBinlogClient()).thenReturn(binaryLogClient);
final Map<String, Object> progressState = mockGlobalStateAndProgressState();
try (MockedStatic<StreamWorker> streamWorkerMockedStatic = mockStatic(StreamWorker.class);
MockedStatic<BinlogEventListener> binlogEventListenerMockedStatic = mockStatic(BinlogEventListener.class);
MockedStatic<DbTableMetadata> dbTableMetadataMockedStatic = mockStatic(DbTableMetadata.class)) {
dbTableMetadataMockedStatic.when(() -> DbTableMetadata.fromMap(progressState)).thenReturn(dbTableMetadata);
streamWorkerMockedStatic.when(() -> StreamWorker.create(eq(sourceCoordinator), any(ReplicationLogClient.class), eq(pluginMetrics)))
.thenReturn(streamWorker);
binlogEventListenerMockedStatic.when(() -> BinlogEventListener.create(eq(streamPartition), eq(buffer), any(RdsSourceConfig.class),
any(String.class), eq(pluginMetrics), eq(binaryLogClient), eq(streamCheckpointer),
eq(acknowledgementSetManager), eq(dbTableMetadata), any(CascadingActionDetector.class)))
.thenReturn(binlogEventListener);
streamWorkerTaskRefresher.initialize(sourceConfig);
}
streamWorkerTaskRefresher.shutdown();
verify(executorService).shutdownNow();

InOrder inOrder = inOrder(streamWorker, executorService);
inOrder.verify(streamWorker).shutdown();
inOrder.verify(executorService).shutdownNow();
}

private StreamWorkerTaskRefresher createObjectUnderTest() {
Expand Down Expand Up @@ -345,8 +365,22 @@ void test_update_when_credentials_unchanged_then_do_nothing() {

@Test
void test_shutdown() {
when(replicationLogClientFactory.create(streamPartition)).thenReturn(logicalReplicationClient);
mockGlobalStateAndProgressState();
try (MockedStatic<StreamWorker> streamWorkerMockedStatic = mockStatic(StreamWorker.class);
MockedStatic<LogicalReplicationEventProcessor> logicalReplicationEventProcessorMockedStatic = mockStatic(LogicalReplicationEventProcessor.class)) {
streamWorkerMockedStatic.when(() -> StreamWorker.create(eq(sourceCoordinator), any(ReplicationLogClient.class), eq(pluginMetrics)))
.thenReturn(streamWorker);
logicalReplicationEventProcessorMockedStatic.when(() -> LogicalReplicationEventProcessor.create(eq(streamPartition), any(RdsSourceConfig.class),
eq(buffer), any(String.class), eq(pluginMetrics), eq(logicalReplicationClient), eq(streamCheckpointer), eq(acknowledgementSetManager)))
.thenReturn(logicalReplicationEventProcessor);
streamWorkerTaskRefresher.initialize(sourceConfig);
}
streamWorkerTaskRefresher.shutdown();
verify(executorService).shutdownNow();

InOrder inOrder = inOrder(streamWorker, executorService);
inOrder.verify(streamWorker).shutdown();
inOrder.verify(executorService).shutdownNow();
}

private StreamWorkerTaskRefresher createObjectUnderTest() {
Expand Down