From 0f55773f697b37f72d5c5f26af2fda583e04d2b2 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Mon, 9 Feb 2026 20:19:09 +0800 Subject: [PATCH 1/2] [FLINK-38236][mongodb] Add metadata support for full document in MongoDB CDC connector --- .../docs/connectors/flink-sources/mongodb-cdc.md | 6 ++++++ .../docs/connectors/flink-sources/mongodb-cdc.md | 6 ++++++ .../mongodb/table/MongoDBReadableMetadata.java | 15 +++++++++++++++ .../mongodb/table/MongoDBTableFactoryTest.java | 7 +++++-- 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md index f98c7b6db99..bc5cd251f30 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md @@ -383,6 +383,11 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能 TIMESTAMP_LTZ(3) NOT NULL 它指示在数据库中进行更改的时间。
如果记录是从表的快照而不是改变流中读取的,该值将始终为0。 + + full_document + STRING + 变更事件的完整文档 JSON 字符串原始数据。对于 insert 事件,这是新文档。对于 update 事件,这是更新后的完整文档。对于 delete 事件,该值为 null。 + row_kind STRING NOT NULL @@ -398,6 +403,7 @@ CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA FROM 'collection_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + raw_data STRING METADATA FROM 'full_document' VIRTUAL, operation STRING METADATA FROM 'row_kind' VIRTUAL, _id STRING, // 必须声明 name STRING, diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md index 98353a3b8c9..ed4bd72023c 100644 --- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md @@ -408,6 +408,11 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0. + + full_document + STRING + The full document of the change event as a JSON string raw data. For insert events, this is the new document. For update events, this is the full document after the update. For delete events, this is null. + row_kind STRING NOT NULL @@ -424,6 +429,7 @@ CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA FROM 'collection_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + raw_data STRING METADATA FROM 'full_document' VIRTUAL, operation STRING METADATA FROM 'row_kind' VIRTUAL, _id STRING, // must be declared name STRING, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java index 581873d2035..a93c789cf4f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java @@ -85,6 +85,21 @@ public Object read(SourceRecord record) { } }), + /** It indicates the full document as string raw data. */ + FULL_DOCUMENT( + "full_document", + DataTypes.STRING().nullable(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + String fullDocString = value.getString(MongoDBEnvelope.FULL_DOCUMENT_FIELD); + return fullDocString != null ? StringData.fromString(fullDocString) : null; + } + }), + /** * It indicates the row kind of the changelog. '+I' means INSERT message, '-D' means DELETE * message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER message diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java index 5716eeed462..a9c58a961b8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java @@ -89,6 +89,8 @@ class MongoDBTableFactoryTest { Column.metadata("time", DataTypes.TIMESTAMP_LTZ(3), "op_ts", true), Column.metadata( "_database_name", DataTypes.STRING(), "database_name", true), + Column.metadata( + "_full_document", DataTypes.STRING(), "full_document", true), Column.metadata("_row_kind", DataTypes.STRING(), "row_kind", true)), Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("_id"))); @@ -227,7 +229,7 @@ void testMetadataColumns() { DynamicTableSource actualSource = createTableSource(SCHEMA_WITH_METADATA, properties); MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource; mongoDBSource.applyReadableMetadata( - Arrays.asList("op_ts", "database_name", "row_kind"), + Arrays.asList("op_ts", "database_name", "full_document", "row_kind"), SCHEMA_WITH_METADATA.toSourceRowDataType()); actualSource = mongoDBSource.copy(); @@ -262,7 +264,8 @@ void testMetadataColumns() { SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); - expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "row_kind"); + expectedSource.metadataKeys = + Arrays.asList("op_ts", "database_name", "full_document", "row_kind"); Assertions.assertThat(actualSource).isEqualTo(expectedSource); From 00ccd53827677c7590365d15f36a1cb44c6b9d5c Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Mon, 9 Feb 2026 20:52:18 +0800 Subject: [PATCH 2/2] [FLINK-38236][mongodb] Add metadata support for full document in MongoDB CDC connector --- .../source/MongoDBFullChangelogITCase.java | 66 +++++++++++++++++-- .../mongodb/table/MongoDBConnectorITCase.java | 43 +++++++++++- 2 files changed, 102 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index 788c324c044..127f3b21f37 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -599,6 +599,7 @@ private void testMongoDBParallelSourceWithMetadataColumns( + " database_name STRING METADATA VIRTUAL," + " collection_name STRING METADATA VIRTUAL," + " row_kind STRING METADATA VIRTUAL," + + " full_document STRING METADATA FROM 'full_document' VIRTUAL," + " primary key (_id) not enforced" + ") WITH (" + " 'connector' = 'mongodb-cdc'," @@ -653,7 +654,7 @@ private void testMongoDBParallelSourceWithMetadataColumns( TableResult tableResult = tEnv.executeSql( "select database_name, collection_name, row_kind, " - + "cid, name, address, phone_number from customers"); + + "cid, name, address, phone_number, full_document from customers"); CloseableIterator iterator = tableResult.collect(); JobID jobId = tableResult.getJobClient().get().getJobID(); List expectedSnapshotData = new ArrayList<>(); @@ -661,8 +662,24 @@ private void testMongoDBParallelSourceWithMetadataColumns( expectedSnapshotData.addAll(snapshotForSingleTable); } - assertEqualsInAnyOrder( - expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + List rawSnapshotData = fetchRows(iterator, expectedSnapshotData.size()); + + // Strip full_document (last column) for precise matching of other metadata columns + List snapshotWithoutFullDoc = + rawSnapshotData.stream() + .map(MongoDBFullChangelogITCase::stripLastColumn) + .collect(Collectors.toList()); + assertEqualsInAnyOrder(expectedSnapshotData, snapshotWithoutFullDoc); + + // Verify full_document is non-null and contains expected fields in snapshot records + for (String row : rawSnapshotData) { + Assertions.assertThat(row) + .as("Snapshot full_document should contain 'cid'") + .contains("\"cid\""); + Assertions.assertThat(row) + .as("Snapshot full_document should contain 'name'") + .contains("\"name\""); + } // second step: check the change stream data for (String collectionName : captureCustomerCollections) { @@ -693,11 +710,50 @@ private void testMongoDBParallelSourceWithMetadataColumns( for (int i = 0; i < captureCustomerCollections.length; i++) { expectedChangeStreamData.addAll(changeEventsForSingleTable); } - List actualChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size()); - assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData); + List rawChangeStreamData = fetchRows(iterator, expectedChangeStreamData.size()); + + // Strip full_document for precise matching of other metadata columns + List changeStreamWithoutFullDoc = + rawChangeStreamData.stream() + .map(MongoDBFullChangelogITCase::stripLastColumn) + .collect(Collectors.toList()); + assertEqualsInAnyOrder(expectedChangeStreamData, changeStreamWithoutFullDoc); + + // Verify full_document in change stream: non-null for +I/+U, null for -D + for (String row : rawChangeStreamData) { + if (row.startsWith("+I") || row.startsWith("+U")) { + Assertions.assertThat(row) + .as("Change stream full_document should contain 'name'") + .contains("\"name\""); + } + if (row.startsWith("-D")) { + Assertions.assertThat(row) + .as("Delete event full_document should be null") + .endsWith(", null]"); + } + } + tableResult.getJobClient().get().cancel().get(); } + /** + * Strip the last column (full_document) from a Row.toString() formatted string. Row format: + * "+I[col1, col2, ..., colN, full_document_json]" or "+I[col1, col2, ..., colN, null]". + */ + private static String stripLastColumn(String row) { + // Find the last ", " before the full_document column value + // The full_document is a JSON string starting with {" or null + int lastJsonStart = row.lastIndexOf(", {\""); + if (lastJsonStart > 0) { + return row.substring(0, lastJsonStart) + "]"; + } + int lastNullStart = row.lastIndexOf(", null]"); + if (lastNullStart > 0) { + return row.substring(0, lastNullStart) + "]"; + } + return row; + } + private void testMongoDBParallelSource( MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java index 89bc8b002ee..9404212cbee 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java @@ -471,6 +471,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception { + " weight DECIMAL(10,3)," + " db_name STRING METADATA FROM 'database_name' VIRTUAL," + " collection_name STRING METADATA VIRTUAL," + + " full_document STRING METADATA FROM 'full_document' VIRTUAL," + " PRIMARY KEY (_id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'mongodb-cdc'," @@ -497,6 +498,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception { + " weight DECIMAL(10,3)," + " database_name STRING," + " collection_name STRING," + + " full_document STRING," + " PRIMARY KEY (_id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'values'," @@ -549,6 +551,30 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception { waitForSinkSize("meta_sink", 16); + List actual = TestValuesTableFactory.getRawResultsAsStrings("meta_sink"); + + // Verify original metadata columns (database_name, collection_name) by stripping + // the full_document column (last column) from each row for precise matching + List actualWithoutFullDoc = + actual.stream() + .map( + row -> { + // Row format: +I(col1,col2,...,colN,full_document_json) + // full_document is the last column, strip it + int lastCommaBeforeJson = row.lastIndexOf(",{\"_id\""); + if (lastCommaBeforeJson > 0) { + return row.substring(0, lastCommaBeforeJson) + ")"; + } + // For -D events, full_document is null + int lastCommaBeforeNull = row.lastIndexOf(",null)"); + if (lastCommaBeforeNull > 0) { + return row.substring(0, lastCommaBeforeNull) + ")"; + } + return row; + }) + .sorted() + .collect(Collectors.toList()); + List expected = Stream.of( "+I(100000000000000000000101,scooter,Small 2-wheel scooter,3.140,%s,products)", @@ -571,8 +597,21 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Exception { .sorted() .collect(Collectors.toList()); - List actual = TestValuesTableFactory.getRawResultsAsStrings("meta_sink"); - Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected); + Assertions.assertThat(actualWithoutFullDoc).containsExactlyInAnyOrderElementsOf(expected); + + // Verify full_document metadata: non-null for all events, contains expected fields + for (String row : actual) { + Assertions.assertThat(row) + .as("full_document should contain 'name' field") + .contains("\"name\""); + Assertions.assertThat(row) + .as("full_document should contain 'description' field") + .contains("\"description\""); + Assertions.assertThat(row) + .as("full_document should contain 'weight' field") + .contains("\"weight\""); + } + result.getJobClient().get().cancel().get(); }