Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
Expand Down Expand Up @@ -91,8 +92,7 @@ protected void processElement(
SourceRecord element, SourceOutput<T> output, SourceSplitState splitState)
throws Exception {
if (isSchemaChangeEvent(element) && splitState.isStreamSplitState()) {
restoreCreateTableEventsFromSplitSchemas(
splitState.asStreamSplitState().getTableSchemas());
cacheCreateTableEventsFromSchemas(splitState.asStreamSplitState().getTableSchemas());
}

if (shouldEmitAllCreateTableEventsInSnapshotMode && isBounded) {
Expand All @@ -104,15 +104,20 @@ protected void processElement(
// to downstream to avoid checkpoint timeout.
io.debezium.relational.TableId tableId =
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
emitCreateTableEventIfNeeded(tableId, output);
emitCreateTableEventIfNeeded(tableId, output, splitState);
} else if (isDataChangeRecord(element)) {
// Handle data change events, schema change events are handled downstream directly
io.debezium.relational.TableId tableId = getTableId(element);
emitCreateTableEventIfNeeded(tableId, output);
emitCreateTableEventIfNeeded(tableId, output, splitState);
}
super.processElement(element, output, splitState);
}

@Override
public void applySplit(SourceSplitBase split) {
cacheCreateTableEventsFromSchemas(split.getTableSchemas());
}

@SuppressWarnings("unchecked")
private void emitAllCreateTableEvents(SourceOutput<T> output) {
createTableEventCache.forEach(
Expand All @@ -124,33 +129,33 @@ private void emitAllCreateTableEvents(SourceOutput<T> output) {

@SuppressWarnings("unchecked")
private void emitCreateTableEventIfNeeded(
io.debezium.relational.TableId tableId, SourceOutput<T> output) {
io.debezium.relational.TableId tableId,
SourceOutput<T> output,
SourceSplitState splitState) {
if (alreadySendCreateTableTables.contains(tableId)) {
return;
}

cacheCreateTableEventsFromSchemas(splitState.toSourceSplit().getTableSchemas());
CreateTableEvent createTableEvent = createTableEventCache.get(tableId);
if (createTableEvent != null) {
output.collect((T) createTableEvent);
} else {
// Table not in cache, fetch schema from database
if (createTableEvent == null) {
// The schema should normally be restored from split state. Keep JDBC as a final
// fallback for old savepoints or unexpected cache misses.
try (SqlServerConnection jdbc =
createSqlServerConnection(sourceConfig.getDbzConnectorConfig())) {
createTableEvent = buildCreateTableEvent(jdbc, tableId);
output.collect((T) createTableEvent);
createTableEventCache.put(tableId, createTableEvent);
} catch (SQLException e) {
throw new RuntimeException("Failed to get table schema for " + tableId, e);
}
}
Comment thread
leonardBang marked this conversation as resolved.
output.collect((T) createTableEvent);
alreadySendCreateTableTables.add(tableId);
}

private void restoreCreateTableEventsFromSplitSchemas(
private void cacheCreateTableEventsFromSchemas(
Map<io.debezium.relational.TableId, TableChange> tableSchemas) {
if (!sourceConfig.isIncludeSchemaChanges()
|| tableSchemas == null
|| tableSchemas.isEmpty()) {
if (tableSchemas == null || tableSchemas.isEmpty()) {
return;
}
for (Map.Entry<io.debezium.relational.TableId, TableChange> entry :
Expand Down
Loading
Loading