Skip to content

Commit b09e8e6

Browse files
committed
feat(cubestore): RepartitionRange jobs and a repartition strategy selector
Add a third repartition strategy that slices an inactive parent's persisted chunks into RepartitionRange jobs at schedule time. Slicing walks all chunks (active and inactive) ordered by id and cuts a range once it reaches the row or chunk-count cap, so the [start, end] bounds stay pinned to chunk ids and a re-slice reproduces them; the end is carried as job data, not the dedup key, so a tail that extends the trailing range dedups on its start instead of spawning a second job. Each range runs as one atomic swap on the worker chosen by the hash of its bounds, restoring cross-worker parallelism. A GC gate keeps an inactive parent's chunks until it fully drains so slicing stays stable. Replace the ad-hoc flags with a single CUBESTORE_REPARTITION_STRATEGY selector (per_chunk default, per_partition, range); an unknown value logs a warning and falls back to per_chunk. The merge caps (CUBESTORE_REPARTITION_MERGE_MAX_INPUT_FILES default 50, CUBESTORE_REPARTITION_MERGE_MAX_ROWS default 4000000) become plain caps with defaults. The per-partition merge core is shared between the in-job loop and the range handler. JobType::RepartitionRange deserializes as an unknown variant on binaries that predate it, so it is only safe behind the skip-unknown-jobs handling; enable the strategy per-deployment.
1 parent bf10c94 commit b09e8e6

8 files changed

Lines changed: 761 additions & 213 deletions

File tree

rust/cubestore/cubestore/src/cluster/ingestion/job_processor.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,30 @@ impl JobIsolatedProcessor {
260260
Self::fail_job_row_key(job)
261261
}
262262
}
263+
JobType::RepartitionRange(end_chunk_id) => {
264+
if let RowKey::Table(TableId::Chunks, start_chunk_id) = job.row_reference() {
265+
let start_chunk_id = *start_chunk_id;
266+
let end_chunk_id = *end_chunk_id;
267+
let data_loaded_size = DataLoadedSize::new();
268+
app_metrics::JOBS_REPARTITION_CHUNK.add(1);
269+
let r = self
270+
.chunk_store
271+
.repartition_chunk_range(
272+
start_chunk_id,
273+
end_chunk_id,
274+
data_loaded_size.clone(),
275+
)
276+
.await;
277+
if let Err(e) = r {
278+
app_metrics::JOBS_REPARTITION_CHUNK_FAILURES.add(1);
279+
return Err(e);
280+
}
281+
app_metrics::JOBS_REPARTITION_CHUNK_COMPLETED.add(1);
282+
Ok(JobProcessResult::new(data_loaded_size.get()))
283+
} else {
284+
Self::fail_job_row_key(job)
285+
}
286+
}
263287
_ => Err(CubeError::internal(format!(
264288
"Job {:?} cannot be processed in separate process",
265289
job.job_type()

rust/cubestore/cubestore/src/cluster/ingestion/job_runner.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,47 @@ impl JobRunner {
397397
Self::fail_job_row_key(job)
398398
}
399399
}
400+
JobType::RepartitionRange(_) => {
401+
if let RowKey::Table(TableId::Chunks, start_chunk_id) = job.row_reference() {
402+
let start_chunk_id = *start_chunk_id;
403+
let process_rate_limiter = self.process_rate_limiter.clone();
404+
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
405+
let metastore = self.meta_store.clone();
406+
let job_to_move = job.clone();
407+
let job_processor = self.job_processor.clone();
408+
Ok(cube_ext::spawn(async move {
409+
let wait_ms = process_rate_limiter
410+
.wait_for_allow(TaskType::Job, timeout)
411+
.await?;
412+
let chunk = metastore.get_chunk(start_chunk_id).await?;
413+
let (_, _, table, _) = metastore
414+
.get_partition_for_compaction(chunk.get_row().get_partition_id())
415+
.await?;
416+
let table_id = table.get_id();
417+
let trace_obj = metastore.get_trace_obj_by_table_id(table_id).await?;
418+
let trace_index = TraceIndex {
419+
table_id: Some(table_id),
420+
trace_obj,
421+
};
422+
match job_processor.process_job(job_to_move).await {
423+
Ok(job_res) => {
424+
process_rate_limiter
425+
.commit_task_usage(
426+
TaskType::Job,
427+
job_res.data_loaded_size() as i64,
428+
wait_ms,
429+
trace_index,
430+
)
431+
.await;
432+
Ok(())
433+
}
434+
Err(e) => Err(e),
435+
}
436+
}))
437+
} else {
438+
Self::fail_job_row_key(job)
439+
}
440+
}
400441
}
401442
}
402443

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::cluster::transport::{ClusterTransport, MetaStoreTransport, WorkerConn
2626
use crate::config::injection::{DIService, Injector};
2727
use crate::config::is_router;
2828
#[allow(unused_imports)]
29-
use crate::config::{Config, ConfigObj};
29+
use crate::config::{Config, ConfigObj, RepartitionStrategy};
3030
use crate::metastore::chunks::chunk_file_name;
3131
use crate::metastore::job::{Job, JobRunnerPool, JobStatus, JobType};
3232
use crate::metastore::{
@@ -945,17 +945,74 @@ impl Cluster for ClusterImpl {
945945
.filter(|c| !c.get_row().in_memory())
946946
.collect::<Vec<_>>();
947947

948-
if self.config_obj.batch_repartition_enabled() {
949-
// FIXME: one job per partition that batches all persisted chunks, but
950-
// keyed on a chunk (RepartitionChunk), not the partition. We reuse the
951-
// existing job type instead of a dedicated per-partition JobType so an
952-
// older binary stays able to deserialize it across `latest`/`release`
953-
// channel switches (a new variant would make its whole job shard
954-
// unreadable). The anchor is the smallest persisted chunk id so add_job
955-
// dedups to a single job per partition; the worker resolves the
956-
// partition from it and processes the anchor last (see
957-
// repartition_partition_chunks). An old binary just repartitions this
958-
// one chunk and drains the rest via its own per-chunk path.
948+
if self.config_obj.repartition_strategy() == RepartitionStrategy::Range {
949+
// Slice the parent's persisted chunks into RepartitionRange jobs. Walk ALL
950+
// chunks (active and inactive) sorted by id so the [start, end] boundaries
951+
// stay pinned to chunk ids and don't shift when chunks deactivate; cut a
952+
// range once its rows reach max_rows or its chunk count reaches the fan-in
953+
// cap, so a range never merges an unbounded number of chunks at once. A
954+
// range is only scheduled when it still has an active chunk; the end is
955+
// carried as the job's data, not its dedup key, so a tail that extends the
956+
// trailing range dedups on the start.
957+
let max_rows = self.config_obj.repartition_merge_max_rows();
958+
let max_files = self.config_obj.repartition_merge_max_input_files();
959+
let mut all = self
960+
.meta_store
961+
.get_chunks_by_partition(p.get_id(), true)
962+
.await?
963+
.into_iter()
964+
.filter(|c| !c.get_row().in_memory())
965+
.collect::<Vec<_>>();
966+
all.sort_by_key(|c| c.get_id());
967+
968+
let mut i = 0;
969+
while i < all.len() {
970+
let start = all[i].get_id();
971+
let mut rows = 0u64;
972+
let mut count = 0usize;
973+
let mut end = start;
974+
let mut has_active = false;
975+
while i < all.len() {
976+
let c = &all[i];
977+
rows += c.get_row().get_row_count();
978+
count += 1;
979+
end = c.get_id();
980+
has_active |= c.get_row().active();
981+
i += 1;
982+
if rows >= max_rows || count >= max_files {
983+
break;
984+
}
985+
}
986+
if has_active {
987+
let node =
988+
pick_worker_by_ids(self.config_obj.as_ref(), [start, end]).to_string();
989+
let job = self
990+
.meta_store
991+
.add_job(Job::new(
992+
RowKey::Table(TableId::Chunks, start),
993+
JobType::RepartitionRange(end),
994+
node.clone(),
995+
))
996+
.await?;
997+
if job.is_some() {
998+
self.notify_job_runner(node).await?;
999+
}
1000+
}
1001+
}
1002+
} else if self.config_obj.repartition_strategy() == RepartitionStrategy::PerPartition
1003+
|| self.config_obj.batch_repartition_enabled()
1004+
{
1005+
// One job per partition that batches all persisted chunks, but keyed on a
1006+
// chunk (RepartitionChunk), not the partition. We reuse the existing job
1007+
// type instead of a dedicated per-partition JobType so an older binary stays
1008+
// able to deserialize it across `latest`/`release` channel switches (a new
1009+
// variant would make its whole job shard unreadable). The anchor is the
1010+
// smallest persisted chunk id so add_job dedups to a single job per
1011+
// partition; the worker resolves the partition from it and processes the
1012+
// anchor last (see repartition_partition_chunks). The PerPartition strategy
1013+
// requires this single-job form: a per-chunk job under it would re-merge the
1014+
// whole partition. An old binary just repartitions this one chunk and drains
1015+
// the rest via its own per-chunk path.
9591016
if let Some(anchor_chunk_id) = chunks.iter().map(|c| c.get_id()).min() {
9601017
let node = self.node_name_by_partition(p);
9611018
let job = self

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 102 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,37 @@ pub struct Config {
361361
injector: Arc<Injector>,
362362
}
363363

364+
/// How an inactive parent's persisted chunks are repartitioned into its children.
365+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
366+
pub enum RepartitionStrategy {
367+
/// One chunk at a time, each split independently into the children.
368+
PerChunk,
369+
/// One job per partition that k-way merges all its chunks in groups and splits the
370+
/// merged stream into the children at the wal-split limit.
371+
PerPartition,
372+
/// Many jobs sliced at schedule time, each merging an inclusive chunk-id range;
373+
/// spreads across workers by the hash of the range bounds.
374+
Range,
375+
}
376+
377+
impl FromStr for RepartitionStrategy {
378+
type Err = String;
379+
380+
fn from_str(s: &str) -> Result<Self, Self::Err> {
381+
match s.to_lowercase().as_str() {
382+
"per_chunk" | "perchunk" | "per-chunk" => Ok(RepartitionStrategy::PerChunk),
383+
"per_partition" | "perpartition" | "per-partition" => {
384+
Ok(RepartitionStrategy::PerPartition)
385+
}
386+
"range" => Ok(RepartitionStrategy::Range),
387+
_ => Err(format!(
388+
"unknown repartition strategy '{}' (expected per_chunk, per_partition or range)",
389+
s
390+
)),
391+
}
392+
}
393+
}
394+
364395
#[automock]
365396
pub trait ConfigObj: DIService {
366397
fn partition_split_threshold(&self) -> u64;
@@ -525,12 +556,17 @@ pub trait ConfigObj: DIService {
525556
/// `Some(0)` disables prefetching.
526557
fn repartition_prefetch_budget_bytes(&self) -> Option<u64>;
527558

528-
/// Max number of persisted input chunks merged in one group when repartitioning
529-
/// an inactive parent. `Some(m >= 2)` streams the parent's chunks through a k-way
530-
/// merge in groups of up to m and splits each group into the active children at
531-
/// the wal-split limit, capping each merge+swap group at m chunks. `None` /
532-
/// `Some(0)` / `Some(1)` disable the merge path.
533-
fn repartition_merge_max_input_files(&self) -> Option<usize>;
559+
/// Which repartition strategy to use for an inactive parent's persisted chunks.
560+
/// Defaults to PerChunk.
561+
fn repartition_strategy(&self) -> RepartitionStrategy;
562+
563+
/// Cap on the number of chunks merged together in one Merge group / RepartitionRange.
564+
/// Bounds the concurrent parquet readers, the k-way merge width, the swap size and
565+
/// (since a range downloads its chunks sequentially) the per-job download time.
566+
fn repartition_merge_max_input_files(&self) -> usize;
567+
568+
/// Cap on the total rows merged together in one Merge group / RepartitionRange.
569+
fn repartition_merge_max_rows(&self) -> u64;
534570

535571
fn allow_decimal128(&self) -> bool;
536572

@@ -681,7 +717,9 @@ pub struct ConfigObjImpl {
681717
pub batch_repartition_enabled: bool,
682718
pub repartition_chunks_time_budget_secs: u64,
683719
pub repartition_prefetch_budget_bytes: Option<u64>,
684-
pub repartition_merge_max_input_files: Option<usize>,
720+
pub repartition_strategy: RepartitionStrategy,
721+
pub repartition_merge_max_input_files: usize,
722+
pub repartition_merge_max_rows: u64,
685723
pub allow_decimal128: bool,
686724
pub enable_remove_orphaned_remote_files: bool,
687725
pub enable_startup_warmup: bool,
@@ -1003,9 +1041,15 @@ impl ConfigObj for ConfigObjImpl {
10031041
fn repartition_prefetch_budget_bytes(&self) -> Option<u64> {
10041042
self.repartition_prefetch_budget_bytes
10051043
}
1006-
fn repartition_merge_max_input_files(&self) -> Option<usize> {
1044+
fn repartition_strategy(&self) -> RepartitionStrategy {
1045+
self.repartition_strategy
1046+
}
1047+
fn repartition_merge_max_input_files(&self) -> usize {
10071048
self.repartition_merge_max_input_files
10081049
}
1050+
fn repartition_merge_max_rows(&self) -> u64 {
1051+
self.repartition_merge_max_rows
1052+
}
10091053

10101054
fn allow_decimal128(&self) -> bool {
10111055
self.allow_decimal128
@@ -1308,6 +1352,24 @@ where
13081352
})
13091353
}
13101354

1355+
// Unlike env_optparse, an unparseable value is not fatal: it logs a warning and falls
1356+
// back to per_chunk, so a typo in the strategy env never takes the process down.
1357+
fn env_repartition_strategy() -> RepartitionStrategy {
1358+
match env::var("CUBESTORE_REPARTITION_STRATEGY") {
1359+
Ok(v) => match v.parse::<RepartitionStrategy>() {
1360+
Ok(s) => s,
1361+
Err(e) => {
1362+
log::warn!(
1363+
"Ignoring CUBESTORE_REPARTITION_STRATEGY: {}; using per_chunk",
1364+
e
1365+
);
1366+
RepartitionStrategy::PerChunk
1367+
}
1368+
},
1369+
Err(_) => RepartitionStrategy::PerChunk,
1370+
}
1371+
}
1372+
13111373
impl Config {
13121374
fn calculate_cache_compaction_trigger_size(cache_max_size: usize) -> usize {
13131375
let trigger_size = match cache_max_size >> 20 {
@@ -1622,8 +1684,14 @@ impl Config {
16221684
repartition_prefetch_budget_bytes: env_optparse_size(
16231685
"CUBESTORE_REPARTITION_PREFETCH_BUDGET",
16241686
),
1625-
repartition_merge_max_input_files: env_optparse(
1687+
repartition_strategy: env_repartition_strategy(),
1688+
repartition_merge_max_input_files: env_parse(
16261689
"CUBESTORE_REPARTITION_MERGE_MAX_INPUT_FILES",
1690+
50,
1691+
),
1692+
repartition_merge_max_rows: env_parse(
1693+
"CUBESTORE_REPARTITION_MERGE_MAX_ROWS",
1694+
4_000_000,
16271695
),
16281696
allow_decimal128: env_bool("CUBESTORE_ALLOW_DECIMAL128", false),
16291697
enable_remove_orphaned_remote_files: env_bool(
@@ -1866,7 +1934,9 @@ impl Config {
18661934
batch_repartition_enabled: true,
18671935
repartition_chunks_time_budget_secs: 60,
18681936
repartition_prefetch_budget_bytes: None,
1869-
repartition_merge_max_input_files: None,
1937+
repartition_strategy: RepartitionStrategy::PerChunk,
1938+
repartition_merge_max_input_files: 50,
1939+
repartition_merge_max_rows: 4_000_000,
18701940
allow_decimal128: false,
18711941
enable_remove_orphaned_remote_files: false,
18721942
enable_startup_warmup: true,
@@ -2728,3 +2798,25 @@ pub async fn uses_remote_metastore(i: &Injector) -> bool {
27282798
pub fn is_router(c: &dyn ConfigObj) -> bool {
27292799
!c.worker_bind_address().is_some()
27302800
}
2801+
2802+
#[cfg(test)]
2803+
mod tests {
2804+
use super::*;
2805+
2806+
#[test]
2807+
fn repartition_strategy_from_str() {
2808+
assert_eq!(
2809+
"per_chunk".parse::<RepartitionStrategy>().unwrap(),
2810+
RepartitionStrategy::PerChunk
2811+
);
2812+
assert_eq!(
2813+
"per_partition".parse::<RepartitionStrategy>().unwrap(),
2814+
RepartitionStrategy::PerPartition
2815+
);
2816+
assert_eq!(
2817+
"RANGE".parse::<RepartitionStrategy>().unwrap(),
2818+
RepartitionStrategy::Range
2819+
);
2820+
assert!("nonsense".parse::<RepartitionStrategy>().is_err());
2821+
}
2822+
}

rust/cubestore/cubestore/src/metastore/job.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ pub enum JobType {
2121
RepartitionChunk,
2222
InMemoryChunksCompaction,
2323
NodeInMemoryChunksCompaction(/*node*/ String),
24+
// Repartition an inclusive [start, end] chunk-id range of an inactive parent in
25+
// one merge+swap. row_reference carries the start chunk; the end is data only and
26+
// is deliberately excluded from the job index key (see key_to_bytes) so a tail
27+
// that extends the trailing range dedups on the start instead of spawning a
28+
// second job for the same start.
29+
RepartitionRange(/*end_chunk_id*/ u64),
2430
}
2531

2632
fn get_job_type_index(j: &JobType) -> u32 {
@@ -35,6 +41,7 @@ fn get_job_type_index(j: &JobType) -> u32 {
3541
JobType::RepartitionChunk => 8,
3642
JobType::InMemoryChunksCompaction => 9,
3743
JobType::NodeInMemoryChunksCompaction(_) => 10,
44+
JobType::RepartitionRange(_) => 11,
3845
}
3946
}
4047

@@ -49,6 +56,7 @@ fn get_job_type_priority(j: &JobType) -> u32 {
4956
JobType::MultiPartitionSplit => 1000,
5057
JobType::FinishMultiSplit => 1000,
5158
JobType::RepartitionChunk => 1000,
59+
JobType::RepartitionRange(_) => 1000,
5260
JobType::InMemoryChunksCompaction => 10000,
5361
JobType::NodeInMemoryChunksCompaction(_) => 10000,
5462
}

0 commit comments

Comments
 (0)