Skip to content

Commit a8221c7

Browse files
committed
fix
1 parent 679f00d commit a8221c7

2 files changed

Lines changed: 9 additions & 23 deletions

File tree

src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use databend_common_storages_fuse::io::TableMetaLocationGenerator;
3434
use databend_common_storages_fuse::operations::ReclusterMode;
3535
use databend_common_storages_fuse::operations::ReclusterMutator;
3636
use databend_common_storages_fuse::pruning::create_segment_location_vector;
37-
use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut;
3837
use databend_common_storages_fuse::statistics::reducers::reduce_block_metas;
3938
use databend_query::sessions::TableContext;
4039
use databend_query::sessions::TableContextSettings;
@@ -44,7 +43,6 @@ use databend_storages_common_table_meta::meta;
4443
use databend_storages_common_table_meta::meta::BlockMeta;
4544
use databend_storages_common_table_meta::meta::ClusterStatistics;
4645
use databend_storages_common_table_meta::meta::SegmentInfo;
47-
use databend_storages_common_table_meta::meta::Statistics;
4846
use databend_storages_common_table_meta::meta::Versioned;
4947
use rand::Rng;
5048
use rand::thread_rng;
@@ -384,7 +382,7 @@ async fn test_recluster_mutator_limits_removed_segments_to_selected_blocks() ->
384382
}
385383

386384
#[tokio::test(flavor = "multi_thread")]
387-
async fn test_recluster_mutator_block_select() -> anyhow::Result<()> {
385+
async fn test_recluster_mutator_compacts_small_overlapping_blocks() -> anyhow::Result<()> {
388386
let fixture = TestFixture::setup().await?;
389387
let ctx = fixture.new_query_ctx().await?;
390388
let location_generator = TableMetaLocationGenerator::new("_prefix".to_owned());
@@ -428,7 +426,8 @@ async fn test_recluster_mutator_block_select() -> anyhow::Result<()> {
428426
}
429427

430428
#[tokio::test(flavor = "multi_thread")]
431-
async fn test_recluster_mutator_selects_multiple_segment_windows() -> anyhow::Result<()> {
429+
async fn test_recluster_mutator_select_segments_covers_candidates_and_merges_tail()
430+
-> anyhow::Result<()> {
432431
let fixture = TestFixture::setup().await?;
433432
let ctx = fixture.new_query_ctx().await?;
434433
ctx.get_settings().set_max_threads(2)?;
@@ -492,6 +491,7 @@ async fn test_recluster_mutator_selects_multiple_segment_windows() -> anyhow::Re
492491
})
493492
.collect::<Vec<_>>();
494493
// Windows are segment-disjoint and together cover every candidate segment.
494+
assert_eq!(window_index_sets.len(), 2);
495495
let mut covered = HashSet::new();
496496
let mut total = 0;
497497
for window in &window_index_sets {
@@ -928,12 +928,7 @@ async fn test_final_recluster_groups_mature_levels_in_two_level_bands() -> anyho
928928
assert_eq!(block_num, 0);
929929
assert!(parts.tasks.is_empty());
930930

931-
Ok(())
932-
}
933-
934-
#[tokio::test(flavor = "multi_thread")]
935-
async fn test_final_recluster_skips_high_level_two_block_batch() -> anyhow::Result<()> {
936-
let thresholds = BlockThresholds::new(1000, 100, 100, 10);
931+
// Two high-level blocks alone are not enough to produce useful FINAL work.
937932
let (block_num, parts) =
938933
target_select_segments_by_level_with_mode(&[(2, 2)], thresholds, 1, ReclusterMode::Final)
939934
.await?;
@@ -998,18 +993,6 @@ async fn test_safety_for_recluster() -> anyhow::Result<()> {
998993

999994
eprintln!("data ready");
1000995

1001-
let mut summary = Statistics::default();
1002-
for seg in &segment_infos {
1003-
merge_statistics_mut(&mut summary, &seg.summary, Some(cluster_key_id));
1004-
}
1005-
1006-
let mut block_ids = HashSet::new();
1007-
for seg in &segment_infos {
1008-
for b in &seg.blocks {
1009-
block_ids.insert(b.location.clone());
1010-
}
1011-
}
1012-
1013996
let ctx: Arc<dyn TableContext> = ctx.clone();
1014997
let segment_locations = create_segment_location_vector(locations.clone(), None);
1015998
let compact_segments = FuseTable::segment_pruning(

src/query/storages/fuse/src/operations/recluster.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,10 @@ impl FuseTable {
170170
// scope stays bounded instead of growing across the whole chunk.
171171
let produced_tasks = !recluster_parts.tasks.is_empty();
172172
merge_recluster_parts(&mut parts, recluster_parts, mutator.cluster_key_id);
173-
if !produced_tasks {
173+
// `LIMIT` bounds the total segments rewritten per invocation, so
174+
// stop after the first productive window instead of accumulating
175+
// more across windows.
176+
if !produced_tasks || limit.is_some() {
174177
break;
175178
}
176179
}

0 commit comments

Comments
 (0)