Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions src/query/ee/src/hilbert_clustering/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,30 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
hilbert_min_bytes,
push_downs.as_ref().is_none_or(|v| v.filters.is_none()),
);
let mut recluster_segment_pruner = None;
'FOR: for chunk in segment_locations.chunks(chunk_size) {
// read segments.
if recluster_segment_pruner.is_none() {
recluster_segment_pruner = Some(FuseTable::create_recluster_segment_pruner(
&ctx,
fuse_table.schema_with_stream(),
fuse_table.get_operator(),
&push_downs,
)?);
}
let (pruning_ctx, segment_pruner, max_concurrency) = {
let (pruning_ctx, segment_pruner, max_concurrency) =
recluster_segment_pruner.as_ref().unwrap();
(
pruning_ctx.clone(),
segment_pruner.clone(),
*max_concurrency,
)
};
let compact_segments = FuseTable::segment_pruning(
&ctx,
fuse_table.schema_with_stream(),
fuse_table.get_operator(),
&push_downs,
pruning_ctx,
segment_pruner,
max_concurrency,
chunk.to_vec(),
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ pub static UNSET_TABLE_OPTIONS_WHITE_LIST: LazyLock<HashSet<&'static str>> = Laz
r.insert(FUSE_OPT_KEY_ROW_PER_BLOCK);
r.insert(FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD);
r.insert(FUSE_OPT_KEY_FILE_SIZE);
// Deprecated: no longer affects recluster, but old tables can still unset it.
r.insert(FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD);
r.insert(FUSE_OPT_KEY_RECLUSTER_DEPTH);
r.insert(FUSE_OPT_KEY_FILE_SIZE);
Expand Down
23 changes: 18 additions & 5 deletions src/query/service/src/interpreters/interpreter_table_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use databend_common_sql::plans::plan_hilbert_sql;
use databend_common_sql::plans::replace_with_constant;
use databend_common_sql::plans::set_update_stream_columns;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::operations::ReclusterFinalCarry;
use databend_common_storages_fuse::operations::ReclusterMode;
use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler;
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
Expand Down Expand Up @@ -130,6 +131,9 @@ impl Interpreter for ReclusterTableInterpreter {
let mut times = 0;
let mut push_downs = None;
let mut hilbert_info = None;
// Linear FINAL carry is scoped to this fixed-scan statement loop.
// A new FINAL statement starts from the table head again.
let mut linear_final_carry = ReclusterFinalCarry::default();
let start = SystemTime::now();
let timeout = Duration::from_secs(recluster_timeout_secs);
let is_final = self.plan.is_final;
Expand All @@ -143,7 +147,7 @@ impl Interpreter for ReclusterTableInterpreter {
}

let res = self
.execute_recluster(&mut push_downs, &mut hilbert_info)
.execute_recluster(&mut push_downs, &mut hilbert_info, &mut linear_final_carry)
Comment thread
zhyass marked this conversation as resolved.
.await;

match res {
Expand All @@ -166,6 +170,9 @@ impl Interpreter for ReclusterTableInterpreter {
| ErrorCode::UNRESOLVABLE_CONFLICT
)
{
// Keep FINAL carry across retryable conflicts. FINAL is
// a bounded fixed scan and does not restart from table
// head to chase concurrent snapshot drift.
warn!(
"recluster: final loop retry reason=retryable_conflict round={} code={} error={:?}",
times + 1,
Expand Down Expand Up @@ -217,6 +224,7 @@ impl ReclusterTableInterpreter {
&self,
push_downs: &mut Option<PushDownInfo>,
hilbert_info: &mut Option<HilbertBuildInfo>,
linear_final_carry: &mut ReclusterFinalCarry,
) -> Result<bool> {
self.ctx.clear_table_meta_timestamps_cache();
let start = SystemTime::now();
Expand Down Expand Up @@ -253,7 +261,10 @@ impl ReclusterTableInterpreter {
self.build_hilbert_plan(&tbl, push_downs, hilbert_info)
.await?
}
ClusterType::Linear => self.build_linear_plan(&tbl, push_downs, *limit).await?,
ClusterType::Linear => {
self.build_linear_plan(tbl.as_ref(), push_downs, *limit, linear_final_carry)
.await?
}
};
let Some(mut physical_plan) = physical_plan else {
return Ok(true);
Expand Down Expand Up @@ -519,11 +530,12 @@ impl ReclusterTableInterpreter {

async fn build_linear_plan(
&self,
tbl: &Arc<dyn Table>,
tbl: &dyn Table,
push_downs: &mut Option<PushDownInfo>,
limit: Option<usize>,
linear_final_carry: &mut ReclusterFinalCarry,
) -> Result<Option<PhysicalPlan>> {
let fuse_table = FuseTable::try_from_table(tbl.as_ref())?;
let fuse_table = FuseTable::try_from_table(tbl)?;
let Some((parts, snapshot)) = fuse_table
.do_recluster(
self.ctx.clone(),
Expand All @@ -534,6 +546,7 @@ impl ReclusterTableInterpreter {
} else {
ReclusterMode::Normal
},
linear_final_carry,
)
.await?
else {
Expand All @@ -544,7 +557,7 @@ impl ReclusterTableInterpreter {
}
let table_meta_timestamps = self
.ctx
.get_table_meta_timestamps(tbl.as_ref(), Some(snapshot.clone()))?;
.get_table_meta_timestamps(tbl, Some(snapshot.clone()))?;

let table_info = tbl.get_table_info().clone();
let is_distributed = parts.is_distributed(self.ctx.clone());
Expand Down
Loading
Loading