Skip to content

Commit f7fae29

Browse files
oeyhEC2 Default User
authored andcommitted
Fix EventType and a few other things (opensearch-project#5598)
Signed-off-by: Hai Yan <oeyh@amazon.com>
1 parent 479ef97 commit f7fae29

10 files changed

Lines changed: 77 additions & 28 deletions

File tree

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/RdsService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private String getS3PathPrefix() {
199199
if (sourceCoordinator.getPartitionPrefix() != null ) {
200200
// The prefix will be used in RDS export, which has a limit of 60 characters.
201201
final String uniqueIdentifier = IdentifierShortener.shortenIdentifier(sourceCoordinator.getPartitionPrefix(), MAX_SOURCE_IDENTIFIER_LENGTH);
202-
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + uniqueIdentifier;
202+
s3PathPrefix = s3UserPathPrefix.isEmpty() ? uniqueIdentifier : s3UserPathPrefix + S3_PATH_DELIMITER + uniqueIdentifier;
203203
LOG.info("Unique identifier used in S3 path prefix is {}", uniqueIdentifier);
204204
} else {
205205
s3PathPrefix = s3UserPathPrefix;

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/configuration/ExportConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.plugins.source.rds.configuration;
77

8+
import com.fasterxml.jackson.annotation.JsonAlias;
89
import com.fasterxml.jackson.annotation.JsonProperty;
910
import jakarta.validation.constraints.NotNull;
1011

@@ -18,6 +19,7 @@ public class ExportConfig {
1819
* The ARN of the IAM role that will be passed to RDS for export.
1920
*/
2021
@JsonProperty("iam_role_arn")
22+
@JsonAlias("export_role_arn")
2123
@NotNull
2224
private String iamRoleArn;
2325

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/converter/RecordConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55

66
package org.opensearch.dataprepper.plugins.source.rds.converter;
77

8-
import com.github.shyiko.mysql.binlog.event.EventType;
98
import org.opensearch.dataprepper.model.event.Event;
109
import org.opensearch.dataprepper.model.event.EventMetadata;
1110
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
11+
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
1212

1313
import java.nio.ByteBuffer;
1414
import java.security.MessageDigest;
@@ -52,7 +52,7 @@ public Event convert(final Event event,
5252
final List<String> primaryKeys,
5353
final long eventCreateTimeEpochMillis,
5454
final long eventVersionNumber,
55-
final EventType eventType) {
55+
final StreamEventType eventType) {
5656

5757
EventMetadata eventMetadata = event.getMetadata();
5858

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/export/DataFileLoader.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.concurrent.atomic.AtomicLong;
3636

37+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
3738
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.SENSITIVE;
3839

3940
public class DataFileLoader implements Runnable {
@@ -152,14 +153,28 @@ public void run() {
152153
eventCount.getAndIncrement();
153154
bytesProcessedSummary.record(bytes);
154155
} catch (Exception e) {
155-
LOG.error(SENSITIVE, "Failed to process record from object s3://{}/{}", bucket, objectKey, e);
156+
LOG.atError()
157+
.addMarker(SENSITIVE)
158+
.addMarker(NOISY)
159+
.setMessage("Failed to process record from object s3://{}/{}")
160+
.addArgument(bucket)
161+
.addArgument(objectKey)
162+
.setCause(e)
163+
.log();
156164
throw new RuntimeException(e);
157165
}
158166
});
159167

160168
LOG.info(SENSITIVE, "Completed loading object s3://{}/{} to buffer", bucket, objectKey);
161169
} catch (Exception e) {
162-
LOG.error(SENSITIVE, "Failed to load object s3://{}/{} to buffer", bucket, objectKey, e);
170+
LOG.atError()
171+
.addMarker(SENSITIVE)
172+
.addMarker(NOISY)
173+
.setMessage("Failed to load object s3://{}/{} to buffer")
174+
.addArgument(bucket)
175+
.addArgument(objectKey)
176+
.setCause(e)
177+
.log();
163178
throw new RuntimeException(e);
164179
}
165180

@@ -171,7 +186,7 @@ public void run() {
171186
}
172187
exportRecordSuccessCounter.increment(eventCount.get());
173188
} catch (Exception e) {
174-
LOG.error("Failed to write events to buffer", e);
189+
LOG.error(NOISY, "Failed to write events to buffer", e);
175190
exportRecordErrorCounter.increment(eventCount.get());
176191
}
177192
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private void createExportPartition(RdsSourceConfig sourceConfig) {
172172
}
173173

174174
private String getS3PrefixForExport(final String givenS3Prefix) {
175-
return givenS3Prefix + S3_PATH_DELIMITER + S3_EXPORT_PREFIX;
175+
return givenS3Prefix.isEmpty() ? S3_EXPORT_PREFIX : givenS3Prefix + S3_PATH_DELIMITER + S3_EXPORT_PREFIX;
176176
}
177177

178178
private Map<String, List<String>> getPrimaryKeyMap() {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.plugins.source.rds.model;
11+
12+
public enum StreamEventType {
13+
INSERT("insert"),
14+
UPDATE("update"),
15+
DELETE("delete");
16+
17+
private final String eventType;
18+
19+
StreamEventType(final String eventType) {
20+
this.eventType = eventType;
21+
}
22+
23+
@Override
24+
public String toString() {
25+
return eventType;
26+
}
27+
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListener.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
3636
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
3737
import org.opensearch.dataprepper.plugins.source.rds.model.ParentTable;
38+
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
3839
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
3940
import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector;
4041
import org.slf4j.Logger;
@@ -283,7 +284,7 @@ void handleInsertEvent(com.github.shyiko.mysql.binlog.event.Event event) {
283284
return;
284285
}
285286

286-
handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.INDEX));
287+
handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.INDEX), StreamEventType.INSERT);
287288
}
288289

289290
void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
@@ -319,7 +320,7 @@ void handleUpdateEvent(com.github.shyiko.mysql.binlog.event.Event event) {
319320
bulkActions.add(OpenSearchBulkActions.INDEX);
320321
}
321322

322-
handleRowChangeEvent(event, data.getTableId(), rows, bulkActions);
323+
handleRowChangeEvent(event, data.getTableId(), rows, bulkActions, StreamEventType.UPDATE);
323324
}
324325

325326
void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
@@ -333,7 +334,7 @@ void handleDeleteEvent(com.github.shyiko.mysql.binlog.event.Event event) {
333334
// Check if a cascade action is involved
334335
cascadeActionDetector.detectCascadingDeletes(event, parentTableMap, tableMetadataMap.get(data.getTableId()));
335336

336-
handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE));
337+
handleRowChangeEvent(event, data.getTableId(), data.getRows(), Collections.nCopies(data.getRows().size(), OpenSearchBulkActions.DELETE), StreamEventType.DELETE);
337338
}
338339

339340
// Visible For Testing
@@ -355,7 +356,8 @@ boolean isValidTableId(long tableId) {
355356
void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
356357
long tableId,
357358
List<Serializable[]> rows,
358-
List<OpenSearchBulkActions> bulkActions) {
359+
List<OpenSearchBulkActions> bulkActions,
360+
StreamEventType streamEventType) {
359361

360362
// Update binlog coordinate after it's first assigned in rotate event handler
361363
if (currentBinlogCoordinate != null) {
@@ -407,7 +409,7 @@ void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event event,
407409
primaryKeys,
408410
eventTimestampMillis,
409411
eventTimestampMillis,
410-
event.getHeader().getEventType());
412+
streamEventType);
411413
pipelineEvents.add(pipelineEvent);
412414
}
413415

@@ -441,7 +443,7 @@ private void addToBufferAccumulator(final BufferAccumulator<Record<Event>> buffe
441443
try {
442444
bufferAccumulator.add(record);
443445
} catch (Exception e) {
444-
LOG.error("Failed to add event to buffer", e);
446+
LOG.error(NOISY, "Failed to add event to buffer", e);
445447
}
446448
}
447449

@@ -452,7 +454,7 @@ private void flushBufferAccumulator(BufferAccumulator<Record<Event>> bufferAccum
452454
} catch (Exception e) {
453455
// this will only happen if writing to buffer gets interrupted from shutdown,
454456
// otherwise bufferAccumulator will keep retrying with backoff
455-
LOG.error("Failed to flush buffer", e);
457+
LOG.error(NOISY, "Failed to flush buffer", e);
456458
changeEventErrorCounter.increment(eventCount);
457459
}
458460
}

data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/LogicalReplicationEventProcessor.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataType;
3131
import org.opensearch.dataprepper.plugins.source.rds.datatype.postgres.PostgresDataTypeHelper;
3232
import org.opensearch.dataprepper.plugins.source.rds.model.MessageType;
33+
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
3334
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
3435
import org.postgresql.replication.LogSequenceNumber;
3536
import org.slf4j.Logger;
@@ -318,7 +319,7 @@ void processInsertMessage(ByteBuffer msg) {
318319
final List<String> primaryKeys = tableMetadata.getPrimaryKeys();
319320
final long eventTimestampMillis = currentEventTimestamp;
320321

321-
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX);
322+
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX, StreamEventType.INSERT);
322323
LOG.debug("Processed an INSERT message with table id: {}", tableId);
323324
}
324325

@@ -333,7 +334,7 @@ void processUpdateMessage(ByteBuffer msg) {
333334

334335
TupleDataType tupleDataType = TupleDataType.fromValue((char) msg.get());
335336
if (tupleDataType == TupleDataType.NEW) {
336-
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX);
337+
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX, StreamEventType.UPDATE);
337338
} else if (tupleDataType == TupleDataType.OLD || tupleDataType == TupleDataType.KEY) {
338339
// Replica Identity is set to full, containing both old and new row data
339340
Map<String, Object> oldRowDataMap = getRowDataMap(msg, columnNames, columnTypes);
@@ -342,9 +343,9 @@ void processUpdateMessage(ByteBuffer msg) {
342343

343344
if (isPrimaryKeyChanged(oldRowDataMap, newRowDataMap, primaryKeys)) {
344345
LOG.debug("Primary keys were changed");
345-
createPipelineEvent(oldRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE);
346+
createPipelineEvent(oldRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE, StreamEventType.UPDATE);
346347
}
347-
createPipelineEvent(newRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX);
348+
createPipelineEvent(newRowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.INDEX, StreamEventType.UPDATE);
348349
}
349350
LOG.debug("Processed an UPDATE message with table id: {}", tableId);
350351
}
@@ -367,7 +368,7 @@ void processDeleteMessage(ByteBuffer msg) {
367368
final List<String> primaryKeys = tableMetadata.getPrimaryKeys();
368369
final long eventTimestampMillis = currentEventTimestamp;
369370

370-
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE);
371+
doProcess(msg, columnNames, tableMetadata, primaryKeys, eventTimestampMillis, OpenSearchBulkActions.DELETE, StreamEventType.DELETE);
371372
LOG.debug("Processed a DELETE message with table id: {}", tableId);
372373
}
373374

@@ -379,12 +380,12 @@ void processTypeMessage(ByteBuffer msg) {
379380
}
380381

381382
private void doProcess(ByteBuffer msg, List<String> columnNames, TableMetadata tableMetadata,
382-
List<String> primaryKeys, long eventTimestampMillis, OpenSearchBulkActions bulkAction) {
383+
List<String> primaryKeys, long eventTimestampMillis, OpenSearchBulkActions bulkAction, StreamEventType streamEventType) {
383384
bytesReceived = msg.capacity();
384385
bytesReceivedSummary.record(bytesReceived);
385386
final List<String> columnTypes = tableMetadata.getColumnTypes();
386387
Map<String, Object> rowDataMap = getRowDataMap(msg, columnNames, columnTypes);
387-
createPipelineEvent(rowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, bulkAction);
388+
createPipelineEvent(rowDataMap, tableMetadata, primaryKeys, eventTimestampMillis, bulkAction, streamEventType);
388389
}
389390

390391
private Map<String, Object> getRowDataMap(ByteBuffer msg, List<String> columnNames, List<String> columnTypes) {
@@ -411,7 +412,8 @@ private Map<String, Object> getRowDataMap(ByteBuffer msg, List<String> columnNam
411412
return rowDataMap;
412413
}
413414

414-
private void createPipelineEvent(Map<String, Object> rowDataMap, TableMetadata tableMetadata, List<String> primaryKeys, long eventTimestampMillis, OpenSearchBulkActions bulkAction) {
415+
private void createPipelineEvent(Map<String, Object> rowDataMap, TableMetadata tableMetadata, List<String> primaryKeys,
416+
long eventTimestampMillis, OpenSearchBulkActions bulkAction, StreamEventType streamEventType) {
415417
final Event dataPrepperEvent = JacksonEvent.builder()
416418
.withEventType("event")
417419
.withData(rowDataMap)
@@ -426,7 +428,7 @@ private void createPipelineEvent(Map<String, Object> rowDataMap, TableMetadata t
426428
primaryKeys,
427429
eventTimestampMillis,
428430
eventTimestampMillis,
429-
null);
431+
streamEventType);
430432
pipelineEvents.add(pipelineEvent);
431433
}
432434

@@ -446,7 +448,7 @@ private void addToBufferAccumulator(final BufferAccumulator<Record<Event>> buffe
446448
try {
447449
bufferAccumulator.add(record);
448450
} catch (Exception e) {
449-
LOG.error("Failed to add event to buffer", e);
451+
LOG.error(NOISY, "Failed to add event to buffer", e);
450452
}
451453
}
452454

@@ -457,7 +459,7 @@ private void flushBufferAccumulator(BufferAccumulator<Record<Event>> bufferAccum
457459
} catch (Exception e) {
458460
// this will only happen if writing to buffer gets interrupted from shutdown,
459461
// otherwise bufferAccumulator will keep retrying with backoff
460-
LOG.error("Failed to flush buffer", e);
462+
LOG.error(NOISY, "Failed to flush buffer", e);
461463
changeEventErrorCounter.increment(eventCount);
462464
}
463465
}

data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/converter/StreamRecordConverterTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
package org.opensearch.dataprepper.plugins.source.rds.converter;
77

8-
import com.github.shyiko.mysql.binlog.event.EventType;
98
import org.junit.jupiter.api.BeforeEach;
109
import org.junit.jupiter.api.Test;
1110
import org.opensearch.dataprepper.event.TestEventFactory;
1211
import org.opensearch.dataprepper.model.event.Event;
1312
import org.opensearch.dataprepper.model.event.EventBuilder;
1413
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
14+
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
1515

1616
import java.util.List;
1717
import java.util.Map;
@@ -57,7 +57,7 @@ void test_convert_returns_expected_event() {
5757
final String databaseName = UUID.randomUUID().toString();
5858
final String schemaName = UUID.randomUUID().toString();
5959
final String tableName = UUID.randomUUID().toString();
60-
final EventType eventType = EventType.EXT_WRITE_ROWS;
60+
final StreamEventType eventType = StreamEventType.INSERT;
6161
final OpenSearchBulkActions bulkAction = OpenSearchBulkActions.INDEX;
6262
final List<String> primaryKeys = List.of("key1");
6363
final long eventCreateTimeEpochMillis = random.nextLong();

data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/BinlogEventListenerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
3232
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
3333
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
34+
import org.opensearch.dataprepper.plugins.source.rds.model.StreamEventType;
3435
import org.opensearch.dataprepper.plugins.source.rds.model.TableMetadata;
3536
import org.opensearch.dataprepper.plugins.source.rds.resync.CascadingActionDetector;
3637

@@ -208,7 +209,7 @@ void test_given_UpdateRows_event_when_primary_key_changes_then_generate_correct_
208209
// verify rowList and bulkActionList that were sent to handleRowChangeEvent() were correct
209210
ArgumentCaptor<List<Serializable[]>> rowListArgumentCaptor = ArgumentCaptor.forClass(List.class);
210211
ArgumentCaptor<List<OpenSearchBulkActions>> bulkActionListArgumentCaptor = ArgumentCaptor.forClass(List.class);
211-
verify(objectUnderTest).handleRowChangeEvent(eq(binlogEvent), eq(tableId), rowListArgumentCaptor.capture(), bulkActionListArgumentCaptor.capture());
212+
verify(objectUnderTest).handleRowChangeEvent(eq(binlogEvent), eq(tableId), rowListArgumentCaptor.capture(), bulkActionListArgumentCaptor.capture(), eq(StreamEventType.UPDATE));
212213
List<Serializable[]> rowList = rowListArgumentCaptor.getValue();
213214
List<OpenSearchBulkActions> bulkActionList = bulkActionListArgumentCaptor.getValue();
214215

0 commit comments

Comments
 (0)