Skip to content

Commit 2014a94

Browse files
committed
fix e2e sql tests.
1 parent d5b2f3c commit 2014a94

4 files changed

Lines changed: 8 additions & 6 deletions

File tree

datafusion/execution/src/runtime_env.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ impl RuntimeEnvBuilder {
263263
(pool, disk_manager)
264264
} else {
265265
(
266-
Arc::new(UnboundedMemoryPool::default()),
266+
Arc::new(UnboundedMemoryPool::default()) as _,
267267
DiskManagerConfig::Disabled,
268268
)
269269
};

datafusion/physical-expr/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub mod execution_props {
4747
pub use datafusion_expr::var_provider::{VarProvider, VarType};
4848
}
4949

50-
pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, FlatNullState};
50+
pub use aggregate::groups_accumulator::{FlatNullState, GroupsAccumulatorAdapter};
5151
pub use analysis::{analyze, AnalysisContext, ExprBoundaries};
5252
pub use equivalence::{
5353
calculate_union, AcrossPartitions, ConstExpr, EquivalenceProperties,

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use datafusion_common::{internal_err, DataFusionError, Result};
4242
use datafusion_execution::disk_manager::RefCountedTempFile;
4343
use datafusion_execution::memory_pool::proxy::VecAllocExt;
4444
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
45-
use datafusion_execution::{DiskManager, TaskContext};
45+
use datafusion_execution::TaskContext;
4646
use datafusion_expr::{EmitTo, GroupsAccumulator};
4747
use datafusion_physical_expr::expressions::Column;
4848
use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr};
@@ -872,9 +872,9 @@ impl Stream for GroupedHashAggregateStream {
872872
// - If found `Err`, throw it, end this stream abnormally
873873
// - If found `None`, it means all blocks are polled, end this stream normally
874874
// - If found `Some`, return it and wait next polling
875-
let emit_result = self.emit(emit_to, false);
875+
let emit_result = self.emit(EmitTo::NextBlock, false);
876876
let Ok(batch_opt) = emit_result else {
877-
return Poll::Ready(Some(emit_result));
877+
return Poll::Ready(Some(Err(emit_result.unwrap_err())));
878878
};
879879

880880
let Some(batch) = batch_opt else {
@@ -888,7 +888,7 @@ impl Stream for GroupedHashAggregateStream {
888888
continue;
889889
};
890890

891-
debug_assert!(output_batch.num_rows() > 0);
891+
debug_assert!(batch.num_rows() > 0);
892892
return Poll::Ready(Some(Ok(
893893
batch.record_output(&self.baseline_metrics)
894894
)));

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ datafusion.catalog.newlines_in_values false
217217
datafusion.execution.batch_size 8192
218218
datafusion.execution.coalesce_batches true
219219
datafusion.execution.collect_statistics false
220+
datafusion.execution.enable_aggregation_blocked_groups true
220221
datafusion.execution.enable_recursive_ctes true
221222
datafusion.execution.enforce_batch_size_in_joins false
222223
datafusion.execution.keep_partition_by_columns false
@@ -317,6 +318,7 @@ datafusion.catalog.newlines_in_values false Specifies whether newlines in (quote
317318
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
318319
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
319320
datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files
321+
datafusion.execution.enable_aggregation_blocked_groups true Should DataFusion use the the blocked approach to manage the groups values and their related states in accumulators. By default, the blocked approach will be used. And the blocked approach allocates capacity for the block based on a predefined block size firstly. When the block reaches its limit, we allocate a new block (also with the same predefined block size based capacity) instead of expanding the current one and copying the data. If setting this flag to `false`, will fall-back to use the single approach, values are managed within a single large block(can think of it as a Vec). As this block grows, it often triggers numerous copies, resulting in poor performance.
320322
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
321323
datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower.
322324
datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches

0 commit comments

Comments
 (0)