Skip to content

Commit f1b0ea4

Browse files
authored
perf(cubestore): faster repartition (prefetch, per-partition merge, range jobs) (#11088)
1 parent 5d82db1 commit f1b0ea4

9 files changed

Lines changed: 1745 additions & 123 deletions

File tree

rust/cubestore/cubestore/src/cluster/ingestion/job_processor.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::config::injection::DIService;
2-
use crate::config::{Config, ConfigObj};
2+
use crate::config::{Config, ConfigObj, RepartitionStrategy};
33
use crate::import::ImportService;
44
use crate::metastore::job::{Job, JobType};
55
use crate::metastore::table::Table;
@@ -222,17 +222,14 @@ impl JobIsolatedProcessor {
222222
}
223223
let data_loaded_size = DataLoadedSize::new();
224224
app_metrics::JOBS_REPARTITION_CHUNK.add(1);
225-
// FIXME: a RepartitionChunk job whose chunk_id is used as an
226-
// anchor for the whole partition (batch_repartition_enabled).
227-
// We deliberately overload the existing RepartitionChunk type
228-
// instead of introducing a dedicated per-partition JobType:
229-
// a new JobType variant cannot be deserialized by an older
230-
// binary, which would make the whole job shard unreadable when
231-
// a deployment switches between the `latest` and `release`
232-
// channels (version can move both ways at any time). Reusing a
233-
// known type keeps both directions safe — an old binary just
234-
// repartitions the single anchor chunk. See repartition_partition_chunks.
235-
let r = if self.config_obj.batch_repartition_enabled() {
225+
// PerPartition reuses the RepartitionChunk job with the smallest chunk
226+
// as the anchor (one job per partition) and merges all its chunks;
227+
// PerChunk gets one job per chunk and repartitions just that chunk. We
228+
// overload the existing RepartitionChunk type (no new JobType) so an
229+
// older binary stays able to deserialize it across channel switches.
230+
let r = if self.config_obj.repartition_strategy()
231+
== RepartitionStrategy::PerPartition
232+
{
236233
let partition_id = chunk.get_row().get_partition_id();
237234
let time_budget = Duration::from_secs(
238235
self.config_obj.repartition_chunks_time_budget_secs(),
@@ -260,6 +257,30 @@ impl JobIsolatedProcessor {
260257
Self::fail_job_row_key(job)
261258
}
262259
}
260+
JobType::RepartitionRange(end_chunk_id) => {
261+
if let RowKey::Table(TableId::Chunks, start_chunk_id) = job.row_reference() {
262+
let start_chunk_id = *start_chunk_id;
263+
let end_chunk_id = *end_chunk_id;
264+
let data_loaded_size = DataLoadedSize::new();
265+
app_metrics::JOBS_REPARTITION_CHUNK.add(1);
266+
let r = self
267+
.chunk_store
268+
.repartition_chunk_range(
269+
start_chunk_id,
270+
end_chunk_id,
271+
data_loaded_size.clone(),
272+
)
273+
.await;
274+
if let Err(e) = r {
275+
app_metrics::JOBS_REPARTITION_CHUNK_FAILURES.add(1);
276+
return Err(e);
277+
}
278+
app_metrics::JOBS_REPARTITION_CHUNK_COMPLETED.add(1);
279+
Ok(JobProcessResult::new(data_loaded_size.get()))
280+
} else {
281+
Self::fail_job_row_key(job)
282+
}
283+
}
263284
_ => Err(CubeError::internal(format!(
264285
"Job {:?} cannot be processed in separate process",
265286
job.job_type()

rust/cubestore/cubestore/src/cluster/ingestion/job_runner.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,47 @@ impl JobRunner {
397397
Self::fail_job_row_key(job)
398398
}
399399
}
400+
JobType::RepartitionRange(_) => {
401+
if let RowKey::Table(TableId::Chunks, start_chunk_id) = job.row_reference() {
402+
let start_chunk_id = *start_chunk_id;
403+
let process_rate_limiter = self.process_rate_limiter.clone();
404+
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
405+
let metastore = self.meta_store.clone();
406+
let job_to_move = job.clone();
407+
let job_processor = self.job_processor.clone();
408+
Ok(cube_ext::spawn(async move {
409+
let wait_ms = process_rate_limiter
410+
.wait_for_allow(TaskType::Job, timeout)
411+
.await?;
412+
let chunk = metastore.get_chunk(start_chunk_id).await?;
413+
let (_, _, table, _) = metastore
414+
.get_partition_for_compaction(chunk.get_row().get_partition_id())
415+
.await?;
416+
let table_id = table.get_id();
417+
let trace_obj = metastore.get_trace_obj_by_table_id(table_id).await?;
418+
let trace_index = TraceIndex {
419+
table_id: Some(table_id),
420+
trace_obj,
421+
};
422+
match job_processor.process_job(job_to_move).await {
423+
Ok(job_res) => {
424+
process_rate_limiter
425+
.commit_task_usage(
426+
TaskType::Job,
427+
job_res.data_loaded_size() as i64,
428+
wait_ms,
429+
trace_index,
430+
)
431+
.await;
432+
Ok(())
433+
}
434+
Err(e) => Err(e),
435+
}
436+
}))
437+
} else {
438+
Self::fail_job_row_key(job)
439+
}
440+
}
400441
// Defense-in-depth: start_processing_job never selects an Unknown job, so
401442
// this arm is not a live path — it just guarantees we never panic on one.
402443
JobType::Unknown => Err(CubeError::internal(format!(

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::cluster::transport::{ClusterTransport, MetaStoreTransport, WorkerConn
2626
use crate::config::injection::{DIService, Injector};
2727
use crate::config::is_router;
2828
#[allow(unused_imports)]
29-
use crate::config::{Config, ConfigObj};
29+
use crate::config::{Config, ConfigObj, RepartitionStrategy};
3030
use crate::metastore::chunks::chunk_file_name;
3131
use crate::metastore::job::{Job, JobRunnerPool, JobStatus, JobType};
3232
use crate::metastore::{
@@ -945,17 +945,71 @@ impl Cluster for ClusterImpl {
945945
.filter(|c| !c.get_row().in_memory())
946946
.collect::<Vec<_>>();
947947

948-
if self.config_obj.batch_repartition_enabled() {
949-
// FIXME: one job per partition that batches all persisted chunks, but
950-
// keyed on a chunk (RepartitionChunk), not the partition. We reuse the
951-
// existing job type instead of a dedicated per-partition JobType so an
952-
// older binary stays able to deserialize it across `latest`/`release`
953-
// channel switches (a new variant would make its whole job shard
954-
// unreadable). The anchor is the smallest persisted chunk id so add_job
955-
// dedups to a single job per partition; the worker resolves the
956-
// partition from it and processes the anchor last (see
957-
// repartition_partition_chunks). An old binary just repartitions this
958-
// one chunk and drains the rest via its own per-chunk path.
948+
if self.config_obj.repartition_strategy() == RepartitionStrategy::Range {
949+
// Slice the parent's persisted chunks into RepartitionRange jobs. Walk ALL
950+
// chunks (active and inactive) sorted by id so the [start, end] boundaries
951+
// stay pinned to chunk ids and don't shift when chunks deactivate; cut a
952+
// range once its rows reach max_rows or its chunk count reaches the fan-in
953+
// cap, so a range never merges an unbounded number of chunks at once. A
954+
// range is only scheduled when it still has an active chunk; the end is
955+
// carried as the job's data, not its dedup key, so a tail that extends the
956+
// trailing range dedups on the start.
957+
// Clamp to >= 1 so a misconfigured 0 cap doesn't break the inner loop before
958+
// adding any chunk. Even 1 degrades to one chunk per range (no merge gain);
959+
// the sane range is >= 2.
960+
let max_rows = self.config_obj.repartition_merge_max_rows().max(1);
961+
let max_files = self.config_obj.repartition_merge_max_input_files().max(1);
962+
let mut all = self
963+
.meta_store
964+
.get_chunks_by_partition(p.get_id(), true)
965+
.await?
966+
.into_iter()
967+
.filter(|c| !c.get_row().in_memory())
968+
.collect::<Vec<_>>();
969+
all.sort_by_key(|c| c.get_id());
970+
971+
let mut i = 0;
972+
while i < all.len() {
973+
let start = all[i].get_id();
974+
let mut rows = 0u64;
975+
let mut count = 0usize;
976+
let mut end = start;
977+
let mut has_active = false;
978+
while i < all.len() {
979+
let c = &all[i];
980+
rows += c.get_row().get_row_count();
981+
count += 1;
982+
end = c.get_id();
983+
has_active |= c.get_row().active();
984+
i += 1;
985+
if rows >= max_rows || count >= max_files {
986+
break;
987+
}
988+
}
989+
if has_active {
990+
let node =
991+
pick_worker_by_ids(self.config_obj.as_ref(), [start, end]).to_string();
992+
let job = self
993+
.meta_store
994+
.add_job(Job::new(
995+
RowKey::Table(TableId::Chunks, start),
996+
JobType::RepartitionRange(end),
997+
node.clone(),
998+
))
999+
.await?;
1000+
if job.is_some() {
1001+
self.notify_job_runner(node).await?;
1002+
}
1003+
}
1004+
}
1005+
} else if self.config_obj.repartition_strategy() == RepartitionStrategy::PerPartition {
1006+
// One job per partition, keyed on a chunk (RepartitionChunk) rather than a
1007+
// dedicated per-partition JobType so an older binary can still deserialize it
1008+
// across `latest`/`release` channel switches. The anchor is the smallest
1009+
// persisted chunk id so add_job dedups to a single job per partition; the
1010+
// worker resolves the partition from it and merges all its chunks (see
1011+
// repartition_partition_chunks). An old binary just repartitions this one
1012+
// chunk and drains the rest via its own per-chunk path.
9591013
if let Some(anchor_chunk_id) = chunks.iter().map(|c| c.get_id()).min() {
9601014
let node = self.node_name_by_partition(p);
9611015
let job = self

0 commit comments

Comments
 (0)