diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java index 497fa54ea1d..bd552bce01f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java @@ -1800,6 +1800,196 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st return expected; } + @Test + @Order(10) + public void testPartitionedTable() throws Exception { + createAndInitialize("partition_table.sql"); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), ORACLE_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(ORACLE_CONTAINER.getOraclePort())); + options.put(USERNAME.key(), CONNECTOR_USER); + options.put(PASSWORD.key(), CONNECTOR_PWD); + options.put(TABLES.key(), "debezium.partitioned_customers"); + options.put(DATABASE.key(), ORACLE_CONTAINER.getDatabaseName()); + options.put(SCAN_STARTUP_MODE.key(), "initial"); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + Properties dbzProperties = new Properties(); + dbzProperties.setProperty("database.connection.adapter", "logminer"); + dbzProperties.setProperty("log.mining.strategy", "online_catalog"); + dbzProperties.setProperty("snapshot.locking.mode", "none"); + dbzProperties.setProperty("database.history.store.only.captured.tables.ddl", "true"); + dbzProperties.setProperty("include.schema.changes", "true"); + OracleSourceConfigFactory configFactory = new OracleSourceConfigFactory(); + configFactory + .username(CONNECTOR_USER) + .password(CONNECTOR_PWD) + .port(ORACLE_CONTAINER.getOraclePort()) + .databaseList(ORACLE_CONTAINER.getDatabaseName()) + .hostname(ORACLE_CONTAINER.getHost()) + .tableList("DEBEZIUM.PARTITIONED_CUSTOMERS") + .startupOptions(StartupOptions.initial()) + .debeziumProperties(dbzProperties) + .includeSchemaChanges(true); + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) + new OracleDataSource( + configFactory, + context.getFactoryConfiguration(), + new ArrayList<>()) + .getEventSourceProvider(); + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + OracleDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(150_000); + + TableId tableId = TableId.tableId("DEBEZIUM", "PARTITIONED_CUSTOMERS"); + + // Build expected CreateTableEvent + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + Schema.newBuilder() + .physicalColumn("ID", DataTypes.BIGINT().notNull()) + .physicalColumn("NAME", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("ADDRESS", DataTypes.VARCHAR(1024)) + .physicalColumn("PHONE_NUMBER", DataTypes.VARCHAR(512)) + .primaryKey(Collections.singletonList("ID")) + .build()); + + // Build expected snapshot data + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.BIGINT().notNull(), + DataTypes.VARCHAR(255).notNull(), + DataTypes.VARCHAR(1024), + DataTypes.VARCHAR(512) + }, + new String[] {"ID", "NAME", "ADDRESS", "PHONE_NUMBER"}); + BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); + + List expectedSnapshot = new ArrayList<>(); + expectedSnapshot.add(createTableEvent); + // 21 rows across 4 partitions + long[][] snapshotIds = { + {101L}, + {102L}, + {103L}, + {109L}, + {110L}, + {111L}, // partition p1 (< 500) + {501L}, + {502L}, + {513L}, // partition p2 (< 1000) + {1009L}, + {1010L}, + {1011L}, + {1012L}, + {1013L}, + {1014L}, + {1015L}, + {1016L}, + {1017L}, + {1018L}, + {1019L}, // partition p3 (< 2000) + {2000L} // partition p4 (MAXVALUE) + }; + String[] names = { + "user_1", "user_2", "user_3", "user_4", "user_5", "user_6", "user_7", "user_8", + "user_9", "user_10", "user_11", "user_12", "user_13", "user_14", "user_15", "user_16", + "user_17", "user_18", "user_19", "user_20", "user_21" + }; + for (int i = 0; i < snapshotIds.length; i++) { + expectedSnapshot.add( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + snapshotIds[i][0], + BinaryStringData.fromString(names[i]), + BinaryStringData.fromString("Shanghai"), + BinaryStringData.fromString("123567891234") + }))); + } + + // Perform DML operations across partitions + List expectedBinlog = new ArrayList<>(); + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + // INSERT into partition p4 + statement.execute( + "INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (2001, 'user_22', 'Shanghai', '123567891234')"); + expectedBinlog.add( + DataChangeEvent.insertEvent( + tableId, + generator.generate( + new Object[] { + 2001L, + BinaryStringData.fromString("user_22"), + BinaryStringData.fromString("Shanghai"), + BinaryStringData.fromString("123567891234") + }))); + // UPDATE in partition p1 + statement.execute( + "UPDATE DEBEZIUM.PARTITIONED_CUSTOMERS SET ADDRESS = 'Hangzhou' WHERE ID = 103"); + expectedBinlog.add( + DataChangeEvent.updateEvent( + tableId, + generator.generate( + new Object[] { + 103L, + BinaryStringData.fromString("user_3"), + BinaryStringData.fromString("Shanghai"), + BinaryStringData.fromString("123567891234") + }), + generator.generate( + new Object[] { + 103L, + BinaryStringData.fromString("user_3"), + BinaryStringData.fromString("Hangzhou"), + BinaryStringData.fromString("123567891234") + }))); + // DELETE from partition p2 + statement.execute("DELETE FROM DEBEZIUM.PARTITIONED_CUSTOMERS WHERE ID = 502"); + expectedBinlog.add( + DataChangeEvent.deleteEvent( + tableId, + generator.generate( + new Object[] { + 502L, + BinaryStringData.fromString("user_8"), + BinaryStringData.fromString("Shanghai"), + BinaryStringData.fromString("123567891234") + }))); + } + + // Fetch and verify snapshot events + List actual = fetchResults(events, expectedSnapshot.size() + expectedBinlog.size()); + + Map> fieldGetterMaps = new HashMap<>(); + Schema schema = createTableEvent.getSchema(); + fieldGetterMaps.put(tableId, SchemaUtils.createFieldGetters(schema)); + + StringBuilder actualSnapshotStr = + getResultString(actual.subList(0, expectedSnapshot.size()), fieldGetterMaps, false); + StringBuilder expectedSnapshotStr = + getResultString(expectedSnapshot, fieldGetterMaps, false); + assertThat(actualSnapshotStr.toString()).isEqualTo(expectedSnapshotStr.toString()); + + StringBuilder actualBinlogStr = + getResultString( + actual.subList(expectedSnapshot.size(), actual.size()), + fieldGetterMaps, + false); + StringBuilder expectedBinlogStr = getResultString(expectedBinlog, fieldGetterMaps, false); + assertThat(actualBinlogStr.toString()).isEqualTo(expectedBinlogStr.toString()); + env.close(); + } + public static List fetchResults(Iterator iter, int size) { List result = new ArrayList<>(size); while (size > 0 && iter.hasNext()) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java index 81e331c2398..703fda469ea 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java @@ -178,7 +178,7 @@ protected static List listTables(Connection connection) { Set tableIdSet = new HashSet<>(); String queryTablesSql = "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n" - + "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX') " + + "WHERE (PARTITIONED = 'YES' OR (TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX'))) " + "AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)"; try { ResultSet resultSet = connection.createStatement().executeQuery(queryTablesSql); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/resources/ddl/partition_table.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/resources/ddl/partition_table.sql new file mode 100644 index 00000000000..1af3781e221 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/resources/ddl/partition_table.sql @@ -0,0 +1,57 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: partition table test +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create a range-partitioned table for CDC partition table testing +CREATE TABLE DEBEZIUM.PARTITIONED_CUSTOMERS ( + ID NUMBER(10) NOT NULL, + NAME VARCHAR2(255) NOT NULL, + ADDRESS VARCHAR2(1024), + PHONE_NUMBER VARCHAR2(512), + PRIMARY KEY(ID) +) +PARTITION BY RANGE (ID) ( + PARTITION p1 VALUES LESS THAN (500), + PARTITION p2 VALUES LESS THAN (1000), + PARTITION p3 VALUES LESS THAN (2000), + PARTITION p4 VALUES LESS THAN (MAXVALUE) +); + +ALTER TABLE DEBEZIUM.PARTITIONED_CUSTOMERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (101,'user_1','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (102,'user_2','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (103,'user_3','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (109,'user_4','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (110,'user_5','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (111,'user_6','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (501,'user_7','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (502,'user_8','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (513,'user_9','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1009,'user_10','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1010,'user_11','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1011,'user_12','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1012,'user_13','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1013,'user_14','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1014,'user_15','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1015,'user_16','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1016,'user_17','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1017,'user_18','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1018,'user_19','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1019,'user_20','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (2000,'user_21','Shanghai','123567891234'); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java index ff80e77fc63..0762ca01217 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java @@ -96,7 +96,7 @@ public static List listTables( Set tableIdSet = new HashSet<>(); String queryTablesSql = "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n" - + "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX') AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)"; + + "WHERE (PARTITIONED = 'YES' OR (TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX'))) AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)"; try { jdbcConnection.query( queryTablesSql, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java index f8d39cde4ef..c133e89c57e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.java @@ -792,6 +792,127 @@ private String getTableNameRegex(String[] captureCustomerTables) { } } + @Test + void testReadPartitionedTable() throws Exception { + createAndInitialize("partition_table.sql"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(200L); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0); + + String sourceDDL = + format( + "CREATE TABLE partitioned_customers (" + + " ID INT NOT NULL," + + " NAME STRING," + + " ADDRESS STRING," + + " PHONE_NUMBER STRING," + + " primary key (ID) not enforced" + + ") WITH (" + + " 'connector' = 'oracle-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = 'false'," + + " 'debezium.log.mining.strategy' = 'online_catalog'," + + " 'debezium.database.history.store.only.captured.tables.ddl' = 'true'" + + ")", + ORACLE_CONTAINER.getHost(), + ORACLE_CONTAINER.getOraclePort(), + TOP_USER, + TOP_SECRET, + ORACLE_DATABASE, + ORACLE_SCHEMA, + "PARTITIONED_CUSTOMERS"); + + // first step: check the snapshot data + String[] snapshotExpected = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[501, user_7, Shanghai, 123567891234]", + "+I[502, user_8, Shanghai, 123567891234]", + "+I[513, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, Shanghai, 123567891234]" + }; + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from partitioned_customers"); + CloseableIterator iterator = tableResult.collect(); + + List expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotExpected)); + + LOG.info("partition table snapshot data start"); + assertEqualsInAnyOrder( + expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); + + // second step: check the redo log data + String tableId = ORACLE_SCHEMA + ".PARTITIONED_CUSTOMERS"; + // DML operations across different partitions + executeSql( + "UPDATE " + tableId + " SET ADDRESS = 'Hangzhou' WHERE ID = 103"); // partition p1 + executeSql("DELETE FROM " + tableId + " WHERE ID = 502"); // partition p2 + executeSql( + "INSERT INTO " + + tableId + + " VALUES(502, 'user_8','Shanghai','123567891234')"); // partition p2 + executeSql( + "UPDATE " + tableId + " SET ADDRESS = 'Shanghai' WHERE ID = 103"); // partition p1 + executeSql( + "UPDATE " + tableId + " SET ADDRESS = 'Hangzhou' WHERE ID = 1010"); // partition p3 + executeSql( + "INSERT INTO " + + tableId + + " VALUES(2001, 'user_22','Shanghai','123567891234')"); // partition p4 + executeSql( + "INSERT INTO " + + tableId + + " VALUES(2002, 'user_23','Shanghai','123567891234')"); // partition p4 + executeSql( + "INSERT INTO " + + tableId + + " VALUES(2003, 'user_24','Shanghai','123567891234')"); // partition p4 + + String[] redoLogExpected = + new String[] { + "-U[103, user_3, Shanghai, 123567891234]", + "+U[103, user_3, Hangzhou, 123567891234]", + "-D[502, user_8, Shanghai, 123567891234]", + "+I[502, user_8, Shanghai, 123567891234]", + "-U[103, user_3, Hangzhou, 123567891234]", + "+U[103, user_3, Shanghai, 123567891234]", + "-U[1010, user_11, Shanghai, 123567891234]", + "+U[1010, user_11, Hangzhou, 123567891234]", + "+I[2001, user_22, Shanghai, 123567891234]", + "+I[2002, user_23, Shanghai, 123567891234]", + "+I[2003, user_24, Shanghai, 123567891234]" + }; + List expectedRedoLogData = new ArrayList<>(Arrays.asList(redoLogExpected)); + assertEqualsInAnyOrder( + expectedRedoLogData, fetchRows(iterator, expectedRedoLogData.size())); + tableResult.getJobClient().get().cancel().get(); + } + private void executeSql(String sql) throws Exception { try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java index cd98b2a83b2..d04bffce052 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSourceTestBase.java @@ -178,7 +178,7 @@ protected static List listTables(Connection connection) { Set tableIdSet = new HashSet<>(); String queryTablesSql = "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES \n" - + "WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX') " + + "WHERE (PARTITIONED = 'YES' OR (TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX'))) " + "AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)"; try { ResultSet resultSet = connection.createStatement().executeQuery(queryTablesSql); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/resources/ddl/partition_table.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/resources/ddl/partition_table.sql new file mode 100644 index 00000000000..1af3781e221 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/resources/ddl/partition_table.sql @@ -0,0 +1,57 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: partition table test +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create a range-partitioned table for CDC partition table testing +CREATE TABLE DEBEZIUM.PARTITIONED_CUSTOMERS ( + ID NUMBER(10) NOT NULL, + NAME VARCHAR2(255) NOT NULL, + ADDRESS VARCHAR2(1024), + PHONE_NUMBER VARCHAR2(512), + PRIMARY KEY(ID) +) +PARTITION BY RANGE (ID) ( + PARTITION p1 VALUES LESS THAN (500), + PARTITION p2 VALUES LESS THAN (1000), + PARTITION p3 VALUES LESS THAN (2000), + PARTITION p4 VALUES LESS THAN (MAXVALUE) +); + +ALTER TABLE DEBEZIUM.PARTITIONED_CUSTOMERS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (101,'user_1','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (102,'user_2','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (103,'user_3','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (109,'user_4','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (110,'user_5','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (111,'user_6','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (501,'user_7','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (502,'user_8','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (513,'user_9','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1009,'user_10','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1010,'user_11','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1011,'user_12','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1012,'user_13','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1013,'user_14','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1014,'user_15','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1015,'user_16','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1016,'user_17','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1017,'user_18','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1018,'user_19','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (1019,'user_20','Shanghai','123567891234'); +INSERT INTO DEBEZIUM.PARTITIONED_CUSTOMERS VALUES (2000,'user_21','Shanghai','123567891234');