Skip to content

Commit dd3ff33

Browse files
Merge branch 'main' into fix/unknown-column-proto-hooks
2 parents 885cf1b + 50013e5 commit dd3ff33

22 files changed

Lines changed: 3266 additions & 919 deletions

File tree

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use async_trait::async_trait;
2626
use datafusion::arrow::array::{UInt8Builder, UInt64Builder};
2727
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2828
use datafusion::arrow::record_batch::RecordBatch;
29+
use datafusion::common::assert_batches_eq;
2930
use datafusion::datasource::{TableProvider, TableType, provider_as_source};
3031
use datafusion::error::Result;
3132
use datafusion::execution::context::TaskContext;
@@ -52,6 +53,33 @@ pub async fn custom_datasource() -> Result<()> {
5253
search_accounts(db.clone(), Some(col("bank_account").gt(lit(8000u64))), 1).await?;
5354
search_accounts(db.clone(), Some(col("bank_account").gt(lit(200u64))), 2).await?;
5455

56+
// exercise SQL paths that push down non-trivial projections:
57+
// - `SELECT 1 ...` requests no source columns (projection: Some([]))
58+
// - `SELECT COUNT(id) ...` requests a single column (projection: Some([0]))
59+
let ctx = SessionContext::new();
60+
ctx.register_table("accounts", Arc::new(db))?;
61+
let constant_batches = ctx
62+
.sql("SELECT 1 AS a FROM accounts")
63+
.await?
64+
.collect()
65+
.await?;
66+
assert_batches_eq!(
67+
[
68+
"+---+", "| a |", "+---+", "| 1 |", "| 1 |", "| 1 |", "+---+",
69+
],
70+
&constant_batches
71+
);
72+
73+
let count_batches = ctx
74+
.sql("SELECT COUNT(id) AS cnt FROM accounts")
75+
.await?
76+
.collect()
77+
.await?;
78+
assert_batches_eq!(
79+
["+-----+", "| cnt |", "+-----+", "| 3 |", "+-----+",],
80+
&count_batches
81+
);
82+
5583
Ok(())
5684
}
5785

@@ -186,6 +214,7 @@ impl TableProvider for CustomDataSource {
186214
#[derive(Debug, Clone)]
187215
struct CustomExec {
188216
db: CustomDataSource,
217+
projection: Option<Vec<usize>>,
189218
projected_schema: SchemaRef,
190219
cache: Arc<PlanProperties>,
191220
}
@@ -201,6 +230,7 @@ impl CustomExec {
201230
let cache = Self::compute_properties(projected_schema.clone());
202231
Self {
203232
db,
233+
projection: projections.cloned(),
204234
projected_schema,
205235
cache: Arc::new(cache),
206236
}
@@ -262,15 +292,25 @@ impl ExecutionPlan for CustomExec {
262292
account_array.append_value(user.bank_account);
263293
}
264294

295+
// Build a batch holding every column the table can produce, then let
296+
// Arrow drop the columns the query didn't ask for. `RecordBatch::project`
297+
// preserves the row count, which matters when the projection selects
298+
// zero columns (e.g. `SELECT 1 FROM t`).
299+
let full_batch = RecordBatch::try_new(
300+
self.db.schema(),
301+
vec![
302+
Arc::new(id_array.finish()),
303+
Arc::new(account_array.finish()),
304+
],
305+
)?;
306+
let batch = match &self.projection {
307+
Some(indices) => full_batch.project(indices)?,
308+
None => full_batch,
309+
};
310+
265311
Ok(Box::pin(MemoryStream::try_new(
266-
vec![RecordBatch::try_new(
267-
self.projected_schema.clone(),
268-
vec![
269-
Arc::new(id_array.finish()),
270-
Arc::new(account_array.finish()),
271-
],
272-
)?],
273-
self.schema(),
312+
vec![batch],
313+
self.projected_schema.clone(),
274314
None,
275315
)?))
276316
}

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,20 +75,19 @@ in multiple phases.
7575
| 3 | `join_selection` | - | Chooses join implementation, build side, and partition mode from statistics and stream properties. |
7676
| 4 | `LimitedDistinctAggregation` | - | Pushes limit hints into grouped distinct-style aggregations when only a small result is needed. |
7777
| 5 | `FilterPushdown` | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. |
78-
| 6 | `EnforceDistribution` | - | Adds repartitioning only where needed to satisfy physical distribution requirements. |
78+
| 6 | `EnsureRequirements` | - | Enforces both distribution and sorting requirements in a single idempotent rule. |
7979
| 7 | `CombinePartialFinalAggregate` | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. |
80-
| 8 | `EnforceSorting` | - | Adds or removes local sorts to satisfy required input orderings. |
81-
| 9 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
82-
| 10 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
83-
| 11 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
84-
| 12 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
85-
| 13 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
86-
| 14 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
87-
| 15 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
88-
| 16 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
89-
| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
90-
| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
91-
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
92-
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
93-
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
94-
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
80+
| 8 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
81+
| 9 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
82+
| 10 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
83+
| 11 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
84+
| 12 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
85+
| 13 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
86+
| 14 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
87+
| 15 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
88+
| 16 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
89+
| 17 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
90+
| 18 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
91+
| 19 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
92+
| 20 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
93+
| 21 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |

0 commit comments

Comments
 (0)