Skip to content

Add reusable plan-time schema alignment helper and apply to RecursiveQueryExec#21912

Merged
kosiew merged 17 commits into
apache:mainfrom
kosiew:normalization-schema-21910
May 6, 2026
Merged

Add reusable plan-time schema alignment helper and apply to RecursiveQueryExec#21912
kosiew merged 17 commits into
apache:mainfrom
kosiew:normalization-schema-21910

Conversation

@kosiew
Copy link
Copy Markdown
Contributor

@kosiew kosiew commented Apr 29, 2026

Which issue does this PR close?


Rationale for this change

Physical plans in DataFusion can expose schemas that differ from their declared output schema, particularly when combining independently planned branches such as in recursive CTEs. This mismatch can lead to inconsistencies observed by downstream operators or consumers that rely on field names or schema metadata.

Previously, schema alignment for recursive queries was handled at the RecordBatch level during execution, which can obscure contract violations in the physical plan and make behavior harder to audit.

This change introduces a reusable execution-layer helper to align schemas at plan construction time, ensuring that child plans conform to the expected schema before execution.


What changes are included in this PR?

  • Introduce project_plan_to_schema in datafusion/physical-plan/src/common.rs:

    • Returns the input plan unchanged when schemas match.
    • Applies a ProjectionExec to align field names when schemas are positionally compatible.
    • Validates column count, data types, nullability, and metadata before applying projection.
    • Produces clear errors when alignment is not possible.
  • Update RecursiveQueryExec:

    • Apply project_plan_to_schema to the recursive term during construction.
    • Remove batch-level schema rebinding logic from RecursiveQueryStream.
  • Adjust tests and expected plans to reflect consistent field naming:

    • Updated recursive CTE tests and explain output expectations.

Are these changes tested?

Yes. The following tests are included:

  • In common.rs:

    • project_plan_to_schema_returns_input_when_schema_matches
    • project_plan_to_schema_aliases_field_names_with_projection_exec
    • project_plan_to_schema_preserves_matching_metadata_while_renaming
    • project_plan_to_schema_errors_on_column_count_mismatch
    • project_plan_to_schema_errors_on_type_mismatch
    • project_plan_to_schema_errors_on_nullability_mismatch
    • project_plan_to_schema_errors_on_field_metadata_mismatch
    • project_plan_to_schema_errors_on_schema_metadata_mismatch
  • In recursive_query.rs:

    • recursive_query_exec_projects_recursive_term_to_reconciled_schema
    • recursive_query_exec_rejects_nullability_mismatch
  • Updates to existing sqllogictest cases in cte.slt and explain plan expectations.


Are there any user-facing changes?

No direct user-facing API changes are introduced.

However, physical plans for recursive queries now consistently expose the declared schema at plan time, which may result in more consistent field names in explain plans and downstream consumers.


LLM-generated code disclosure

This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.

kosiew added 4 commits April 29, 2026 16:03
…rmalization

- Introduced a new function `normalize_batch_schema` which aligns the schema of a `RecordBatch` to an expected schema, handling cases where field names may differ while maintaining data types.
- Added extensive documentation outlining behavior for various scenarios including matching schemas, positional data type alignment, and error handling for mismatches.
- Implemented unit tests to validate functionality, covering scenarios of schema normalization, field renaming, and error conditions.
- Updated relevant parts of the codebase to utilize the new normalization function, ensuring consistent output schema in recursive query execution plans.
- Use `schema_ref()` instead of `schema()` to avoid cloning the Arc<Schema>.
- Replace imperative loop with early return using `find` for data type mismatch checks.
- Eliminate unnecessary cloning of the batch in tests.
- Remove redundant assertions for field names as schema equality already verifies them.
- Reordered the imports in `common.rs` for better readability.
- Removed commented-out lines to clean up the code.
- Enhanced code clarity by making minor adjustments to existing comments.
- Updated the documentation for schema normalization to clarify the conditions under which the batch is returned unchanged.
- Added a test to verify that schema normalization correctly strips metadata from the RecordBatch when the expected schema does not contain it.
- Provided additional assertions to ensure zero-copy behavior in schema normalization.
- Improved code comments for better clarity on the usage of the `normalize_batch_schema` function across various modules.
@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Apr 29, 2026
kosiew added 2 commits April 29, 2026 16:21
…iple files

- Improved formatting for better readability in `common.rs`
- Removed redundant lines in `lib.rs` by consolidating `normalize_batch_schema` import
- Reordered imports in `recursive_query.rs` for consistency and clarity
…chema

Updated the comment in `common.rs` to use the full crate path for `ExecutionPlan::schema`. This improves clarity for users reading the documentation regarding output schema.
@kosiew
Copy link
Copy Markdown
Contributor Author

kosiew commented Apr 29, 2026

run benchmark datafusion-physical-plan

@kosiew
Copy link
Copy Markdown
Contributor Author

kosiew commented Apr 29, 2026

run benchmark physical_plan

@adriangbot
Copy link
Copy Markdown

🤖 Criterion benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4342034089-1906-g5v7p 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing normalization-schema-21910 (022c4a5) to e8a93bb (merge-base) diff
BENCH_NAME=datafusion-physical-plan
BENCH_COMMAND=cargo bench --features=parquet --bench datafusion-physical-plan
BENCH_FILTER=
Results will be posted here when complete


File an issue against this benchmark runner

@adriangbot
Copy link
Copy Markdown

Benchmark for this request failed.

Last 20 lines of output:

Click to expand
    struct_query_sql
    substr
    substr_index
    substring
    sum
    to_char
    to_hex
    to_local_time
    to_time
    to_timestamp
    topk_aggregate
    topk_repartition
    translate
    trim
    trunc
    unhex
    upper
    uuid
    window_query_sql
    with_hashes

File an issue against this benchmark runner

@adriangbot
Copy link
Copy Markdown

🤖 Criterion benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4342043968-1907-kp5bh 6.12.55+ #1 SMP Sun Feb 1 08:59:41 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing normalization-schema-21910 (022c4a5) to e8a93bb (merge-base) diff
BENCH_NAME=physical_plan
BENCH_COMMAND=cargo bench --features=parquet --bench physical_plan
BENCH_FILTER=
Results will be posted here when complete


File an issue against this benchmark runner

@kosiew
Copy link
Copy Markdown
Contributor Author

kosiew commented Apr 29, 2026

show benchmark queue

@adriangbot
Copy link
Copy Markdown

Hi @kosiew, you asked to view the benchmark queue (#21912 (comment)).

Comment Repo PR User Benchmarks Status
#4342043968 apache/datafusion #21912 kosiew ["physical_plan"] running

File an issue against this benchmark runner

@adriangbot
Copy link
Copy Markdown

🤖 Criterion benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

group                               main                                   normalization-schema-21910
-----                               ----                                   --------------------------
interleave_batches                  1.00    149.2±0.79µs        ? ?/sec    1.00    149.2±0.65µs        ? ?/sec
merge_batches_no_overlap_large      1.00    140.4±0.84µs        ? ?/sec    1.01    141.1±0.84µs        ? ?/sec
merge_batches_no_overlap_small      1.00    142.1±0.57µs        ? ?/sec    1.00    142.0±0.80µs        ? ?/sec
merge_batches_small_into_large      1.00     94.6±0.54µs        ? ?/sec    1.00     95.0±0.58µs        ? ?/sec
merge_batches_some_overlap_large    1.00    144.9±0.70µs        ? ?/sec    1.00    144.5±0.64µs        ? ?/sec
merge_batches_some_overlap_small    1.00    145.5±0.45µs        ? ?/sec    1.00    145.3±0.36µs        ? ?/sec

Resource Usage

base (merge-base)

Metric Value
Wall time 65.0s
Peak memory 5.3 GiB
Avg memory 5.2 GiB
CPU user 80.8s
CPU sys 4.0s
Peak spill 0 B

branch

Metric Value
Wall time 65.0s
Peak memory 5.3 GiB
Avg memory 5.2 GiB
CPU user 80.3s
CPU sys 3.2s
Peak spill 0 B

File an issue against this benchmark runner

@kosiew kosiew marked this pull request as ready for review April 29, 2026 08:52
…ization in `RecordBatch`

This commit introduces a reusable helper function `normalize_batch_schema` in the `datafusion-physical-plan` module, which allows for zero-copy normalization of schema names in `RecordBatch` outputs. The helper ensures efficient schema renaming by replacing only the `Arc<Schema>` wrapper without copying the underlying column buffers.

An operator audit has been included, confirming that operators like `UnionExec` and various join types are safe without this helper, while `RecursiveQueryExec` has been updated to utilize it, following up on PR apache#21770.
@github-actions github-actions Bot added the development-process Related to development process of DataFusion label Apr 29, 2026
@github-actions github-actions Bot removed the development-process Related to development process of DataFusion label Apr 29, 2026
Copy link
Copy Markdown
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thnks @kosiew

Comment thread datafusion/physical-plan/src/common.rs Outdated
Comment on lines +45 to +46
/// a recursive CTE—may produce batches whose field *names* differ from the declared
/// schema even though the data types are identical. Downstream consumers that key on
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a bug to me -- if the recursive CTE is producing the wrong schema then we should fix the CTE output (e.g. via a ProjectionExec), not patch over the problem with a rename 🤔

Copy link
Copy Markdown
Contributor Author

@kosiew kosiew Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right.
Implemented ProjectionExec fix

) -> Poll<Option<Result<RecordBatch>>> {
let baseline_metrics = self.baseline_metrics.clone();

// Rebind to the declared output schema. The recursive term is planned
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we should fix this via a ProjectionExec rather than just swapping out the schema 🤔

Copy link
Copy Markdown
Contributor Author

@kosiew kosiew Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented ProjectionExec fix

- Implemented recursive CTE schema-name alignment during plan construction.
- Removed public RecordBatch schema-normalization helper/re-export.
- Eliminated stream-time batch rebinding functionality.
- Added unit tests for projection insertion and error handling cases.
- Updated expectations for explain/sqllogictest.
@github-actions github-actions Bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Apr 30, 2026
@kosiew kosiew changed the title Add normalize_batch_schema helper and migrate RecursiveQueryExec to enforce output schema consistency Normalize recursive CTE schema via projection to align with static term Apr 30, 2026
kosiew added 2 commits April 30, 2026 17:20
…eQueryExec

- Added `project_plan_to_schema` utility in `datafusion/physical-plan/src/common.rs`
- Updated `RecursiveQueryExec` to align recursive-term schemas at plan time
- Added tests for:
  - Schema matching
  - Alias projection
  - Metadata preservation
  - Mismatch errors
  - Recursive query integration
…lation

- Modified the initial selection in `oom_recursive_cte` to filter out NULL values by selecting from a VALUES construct.
- Changed the level calculation in the `region_sales` CTE test to use `SUM(0)` instead of `0` for consistency in aggregation.
@kosiew kosiew changed the title Normalize recursive CTE schema via projection to align with static term Align recursive query schemas at plan-time with reusable projection helper Apr 30, 2026
@kosiew kosiew requested a review from alamb April 30, 2026 10:16
Address regression in recursive CTE handling by replacing the strict
projection with a specific alignment path. This ensures proper column
alignment, maintains nullability, and validates type compatibility,
while preserving the output schema contract for planner checks.
Move helper tests to common.rs and retain integration coverage in
recursive_query.rs. Restore behavior in cte.slt.

Co-authored-by: Copilot <copilot@github.com>
kosiew added 4 commits April 30, 2026 20:38
…on tests

- Switched construction path to shared helper at line 92
- Removed private align_recursive_term_to_static_schema logic at line 391
- Updated regression test to match nullability and assert non-null result schema at line 506
- Added new regression test to reject nullability mismatch at line 528
…ailures in common.rs

- Replaced `internal_err!` with `plan_err!` for the following validation failures:
  - Column count mismatch
  - Schema metadata mismatch
  - Field compatibility mismatch (including nullability)
- Retained the earlier recursive-query fix using the shared helper from `recursive_query.rs:94`.
…rsive_query.rs

- Improved mismatch detection at common.rs to differentiate between:
  - Data type mismatch
  - Nullability mismatch
  - Field metadata mismatch
- Updated error messaging to be attribute-specific at common.rs
- Modified unit test assertions in common.rs:
- Updated recursive query test assertion in recursive_query.rs
@kosiew kosiew changed the title Align recursive query schemas at plan-time with reusable projection helper Add reusable plan-time schema alignment helper and apply to RecursiveQueryExec Apr 30, 2026
Copy link
Copy Markdown
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kosiew - this looks like a really nice cleanup to me. Thank you

The only thing I see that i think should be resolved before merge is why the memory_limit/mod.rs change is needed

.with_query(
"WITH RECURSIVE nodes AS (
SELECT 1 as id
SELECT id FROM (VALUES (1), (NULL)) AS t(id) WHERE id IS NOT NULL
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change needed? It is not clear to me if this is needed for this PR 🤔

THe prior query seem valid as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT 1 as id
would fail with

Error during planning: Cannot project plan column 0 ('id') to expected output field 'id': field nullability differs (input field: Field { name: "id", data_type: Int64, nullable: true }, expected field: Field { name: "id", data_type: Int64 })

under the stricter nullability check.

Should I amend the check to allow safe nullability widening (non-null -> nullable)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so -- otherwise queries that used to run will start to error, which would likely be interpreted by users as a regression

) -> Poll<Option<Result<RecordBatch>>> {
let baseline_metrics = self.baseline_metrics.clone();

// Rebind to the declared output schema. The recursive term is planned
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice cleanup

- Reverted oom_recursive_cte query to use SELECT 1 as id.
- Updated schema alignment to permit safe nullability widening (non-null to nullable) through same-type CastExpr.
- Enhanced recursive CTE to reconcile output nullability across static and recursive terms, ensuring alignment of both plans.
- Rejected nullability narrowing.
@kosiew kosiew marked this pull request as draft May 5, 2026 13:00
@alamb alamb marked this pull request as ready for review May 5, 2026 13:04
@alamb alamb marked this pull request as draft May 5, 2026 13:04
@alamb
Copy link
Copy Markdown
Contributor

alamb commented May 5, 2026

looks good to me, FWIW - thansk @kosiew

@kosiew kosiew marked this pull request as ready for review May 5, 2026 13:42
@kosiew kosiew added this pull request to the merge queue May 6, 2026
@kosiew
Copy link
Copy Markdown
Contributor Author

kosiew commented May 6, 2026

@alamb
Thanks for the review and feedback

Merged via the queue into apache:main with commit 739e147 May 6, 2026
39 checks passed
@kosiew kosiew deleted the normalization-schema-21910 branch May 6, 2026 02:30
kosiew added a commit to kosiew/datafusion that referenced this pull request May 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce a reusable execution-layer helper for schema normalization

3 participants