Add reusable plan-time schema alignment helper and apply to RecursiveQueryExec#21912
Conversation
…rmalization - Introduced a new function `normalize_batch_schema` which aligns the schema of a `RecordBatch` to an expected schema, handling cases where field names may differ while maintaining data types. - Added extensive documentation outlining behavior for various scenarios including matching schemas, positional data type alignment, and error handling for mismatches. - Implemented unit tests to validate functionality, covering scenarios of schema normalization, field renaming, and error conditions. - Updated relevant parts of the codebase to utilize the new normalization function, ensuring consistent output schema in recursive query execution plans.
- Use `schema_ref()` instead of `schema()` to avoid cloning the Arc<Schema>. - Replace imperative loop with early return using `find` for data type mismatch checks. - Eliminate unnecessary cloning of the batch in tests. - Remove redundant assertions for field names as schema equality already verifies them.
- Reordered the imports in `common.rs` for better readability. - Removed commented-out lines to clean up the code. - Enhanced code clarity by making minor adjustments to existing comments.
- Updated the documentation for schema normalization to clarify the conditions under which the batch is returned unchanged. - Added a test to verify that schema normalization correctly strips metadata from the RecordBatch when the expected schema does not contain it. - Provided additional assertions to ensure zero-copy behavior in schema normalization. - Improved code comments for better clarity on the usage of the `normalize_batch_schema` function across various modules.
…iple files - Improved formatting for better readability in `common.rs` - Removed redundant lines in `lib.rs` by consolidating `normalize_batch_schema` import - Reordered imports in `recursive_query.rs` for consistency and clarity
…chema Updated the comment in `common.rs` to use the full crate path for `ExecutionPlan::schema`. This improves clarity for users reading the documentation regarding output schema.
|
run benchmark datafusion-physical-plan |
|
run benchmark physical_plan |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing normalization-schema-21910 (022c4a5) to e8a93bb (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing normalization-schema-21910 (022c4a5) to e8a93bb (merge-base) diff File an issue against this benchmark runner |
|
show benchmark queue |
|
Hi @kosiew, you asked to view the benchmark queue (#21912 (comment)).
File an issue against this benchmark runner |
|
🤖 Criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
…ization in `RecordBatch` This commit introduces a reusable helper function `normalize_batch_schema` in the `datafusion-physical-plan` module, which allows for zero-copy normalization of schema names in `RecordBatch` outputs. The helper ensures efficient schema renaming by replacing only the `Arc<Schema>` wrapper without copying the underlying column buffers. An operator audit has been included, confirming that operators like `UnionExec` and various join types are safe without this helper, while `RecursiveQueryExec` has been updated to utilize it, following up on PR apache#21770.
…tion and cleaning up content
| /// a recursive CTE—may produce batches whose field *names* differ from the declared | ||
| /// schema even though the data types are identical. Downstream consumers that key on |
There was a problem hiding this comment.
This seems like a bug to me -- if the recursive CTE is producing the wrong schema then we should fix the CTE output (e.g. via a ProjectionExec), not patch over the problem with a rename 🤔
There was a problem hiding this comment.
You're right.
Implemented ProjectionExec fix
| ) -> Poll<Option<Result<RecordBatch>>> { | ||
| let baseline_metrics = self.baseline_metrics.clone(); | ||
|
|
||
| // Rebind to the declared output schema. The recursive term is planned |
There was a problem hiding this comment.
It seems like we should fix this via a ProjectionExec rather than just swapping out the schema 🤔
There was a problem hiding this comment.
Implemented ProjectionExec fix
- Implemented recursive CTE schema-name alignment during plan construction. - Removed public RecordBatch schema-normalization helper/re-export. - Eliminated stream-time batch rebinding functionality. - Added unit tests for projection insertion and error handling cases. - Updated expectations for explain/sqllogictest.
normalize_batch_schema helper and migrate RecursiveQueryExec to enforce output schema consistency…eQueryExec - Added `project_plan_to_schema` utility in `datafusion/physical-plan/src/common.rs` - Updated `RecursiveQueryExec` to align recursive-term schemas at plan time - Added tests for: - Schema matching - Alias projection - Metadata preservation - Mismatch errors - Recursive query integration
…lation - Modified the initial selection in `oom_recursive_cte` to filter out NULL values by selecting from a VALUES construct. - Changed the level calculation in the `region_sales` CTE test to use `SUM(0)` instead of `0` for consistency in aggregation.
Address regression in recursive CTE handling by replacing the strict projection with a specific alignment path. This ensures proper column alignment, maintains nullability, and validates type compatibility, while preserving the output schema contract for planner checks. Move helper tests to common.rs and retain integration coverage in recursive_query.rs. Restore behavior in cte.slt. Co-authored-by: Copilot <copilot@github.com>
…on tests - Switched construction path to shared helper at line 92 - Removed private align_recursive_term_to_static_schema logic at line 391 - Updated regression test to match nullability and assert non-null result schema at line 506 - Added new regression test to reject nullability mismatch at line 528
…ailures in common.rs - Replaced `internal_err!` with `plan_err!` for the following validation failures: - Column count mismatch - Schema metadata mismatch - Field compatibility mismatch (including nullability) - Retained the earlier recursive-query fix using the shared helper from `recursive_query.rs:94`.
…rsive_query.rs - Improved mismatch detection at common.rs to differentiate between: - Data type mismatch - Nullability mismatch - Field metadata mismatch - Updated error messaging to be attribute-specific at common.rs - Modified unit test assertions in common.rs: - Updated recursive query test assertion in recursive_query.rs
| .with_query( | ||
| "WITH RECURSIVE nodes AS ( | ||
| SELECT 1 as id | ||
| SELECT id FROM (VALUES (1), (NULL)) AS t(id) WHERE id IS NOT NULL |
There was a problem hiding this comment.
why is this change needed? It is not clear to me if this is needed for this PR 🤔
THe prior query seem valid as well
There was a problem hiding this comment.
SELECT 1 as id
would fail with
Error during planning: Cannot project plan column 0 ('id') to expected output field 'id': field nullability differs (input field: Field { name: "id", data_type: Int64, nullable: true }, expected field: Field { name: "id", data_type: Int64 })
under the stricter nullability check.
Should I amend the check to allow safe nullability widening (non-null -> nullable)?
There was a problem hiding this comment.
I think so -- otherwise queries that used to run will start to error, which would likely be interpreted by users as a regression
| ) -> Poll<Option<Result<RecordBatch>>> { | ||
| let baseline_metrics = self.baseline_metrics.clone(); | ||
|
|
||
| // Rebind to the declared output schema. The recursive term is planned |
- Reverted oom_recursive_cte query to use SELECT 1 as id. - Updated schema alignment to permit safe nullability widening (non-null to nullable) through same-type CastExpr. - Enhanced recursive CTE to reconcile output nullability across static and recursive terms, ensuring alignment of both plans. - Rejected nullability narrowing.
|
looks good to me, FWIW - thansk @kosiew |
|
@alamb |
… apply to RecursiveQueryExec (apache#21912)
Which issue does this PR close?
Rationale for this change
Physical plans in DataFusion can expose schemas that differ from their declared output schema, particularly when combining independently planned branches such as in recursive CTEs. This mismatch can lead to inconsistencies observed by downstream operators or consumers that rely on field names or schema metadata.
Previously, schema alignment for recursive queries was handled at the
RecordBatchlevel during execution, which can obscure contract violations in the physical plan and make behavior harder to audit.This change introduces a reusable execution-layer helper to align schemas at plan construction time, ensuring that child plans conform to the expected schema before execution.
What changes are included in this PR?
Introduce
project_plan_to_schemaindatafusion/physical-plan/src/common.rs:ProjectionExecto align field names when schemas are positionally compatible.Update
RecursiveQueryExec:project_plan_to_schemato the recursive term during construction.RecursiveQueryStream.Adjust tests and expected plans to reflect consistent field naming:
Are these changes tested?
Yes. The following tests are included:
In
common.rs:project_plan_to_schema_returns_input_when_schema_matchesproject_plan_to_schema_aliases_field_names_with_projection_execproject_plan_to_schema_preserves_matching_metadata_while_renamingproject_plan_to_schema_errors_on_column_count_mismatchproject_plan_to_schema_errors_on_type_mismatchproject_plan_to_schema_errors_on_nullability_mismatchproject_plan_to_schema_errors_on_field_metadata_mismatchproject_plan_to_schema_errors_on_schema_metadata_mismatchIn
recursive_query.rs:recursive_query_exec_projects_recursive_term_to_reconciled_schemarecursive_query_exec_rejects_nullability_mismatchUpdates to existing sqllogictest cases in
cte.sltand explain plan expectations.Are there any user-facing changes?
No direct user-facing API changes are introduced.
However, physical plans for recursive queries now consistently expose the declared schema at plan time, which may result in more consistent field names in explain plans and downstream consumers.
LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.