Skip to content

fix: support aggregation with non-equality conditions in scalar subquery#24047

Closed
ck89119 wants to merge 3 commits intomatrixorigin:mainfrom
ck89119:fix-23942-claude
Closed

fix: support aggregation with non-equality conditions in scalar subquery#24047
ck89119 wants to merge 3 commits intomatrixorigin:mainfrom
ck89119:fix-23942-claude

Conversation

@ck89119
Copy link
Copy Markdown
Contributor

@ck89119 ck89119 commented Apr 3, 2026

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #23942

What this PR does / why we need it:

support aggregation with non-equality conditions in scalar subquery

@ck89119 ck89119 changed the title fix: support aggregation with non-equality conditions in scalar subqu… fix: support aggregation with non-equality conditions in scalar subquery Apr 3, 2026
@qodo-code-review
Copy link
Copy Markdown

Review Summary by Qodo

Support aggregation with non-equality conditions in scalar subqueries

🐞 Bug fix ✨ Enhancement

Grey Divider

Walkthroughs

Description
• Support aggregation with non-equality conditions in scalar subqueries via correlated apply
• Introduce SubqueryRunner interface to handle generic subquery execution in apply operator
• Add CorrExpressionExecutor for evaluating correlated column references from outer context
• Enable context passing mechanism for correlated batches through WithCorrelatedBatches
Diagram
flowchart LR
  A["Scalar Subquery<br/>with Non-Eq Conditions"] -->|"Detect Pattern"| B["Create Apply Node"]
  B -->|"Generic Subquery"| C["CorrelatedApplyRunner"]
  C -->|"Evaluate"| D["CorrExpressionExecutor"]
  D -->|"Access Outer Row"| E["WithCorrelatedBatches Context"]
  E -->|"Return Results"| F["Apply Operator Output"]
Loading

Grey Divider

File Changes

1. pkg/sql/colexec/apply/apply.go ✨ Enhancement +50/-16

Support both table functions and subquery runners

pkg/sql/colexec/apply/apply.go


2. pkg/sql/colexec/apply/apply_test.go 🧪 Tests +66/-2

Add test for runner lifecycle and mock runner

pkg/sql/colexec/apply/apply_test.go


3. pkg/sql/colexec/apply/types.go ✨ Enhancement +16/-0

Define SubqueryRunner interface and add Runner field

pkg/sql/colexec/apply/types.go


View more (16)
4. pkg/sql/colexec/correlated_context.go ✨ Enhancement +46/-0

New file for correlated batches context management

pkg/sql/colexec/correlated_context.go


5. pkg/sql/colexec/evalExpression.go ✨ Enhancement +80/-0

Implement CorrExpressionExecutor for correlated columns

pkg/sql/colexec/evalExpression.go


6. pkg/sql/colexec/evalExpressionReset.go ✨ Enhancement +4/-0

Add ResetForNextQuery for CorrExpressionExecutor

pkg/sql/colexec/evalExpressionReset.go


7. pkg/sql/colexec/evalExpression_test.go 🧪 Tests +78/-0

Add tests for correlated expression executor

pkg/sql/colexec/evalExpression_test.go


8. pkg/sql/colexec/reuse.go ✨ Enhancement +18/-0

Register CorrExpressionExecutor in reuse pool

pkg/sql/colexec/reuse.go


9. pkg/sql/compile/compile.go ✨ Enhancement +25/-4

Update compileApply to support generic subqueries

pkg/sql/compile/compile.go


10. pkg/sql/compile/correlated_apply.go ✨ Enhancement +181/-0

New file implementing correlatedApplyRunner

pkg/sql/compile/correlated_apply.go


11. pkg/sql/compile/operator.go ✨ Enhancement +25/-12

Handle optional TableFunction in dupOperator and constructApply

pkg/sql/compile/operator.go


12. pkg/sql/compile/operator_test.go 🧪 Tests +37/-0

Add tests for apply without table function

pkg/sql/compile/operator_test.go


13. pkg/sql/compile/remoterun.go ✨ Enhancement +18/-14

Handle optional TableFunction in serialization

pkg/sql/compile/remoterun.go


14. pkg/sql/compile/remoterun_test.go 🧪 Tests +25/-0

Add test for apply serialization without table function

pkg/sql/compile/remoterun_test.go


15. pkg/sql/plan/build_test.go 🧪 Tests +5/-1

Update test to allow correlated non-equality aggregation

pkg/sql/plan/build_test.go


16. pkg/sql/plan/flatten_subquery.go 🐞 Bug fix +96/-8

Replace error with apply node for non-eq agg subqueries

pkg/sql/plan/flatten_subquery.go


17. pkg/sql/plan/query_builder.go ✨ Enhancement +53/-24

Handle non-table-function apply in column remapping

pkg/sql/plan/query_builder.go


18. test/distributed/cases/dml/select/subquery.result 🧪 Tests +22/-5

Update expected results for correlated agg queries

test/distributed/cases/dml/select/subquery.result


19. test/distributed/cases/hint/hint_cte.result 🧪 Tests +6/-1

Update expected results for CTE with correlated agg

test/distributed/cases/hint/hint_cte.result


Grey Divider

Qodo Logo

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented Apr 3, 2026

Code Review by Qodo

🐞 Bugs (4) 📘 Rule violations (0) 📎 Requirement gaps (0) 🎨 UX Issues (0)

Grey Divider


Action required

1. Corr vector length mismatch 🐞 Bug ≡ Correctness
Description
CorrExpressionExecutor.Eval returns a const vector of length 1 when the inner input batch has 0
rows, violating the invariant that expression result lengths match the input batch row count and
risking downstream panics/wrong results.
Code

pkg/sql/colexec/evalExpression.go[R784-788]

+	rowCount := 1
+	if len(batches) > 0 && batches[0] != nil && batches[0].RowCount() > 0 {
+		rowCount = batches[0].RowCount()
+	}
+	if err := expr.setConstFunc(expr.resultVector, corrBatches[relIndex].Vecs[expr.colIndex], int64(row), rowCount); err != nil {
Evidence
CorrExpressionExecutor defaults rowCount to 1 unless batches[0].RowCount() > 0, so an empty batch
produces a non-empty const vector.

pkg/sql/colexec/evalExpression.go[758-791]
pkg/sql/colexec/evalExpression.go[794-800]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`CorrExpressionExecutor.Eval` computes `rowCount := 1` and only updates it when the inner batch has `RowCount() > 0`. For empty inner batches, it returns a const vector with length 1, which can break downstream vectorized operators that assume vector length equals the current batch row count.

### Issue Context
Other executors generally size their outputs to `batches[0].RowCount()` (including 0). This executor should follow the same contract.

### Fix Focus Areas
- pkg/sql/colexec/evalExpression.go[784-788]

### Suggested fix
Set `rowCount` to the inner batch row count whenever an inner batch exists, even when it is 0:
- If `len(batches)>0 && batches[0]!=nil`, use `rowCount = batches[0].RowCount()`.
- Otherwise (no inner batch provided), keep `rowCount = 1` as the fallback.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Runner lost in remoterun 🐞 Bug ☼ Reliability
Description
Remote pipeline serialization/deserialization for Apply only includes TableFunction, so an Apply
that relies on Runner will be reconstructed remotely with both TableFunction and Runner nil and fail
with InvalidState at Prepare/Call.
Code

pkg/sql/compile/remoterun.go[R762-771]

+		if t.TableFunction != nil {
+			in.TableFunction = &pipeline.TableFunction{
+				Attrs:    t.TableFunction.Attrs,
+				Rets:     t.TableFunction.Rets,
+				Args:     t.TableFunction.Args,
+				Params:   t.TableFunction.Params,
+				Name:     t.TableFunction.FuncName,
+				IsSingle: t.TableFunction.IsSingle,
+			}
		}
Evidence
convertToPipelineInstruction conditionally omits TableFunction; convertToVmOperator only rebuilds
TableFunction and never serializes Runner. Apply.Prepare/Call requires either TableFunction or
Runner, so a remotely rebuilt Apply without TableFunction will error.

pkg/sql/compile/remoterun.go[754-771]
pkg/sql/compile/remoterun.go[1203-1218]
pkg/sql/colexec/apply/apply.go[44-77]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`apply.Apply` can now operate with `Runner` instead of a `TableFunction`, but the remote pipeline instruction format only carries `TableFunction`. If an Apply-with-Runner ends up in a remote scope, it will be serialized without Runner and deserialized with both Runner and TableFunction nil, causing a runtime InvalidState error.

### Issue Context
- Serialization currently does:
 - `Apply` always
 - `TableFunction` only when non-nil
 - **Runner is never serialized**
- Execution requires `TableFunction != nil || Runner != nil`.

### Fix Focus Areas
- pkg/sql/compile/remoterun.go[754-771]
- pkg/sql/compile/remoterun.go[1203-1218]
- pkg/sql/colexec/apply/apply.go[55-77]

### Suggested fix options (pick one)
1) **Fail fast at serialization time**: if `t.Runner != nil`, return an explicit error/NYI from `convertToPipelineInstruction` so the system never ships an un-runnable remote pipeline.
2) **Force local execution** for Apply-with-Runner scopes (compile-time), ensuring it never goes through remote serialization.
3) Add a serializable runner representation and reconstruct Runner on remote CN (likely more involved).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

3. Runner shared across dups 🐞 Bug ☼ Reliability
Description
dupOperator copies Apply.Runner by reference, so duplicated Apply operators can share a single
mutable Runner instance, risking races and incorrect results if those duplicates execute
concurrently.
Code

pkg/sql/compile/operator.go[R523-533]

+		op.Runner = t.Runner
+		if t.TableFunction != nil {
+			op.TableFunction = table_function.NewArgument()
+			op.TableFunction.FuncName = t.TableFunction.FuncName
+			op.TableFunction.Args = t.TableFunction.Args
+			op.TableFunction.Rets = t.TableFunction.Rets
+			op.TableFunction.Attrs = t.TableFunction.Attrs
+			op.TableFunction.Params = t.TableFunction.Params
+			op.TableFunction.IsSingle = t.TableFunction.IsSingle
+			op.TableFunction.SetInfo(&info)
+		}
Evidence
dupOperator assigns op.Runner = t.Runner (shallow copy). correlatedApplyRunner maintains mutable
fields (outputBatches, outputIdx) that are not concurrency-safe to share across operator
instances.

pkg/sql/compile/operator.go[517-535]
pkg/sql/compile/correlated_apply.go[31-37]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`dupOperator` shallow-copies `Apply.Runner`, which can cause multiple Apply operator instances to share the same Runner object. If those Apply instances can run in parallel (e.g., operator duplication for parallel pipelines), Runner state (like `outputIdx/outputBatches`) can be corrupted.

### Issue Context
`correlatedApplyRunner` stores per-execution mutable state and is not designed to be shared across concurrent operator instances.

### Fix Focus Areas
- pkg/sql/compile/operator.go[517-535]
- pkg/sql/compile/correlated_apply.go[31-37]

### Suggested fix
- If Apply duplication is possible when Runner is set: implement a `Clone()`/factory for Runner so each duplicated operator gets its own Runner instance.
- Otherwise: add a defensive assertion/guard in `dupOperator` (and/or at compile time) that `Runner` must be nil when `maxParallel > 1` (or when duplication is invoked).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


4. Recursive batch flag ignored 🐞 Bug ≡ Correctness
Description
correlatedApplyRunner.captureBatch uses Batch.Dup which preserves Batch.Recursive; Apply.probe
treats any batch with Last/Recursive set as done (IsDone) and will not union its rows, dropping data
for such batches.
Code

pkg/sql/compile/correlated_apply.go[R163-172]

+func (r *correlatedApplyRunner) captureBatch(bat *batch.Batch, _ *perfcounter.CounterSet) error {
+	if bat == nil {
+		return nil
+	}
+	dup, err := bat.Dup(r.compile.proc.Mp())
+	if err != nil {
+		return err
+	}
+	r.outputBatches = append(r.outputBatches, dup)
+	return nil
Evidence
Batch.Dup clones the Recursive flag, and Apply.probe checks tfResult.Batch.IsDone() (true when
Last()/Recursive>0) before consuming any rows from the runner batch.

pkg/sql/compile/correlated_apply.go[163-172]
pkg/container/batch/batch.go[580-614]
pkg/container/batch/batch.go[722-727]
pkg/sql/colexec/apply/apply.go[159-201]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`captureBatch` duplicates batches via `bat.Dup`, which preserves `Batch.Recursive`. Apply's probe loop treats `Batch.Last()` (Recursive>0) as done and does not consume rows from done batches, so any captured batch with Recursive set will be ignored.

### Issue Context
`Recursive` is used in some pipelines (e.g., recursive CTE signaling). Even if correlated subqueries rarely hit this, sanitizing runner output makes Apply robust.

### Fix Focus Areas
- pkg/sql/compile/correlated_apply.go[163-172]
- pkg/sql/colexec/apply/apply.go[186-201]
- pkg/container/batch/batch.go[580-587]

### Suggested fix
After duplicating the batch, clear the done/recursive marker before storing:
- `dup.Recursive = 0`
This ensures Apply will consume its rows, while the runner can still signal completion by returning `batch.EmptyBatch` in `Call()`.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

Comment on lines +784 to +788
rowCount := 1
if len(batches) > 0 && batches[0] != nil && batches[0].RowCount() > 0 {
rowCount = batches[0].RowCount()
}
if err := expr.setConstFunc(expr.resultVector, corrBatches[relIndex].Vecs[expr.colIndex], int64(row), rowCount); err != nil {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Corr vector length mismatch 🐞 Bug ≡ Correctness

CorrExpressionExecutor.Eval returns a const vector of length 1 when the inner input batch has 0
rows, violating the invariant that expression result lengths match the input batch row count and
risking downstream panics/wrong results.
Agent Prompt
### Issue description
`CorrExpressionExecutor.Eval` computes `rowCount := 1` and only updates it when the inner batch has `RowCount() > 0`. For empty inner batches, it returns a const vector with length 1, which can break downstream vectorized operators that assume vector length equals the current batch row count.

### Issue Context
Other executors generally size their outputs to `batches[0].RowCount()` (including 0). This executor should follow the same contract.

### Fix Focus Areas
- pkg/sql/colexec/evalExpression.go[784-788]

### Suggested fix
Set `rowCount` to the inner batch row count whenever an inner batch exists, even when it is 0:
- If `len(batches)>0 && batches[0]!=nil`, use `rowCount = batches[0].RowCount()`.
- Otherwise (no inner batch provided), keep `rowCount = 1` as the fallback.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +762 to 771
if t.TableFunction != nil {
in.TableFunction = &pipeline.TableFunction{
Attrs: t.TableFunction.Attrs,
Rets: t.TableFunction.Rets,
Args: t.TableFunction.Args,
Params: t.TableFunction.Params,
Name: t.TableFunction.FuncName,
IsSingle: t.TableFunction.IsSingle,
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

2. Runner lost in remoterun 🐞 Bug ☼ Reliability

Remote pipeline serialization/deserialization for Apply only includes TableFunction, so an Apply
that relies on Runner will be reconstructed remotely with both TableFunction and Runner nil and fail
with InvalidState at Prepare/Call.
Agent Prompt
### Issue description
`apply.Apply` can now operate with `Runner` instead of a `TableFunction`, but the remote pipeline instruction format only carries `TableFunction`. If an Apply-with-Runner ends up in a remote scope, it will be serialized without Runner and deserialized with both Runner and TableFunction nil, causing a runtime InvalidState error.

### Issue Context
- Serialization currently does:
  - `Apply` always
  - `TableFunction` only when non-nil
  - **Runner is never serialized**
- Execution requires `TableFunction != nil || Runner != nil`.

### Fix Focus Areas
- pkg/sql/compile/remoterun.go[754-771]
- pkg/sql/compile/remoterun.go[1203-1218]
- pkg/sql/colexec/apply/apply.go[55-77]

### Suggested fix options (pick one)
1) **Fail fast at serialization time**: if `t.Runner != nil`, return an explicit error/NYI from `convertToPipelineInstruction` so the system never ships an un-runnable remote pipeline.
2) **Force local execution** for Apply-with-Runner scopes (compile-time), ensuring it never goes through remote serialization.
3) Add a serializable runner representation and reconstruct Runner on remote CN (likely more involved).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

…copes

buildReaders uses c.proc.Ctx (the compile's proc context), not the
scope's own Proc.Ctx. The child compile's proc was created via
NewNoContextChildProc which leaves Ctx nil. Call BuildPipelineContext
on the child proc before running scopes to avoid nil-parent panic.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/bug Something isn't working kind/feature size/L Denotes a PR that changes [500,999] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants