Skip to content

Commit 8b7a5a5

Browse files
leonardBangclaude
andcommitted
[test][pipeline-e2e] Tolerate transient regressed reads in MySqlToHudiE2eITCase
Hudi MERGE_ON_READ snapshot reads can momentarily expose an empty or partial file slice during compaction, making validateSinkResult judge a transient empty result and report a misleading Actual:[]. Keep the best observed read and skip regressed reads so the final assertion never lands on a transient empty slice. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 7464365 commit 8b7a5a5

1 file changed

Lines changed: 22 additions & 1 deletion

File tree

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,9 +491,30 @@ private void validateSinkResult(
491491
LOG.info("Verifying Hudi {}::{}::{} results...", warehouse, database, table);
492492
long deadline = System.currentTimeMillis() + HUDI_TESTCASE_TIMEOUT.toMillis();
493493
List<String> results = Collections.emptyList();
494+
int maxObservedSize = -1;
494495
while (System.currentTimeMillis() < deadline) {
495496
try {
496-
results = fetchHudiTableRows(warehouse, database, table);
497+
List<String> fetched = fetchHudiTableRows(warehouse, database, table);
498+
499+
// Hudi MERGE_ON_READ tables can momentarily expose an empty or partial file
500+
// slice while a compaction swaps slices, so a snapshot read may regress to fewer
501+
// rows (sometimes 0) even though no data was actually lost. The row count in this
502+
// test only grows, so treat a regressed read as a transient inconsistent state:
503+
// skip judging it this round and re-read on the next loop, keeping the best
504+
// (largest) read so the final assertion never lands on a transient empty result.
505+
if (maxObservedSize > 0 && fetched.size() < maxObservedSize) {
506+
LOG.warn(
507+
"Ignoring transient regressed read from Hudi MOR table: got {} rows, "
508+
+ "previously saw {} rows (likely a compaction file-slice swap). "
509+
+ "Waiting for the next loop...",
510+
fetched.size(),
511+
maxObservedSize);
512+
Thread.sleep(10000L);
513+
continue;
514+
}
515+
516+
results = fetched;
517+
maxObservedSize = fetched.size();
497518
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
498519
LOG.info(
499520
"Successfully verified {} records in {} seconds for {}::{}.",

0 commit comments

Comments
 (0)