Skip to content

Commit 7464365

Browse files
leonardBangclaude
andcommitted
fix unstable test
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent cff6604 commit 7464365

2 files changed

Lines changed: 93 additions & 30 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.flink.cdc.connectors.oceanbase;
1919

2020
import org.apache.flink.api.common.JobID;
21+
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
22+
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
2123
import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
2224
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
2325
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -30,6 +32,7 @@
3032
import org.apache.flink.util.CloseableIterator;
3133
import org.apache.flink.util.StringUtils;
3234

35+
import io.debezium.connector.mysql.MySqlConnection;
3336
import io.debezium.jdbc.JdbcConnection;
3437
import org.assertj.core.api.Assertions;
3538
import org.junit.jupiter.api.AfterEach;
@@ -385,6 +388,12 @@ private void testMySqlParallelSource(
385388
e.getKey(), e.getValue()))
386389
.collect(Collectors.joining(",")));
387390
tEnv.executeSql(sourceDDL);
391+
if (!DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
392+
// In latest-offset mode the job must not resolve its start offset before the rows
393+
// written during setup() are materialized by the OceanBase binlog service,
394+
// otherwise they are read back as +I events and break the assertions.
395+
waitForBinlogServiceCaughtUp();
396+
}
388397
TableResult tableResult = tEnv.executeSql("select * from customers");
389398

390399
// first step: check the snapshot data
@@ -558,6 +567,47 @@ private void makeSecondPartBinlogEvents(JdbcConnection connection, String tableI
558567
}
559568
}
560569

570+
/**
571+
* The OceanBase binlog service materializes committed transactions asynchronously. In
572+
* latest-offset mode the source must not resolve its start offset before the rows written
573+
* during {@link #setup()} are materialized, otherwise they are read back as +I events and break
574+
* the assertions. This writes a marker into a non-captured table and waits until the binlog
575+
* offset moves past it and stops advancing, which guarantees all earlier writes are visible.
576+
*/
577+
private void waitForBinlogServiceCaughtUp() throws Exception {
578+
String markerTable = testDatabase + ".binlog_sync_marker";
579+
MySqlConnection connection = getConnection();
580+
try {
581+
BinlogOffset before = DebeziumUtils.currentBinlogOffset(connection);
582+
connection.setAutoCommit(false);
583+
connection.execute("CREATE TABLE IF NOT EXISTS " + markerTable + " (id INT)");
584+
connection.execute("INSERT INTO " + markerTable + " VALUES (1)");
585+
connection.commit();
586+
587+
long deadline = System.currentTimeMillis() + 60_000L;
588+
BinlogOffset previous = null;
589+
int stableTimes = 0;
590+
while (System.currentTimeMillis() < deadline) {
591+
Thread.sleep(500L);
592+
BinlogOffset current = DebeziumUtils.currentBinlogOffset(connection);
593+
if (previous != null
594+
&& current.isAfter(before)
595+
&& current.compareTo(previous) == 0) {
596+
if (++stableTimes >= 2) {
597+
return;
598+
}
599+
} else {
600+
stableTimes = 0;
601+
}
602+
previous = current;
603+
}
604+
throw new IllegalStateException(
605+
"OceanBase binlog service did not catch up with setup writes in time.");
606+
} finally {
607+
connection.close();
608+
}
609+
}
610+
561611
private void sleepMs(long millis) {
562612
try {
563613
Thread.sleep(millis);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -311,30 +311,34 @@ void testSchemaChangeUpdatesSnapshotState() throws Exception {
311311
"INSERT INTO customer.\"Customers\" VALUES (3002, 'after_ddl', 'Shanghai', '222', 'after@test.com')");
312312
}
313313

314-
// Wait for the schema change event to be processed
315-
Thread.sleep(1000L);
316-
317-
// Poll records so the emitter processes all events
314+
// Poll until the reader captures both stream records in order.
318315
final SimpleReaderOutput output = new SimpleReaderOutput();
319-
for (int i = 0; i < 10; i++) {
320-
reader.pollNext(output);
321-
}
322-
323-
// Verify the emitted records contain data before and after DDL in correct order
324-
List<SourceRecord> results = output.getResults();
325316
int beforeDdlPos = -1;
326317
int afterDdlPos = -1;
327-
for (int i = 0; i < results.size(); i++) {
328-
SourceRecord record = results.get(i);
329-
if (record.value() != null) {
330-
String value = record.value().toString();
331-
if (value.contains("before_ddl")) {
332-
beforeDdlPos = i;
333-
} else if (value.contains("after_ddl")) {
334-
afterDdlPos = i;
318+
long recordDeadline = System.currentTimeMillis() + 10_000L;
319+
while (System.currentTimeMillis() < recordDeadline) {
320+
reader.pollNext(output);
321+
322+
List<SourceRecord> results = output.getResults();
323+
beforeDdlPos = -1;
324+
afterDdlPos = -1;
325+
for (int i = 0; i < results.size(); i++) {
326+
SourceRecord record = results.get(i);
327+
if (record.value() != null) {
328+
String value = record.value().toString();
329+
if (value.contains("before_ddl")) {
330+
beforeDdlPos = i;
331+
} else if (value.contains("after_ddl")) {
332+
afterDdlPos = i;
333+
}
335334
}
336335
}
336+
if (beforeDdlPos >= 0 && afterDdlPos >= 0 && beforeDdlPos < afterDdlPos) {
337+
break;
338+
}
339+
Thread.sleep(100L);
337340
}
341+
338342
assertThat(beforeDdlPos)
339343
.as("Should capture the INSERT before DDL")
340344
.isGreaterThanOrEqualTo(0);
@@ -343,22 +347,31 @@ void testSchemaChangeUpdatesSnapshotState() throws Exception {
343347
.as("INSERT before DDL should appear before INSERT after DDL")
344348
.isLessThan(afterDdlPos);
345349

346-
// Verify that snapshotState returns splits with updated table schema
347-
List<SourceSplitBase> splits = reader.snapshotState(1L);
348-
assertThat(splits).isNotEmpty();
349-
350+
// Verify that snapshotState returns splits with updated table schema.
351+
List<SourceSplitBase> splits = Collections.emptyList();
350352
boolean foundUpdatedSchema = false;
351-
for (SourceSplitBase split : splits) {
352-
if (split.isStreamSplit()) {
353-
Map<TableId, TableChanges.TableChange> schemas =
354-
split.asStreamSplit().getTableSchemas();
355-
if (schemas.containsKey(tableId)
356-
&& schemas.get(tableId).getTable().columnWithName("email") != null) {
357-
foundUpdatedSchema = true;
358-
break;
353+
long schemaDeadline = System.currentTimeMillis() + 10_000L;
354+
while (System.currentTimeMillis() < schemaDeadline) {
355+
splits = reader.snapshotState(1L);
356+
foundUpdatedSchema = false;
357+
for (SourceSplitBase split : splits) {
358+
if (split.isStreamSplit()) {
359+
Map<TableId, TableChanges.TableChange> schemas =
360+
split.asStreamSplit().getTableSchemas();
361+
if (schemas.containsKey(tableId)
362+
&& schemas.get(tableId).getTable().columnWithName("email") != null) {
363+
foundUpdatedSchema = true;
364+
break;
365+
}
359366
}
360367
}
368+
if (foundUpdatedSchema) {
369+
break;
370+
}
371+
Thread.sleep(100L);
361372
}
373+
374+
assertThat(splits).isNotEmpty();
362375
assertThat(foundUpdatedSchema)
363376
.as("The snapshotState should contain the updated table schema with 'email' column")
364377
.isTrue();

0 commit comments

Comments
 (0)