Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>它指示在数据库中进行更改的时间。 <br>如果记录是从表的快照而不是改变流中读取的,该值将始终为0。</td>
</tr>
<tr>
<td>full_document</td>
<td>STRING</td>
<td>变更事件的完整文档 JSON 字符串原始数据。对于 insert 事件,这是新文档。对于 update 事件,这是更新后的完整文档。对于 delete 事件,该值为 null。</td>
</tr>
<tr>
<td>row_kind</td>
<td>STRING NOT NULL</td>
Expand All @@ -398,6 +403,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
raw_data STRING METADATA FROM 'full_document' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
_id STRING, // 必须声明
name STRING,
Expand Down
6 changes: 6 additions & 0 deletions docs/content/docs/connectors/flink-sources/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,11 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database. <br>If the record is read from snapshot of the table instead of the change stream, the value is always 0.</td>
</tr>
<tr>
<td>full_document</td>
<td>STRING</td>
<td>The full document of the change event as a JSON string raw data. For insert events, this is the new document. For update events, this is the full document after the update. For delete events, this is null.</td>
</tr>
<tr>
<td>row_kind</td>
<td>STRING NOT NULL</td>
Expand All @@ -424,6 +429,7 @@ CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
raw_data STRING METADATA FROM 'full_document' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
_id STRING, // must be declared
name STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ public Object read(SourceRecord record) {
}
}),

/** It indicates the full document as string raw data. */
FULL_DOCUMENT(
"full_document",
DataTypes.STRING().nullable(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object read(SourceRecord record) {
Struct value = (Struct) record.value();
String fullDocString = value.getString(MongoDBEnvelope.FULL_DOCUMENT_FIELD);
return fullDocString != null ? StringData.fromString(fullDocString) : null;
}
}),

/**
* It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE
* message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ private void testMongoDBParallelSourceWithMetadataColumns(
+ " database_name STRING METADATA VIRTUAL,"
+ " collection_name STRING METADATA VIRTUAL,"
+ " row_kind STRING METADATA VIRTUAL,"
+ " full_document STRING METADATA FROM 'full_document' VIRTUAL,"
+ " primary key (_id) not enforced"
+ ") WITH ("
+ " 'connector' = 'mongodb-cdc',"
Expand Down Expand Up @@ -653,16 +654,32 @@ private void testMongoDBParallelSourceWithMetadataColumns(
TableResult tableResult =
tEnv.executeSql(
"select database_name, collection_name, row_kind, "
+ "cid, name, address, phone_number from customers");
+ "cid, name, address, phone_number, full_document from customers");
CloseableIterator<Row> iterator = tableResult.collect();
JobID jobId = tableResult.getJobClient().get().getJobID();
List<String> expectedSnapshotData = new ArrayList<>();
for (int i = 0; i < captureCustomerCollections.length; i++) {
expectedSnapshotData.addAll(snapshotForSingleTable);
}

assertEqualsInAnyOrder(
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
List<String> rawSnapshotData = fetchRows(iterator, expectedSnapshotData.size());

// Strip full_document (last column) for precise matching of other metadata columns
List<String> snapshotWithoutFullDoc =
rawSnapshotData.stream()
.map(MongoDBFullChangelogITCase::stripLastColumn)
.collect(Collectors.toList());
assertEqualsInAnyOrder(expectedSnapshotData, snapshotWithoutFullDoc);

// Verify full_document is non-null and contains expected fields in snapshot records
for (String row : rawSnapshotData) {
Assertions.assertThat(row)
.as("Snapshot full_document should contain 'cid'")
.contains("\"cid\"");
Assertions.assertThat(row)
.as("Snapshot full_document should contain 'name'")
.contains("\"name\"");
}

// second step: check the change stream data
for (String collectionName : captureCustomerCollections) {
Expand Down Expand Up @@ -693,11 +710,50 @@ private void testMongoDBParallelSourceWithMetadataColumns(
for (int i = 0; i < captureCustomerCollections.length; i++) {
expectedChangeStreamData.addAll(changeEventsForSingleTable);
}
List<String> actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size());
assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData);
List<String> rawChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size());

// Strip full_document for precise matching of other metadata columns
List<String> changeStreamWithoutFullDoc =
rawChangeStreamData.stream()
.map(MongoDBFullChangelogITCase::stripLastColumn)
.collect(Collectors.toList());
assertEqualsInAnyOrder(expectedChangeStreamData, changeStreamWithoutFullDoc);

// Verify full_document in change stream: non-null for +I/+U, null for -D
for (String row : rawChangeStreamData) {
if (row.startsWith("+I") || row.startsWith("+U")) {
Assertions.assertThat(row)
.as("Change stream full_document should contain 'name'")
.contains("\"name\"");
}
if (row.startsWith("-D")) {
Assertions.assertThat(row)
.as("Delete event full_document should be null")
.endsWith(", null]");
}
}

tableResult.getJobClient().get().cancel().get();
}

/**
* Strip the last column (full_document) from a Row.toString() formatted string. Row format:
* "+I[col1, col2, ..., colN, full_document_json]" or "+I[col1, col2, ..., colN, null]".
*/
private static String stripLastColumn(String row) {
// Find the last ", " before the full_document column value
// The full_document is a JSON string starting with {" or null
int lastJsonStart = row.lastIndexOf(", {\"");
if (lastJsonStart > 0) {
return row.substring(0, lastJsonStart) + "]";
}
int lastNullStart = row.lastIndexOf(", null]");
if (lastNullStart > 0) {
return row.substring(0, lastNullStart) + "]";
}
return row;
}

private void testMongoDBParallelSource(
MongoDBTestUtils.FailoverType failoverType,
MongoDBTestUtils.FailoverPhase failoverPhase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception {
+ " weight DECIMAL(10,3),"
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
+ " collection_name STRING METADATA VIRTUAL,"
+ " full_document STRING METADATA FROM 'full_document' VIRTUAL,"
+ " PRIMARY KEY (_id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'mongodb-cdc',"
Expand All @@ -497,6 +498,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception {
+ " weight DECIMAL(10,3),"
+ " database_name STRING,"
+ " collection_name STRING,"
+ " full_document STRING,"
+ " PRIMARY KEY (_id) NOT ENFORCED"
+ ") WITH ("
+ " 'connector' = 'values',"
Expand Down Expand Up @@ -549,6 +551,30 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception {

waitForSinkSize("meta_sink", 16);

List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("meta_sink");

// Verify original metadata columns (database_name, collection_name) by stripping
// the full_document column (last column) from each row for precise matching
List<String> actualWithoutFullDoc =
actual.stream()
.map(
row -> {
// Row format: +I(col1,col2,...,colN,full_document_json)
// full_document is the last column, strip it
int lastCommaBeforeJson = row.lastIndexOf(",{\"_id\"");
if (lastCommaBeforeJson > 0) {
return row.substring(0, lastCommaBeforeJson) + ")";
}
// For -D events, full_document is null
int lastCommaBeforeNull = row.lastIndexOf(",null)");
if (lastCommaBeforeNull > 0) {
return row.substring(0, lastCommaBeforeNull) + ")";
}
return row;
})
.sorted()
.collect(Collectors.toList());

List<String> expected =
Stream.of(
"+I(100000000000000000000101,scooter,Small 2-wheel scooter,3.140,%s,products)",
Expand All @@ -571,8 +597,21 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception {
.sorted()
.collect(Collectors.toList());

List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("meta_sink");
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
Assertions.assertThat(actualWithoutFullDoc).containsExactlyInAnyOrderElementsOf(expected);

// Verify full_document metadata: non-null for all events, contains expected fields
for (String row : actual) {
Assertions.assertThat(row)
.as("full_document should contain 'name' field")
.contains("\"name\"");
Assertions.assertThat(row)
.as("full_document should contain 'description' field")
.contains("\"description\"");
Assertions.assertThat(row)
.as("full_document should contain 'weight' field")
.contains("\"weight\"");
}

result.getJobClient().get().cancel().get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class MongoDBTableFactoryTest {
Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
Column.metadata(
"_database_name", DataTypes.STRING(), "database_name", true),
Column.metadata(
"_full_document", DataTypes.STRING(), "full_document", true),
Column.metadata("_row_kind", DataTypes.STRING(), "row_kind", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk", Collections.singletonList("_id")));
Expand Down Expand Up @@ -227,7 +229,7 @@ void testMetadataColumns() {
DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties);
MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource;
mongoDBSource.applyReadableMetadata(
Arrays.asList("op_ts", "database_name", "row_kind"),
Arrays.asList("op_ts", "database_name", "full_document", "row_kind"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = mongoDBSource.copy();

Expand Down Expand Up @@ -262,7 +264,8 @@ void testMetadataColumns() {
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());

expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "row_kind");
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "full_document", "row_kind");

Assertions.assertThat(actualSource).isEqualTo(expectedSource);

Expand Down
Loading