@@ -36,6 +36,34 @@ suite("test_time_series_compaction_polciy", "p0") {
3636 return rowsetCount
3737 }
3838
39+ // Manually POST cumulative compaction to every tablet. Fire-and-forget;
40+ // per-tablet errors (e.g. CUMULATIVE_NO_SUITABLE_VERSION on a tablet with
41+ // nothing to merge) are expected and surface via the polling assertion.
42+ def trigger_cumulative_all = { tabletsList ->
43+ for (def tablet in tabletsList) {
44+ def be_host = backendId_to_backendIP[" ${ tablet.BackendId} " ]
45+ def be_port = backendId_to_backendHttpPort[" ${ tablet.BackendId} " ]
46+ curl(" POST" , " http://${ be_host} :${ be_port} /api/compaction/run?tablet_id=${ tablet.TabletId} &compact_type=cumulative" )
47+ }
48+ }
49+
50+ // Poll get_rowset_count until count <= target or timeout. Returns the last
51+ // observed count. Avoids the run_status race in trigger_and_wait_compaction
52+ // for time_series mode (BE may have queued the task but not yet acquired
53+ // the cumulative lock when first polled).
54+ def wait_rowset_count_le = { tabletsList , target , timeoutSec ->
55+ long deadline = System . currentTimeMillis() + timeoutSec * 1000
56+ int last = -1
57+ while (System . currentTimeMillis() < deadline) {
58+ last = get_rowset_count. call(tabletsList)
59+ if (last <= target) {
60+ return last
61+ }
62+ Thread . sleep(1000 )
63+ }
64+ return last
65+ }
66+
3967 sql """ DROP TABLE IF EXISTS ${ tableName} ; """
4068 sql """
4169 CREATE TABLE ${ tableName} (
@@ -133,21 +161,19 @@ suite("test_time_series_compaction_polciy", "p0") {
133161 assert (rowsetCount == 34 * replicaNum)
134162
135163 // trigger cumulative compactions for all tablets in table
136- trigger_and_wait_compaction(tableName, " cumulative" )
137-
138164 // after cumulative compaction, there is only 26 rowset.
139165 // 5 consecutive empty versions are merged into one empty version
140166 // 34 - 2*4 = 26
141- rowsetCount = get_rowset_count. call(tablets);
142- assert (rowsetCount == 26 * replicaNum)
167+ trigger_cumulative_all. call(tablets)
168+ rowsetCount = wait_rowset_count_le. call(tablets, 26 * replicaNum, 60 )
169+ assert (rowsetCount == 26 * replicaNum) : " expected ${ 26 * replicaNum} rowsets, got ${ rowsetCount} "
143170
144171 // trigger cumulative compactions for all tablets in ${tableName}
145- trigger_and_wait_compaction(tableName, " cumulative" )
146-
147172 // after cumulative compaction, there is only 22 rowset.
148173 // 26 - 4 = 22
149- rowsetCount = get_rowset_count. call(tablets);
150- assert (rowsetCount == 22 * replicaNum)
174+ trigger_cumulative_all. call(tablets)
175+ rowsetCount = wait_rowset_count_le. call(tablets, 22 * replicaNum, 60 )
176+ assert (rowsetCount == 22 * replicaNum) : " expected ${ 22 * replicaNum} rowsets, got ${ rowsetCount} "
151177
152178 qt_sql_2 """ select count() from ${ tableName} """
153179 if (isCloudMode()) {
@@ -156,11 +182,10 @@ suite("test_time_series_compaction_polciy", "p0") {
156182 sql """ alter table ${ tableName} set ("time_series_compaction_file_count_threshold"="10")"""
157183 sql """ sync"""
158184 // trigger cumulative compactions for all tablets in ${tableName}
159- trigger_and_wait_compaction(tableName, " cumulative" )
160-
161185 // after cumulative compaction, there is only 11 rowset.
162- rowsetCount = get_rowset_count. call(tablets);
163- assert (rowsetCount == 11 * replicaNum)
186+ trigger_cumulative_all. call(tablets)
187+ rowsetCount = wait_rowset_count_le. call(tablets, 11 * replicaNum, 60 )
188+ assert (rowsetCount == 11 * replicaNum) : " expected ${ 11 * replicaNum} rowsets, got ${ rowsetCount} "
164189 qt_sql_3 """ select count() from ${ tableName} """
165190
166191 sql """ DROP TABLE IF EXISTS ${ tableName} ; """
0 commit comments