2323import org .apache .flink .cdc .common .schema .Schema ;
2424import org .apache .flink .cdc .connectors .base .options .StartupOptions ;
2525import org .apache .flink .cdc .connectors .base .source .meta .offset .OffsetFactory ;
26+ import org .apache .flink .cdc .connectors .base .source .meta .split .SourceSplitBase ;
2627import org .apache .flink .cdc .connectors .base .source .meta .split .SourceSplitState ;
2728import org .apache .flink .cdc .connectors .base .source .metrics .SourceReaderMetrics ;
2829import org .apache .flink .cdc .connectors .base .source .reader .IncrementalSourceRecordEmitter ;
@@ -91,8 +92,7 @@ protected void processElement(
9192 SourceRecord element , SourceOutput <T > output , SourceSplitState splitState )
9293 throws Exception {
9394 if (isSchemaChangeEvent (element ) && splitState .isStreamSplitState ()) {
94- restoreCreateTableEventsFromSplitSchemas (
95- splitState .asStreamSplitState ().getTableSchemas ());
95+ cacheCreateTableEventsFromSchemas (splitState .asStreamSplitState ().getTableSchemas ());
9696 }
9797
9898 if (shouldEmitAllCreateTableEventsInSnapshotMode && isBounded ) {
@@ -104,15 +104,20 @@ protected void processElement(
104104 // to downstream to avoid checkpoint timeout.
105105 io .debezium .relational .TableId tableId =
106106 splitState .asSnapshotSplitState ().toSourceSplit ().getTableId ();
107- emitCreateTableEventIfNeeded (tableId , output );
107+ emitCreateTableEventIfNeeded (tableId , output , splitState );
108108 } else if (isDataChangeRecord (element )) {
109109 // Handle data change events, schema change events are handled downstream directly
110110 io .debezium .relational .TableId tableId = getTableId (element );
111- emitCreateTableEventIfNeeded (tableId , output );
111+ emitCreateTableEventIfNeeded (tableId , output , splitState );
112112 }
113113 super .processElement (element , output , splitState );
114114 }
115115
116+ @ Override
117+ public void applySplit (SourceSplitBase split ) {
118+ cacheCreateTableEventsFromSchemas (split .getTableSchemas ());
119+ }
120+
116121 @ SuppressWarnings ("unchecked" )
117122 private void emitAllCreateTableEvents (SourceOutput <T > output ) {
118123 createTableEventCache .forEach (
@@ -124,33 +129,28 @@ private void emitAllCreateTableEvents(SourceOutput<T> output) {
124129
125130 @ SuppressWarnings ("unchecked" )
126131 private void emitCreateTableEventIfNeeded (
127- io .debezium .relational .TableId tableId , SourceOutput <T > output ) {
132+ io .debezium .relational .TableId tableId ,
133+ SourceOutput <T > output ,
134+ SourceSplitState splitState ) {
128135 if (alreadySendCreateTableTables .contains (tableId )) {
129136 return ;
130137 }
131138
139+ cacheCreateTableEventsFromSchemas (splitState .toSourceSplit ().getTableSchemas ());
132140 CreateTableEvent createTableEvent = createTableEventCache .get (tableId );
133- if (createTableEvent != null ) {
134- output .collect ((T ) createTableEvent );
135- } else {
136- // Table not in cache, fetch schema from database
137- try (SqlServerConnection jdbc =
138- createSqlServerConnection (sourceConfig .getDbzConnectorConfig ())) {
139- createTableEvent = buildCreateTableEvent (jdbc , tableId );
140- output .collect ((T ) createTableEvent );
141- createTableEventCache .put (tableId , createTableEvent );
142- } catch (SQLException e ) {
143- throw new RuntimeException ("Failed to get table schema for " + tableId , e );
144- }
141+ if (createTableEvent == null ) {
142+ throw new IllegalStateException (
143+ "Missing CreateTableEvent for table "
144+ + tableId
145+ + ". Table schema should have been restored before processing records." );
145146 }
147+ output .collect ((T ) createTableEvent );
146148 alreadySendCreateTableTables .add (tableId );
147149 }
148150
149- private void restoreCreateTableEventsFromSplitSchemas (
151+ private void cacheCreateTableEventsFromSchemas (
150152 Map <io .debezium .relational .TableId , TableChange > tableSchemas ) {
151- if (!sourceConfig .isIncludeSchemaChanges ()
152- || tableSchemas == null
153- || tableSchemas .isEmpty ()) {
153+ if (tableSchemas == null || tableSchemas .isEmpty ()) {
154154 return ;
155155 }
156156 for (Map .Entry <io .debezium .relational .TableId , TableChange > entry :
0 commit comments