Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private String getS3PathPrefix() {
if (sourceCoordinator.getPartitionPrefix() != null ) {
// The prefix will be used in RDS export, which has a limit of 60 characters.
final String uniqueIdentifier = IdentifierShortener.shortenIdentifier(sourceCoordinator.getPartitionPrefix(), MAX_SOURCE_IDENTIFIER_LENGTH);
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + uniqueIdentifier;
s3PathPrefix = s3UserPathPrefix.isEmpty() ? uniqueIdentifier : s3UserPathPrefix + S3_PATH_DELIMITER + uniqueIdentifier;
LOG.info("Unique identifier used in S3 path prefix is {}", uniqueIdentifier);
} else {
s3PathPrefix = s3UserPathPrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.rds.configuration;

import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;

Expand All @@ -18,6 +19,7 @@ public class ExportConfig {
* The ARN of the IAM role that will be passed to RDS for export.
*/
@JsonProperty("iam_role_arn")
@JsonAlias("export_role_arn")
@NotNull
private String iamRoleArn;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

package org.opensearch.dataprepper.plugins.source.rds.converter;

import com.github.shyiko.mysql.binlog.event.EventType;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;

import java.nio.ByteBuffer;
import java.security.MessageDigest;
Expand Down Expand Up @@ -52,7 +52,7 @@ public Event convert(final Event event,
final List<String> primaryKeys,
final long eventCreateTimeEpochMillis,
final long eventVersionNumber,
final EventType eventType) {
final StreamEventType eventType) {

EventMetadata eventMetadata = event.getMetadata();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE;

public class DataFileLoader implements Runnable {
Expand Down Expand Up @@ -152,14 +153,28 @@ public void run() {
eventCount.getAndIncrement();
bytesProcessedSummary.record(bytes);
} catch (Exception e) {
LOG.error(SENSITIVE, "Failed to process record from object s3://{}/{}", bucket, objectKey, e);
LOG.atError()
.addMarker(SENSITIVE)
.addMarker(NOISY)
.setMessage("Failed to process record from object s3://{}/{}")
.addArgument(bucket)
.addArgument(objectKey)
.setCause(e)
.log();
throw new RuntimeException(e);
}
});

LOG.info(SENSITIVE, "Completed loading object s3://{}/{} to buffer", bucket, objectKey);
} catch (Exception e) {
LOG.error(SENSITIVE, "Failed to load object s3://{}/{} to buffer", bucket, objectKey, e);
LOG.atError()
.addMarker(SENSITIVE)
.addMarker(NOISY)
.setMessage("Failed to load object s3://{}/{} to buffer")
.addArgument(bucket)
.addArgument(objectKey)
.setCause(e)
.log();
throw new RuntimeException(e);
}

Expand All @@ -171,7 +186,7 @@ public void run() {
}
exportRecordSuccessCounter.increment(eventCount.get());
} catch (Exception e) {
LOG.error("Failed to write events to buffer", e);
LOG.error(NOISY, "Failed to write events to buffer", e);
exportRecordErrorCounter.increment(eventCount.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private void createExportPartition(RdsSourceConfig sourceConfig) {
}

private String getS3PrefixForExport(final String givenS3Prefix) {
return givenS3Prefix + S3_PATH_DELIMITER + S3_EXPORT_PREFIX;
return givenS3Prefix.isEmpty() ? S3_EXPORT_PREFIX : givenS3Prefix + S3_PATH_DELIMITER + S3_EXPORT_PREFIX;
}

private Map<String, List<String>> getPrimaryKeyMap() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.model;

public enum StreamEventType {
INSERT("insert"),
UPDATE("update"),
DELETE("delete");

private final String eventType;

StreamEventType(final String eventType) {
this.eventType = eventType;
}

@Override
public String toString() {
return eventType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.model.ParentTable;
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector;
import org.slf4j.Logger;
Expand Down Expand Up @@ -283,7 +284,7 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) {
return;
}

handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.INDEX));
handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.INDEX), StreamEventType.INSERT);
}

void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
Expand Down Expand Up @@ -319,7 +320,7 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
bulkActions.add(OpenSearchBulkActions.INDEX);
}

handleRowChangeEvent(event, data.getTableId(), rows, bulkActions);
handleRowChangeEvent(event, data.getTableId(), rows, bulkActions, StreamEventType.UPDATE);
}

void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
Expand All @@ -333,7 +334,7 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
// Check if a cascade action is involved
cascadeActionDetector.detectCascadingDeletes(event, parentTableMap, tableMetadataMap.get(data.getTableId()));

handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE));
handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE), StreamEventType.DELETE);
}

// Visible For Testing
Expand All @@ -355,7 +356,8 @@ boolean isValidTableId(long tableId) {
void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
long tableId,
List<Serializable[]> rows,
List<OpenSearchBulkActions> bulkActions) {
List<OpenSearchBulkActions> bulkActions,
StreamEventType streamEventType) {

// Update binlog coordinate after it's first assigned in rotate event handler
if (currentBinlogCoordinate != null) {
Expand Down Expand Up @@ -407,7 +409,7 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
primaryKeys,
eventTimestampMillis,
eventTimestampMillis,
event.getHeader().getEventType());
streamEventType);
pipelineEvents.add(pipelineEvent);
}

Expand Down Expand Up @@ -441,7 +443,7 @@ private void addToBufferAccumulator(final BufferAccumulator<Record<Event>> buffe
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed to add event to buffer", e);
LOG.error(NOISY, "Failed to add event to buffer", e);
}
}

Expand All @@ -452,7 +454,7 @@ private void flushBufferAccumulator(BufferAccumulator<Record<Event>> bufferAccum
} catch (Exception e) {
// this will only happen if writing to buffer gets interrupted from shutdown,
// otherwise bufferAccumulator will keep retrying with backoff
LOG.error("Failed to flush buffer", e);
LOG.error(NOISY, "Failed to flush buffer", e);
changeEventErrorCounter.increment(eventCount);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHelper;
import org.opensearch.dataprepper.plugins.source.rds.model.MessageType;
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.postgresql.replication.LogSequenceNumber;
import org.slf4j.Logger;
Expand Down Expand Up @@ -318,7 +319,7 @@ void processInsertMessage(ByteBuffer msg) {
final List<String> primaryKeys = tableMetadata.getPrimaryKeys();
final long eventTimestampMillis = currentEventTimestamp;

doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX);
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX, StreamEventType.INSERT);
LOG.debug("Processed an INSERT message with table id: {}", tableId);
}

Expand All @@ -333,7 +334,7 @@ void processUpdateMessage(ByteBuffer msg) {

TupleDataType tupleDataType = TupleDataType.fromValue((char) msg.get());
if (tupleDataType == TupleDataType.NEW) {
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX);
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX, StreamEventType.UPDATE);
} else if (tupleDataType == TupleDataType.OLD || tupleDataType == TupleDataType.KEY) {
// Replica Identity is set to full, containing both old and new row data
Map<String, Object> oldRowDataMap = getRowDataMap(msg, columnNames, columnTypes);
Expand All @@ -342,9 +343,9 @@ void processUpdateMessage(ByteBuffer msg) {

if (isPrimaryKeyChanged(oldRowDataMap, newRowDataMap, primaryKeys)) {
LOG.debug("Primary keys were changed");
createPipelineEvent(oldRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE);
createPipelineEvent(oldRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE, StreamEventType.UPDATE);
}
createPipelineEvent(newRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX);
createPipelineEvent(newRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX, StreamEventType.UPDATE);
}
LOG.debug("Processed an UPDATE message with table id: {}", tableId);
}
Expand All @@ -367,7 +368,7 @@ void processDeleteMessage(ByteBuffer msg) {
final List<String> primaryKeys = tableMetadata.getPrimaryKeys();
final long eventTimestampMillis = currentEventTimestamp;

doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE);
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE, StreamEventType.DELETE);
LOG.debug("Processed a DELETE message with table id: {}", tableId);
}

Expand All @@ -379,12 +380,12 @@ void processTypeMessage(ByteBuffer msg) {
}

private void doProcess(ByteBuffer msg, List<String> columnNames, TableMetadata tableMetadata,
List<String> primaryKeys, long eventTimestampMillis, OpenSearchBulkActions bulkAction) {
List<String> primaryKeys, long eventTimestampMillis, OpenSearchBulkActions bulkAction, StreamEventType streamEventType) {
bytesReceived = msg.capacity();
bytesReceivedSummary.record(bytesReceived);
final List<String> columnTypes = tableMetadata.getColumnTypes();
Map<String, Object> rowDataMap = getRowDataMap(msg, columnNames, columnTypes);
createPipelineEvent(rowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, bulkAction);
createPipelineEvent(rowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, bulkAction, streamEventType);
}

private Map<String, Object> getRowDataMap(ByteBuffer msg, List<String> columnNames, List<String> columnTypes) {
Expand All @@ -411,7 +412,8 @@ private Map<String, Object> getRowDataMap(ByteBuffer msg, List<String> columnNam
return rowDataMap;
}

private void createPipelineEvent(Map<String, Object> rowDataMap, TableMetadata tableMetadata, List<String> primaryKeys, long eventTimestampMillis, OpenSearchBulkActions bulkAction) {
private void createPipelineEvent(Map<String, Object> rowDataMap, TableMetadata tableMetadata, List<String> primaryKeys,
long eventTimestampMillis, OpenSearchBulkActions bulkAction, StreamEventType streamEventType) {
final Event dataPrepperEvent = JacksonEvent.builder()
.withEventType("event")
.withData(rowDataMap)
Expand All @@ -426,7 +428,7 @@ private void createPipelineEvent(Map<String, Object> rowDataMap, TableMetadata t
primaryKeys,
eventTimestampMillis,
eventTimestampMillis,
null);
streamEventType);
pipelineEvents.add(pipelineEvent);
}

Expand All @@ -446,7 +448,7 @@ private void addToBufferAccumulator(final BufferAccumulator<Record<Event>> buffe
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed to add event to buffer", e);
LOG.error(NOISY, "Failed to add event to buffer", e);
}
}

Expand All @@ -457,7 +459,7 @@ private void flushBufferAccumulator(BufferAccumulator<Record<Event>> bufferAccum
} catch (Exception e) {
// this will only happen if writing to buffer gets interrupted from shutdown,
// otherwise bufferAccumulator will keep retrying with backoff
LOG.error("Failed to flush buffer", e);
LOG.error(NOISY, "Failed to flush buffer", e);
changeEventErrorCounter.increment(eventCount);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

package org.opensearch.dataprepper.plugins.source.rds.converter;

import com.github.shyiko.mysql.binlog.event.EventType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -57,7 +57,7 @@ void test_convert_returns_expected_event() {
final String databaseName = UUID.randomUUID().toString();
final String schemaName = UUID.randomUUID().toString();
final String tableName = UUID.randomUUID().toString();
final EventType eventType = EventType.EXT_WRITE_ROWS;
final StreamEventType eventType = StreamEventType.INSERT;
final OpenSearchBulkActions bulkAction = OpenSearchBulkActions.INDEX;
final List<String> primaryKeys = List.of("key1");
final long eventCreateTimeEpochMillis = random.nextLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector;

Expand Down Expand Up @@ -208,7 +209,7 @@ void test_given_UpdateRows_event_when_primary_key_changes_then_generate_correct_
// verify rowList and bulkActionList that were sent to handleRowChangeEvent() were correct
ArgumentCaptor<List<Serializable[]>> rowListArgumentCaptor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<List<OpenSearchBulkActions>> bulkActionListArgumentCaptor = ArgumentCaptor.forClass(List.class);
verify(objectUnderTest).handleRowChangeEvent(eq(binlogEvent), eq(tableId), rowListArgumentCaptor.capture(), bulkActionListArgumentCaptor.capture());
verify(objectUnderTest).handleRowChangeEvent(eq(binlogEvent), eq(tableId), rowListArgumentCaptor.capture(), bulkActionListArgumentCaptor.capture(), eq(StreamEventType.UPDATE));
List<Serializable[]> rowList = rowListArgumentCaptor.getValue();
List<OpenSearchBulkActions> bulkActionList = bulkActionListArgumentCaptor.getValue();

Expand Down