Skip to content

Commit 18b967e

Browse files
committed
refactor(cubestore): collapse batch flag into strategy selector, concurrent download in merge
Remove CUBESTORE_BATCH_REPARTITION / batch_repartition_enabled. The repartition strategy (per_chunk / per_partition / range) is now the sole selector: - per_chunk: one job per chunk -> repartition_chunk - per_partition: one anchor job per partition -> merge - range: RepartitionRange jobs -> merge The job handler and scheduler dispatch on the strategy directly. The dead per_chunk+batch hybrid (per-chunk loop and its producer/consumer prefetch) is gone, along with its two now-obsolete tests. Prefetch is reworked: drop the byte-budget producer/consumer and CUBESTORE_REPARTITION_PREFETCH_BUDGET; add a plain bool CUBESTORE_REPARTITION_CONCURRENT_DOWNLOAD (default off) that downloads a merge group's chunk parquets concurrently before building the merge inputs. It applies to both per_partition and range; the group is already bounded by repartition_merge_max_input_files and the pool by download_concurrency, so no extra budget is needed.
1 parent ae73d0b commit 18b967e

5 files changed

Lines changed: 78 additions & 579 deletions

File tree

rust/cubestore/cubestore/src/cluster/ingestion/job_processor.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::config::injection::DIService;
2-
use crate::config::{Config, ConfigObj};
2+
use crate::config::{Config, ConfigObj, RepartitionStrategy};
33
use crate::import::ImportService;
44
use crate::metastore::job::{Job, JobType};
55
use crate::metastore::table::Table;
@@ -222,17 +222,14 @@ impl JobIsolatedProcessor {
222222
}
223223
let data_loaded_size = DataLoadedSize::new();
224224
app_metrics::JOBS_REPARTITION_CHUNK.add(1);
225-
// FIXME: a RepartitionChunk job whose chunk_id is used as an
226-
// anchor for the whole partition (batch_repartition_enabled).
227-
// We deliberately overload the existing RepartitionChunk type
228-
// instead of introducing a dedicated per-partition JobType:
229-
// a new JobType variant cannot be deserialized by an older
230-
// binary, which would make the whole job shard unreadable when
231-
// a deployment switches between the `latest` and `release`
232-
// channels (version can move both ways at any time). Reusing a
233-
// known type keeps both directions safe — an old binary just
234-
// repartitions the single anchor chunk. See repartition_partition_chunks.
235-
let r = if self.config_obj.batch_repartition_enabled() {
225+
// PerPartition reuses the RepartitionChunk job with the smallest chunk
226+
// as the anchor (one job per partition) and merges all its chunks;
227+
// PerChunk gets one job per chunk and repartitions just that chunk. We
228+
// overload the existing RepartitionChunk type (no new JobType) so an
229+
// older binary stays able to deserialize it across channel switches.
230+
let r = if self.config_obj.repartition_strategy()
231+
== RepartitionStrategy::PerPartition
232+
{
236233
let partition_id = chunk.get_row().get_partition_id();
237234
let time_budget = Duration::from_secs(
238235
self.config_obj.repartition_chunks_time_budget_secs(),

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,20 +1002,14 @@ impl Cluster for ClusterImpl {
10021002
}
10031003
}
10041004
}
1005-
} else if self.config_obj.repartition_strategy() == RepartitionStrategy::PerPartition
1006-
|| self.config_obj.batch_repartition_enabled()
1007-
{
1008-
// One job per partition that batches all persisted chunks, but keyed on a
1009-
// chunk (RepartitionChunk), not the partition. We reuse the existing job
1010-
// type instead of a dedicated per-partition JobType so an older binary stays
1011-
// able to deserialize it across `latest`/`release` channel switches (a new
1012-
// variant would make its whole job shard unreadable). The anchor is the
1013-
// smallest persisted chunk id so add_job dedups to a single job per
1014-
// partition; the worker resolves the partition from it and processes the
1015-
// anchor last (see repartition_partition_chunks). The PerPartition strategy
1016-
// requires this single-job form: a per-chunk job under it would re-merge the
1017-
// whole partition. An old binary just repartitions this one chunk and drains
1018-
// the rest via its own per-chunk path.
1005+
} else if self.config_obj.repartition_strategy() == RepartitionStrategy::PerPartition {
1006+
// One job per partition, keyed on a chunk (RepartitionChunk) rather than a
1007+
// dedicated per-partition JobType so an older binary can still deserialize it
1008+
// across `latest`/`release` channel switches. The anchor is the smallest
1009+
// persisted chunk id so add_job dedups to a single job per partition; the
1010+
// worker resolves the partition from it and merges all its chunks (see
1011+
// repartition_partition_chunks). An old binary just repartitions this one
1012+
// chunk and drains the rest via its own per-chunk path.
10191013
if let Some(anchor_chunk_id) = chunks.iter().map(|c| c.get_id()).min() {
10201014
let node = self.node_name_by_partition(p);
10211015
let job = self

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -540,11 +540,6 @@ pub trait ConfigObj: DIService {
540540
/// default (hash placement); kept as a flag for rollout/rollback.
541541
fn load_aware_import_placement_enabled(&self) -> bool;
542542

543-
/// When enabled, an inactive parent partition is repartitioned by a single
544-
/// per-partition job that loops over its persisted chunks, instead of one
545-
/// job per chunk. Kept as a flag for emergency rollback.
546-
fn batch_repartition_enabled(&self) -> bool;
547-
548543
/// Time budget for a single batch repartition job. The job yields its runner
549544
/// slot once the budget is exceeded (after the current chunk); the remainder
550545
/// is drained by a follow-up job via the cascade. Kept separate from
@@ -567,13 +562,11 @@ pub trait ConfigObj: DIService {
567562
/// Off by default; when disabled, chunks are sent whole and filtered only in the subprocess.
568563
fn prefilter_in_memory_chunks_enabled(&self) -> bool;
569564

570-
/// Byte budget for prefetching persisted chunk parquets ahead of a batch
571-
/// repartition job. A sequential producer downloads upcoming chunks (anchor
572-
/// last) while the job processes the current one, bounded so that no more
573-
/// than this many bytes of fetched-but-unprocessed data sit on local disk.
574-
/// The env value accepts size suffixes (e.g. `512MB`). `None` (env unset) or
575-
/// `Some(0)` disables prefetching.
576-
fn repartition_prefetch_budget_bytes(&self) -> Option<u64>;
565+
/// When enabled, a merge group's chunk parquets (PerPartition / Range strategies) are
566+
/// downloaded concurrently before building the merge inputs, instead of one at a time.
567+
/// The group is already bounded by repartition_merge_max_input_files and the download
568+
/// pool by download_concurrency, so no extra budget is needed. Off by default.
569+
fn repartition_concurrent_download(&self) -> bool;
577570

578571
/// Which repartition strategy to use for an inactive parent's persisted chunks.
579572
/// Defaults to PerChunk.
@@ -733,12 +726,11 @@ pub struct ConfigObjImpl {
733726
pub upload_to_remote: bool,
734727
pub enable_topk: bool,
735728
pub load_aware_import_placement_enabled: bool,
736-
pub batch_repartition_enabled: bool,
737729
pub repartition_chunks_time_budget_secs: u64,
738730
pub push_partial_aggregate_below_merge_enabled: bool,
739731
pub compaction_split_by_total_file_size_enabled: bool,
740732
pub prefilter_in_memory_chunks_enabled: bool,
741-
pub repartition_prefetch_budget_bytes: Option<u64>,
733+
pub repartition_concurrent_download: bool,
742734
pub repartition_strategy: RepartitionStrategy,
743735
pub repartition_merge_max_input_files: usize,
744736
pub repartition_merge_max_rows: u64,
@@ -1054,9 +1046,6 @@ impl ConfigObj for ConfigObjImpl {
10541046
fn load_aware_import_placement_enabled(&self) -> bool {
10551047
self.load_aware_import_placement_enabled
10561048
}
1057-
fn batch_repartition_enabled(&self) -> bool {
1058-
self.batch_repartition_enabled
1059-
}
10601049
fn repartition_chunks_time_budget_secs(&self) -> u64 {
10611050
self.repartition_chunks_time_budget_secs
10621051
}
@@ -1069,8 +1058,8 @@ impl ConfigObj for ConfigObjImpl {
10691058
fn prefilter_in_memory_chunks_enabled(&self) -> bool {
10701059
self.prefilter_in_memory_chunks_enabled
10711060
}
1072-
fn repartition_prefetch_budget_bytes(&self) -> Option<u64> {
1073-
self.repartition_prefetch_budget_bytes
1061+
fn repartition_concurrent_download(&self) -> bool {
1062+
self.repartition_concurrent_download
10741063
}
10751064
fn repartition_strategy(&self) -> RepartitionStrategy {
10761065
self.repartition_strategy
@@ -1737,7 +1726,6 @@ impl Config {
17371726
"CUBESTORE_LOAD_AWARE_IMPORT_PLACEMENT",
17381727
false,
17391728
),
1740-
batch_repartition_enabled: env_bool("CUBESTORE_BATCH_REPARTITION", false),
17411729
repartition_chunks_time_budget_secs: env_parse(
17421730
"CUBESTORE_REPARTITION_TIME_BUDGET_SECS",
17431731
60,
@@ -1754,12 +1742,10 @@ impl Config {
17541742
"CUBESTORE_PREFILTER_IN_MEMORY_CHUNKS",
17551743
false,
17561744
),
1757-
repartition_prefetch_budget_bytes: env_parse_optional_size(
1758-
"CUBESTORE_REPARTITION_PREFETCH_BUDGET",
1759-
None,
1760-
None,
1761-
)
1762-
.map(|n| n as u64),
1745+
repartition_concurrent_download: env_bool(
1746+
"CUBESTORE_REPARTITION_CONCURRENT_DOWNLOAD",
1747+
false,
1748+
),
17631749
repartition_strategy: env_repartition_strategy(),
17641750
repartition_merge_max_input_files: env_parse(
17651751
"CUBESTORE_REPARTITION_MERGE_MAX_INPUT_FILES",
@@ -2007,14 +1993,13 @@ impl Config {
20071993
upload_to_remote: true,
20081994
enable_topk: true,
20091995
load_aware_import_placement_enabled: false,
2010-
batch_repartition_enabled: true,
20111996
repartition_chunks_time_budget_secs: 60,
20121997
push_partial_aggregate_below_merge_enabled: true,
20131998
compaction_split_by_total_file_size_enabled: false,
20141999
// Production default is off; kept on in tests so prefilter_chunks_shared_scan
20152000
// and the rest of the suite keep exercising the worker-side trim path.
20162001
prefilter_in_memory_chunks_enabled: true,
2017-
repartition_prefetch_budget_bytes: None,
2002+
repartition_concurrent_download: false,
20182003
repartition_strategy: RepartitionStrategy::PerChunk,
20192004
repartition_merge_max_input_files: 50,
20202005
repartition_merge_max_rows: 4_000_000,

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3605,7 +3605,7 @@ mod tests {
36053605
.update_config(|mut c| {
36063606
c.partition_split_threshold = 20;
36073607
c.compaction_chunks_count_threshold = 10;
3608-
c.batch_repartition_enabled = false;
3608+
c.repartition_strategy = crate::config::RepartitionStrategy::PerChunk;
36093609
c
36103610
})
36113611
.start_test(async move |services| {
@@ -3616,16 +3616,16 @@ mod tests {
36163616
}
36173617

36183618
#[tokio::test]
3619-
async fn repartition_prefetch_keeps_data_consistent() -> Result<(), CubeError> {
3620-
// Batch repartition with chunk parquet prefetch enabled must drain and
3621-
// keep data consistent end-to-end (real downloads through the prefetch
3622-
// producer/consumer path).
3623-
Config::test("repartition_prefetch_keeps_data_consistent")
3619+
async fn repartition_concurrent_download_keeps_data_consistent() -> Result<(), CubeError> {
3620+
// PerPartition merge with concurrent chunk download enabled must drain and keep
3621+
// data consistent end-to-end (real concurrent downloads in the merge group build).
3622+
Config::test("repartition_concurrent_download_keeps_data_consistent")
36243623
.update_config(|mut c| {
36253624
c.partition_split_threshold = 20;
36263625
c.compaction_chunks_count_threshold = 10;
3627-
c.batch_repartition_enabled = true;
3628-
c.repartition_prefetch_budget_bytes = Some(64 * 1024 * 1024);
3626+
c.repartition_strategy = crate::config::RepartitionStrategy::PerPartition;
3627+
c.repartition_merge_max_input_files = 4;
3628+
c.repartition_concurrent_download = true;
36293629
c
36303630
})
36313631
.start_test(async move |services| {
@@ -3721,7 +3721,8 @@ mod tests {
37213721
.update_config(|mut c| {
37223722
c.partition_split_threshold = 20;
37233723
c.compaction_chunks_count_threshold = 10;
3724-
c.batch_repartition_enabled = true;
3724+
c.repartition_strategy = crate::config::RepartitionStrategy::PerPartition;
3725+
c.repartition_merge_max_input_files = 2;
37253726
c.repartition_chunks_time_budget_secs = 1;
37263727
c
37273728
})

0 commit comments

Comments
 (0)