Skip to content

Commit 405f88f

Browse files
clippy
1 parent 6aef801 commit 405f88f

4 files changed

Lines changed: 61 additions & 57 deletions

File tree

src/distributed_planner/exchange_assignment.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ impl BroadcastExchangeLayout {
204204
if partitions_per_producer_task == 0 {
205205
return plan_err!("broadcast exchange requires partitions_per_producer_task > 0");
206206
}
207-
if partitions_per_producer_task % consumer_task_count != 0 {
207+
if !partitions_per_producer_task.is_multiple_of(consumer_task_count) {
208208
return plan_err!(
209209
"broadcast exchange requires consumer_task_count to divide partitions_per_producer_task evenly, got {} and {}",
210210
partitions_per_producer_task,

src/distributed_planner/insert_local_exchange_split.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ impl SplitInput {
427427
}
428428
if self.already_split
429429
|| target_partition_count == 0
430-
|| target_partition_count % self.owned_partition_count != 0
430+
|| !target_partition_count.is_multiple_of(self.owned_partition_count)
431431
{
432432
return false;
433433
}
@@ -657,7 +657,7 @@ mod tests {
657657
.unwrap()
658658
.unwrap_or(physical_plan);
659659
let d_cfg = DistributedConfig::from_config_options(state.config_options()).unwrap();
660-
let split = insert_local_exchange_split_execs(distributed, &d_cfg).unwrap();
660+
let split = insert_local_exchange_split_execs(distributed, d_cfg).unwrap();
661661
let distributed_exec = DistributedExec::new(split);
662662
display_plan_ascii(&distributed_exec, false)
663663
}

src/execution_plans/benchmarks/local_exchange_query_shape_bench.rs

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,15 @@ impl LocalExchangeQueryBench {
142142
Ok(QueryBenchSources::Single {
143143
input: make_fact_source(
144144
value_name,
145-
self.total_rows,
146-
self.batch_rows,
147-
self.source_partitions,
148-
self.key_domain,
149-
grp_domain,
150-
key_distribution,
151-
value_offset,
145+
FactSourceSpec {
146+
total_rows: self.total_rows,
147+
batch_rows: self.batch_rows,
148+
source_partitions: self.source_partitions,
149+
key_domain: self.key_domain,
150+
grp_domain,
151+
key_distribution,
152+
value_offset,
153+
},
152154
)?,
153155
})
154156
}
@@ -162,23 +164,27 @@ impl LocalExchangeQueryBench {
162164
Ok(QueryBenchSources::Join {
163165
left: make_fact_source(
164166
"left_value",
165-
self.total_rows,
166-
self.batch_rows,
167-
self.source_partitions,
168-
self.key_domain,
169-
16,
170-
left_distribution,
171-
0,
167+
FactSourceSpec {
168+
total_rows: self.total_rows,
169+
batch_rows: self.batch_rows,
170+
source_partitions: self.source_partitions,
171+
key_domain: self.key_domain,
172+
grp_domain: 16,
173+
key_distribution: left_distribution,
174+
value_offset: 0,
175+
},
172176
)?,
173177
right: make_fact_source(
174178
"right_value",
175-
right_total_rows,
176-
self.batch_rows,
177-
self.source_partitions,
178-
right_key_domain,
179-
16,
180-
KeyDistribution::Uniform,
181-
17,
179+
FactSourceSpec {
180+
total_rows: right_total_rows,
181+
batch_rows: self.batch_rows,
182+
source_partitions: self.source_partitions,
183+
key_domain: right_key_domain,
184+
grp_domain: 16,
185+
key_distribution: KeyDistribution::Uniform,
186+
value_offset: 17,
187+
},
182188
)?,
183189
})
184190
}
@@ -586,8 +592,8 @@ fn group_cols_to_exprs(group_cols: &[(&str, usize)]) -> Vec<(Arc<dyn PhysicalExp
586592
.iter()
587593
.map(|(name, index)| {
588594
(
589-
Arc::new(Column::new(*name, *index)) as Arc<dyn PhysicalExpr>,
590-
(*name).to_string(),
595+
Arc::new(Column::new(name, *index)) as Arc<dyn PhysicalExpr>,
596+
name.to_string(),
591597
)
592598
})
593599
.collect()
@@ -599,8 +605,8 @@ fn group_cols_to_final_exprs(group_cols: &[(&str, usize)]) -> Vec<(Arc<dyn Physi
599605
.enumerate()
600606
.map(|(index, (name, _))| {
601607
(
602-
Arc::new(Column::new(*name, index)) as Arc<dyn PhysicalExpr>,
603-
(*name).to_string(),
608+
Arc::new(Column::new(name, index)) as Arc<dyn PhysicalExpr>,
609+
name.to_string(),
604610
)
605611
})
606612
.collect()
@@ -610,7 +616,7 @@ fn partial_group_columns(group_cols: &[(&str, usize)]) -> Vec<Arc<dyn PhysicalEx
610616
group_cols
611617
.iter()
612618
.enumerate()
613-
.map(|(index, (name, _))| Arc::new(Column::new(*name, index)) as Arc<dyn PhysicalExpr>)
619+
.map(|(index, (name, _))| Arc::new(Column::new(name, index)) as Arc<dyn PhysicalExpr>)
614620
.collect()
615621
}
616622

@@ -620,27 +626,20 @@ enum KeyDistribution {
620626
HotKey,
621627
}
622628

623-
fn make_fact_source(
624-
value_name: &str,
629+
#[derive(Clone, Copy)]
630+
struct FactSourceSpec {
625631
total_rows: usize,
626632
batch_rows: usize,
627633
source_partitions: usize,
628634
key_domain: usize,
629635
grp_domain: usize,
630636
key_distribution: KeyDistribution,
631637
value_offset: i64,
632-
) -> Result<PreparedFactSource> {
638+
}
639+
640+
fn make_fact_source(value_name: &str, spec: FactSourceSpec) -> Result<PreparedFactSource> {
633641
let schema = fact_schema(value_name);
634-
let partitions = make_fact_batches(
635-
Arc::clone(&schema),
636-
total_rows,
637-
batch_rows,
638-
source_partitions,
639-
key_domain,
640-
grp_domain,
641-
key_distribution,
642-
value_offset,
643-
)?;
642+
let partitions = make_fact_batches(Arc::clone(&schema), spec)?;
644643
Ok(PreparedFactSource { schema, partitions })
645644
}
646645

@@ -652,16 +651,17 @@ fn fact_schema(value_name: &str) -> SchemaRef {
652651
]))
653652
}
654653

655-
fn make_fact_batches(
656-
schema: SchemaRef,
657-
total_rows: usize,
658-
batch_rows: usize,
659-
source_partitions: usize,
660-
key_domain: usize,
661-
grp_domain: usize,
662-
key_distribution: KeyDistribution,
663-
value_offset: i64,
664-
) -> Result<Vec<Vec<RecordBatch>>> {
654+
fn make_fact_batches(schema: SchemaRef, spec: FactSourceSpec) -> Result<Vec<Vec<RecordBatch>>> {
655+
let FactSourceSpec {
656+
total_rows,
657+
batch_rows,
658+
source_partitions,
659+
key_domain,
660+
grp_domain,
661+
key_distribution,
662+
value_offset,
663+
} = spec;
664+
665665
if batch_rows == 0 {
666666
return exec_err!("benchmark batch_rows must be greater than zero");
667667
}

src/execution_plans/local_exchange_split.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ impl Splitter {
332332
self.hashes.resize(batch.num_rows(), 0);
333333
create_hashes(
334334
&self.evaluated,
335-
&REPARTITION_RANDOM_STATE.random_state(),
335+
REPARTITION_RANDOM_STATE.random_state(),
336336
&mut self.hashes,
337337
)?;
338338
self.metrics.hash_eval_time.add_elapsed(hash_start);
@@ -565,7 +565,7 @@ mod tests {
565565
let mut hashes = vec![0; batch.num_rows()];
566566
create_hashes(
567567
&evaluated,
568-
&REPARTITION_RANDOM_STATE.random_state(),
568+
REPARTITION_RANDOM_STATE.random_state(),
569569
&mut hashes,
570570
)?;
571571

@@ -618,7 +618,8 @@ mod tests {
618618

619619
let actual0 = batches0.iter().flat_map(batch_values).collect::<Vec<_>>();
620620
let actual1 = batches1.iter().flat_map(batch_values).collect::<Vec<_>>();
621-
let expected0 = expected_local_values(&[input_batch.clone()], &hash_exprs, 2, 2, 0)?;
621+
let expected0 =
622+
expected_local_values(std::slice::from_ref(&input_batch), &hash_exprs, 2, 2, 0)?;
622623
let expected1 = expected_local_values(&[input_batch], &hash_exprs, 2, 2, 1)?;
623624

624625
assert_eq!(actual0, expected0);
@@ -715,8 +716,11 @@ mod tests {
715716
int32_batch(Arc::clone(&schema), &[0, 1, 2, 3])?,
716717
int32_batch(Arc::clone(&schema), &[4, 5, 6, 7])?,
717718
];
718-
let input =
719-
TestMemoryExec::try_new_exec(&[input_batches.clone()], Arc::clone(&schema), None)?;
719+
let input = TestMemoryExec::try_new_exec(
720+
std::slice::from_ref(&input_batches),
721+
Arc::clone(&schema),
722+
None,
723+
)?;
720724
let input = Arc::unwrap_or_clone(input).try_with_sort_information(vec![
721725
[datafusion::physical_expr::PhysicalSortExpr::new_default(
722726
Arc::new(Column::new("k", 0)),

0 commit comments

Comments
 (0)