Skip to content

Commit 71b815d

Browse files
authored
[spark] Stabilize row tracking concurrent tests (#8307)
Row tracking concurrent tests could fail on transient commit conflicts and snapshot reads while merge and compaction run in parallel. The previous retry logic was duplicated and, in one path, retried without a bound. This PR centralizes the retry helper for these concurrent tests, adds a bounded retry with a small backoff, preserves the final exception on failure, and narrows snapshot missing detection to require `Snapshot file` and `does not exist` in the same throwable message.
1 parent 17bac20 commit 71b815d

1 file changed

Lines changed: 76 additions & 61 deletions

File tree

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala

Lines changed: 76 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,43 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase with AdaptiveSpar
4444

4545
import testImplicits._
4646

47+
private val MaxRetryAttempts = 20
48+
private val RetryIntervalMillis = 10L
49+
50+
private def doWithRetry(doAction: () => Unit, retryableMessages: String*): Unit = {
51+
var attempts = 0
52+
while (true) {
53+
try {
54+
doAction.apply()
55+
return
56+
} catch {
57+
case e: Exception =>
58+
attempts += 1
59+
if (!isRetryable(e, retryableMessages) || attempts >= MaxRetryAttempts) {
60+
throw e
61+
}
62+
Thread.sleep(RetryIntervalMillis)
63+
}
64+
}
65+
}
66+
67+
private def isRetryable(e: Throwable, retryableMessages: Seq[String]): Boolean = {
68+
hasMessage(e, "Snapshot file", "does not exist") ||
69+
retryableMessages.exists(message => hasMessage(e, message))
70+
}
71+
72+
private def hasMessage(e: Throwable, fragments: String*): Boolean = {
73+
var current = e
74+
while (current != null) {
75+
val message = current.getMessage
76+
if (message != null && fragments.forall(message.contains)) {
77+
return true
78+
}
79+
current = current.getCause
80+
}
81+
false
82+
}
83+
4784
test("Data Evolution: concurrent merge and compact") {
4885
withTable("s", "t") {
4986
sql(s"""
@@ -104,27 +141,16 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase with AdaptiveSpar
104141
Seq((1, 1, 1)).toDF("id", "b", "c").createOrReplaceTempView("s")
105142

106143
def doMerge(): Unit = {
107-
var success = false
108-
while (!success) {
109-
try {
110-
sql(s"""
111-
|MERGE INTO t
112-
|USING s
113-
|ON t.id = s.id
114-
|WHEN MATCHED THEN
115-
|UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
116-
|""".stripMargin).collect()
117-
success = true
118-
} catch {
119-
case e: Exception =>
120-
if (
121-
!e.getMessage.contains(
122-
"multiple 'MERGE INTO' operations have encountered conflicts")
123-
) {
124-
throw e
125-
}
126-
}
127-
}
144+
doWithRetry(
145+
() => sql(s"""
146+
|MERGE INTO t
147+
|USING s
148+
|ON t.id = s.id
149+
|WHEN MATCHED THEN
150+
|UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
151+
|""".stripMargin).collect(),
152+
"multiple 'MERGE INTO' operations have encountered conflicts"
153+
)
128154
}
129155

130156
val mergeInto1 = Future {
@@ -159,25 +185,25 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase with AdaptiveSpar
159185

160186
val mergeB = Future {
161187
for (_ <- 1 to 10) {
162-
sql(s"""
163-
|MERGE INTO t
164-
|USING sb
165-
|ON t.id = sb.id
166-
|WHEN MATCHED THEN
167-
|UPDATE SET t.b = sb.b + t.b
168-
|""".stripMargin).collect()
188+
doWithRetry(() => sql(s"""
189+
|MERGE INTO t
190+
|USING sb
191+
|ON t.id = sb.id
192+
|WHEN MATCHED THEN
193+
|UPDATE SET t.b = sb.b + t.b
194+
|""".stripMargin).collect())
169195
}
170196
}
171197

172198
val mergeC = Future {
173199
for (_ <- 1 to 10) {
174-
sql(s"""
175-
|MERGE INTO t
176-
|USING sc
177-
|ON t.id = sc.id
178-
|WHEN MATCHED THEN
179-
|UPDATE SET t.c = sc.c + t.c
180-
|""".stripMargin).collect()
200+
doWithRetry(() => sql(s"""
201+
|MERGE INTO t
202+
|USING sc
203+
|ON t.id = sc.id
204+
|WHEN MATCHED THEN
205+
|UPDATE SET t.c = sc.c + t.c
206+
|""".stripMargin).collect())
181207
}
182208
}
183209

@@ -199,33 +225,19 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase with AdaptiveSpar
199225
sql("INSERT INTO t VALUES (1, 0, 0)")
200226
Seq((1, 1, 1)).toDF("id", "b", "c").createOrReplaceTempView("s")
201227

202-
def doWithRetry(doAction: () => Unit): Unit = {
203-
var success = false
204-
while (!success) {
205-
try {
206-
doAction.apply()
207-
success = true
208-
} catch {
209-
case e: Exception =>
210-
if (
211-
!e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' operations")
212-
&& !e.getMessage.contains("Row ID existence conflict")
213-
) {
214-
throw e
215-
}
216-
}
217-
}
218-
}
219-
220228
val mergeInto = Future {
221229
for (i <- 1 to 10) {
222-
doWithRetry(() => sql(s"""
223-
|MERGE INTO t
224-
|USING s
225-
|ON t.id = s.id
226-
|WHEN MATCHED THEN
227-
|UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
228-
|""".stripMargin).collect())
230+
doWithRetry(
231+
() => sql(s"""
232+
|MERGE INTO t
233+
|USING s
234+
|ON t.id = s.id
235+
|WHEN MATCHED THEN
236+
|UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
237+
|""".stripMargin).collect(),
238+
"multiple 'MERGE INTO' and 'COMPACT' operations",
239+
"Row ID existence conflict"
240+
)
229241
if (i > 1) {
230242
sql(s"INSERT INTO t VALUES ($i, $i, $i)")
231243
}
@@ -244,7 +256,10 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase with AdaptiveSpar
244256
while (!canBeCompacted) {
245257
Thread.sleep(1)
246258
}
247-
doWithRetry(() => sql("CALL sys.compact(table => 't')"))
259+
doWithRetry(
260+
() => sql("CALL sys.compact(table => 't')"),
261+
"multiple 'MERGE INTO' and 'COMPACT' operations",
262+
"Row ID existence conflict")
248263
}
249264
}
250265

0 commit comments

Comments
 (0)