Skip to content

Commit 41c5be9

Browse files
authored
Merge branch 'main' into dynamic-disk-limit
2 parents 9478f44 + 66f82af commit 41c5be9

9 files changed

Lines changed: 1030 additions & 500 deletions

File tree

datafusion/functions-aggregate-common/src/min_max.rs

Lines changed: 432 additions & 406 deletions
Large diffs are not rendered by default.

datafusion/physical-expr-common/src/binary_view_map.rs

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,11 +339,28 @@ where
339339
payload
340340
} else {
341341
// no existing value, make a new one
342-
let value: &[u8] = values.value(i).as_ref();
343-
let payload = make_payload_fn(Some(value));
342+
let (new_view, payload) = if len <= 12 {
343+
// Inline path: bytes are already packed in view_u128.
344+
// The inline ByteView format is [len:u32 LE][data:12 bytes zero-padded],
345+
// so extracting bytes from the u128 avoids a round-trip through
346+
// values.value(i) (which reads the views buffer and returns the same slice).
347+
let view_bytes = view_u128.to_le_bytes();
348+
let value = &view_bytes[4..4 + len as usize];
349+
let payload = make_payload_fn(Some(value));
350+
// For inline strings, the stored view is identical to the input view:
351+
// make_view(value, 0, 0) produces the same u128 as view_u128.
352+
//
353+
// SAFETY: view_u128 was a valid view, and the enclosing `len <= 12`
354+
// ensures it is inline
355+
let new_view = unsafe { self.append_inline_view(view_u128) };
356+
(new_view, payload)
357+
} else {
358+
let value: &[u8] = values.value(i).as_ref();
359+
let payload = make_payload_fn(Some(value));
360+
let new_view = self.append_value(value);
361+
(new_view, payload)
362+
};
344363

345-
// Create view pointing to our buffers
346-
let new_view = self.append_value(value);
347364
let new_header = Entry {
348365
view: new_view,
349366
hash,
@@ -389,6 +406,26 @@ where
389406
}
390407
}
391408

409+
/// Append an already-computed inline view (len <= 12) directly, bypassing
410+
/// buffer allocation.
411+
///
412+
/// Returns the view that was stored (identical to the argument).
413+
///
414+
/// # Safety
415+
///
416+
/// `view` must be a valid inline `ByteView`: the length field in the low
417+
/// 32 bits must be <= 12, and the remaining 12 bytes must hold the
418+
/// value's bytes (zero-padded if shorter). Calling with a non-inline view
419+
/// would store a value that downstream `views` consumers interpret as
420+
/// `[buffer_index, offset]` into the `completed`/`in_progress` buffers,
421+
/// which is unsound for any view that didn't originate from a real
422+
/// allocation in those buffers.
423+
unsafe fn append_inline_view(&mut self, view: u128) -> u128 {
424+
self.views.push(view);
425+
self.nulls.append_non_null();
426+
view
427+
}
428+
392429
/// Append a value to our buffers and return the view pointing to it
393430
fn append_value(&mut self, value: &[u8]) -> u128 {
394431
let len = value.len();

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 63 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ use datafusion_common::stats::Precision;
5252
use datafusion_common::tree_node::TreeNodeRecursion;
5353
use datafusion_common::utils::transpose;
5454
use datafusion_common::{
55-
ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err,
56-
internal_datafusion_err, internal_err,
55+
ColumnStatistics, DataFusionError, HashMap, assert_or_internal_err, internal_err,
5756
};
5857
use datafusion_common::{Result, not_impl_err};
5958
use datafusion_common_runtime::SpawnedTask;
@@ -681,46 +680,8 @@ impl BatchPartitioner {
681680
// Finished building index-arrays for output partitions
682681
timer.done();
683682

684-
// Borrowing partitioner timer to prevent moving `self` to closure
685-
let partitioner_timer = &self.timer;
686-
687-
let mut partitioned_batches = vec![];
688-
for (partition, p_indices) in indices.iter_mut().enumerate() {
689-
if !p_indices.is_empty() {
690-
let taken_indices = std::mem::take(p_indices);
691-
let indices_array: PrimitiveArray<UInt32Type> =
692-
taken_indices.into();
693-
694-
// Tracking time required for repartitioned batches construction
695-
let _timer = partitioner_timer.timer();
696-
697-
// Produce batches based on indices
698-
let columns =
699-
take_arrays(batch.columns(), &indices_array, None)?;
700-
701-
let mut options = RecordBatchOptions::new();
702-
options = options.with_row_count(Some(indices_array.len()));
703-
let batch = RecordBatch::try_new_with_options(
704-
batch.schema(),
705-
columns,
706-
&options,
707-
)
708-
.unwrap();
709-
710-
partitioned_batches.push(Ok((partition, batch)));
711-
712-
// Return the taken vec
713-
let (_, buffer, _) = indices_array.into_parts();
714-
let mut vec =
715-
buffer.into_inner().into_vec::<u32>().map_err(|e| {
716-
internal_datafusion_err!(
717-
"Could not convert buffer to vec: {e:?}"
718-
)
719-
})?;
720-
vec.clear();
721-
*p_indices = vec;
722-
}
723-
}
683+
let partitioned_batches =
684+
Self::partition_grouped_take(&batch, indices, &self.timer)?;
724685

725686
Box::new(partitioned_batches.into_iter())
726687
}
@@ -736,6 +697,66 @@ impl BatchPartitioner {
736697
BatchPartitionerState::Hash { indices, .. } => indices.len(),
737698
}
738699
}
700+
701+
/// Build repartitioned hash output batches using one `take` per input batch.
702+
///
703+
/// The hash router first fills one index vector per output partition. This method
704+
/// concatenates those index vectors, performs one grouped `take_arrays`, and
705+
/// then returns each output partition as a slice of the reordered batch.
706+
///
707+
/// For example, given partition indices:
708+
///
709+
/// ```text
710+
/// partition 0: [2, 5]
711+
/// partition 1: []
712+
/// partition 2: [0, 3, 4]
713+
/// ```
714+
///
715+
/// this method takes rows in `[2, 5, 0, 3, 4]` order once, then returns
716+
/// `partition 0 = slice(0, 2)` and `partition 2 = slice(2, 3)`.
717+
fn partition_grouped_take(
718+
batch: &RecordBatch,
719+
indices: &mut [Vec<u32>],
720+
timer: &metrics::Time,
721+
) -> Result<Vec<Result<(usize, RecordBatch)>>> {
722+
let mut partition_ranges = Vec::with_capacity(indices.len());
723+
let mut reordered_indices = Vec::with_capacity(batch.num_rows());
724+
725+
for (partition, p_indices) in indices.iter_mut().enumerate() {
726+
if p_indices.is_empty() {
727+
continue;
728+
}
729+
730+
let start = reordered_indices.len();
731+
reordered_indices.extend_from_slice(p_indices);
732+
partition_ranges.push((partition, start, p_indices.len()));
733+
p_indices.clear();
734+
}
735+
736+
if reordered_indices.is_empty() {
737+
return Ok(vec![]);
738+
}
739+
740+
let batches = {
741+
let _timer = timer.timer();
742+
let indices_array: PrimitiveArray<UInt32Type> = reordered_indices.into();
743+
let columns = take_arrays(batch.columns(), &indices_array, None)?;
744+
745+
let mut options = RecordBatchOptions::new();
746+
options = options.with_row_count(Some(indices_array.len()));
747+
let reordered_batch =
748+
RecordBatch::try_new_with_options(batch.schema(), columns, &options)?;
749+
750+
partition_ranges
751+
.into_iter()
752+
.map(|(partition, start, len)| {
753+
Ok((partition, reordered_batch.slice(start, len)))
754+
})
755+
.collect()
756+
};
757+
758+
Ok(batches)
759+
}
739760
}
740761

741762
/// Maps `N` input partitions to `M` output partitions based on a

datafusion/sql/src/statement.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -886,14 +886,18 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
886886
"Execute statement with DEFAULT is not supported"
887887
);
888888
}
889+
let name = name.ok_or_else(|| {
890+
plan_datafusion_err!("EXECUTE statement requires a name")
891+
})?;
892+
889893
let empty_schema = DFSchema::empty();
890894
let parameters = parameters
891895
.into_iter()
892896
.map(|expr| self.sql_to_expr(expr, &empty_schema, planner_context))
893897
.collect::<Result<Vec<Expr>>>()?;
894898

895899
Ok(LogicalPlan::Statement(PlanStatement::Execute(Execute {
896-
name: object_name_to_string(&name.unwrap()),
900+
name: object_name_to_string(&name),
897901
parameters,
898902
})))
899903
}

datafusion/sql/src/unparser/plan.rs

Lines changed: 119 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ use datafusion_common::{
4848
};
4949
use datafusion_expr::expr::{OUTER_REFERENCE_COLUMN_PREFIX, UNNEST_COLUMN_PREFIX};
5050
use datafusion_expr::{
51-
BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
51+
Aggregate, BinaryExpr, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan,
5252
LogicalPlanBuilder, Operator, Projection, SortExpr, TableScan, Unnest,
53-
UserDefinedLogicalNode, expr::Alias,
53+
UserDefinedLogicalNode, Window, expr::Alias,
5454
};
5555
use sqlparser::ast::{self, Ident, OrderByKind, SetExpr, TableAliasColumnDef};
5656
use std::{sync::Arc, vec};
@@ -478,6 +478,80 @@ impl Unparser<'_> {
478478
Ok(false)
479479
}
480480

481+
fn project_window_output(
482+
&self,
483+
window_expr: &[Expr],
484+
select: &mut SelectBuilder,
485+
agg: Option<&Aggregate>,
486+
) -> Result<()> {
487+
let mut items = if select.already_projected() {
488+
select.pop_projections()
489+
} else {
490+
vec![ast::SelectItem::Wildcard(
491+
ast::WildcardAdditionalOptions::default(),
492+
)]
493+
};
494+
495+
items.extend(
496+
window_expr
497+
.iter()
498+
.map(|expr| {
499+
let expr = if let Some(agg) = agg {
500+
unproject_agg_exprs(expr.clone(), agg, None)?
501+
} else {
502+
expr.clone()
503+
};
504+
self.select_item_to_sql(&expr)
505+
})
506+
.collect::<Result<Vec<_>>>()?,
507+
);
508+
select.projection(items);
509+
510+
Ok(())
511+
}
512+
513+
fn window_input_requires_derived_subquery(plan: &LogicalPlan) -> bool {
514+
// These operators either produce a SELECT list or apply SQL clauses
515+
// that are evaluated after window functions in a single SELECT block.
516+
// Keep them below the Window node by emitting a derived table.
517+
matches!(
518+
plan,
519+
LogicalPlan::Projection(_)
520+
| LogicalPlan::Distinct(_)
521+
| LogicalPlan::Limit(_)
522+
| LogicalPlan::Sort(_)
523+
| LogicalPlan::Union(_)
524+
)
525+
}
526+
527+
fn window_to_sql_with_derived_input(
528+
&self,
529+
window: &Window,
530+
select: &mut SelectBuilder,
531+
relation: &mut RelationBuilder,
532+
) -> Result<()> {
533+
let input_alias = "derived_window_input";
534+
self.derive(
535+
window.input.as_ref(),
536+
relation,
537+
Some(self.new_table_alias(input_alias.to_string(), vec![])),
538+
false,
539+
)?;
540+
541+
let input_schema = window.input.schema();
542+
let mut alias_rewriter = TableAliasRewriter {
543+
table_schema: input_schema.as_arrow(),
544+
alias_name: TableReference::bare(input_alias),
545+
};
546+
let window_expr = window
547+
.window_expr
548+
.iter()
549+
.map(|expr| expr.clone().rewrite(&mut alias_rewriter).data())
550+
.collect::<Result<Vec<_>>>()?;
551+
552+
self.project_window_output(&window_expr, select, None)
553+
}
554+
481555
#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
482556
fn select_to_sql_recursively(
483557
&self,
@@ -610,25 +684,28 @@ impl Unparser<'_> {
610684
self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
611685
}
612686
LogicalPlan::Filter(filter) => {
613-
if let Some(agg) =
614-
find_agg_node_within_select(plan, select.already_projected())
687+
let window = find_window_nodes_within_select(
688+
plan,
689+
None,
690+
select.already_projected(),
691+
);
692+
let agg = find_agg_node_within_select(plan, select.already_projected());
693+
694+
if let (Some(window), true) =
695+
(window.as_deref(), self.dialect.supports_qualify())
615696
{
697+
let mut unprojected =
698+
unproject_window_exprs(filter.predicate.clone(), window)?;
699+
if let Some(agg) = agg {
700+
unprojected = unproject_agg_exprs(unprojected, agg, None)?;
701+
}
702+
let filter_expr = self.expr_to_sql(&unprojected)?;
703+
select.qualify(Some(filter_expr));
704+
} else if let Some(agg) = agg {
616705
let unprojected =
617706
unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
618707
let filter_expr = self.expr_to_sql(&unprojected)?;
619708
select.having(Some(filter_expr));
620-
} else if let (Some(window), true) = (
621-
find_window_nodes_within_select(
622-
plan,
623-
None,
624-
select.already_projected(),
625-
),
626-
self.dialect.supports_qualify(),
627-
) {
628-
let unprojected =
629-
unproject_window_exprs(filter.predicate.clone(), &window)?;
630-
let filter_expr = self.expr_to_sql(&unprojected)?;
631-
select.qualify(Some(filter_expr));
632709
} else {
633710
let filter_expr = self.expr_to_sql(&filter.predicate)?;
634711
select.selection(Some(filter_expr));
@@ -1143,13 +1220,37 @@ impl Unparser<'_> {
11431220
Ok(())
11441221
}
11451222
LogicalPlan::Window(window) => {
1146-
// Window nodes are handled simultaneously with Projection nodes
1223+
// Window nodes are usually handled simultaneously with Projection
1224+
// nodes, where projected columns are unprojected back into their
1225+
// corresponding window expressions. Manually built plans can have
1226+
// Window nodes without an enclosing Projection, so in that case
1227+
// the Window node itself must contribute its output expressions.
1228+
let project_window_output = !select.already_projected();
1229+
if project_window_output
1230+
&& Self::window_input_requires_derived_subquery(window.input.as_ref())
1231+
{
1232+
return self
1233+
.window_to_sql_with_derived_input(window, select, relation);
1234+
}
1235+
1236+
let agg = if project_window_output {
1237+
find_agg_node_within_select(plan, false)
1238+
} else {
1239+
None
1240+
};
1241+
11471242
self.select_to_sql_recursively(
11481243
window.input.as_ref(),
11491244
query,
11501245
select,
11511246
relation,
1152-
)
1247+
)?;
1248+
1249+
if project_window_output {
1250+
self.project_window_output(&window.window_expr, select, agg)?;
1251+
}
1252+
1253+
Ok(())
11531254
}
11541255
LogicalPlan::EmptyRelation(_) => {
11551256
// An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,

datafusion/sql/src/unparser/utils.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,12 @@ pub(crate) fn find_agg_node_within_select(
5454
// Agg nodes explicitly return immediately with a single node
5555
if let LogicalPlan::Aggregate(agg) = input {
5656
Some(agg)
57-
} else if let LogicalPlan::TableScan(_) = input {
57+
} else if matches!(
58+
input,
59+
LogicalPlan::TableScan(_)
60+
| LogicalPlan::Subquery(_)
61+
| LogicalPlan::SubqueryAlias(_)
62+
) {
5863
None
5964
} else if let LogicalPlan::Projection(_) = input {
6065
if already_projected {

0 commit comments

Comments
 (0)