Skip to content

Commit 217b36b

Browse files
committed
fmt
1 parent c5463ae commit 217b36b

4 files changed

Lines changed: 46 additions & 27 deletions

File tree

datafusion/execution/src/disk_manager.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,6 @@ impl RefCountedTempFile {
391391
self.current_file_disk_usage.load(Ordering::Relaxed)
392392
}
393393

394-
395394
pub fn clone_refcounted(&self) -> Result<Self> {
396395
let reopened = std::fs::File::open(self.path())?;
397396
let temp_path = TempPath::from_path(self.path());

datafusion/physical-plan/src/joins/grace_hash_join/exec.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ use crate::filter_pushdown::{
2020
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
2121
FilterPushdownPropagation,
2222
};
23-
use crate::joins::utils::{
24-
reorder_output_after_swap, swap_join_projection, OnceFut,
25-
};
23+
use crate::joins::utils::{reorder_output_after_swap, swap_join_projection, OnceFut};
2624
use crate::joins::{JoinOn, JoinOnRef, PartitionMode};
2725
use crate::projection::{
2826
try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData,
@@ -33,12 +31,12 @@ use crate::{
3331
common::can_project,
3432
joins::utils::{
3533
build_join_schema, check_join_is_valid, estimate_join_statistics,
36-
symmetric_join_output_partitioning,
37-
BuildProbeJoinMetrics, ColumnIndex, JoinFilter,
34+
symmetric_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
35+
JoinFilter,
3836
},
3937
metrics::{ExecutionPlanMetricsSet, MetricsSet},
40-
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
41-
PlanProperties, SendableRecordBatchStream, Statistics,
38+
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties,
39+
SendableRecordBatchStream, Statistics,
4240
};
4341
use crate::{ExecutionPlanProperties, SpillManager};
4442
use std::fmt;
@@ -52,8 +50,7 @@ use arrow::datatypes::SchemaRef;
5250
use arrow::record_batch::RecordBatch;
5351
use datafusion_common::config::ConfigOptions;
5452
use datafusion_common::{
55-
internal_err, plan_err, project_schema, JoinSide, JoinType,
56-
NullEquality, Result,
53+
internal_err, plan_err, project_schema, JoinSide, JoinType, NullEquality, Result,
5754
};
5855
use datafusion_execution::TaskContext;
5956
use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
@@ -670,7 +667,7 @@ impl ExecutionPlan for GraceHashJoinExec {
670667
spill_right_clone,
671668
partition,
672669
)
673-
.await?;
670+
.await?;
674671
accumulator_clone
675672
.report_partition(partition, left_idx.clone(), right_idx.clone())
676673
.await;
@@ -730,11 +727,11 @@ impl ExecutionPlan for GraceHashJoinExec {
730727
}
731728

732729
if let Some(JoinData {
733-
projected_left_child,
734-
projected_right_child,
735-
join_filter,
736-
join_on,
737-
}) = try_pushdown_through_join(
730+
projected_left_child,
731+
projected_right_child,
732+
join_filter,
733+
join_on,
734+
}) = try_pushdown_through_join(
738735
projection,
739736
self.left(),
740737
self.right(),
@@ -820,7 +817,7 @@ impl ExecutionPlan for GraceHashJoinExec {
820817
let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
821818
assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
822819
let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
823-
// We expect 0 or 1 self filters
820+
// We expect 0 or 1 self filters
824821
if let Some(filter) = right_child_self_filters.first() {
825822
// Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
826823
// "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
@@ -855,7 +852,6 @@ impl ExecutionPlan for GraceHashJoinExec {
855852
}
856853
}
857854

858-
859855
#[allow(clippy::too_many_arguments)]
860856
pub async fn partition_and_spill(
861857
random_state: RandomState,
@@ -883,7 +879,7 @@ pub async fn partition_and_spill(
883879
&join_metrics,
884880
enable_dynamic_filter_pushdown,
885881
)
886-
.await?;
882+
.await?;
887883

888884
// RIGHT side partitioning
889885
let right_index = partition_and_spill_one_side(
@@ -896,7 +892,7 @@ pub async fn partition_and_spill(
896892
&join_metrics,
897893
enable_dynamic_filter_pushdown,
898894
)
899-
.await?;
895+
.await?;
900896
Ok((left_index, right_index))
901897
}
902898

datafusion/physical-plan/src/spill/in_memory_spill_buffer.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
118
use crate::memory::MemoryStream;
219
use crate::spill::spill_manager::GetSlicedSize;
320
use arrow::array::RecordBatch;

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
//! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations.
1919
20-
use std::slice;
2120
use arrow::array::StringViewArray;
2221
use arrow::datatypes::SchemaRef;
2322
use arrow::record_batch::RecordBatch;
2423
use datafusion_execution::runtime_env::RuntimeEnv;
24+
use std::slice;
2525
use std::sync::Arc;
2626

2727
use datafusion_common::{config::SpillCompression, DataFusionError, Result};
@@ -30,8 +30,8 @@ use datafusion_execution::SendableRecordBatchStream;
3030

3131
use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream};
3232
use crate::coop::cooperative;
33-
use crate::{common::spawn_buffered, metrics::SpillMetrics};
3433
use crate::spill::in_memory_spill_buffer::InMemorySpillBuffer;
34+
use crate::{common::spawn_buffered, metrics::SpillMetrics};
3535

3636
/// The `SpillManager` is responsible for the following tasks:
3737
/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations.
@@ -177,7 +177,11 @@ impl SpillManager {
177177

178178
/// Automatically decides whether to spill the given RecordBatch to memory or disk,
179179
/// depending on available memory pool capacity.
180-
pub(crate) fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> Result<SpillLocation> {
180+
pub(crate) fn spill_batch_auto(
181+
&self,
182+
batch: &RecordBatch,
183+
request_msg: &str,
184+
) -> Result<SpillLocation> {
181185
let size = batch.get_sliced_size()?;
182186

183187
// Check current memory usage and total limit from the runtime memory pool
@@ -188,14 +192,16 @@ impl SpillManager {
188192
};
189193

190194
// If there's enough memory (with a safety margin), keep it in memory
191-
if used + size * 3 / 2 <= limit {
195+
if used + size * 3 / 2 <= limit {
192196
let buf = Arc::new(InMemorySpillBuffer::from_batch(batch)?);
193197
self.metrics.spilled_bytes.add(size);
194198
self.metrics.spilled_rows.add(batch.num_rows());
195199
Ok(SpillLocation::Memory(buf))
196200
} else {
197201
// Otherwise spill to disk using the existing SpillManager logic
198-
let Some(file) = self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)? else {
202+
let Some(file) =
203+
self.spill_record_batch_and_finish(slice::from_ref(batch), request_msg)?
204+
else {
199205
return Err(DataFusionError::Execution(
200206
"failed to spill batch to disk".into(),
201207
));
@@ -251,13 +257,14 @@ impl SpillManager {
251257
spill: &SpillLocation,
252258
) -> Result<SendableRecordBatchStream> {
253259
match spill {
254-
SpillLocation::Memory(buf) => Ok(Arc::clone(buf).as_stream(Arc::clone(&self.schema))?),
260+
SpillLocation::Memory(buf) => {
261+
Ok(Arc::clone(buf).as_stream(Arc::clone(&self.schema))?)
262+
}
255263
SpillLocation::Disk(file) => self.read_spill_as_stream_ref(file),
256264
}
257265
}
258266
}
259267

260-
261268
#[derive(Debug, Clone)]
262269
pub enum SpillLocation {
263270
Memory(Arc<InMemorySpillBuffer>),

0 commit comments

Comments
 (0)