Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
8424218
[test][connector/oceanbase] Stabilize OceanBaseFailoverITCase startup…
leonardBang Jun 25, 2026
542ec17
[test][connector/postgres] Replace sleeps in PostgresSourceReaderTest
leonardBang Jun 25, 2026
d6e8d1f
[test][connector/postgres] Stabilize NewlyAddedTableITCase failover t…
leonardBang Jun 25, 2026
e6155a8
[test][pipeline-postgres] Avoid canceling stopped savepoint jobs
leonardBang Jun 25, 2026
00191ae
[test][connector/mysql] Stabilize NewlyAddedTableITCase failover races
leonardBang Jun 25, 2026
217cb1f
[test][connector/mysql] Stabilize MySqlConnectorITCase sink waits
leonardBang Jun 25, 2026
8523c68
[test][connector/mongodb] Tolerate duplicate restore update pair
leonardBang Jun 25, 2026
c435bf3
[test][connector/oracle] Poll sink contents in NewlyAddedTableITCase
leonardBang Jun 25, 2026
7be4a4c
[test][connector/sqlserver] Stabilize SqlServerTimezoneITCase waits
leonardBang Jun 25, 2026
45baf16
[test][connector/iceberg] Stabilize MySqlToIcebergE2eITCase commits
leonardBang Jun 25, 2026
32be3af
[test][pipeline-e2e] Wait for stream-split handoff before incremental…
leonardBang Jun 25, 2026
ad6f7f4
[test][pipeline-e2e] Stabilize RouteE2eITCase extreme routing waits
leonardBang Jun 25, 2026
8279d97
[test][pipeline-e2e] Stabilize SqlServerE2eITCase split handoff
leonardBang Jun 25, 2026
79e5fe3
[test][pipeline-e2e] Accept alternate Oracle id rendering in OracleE2…
leonardBang Jun 25, 2026
11bfd09
[test][pipeline-e2e] Stabilize MySqlToHudiE2eITCase MOR validation
leonardBang Jun 25, 2026
5ff15c6
[test] Address pipeline e2e review comments
leonardBang Jun 26, 2026
a2bc423
[test] Improve test assertion readability
leonardBang Jun 26, 2026
53ab239
[test][mongodb] Fix duplicate-update assertion compilation
leonardBang Jun 26, 2026
51d7eca
[test] Stabilize Oracle and Mongo CDC waits
leonardBang Jun 26, 2026
13630e6
[test][pipeline-e2e] Wait for MysqlToKafka handoff
leonardBang Jun 26, 2026
fde405f
[test][mysql-source] Stabilize varbinary PK source test
leonardBang Jun 26, 2026
44385fa
[test][pipeline-e2e] Stabilize multiple transform rule test
leonardBang Jun 27, 2026
55ddaef
[test] Converge flaky review fixes
leonardBang Jun 27, 2026
268b520
[test][pipeline-e2e] Restore Oracle legacy id fallback
leonardBang Jun 27, 2026
cf0d312
[test] Stabilize mysql conflict and Hudi schema evolution tests
leonardBang Jun 27, 2026
dd373be
[test][mysql-source] Stabilize server-id conflict assertion
leonardBang Jun 27, 2026
e86007c
[test] Stabilize mysql server-id and OceanBase startup checks
leonardBang Jun 27, 2026
2d98afd
[test][ci] Disable Ryuk in E2E workflow jobs
leonardBang Jun 27, 2026
aadb473
[test][pipeline-e2e] Flush Hudi schema evolution before validation
leonardBang Jun 28, 2026
8be3785
[test][oracle] Wait for snapshot job before JM failover
leonardBang Jun 28, 2026
ec4f5a0
[test] Retry transient Oracle, TiDB, and Iceberg races
leonardBang Jun 29, 2026
54105d0
[test][oracle] Precreate log mining flush table
leonardBang Jun 29, 2026
7d1d392
[test][ci] Replace setup-maven action
leonardBang Jun 29, 2026
9b8fb24
[test] Stabilize Oracle and SQL Server source ITs
leonardBang Jun 29, 2026
3b32a5e
[test] Stabilize MySQL and Oracle source ITs
leonardBang Jun 29, 2026
682c7f9
[test][oracle] Stabilize log-mining transition tests
leonardBang Jun 29, 2026
d98d1f3
[fix][oracle] Serialize flush table initialization
leonardBang Jun 29, 2026
ab06a99
[test][mysql] Stabilize varbinary primary-key streaming test
leonardBang Jun 29, 2026
c0480c2
[test][pipeline-e2e] Stabilize Hudi whole-db checkpoint trigger
leonardBang Jun 29, 2026
8f41b85
[fix][oracle] Stabilize log-mining session restart
leonardBang Jun 30, 2026
b28dbd2
[test][pipeline-e2e] Stabilize UDF schema-event waits
leonardBang Jun 30, 2026
977c17a
[test][mysql] Stabilize varbinary primary-key handoff wait
leonardBang Jun 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,16 @@ jobs:
cache: 'maven'

- name: Set Maven 3.8.6
uses: stCarolas/setup-maven@v5
with:
maven-version: 3.8.6
run: |
set -euo pipefail
maven_version=3.8.6
maven_archive="apache-maven-${maven_version}-bin.tar.gz"
curl -fsSL --retry 3 --retry-all-errors --retry-delay 5 \
"https://archive.apache.org/dist/maven/maven-3/${maven_version}/binaries/${maven_archive}" \
-o "$RUNNER_TEMP/$maven_archive"
tar -xzf "$RUNNER_TEMP/$maven_archive" -C "$RUNNER_TEMP"
echo "$RUNNER_TEMP/apache-maven-${maven_version}/bin" >> "$GITHUB_PATH"
"$RUNNER_TEMP/apache-maven-${maven_version}/bin/mvn" -version

- name: Compile and test
timeout-minutes: 90
Expand All @@ -107,7 +114,11 @@ jobs:
fi

build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }${{ inputs.custom-maven-parameter }}"


if [ "${{ matrix.module }}" = "pipeline_e2e" ] || [ "${{ matrix.module }}" = "source_e2e" ]; then
export TESTCONTAINERS_RYUK_DISABLED=true
fi

mvn --no-snapshot-updates -B -DskipTests ${{ inputs.custom-maven-parameter }} -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} -Duser.timezone=$jvm_timezone verify

- name: Print JVM thread dumps when cancelled
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
################################################################################

function random_timezone() {
local rnd=$(expr $RANDOM % 25)
local hh=$(expr $rnd / 2)
local mm=$(expr $rnd % 2 \* 3)"0"
# Keep CI-generated zones on hour boundaries. Some connector dependencies do not handle
# sub-hour JVM offsets consistently, and connector-specific timezone cases cover that behavior.
local hh=$(expr $RANDOM % 13)
local mm="00"
local sgn=$(expr $RANDOM % 2)
if [ $sgn -eq 0 ]
then
echo "GMT+$hh:$mm"
else
echo "GMT-$hh:$mm"
fi
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.types.Type;
Expand All @@ -61,6 +62,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -84,6 +86,8 @@ public class IcebergMetadataApplier implements MetadataApplier {

private static final Logger LOG = LoggerFactory.getLogger(IcebergMetadataApplier.class);

private static final int MAX_SCHEMA_UPDATE_RETRIES = 20;

private transient Catalog catalog;

private final Map<String, String> catalogOptions;
Expand Down Expand Up @@ -235,112 +239,127 @@ private void applyDefaultValues(
}

private void applyAddColumn(AddColumnEvent event) {
TableIdentifier tableIdentifier = TableIdentifier.parse(event.tableId().identifier());
try {
Table table = catalog.loadTable(tableIdentifier);
applyAddColumnEventWithPosition(table, event);
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
applySchemaUpdate(
TableIdentifier.parse(event.tableId().identifier()),
event,
table -> {
UpdateSchema updateSchema = table.updateSchema();
for (AddColumnEvent.ColumnWithPosition columnWithPosition :
event.getAddedColumns()) {
Column addColumn = columnWithPosition.getAddColumn();
String columnName = addColumn.getName();
String columnComment = addColumn.getComment();
Type icebergType =
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType());
Literal<?> defaultValue =
IcebergTypeUtils.parseDefaultValue(
addColumn.getDefaultValueExpression(), addColumn.getType());
if (defaultValue != null && getFormatVersion(table) >= 3) {
updateSchema.addColumn(
columnName, icebergType, columnComment, defaultValue);
updateSchema.updateColumnDefault(columnName, defaultValue);
} else {
updateSchema.addColumn(columnName, icebergType, columnComment);
}
switch (columnWithPosition.getPosition()) {
case FIRST:
updateSchema.moveFirst(columnName);
break;
case LAST:
break;
case BEFORE:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for BEFORE position");
updateSchema.moveBefore(
columnName, columnWithPosition.getExistedColumnName());
break;
case AFTER:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for AFTER position");
updateSchema.moveAfter(
columnName, columnWithPosition.getExistedColumnName());
break;
default:
throw new SchemaEvolveException(
event,
"Unknown column position: "
+ columnWithPosition.getPosition());
}
}
updateSchema.commit();
});
}

private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
throws SchemaEvolveException {

try {
UpdateSchema updateSchema = table.updateSchema();
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
Column addColumn = columnWithPosition.getAddColumn();
String columnName = addColumn.getName();
String columnComment = addColumn.getComment();
Type icebergType =
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType());
Literal<?> defaultValue =
IcebergTypeUtils.parseDefaultValue(
addColumn.getDefaultValueExpression(), addColumn.getType());
if (defaultValue != null && getFormatVersion(table) >= 3) {
updateSchema.addColumn(columnName, icebergType, columnComment, defaultValue);
updateSchema.updateColumnDefault(columnName, defaultValue);
} else {
updateSchema.addColumn(columnName, icebergType, columnComment);
}
switch (columnWithPosition.getPosition()) {
case FIRST:
updateSchema.moveFirst(columnName);
break;
case LAST:
break;
case BEFORE:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for BEFORE position");
updateSchema.moveBefore(
columnName, columnWithPosition.getExistedColumnName());
break;
case AFTER:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for AFTER position");
updateSchema.moveAfter(
columnName, columnWithPosition.getExistedColumnName());
break;
default:
throw new SchemaEvolveException(
event,
"Unknown column position: " + columnWithPosition.getPosition());
private void applySchemaUpdate(
TableIdentifier tableIdentifier, SchemaChangeEvent event, Consumer<Table> updater) {
for (int attempt = 0; attempt < MAX_SCHEMA_UPDATE_RETRIES; attempt++) {
try {
Table table = catalog.loadTable(tableIdentifier);
table.refresh();
updater.accept(table);
return;
} catch (CommitFailedException e) {
if (attempt == MAX_SCHEMA_UPDATE_RETRIES - 1) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
LOG.warn(
"Retrying schema change {} for table {} after optimistic commit conflict.",
event,
tableIdentifier,
e);
} catch (SchemaEvolveException e) {
throw e;
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
updateSchema.commit();
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
}

private void applyDropColumn(DropColumnEvent event) {
try {
UpdateSchema updateSchema =
catalog.loadTable(TableIdentifier.parse(event.tableId().identifier()))
.updateSchema();
event.getDroppedColumnNames().forEach(updateSchema::deleteColumn);
updateSchema.commit();
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
applySchemaUpdate(
TableIdentifier.parse(event.tableId().identifier()),
event,
table -> {
UpdateSchema updateSchema = table.updateSchema();
event.getDroppedColumnNames().forEach(updateSchema::deleteColumn);
updateSchema.commit();
});
}

private void applyRenameColumn(RenameColumnEvent event) {
try {
UpdateSchema updateSchema =
catalog.loadTable(TableIdentifier.parse(event.tableId().identifier()))
.updateSchema();
event.getNameMapping().forEach(updateSchema::renameColumn);
updateSchema.commit();
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
applySchemaUpdate(
TableIdentifier.parse(event.tableId().identifier()),
event,
table -> {
UpdateSchema updateSchema = table.updateSchema();
event.getNameMapping().forEach(updateSchema::renameColumn);
updateSchema.commit();
});
}

private void applyAlterColumnType(AlterColumnTypeEvent event) {
try {
UpdateSchema updateSchema =
catalog.loadTable(TableIdentifier.parse(event.tableId().identifier()))
.updateSchema();
event.getTypeMapping()
.forEach(
(name, newType) -> {
Type.PrimitiveType type =
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(newType)
.getLogicalType())
.asPrimitiveType();
updateSchema.updateColumn(name, type);
});
updateSchema.commit();
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
}
applySchemaUpdate(
TableIdentifier.parse(event.tableId().identifier()),
event,
table -> {
UpdateSchema updateSchema = table.updateSchema();
event.getTypeMapping()
.forEach(
(name, newType) -> {
Type.PrimitiveType type =
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(
newType)
.getLogicalType())
.asPrimitiveType();
updateSchema.updateColumn(name, type);
});
updateSchema.commit();
});
}

private PartitionSpec generatePartitionSpec(Schema schema, List<String> partitionColumns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
continue;
}

// Each batch is committed as its own snapshot, and an Iceberg snapshot operation
// captures its base metadata when it is created (not when it is committed). Since
// committing one batch advances the table version, the next batch must be built on
// the refreshed metadata; otherwise it is created on a stale base and tries to
// write
// a metadata version the previous batch already took, failing with
// CommitFailedException ("Version N already exists").
table.refresh();

SnapshotUpdate<?> operation;
if (deleteFiles.isEmpty()) {
AppendFiles append = table.newAppend();
Expand Down
Loading
Loading