Skip to content
Closed
66 changes: 50 additions & 16 deletions pkg/sql/colexec/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,28 @@ func (apply *Apply) Prepare(proc *process.Process) (err error) {
apply.ctr.sels = make([]int32, 0)
}

if apply.TableFunction == nil {
return moerr.NewInvalidState(proc.Ctx, "apply operator missing table function")
if apply.TableFunction == nil && apply.Runner == nil {
return moerr.NewInvalidState(proc.Ctx, "apply operator missing table function or subquery runner")
}

err = apply.TableFunction.ApplyPrepare(proc)
if err != nil {
return
if apply.TableFunction != nil {
err = apply.TableFunction.ApplyPrepare(proc)
if err != nil {
return
}
}
if apply.Runner != nil {
err = apply.Runner.Prepare(proc)
if err != nil {
return
}
}
return
}

func (apply *Apply) Call(proc *process.Process) (vm.CallResult, error) {
if apply.TableFunction == nil {
return vm.CancelResult, moerr.NewInvalidState(proc.Ctx, "apply operator missing table function")
if apply.TableFunction == nil && apply.Runner == nil {
return vm.CancelResult, moerr.NewInvalidState(proc.Ctx, "apply operator missing table function or subquery runner")
}

analyzer := apply.OpAnalyzer
Expand All @@ -84,18 +92,34 @@ func (apply *Apply) Call(proc *process.Process) (vm.CallResult, error) {
if ctr.inbat == nil {
result.Batch = nil
result.Status = vm.ExecStop
err := apply.TableFunction.ApplyEnd(proc)
if err != nil {
return result, err
if apply.TableFunction != nil {
err := apply.TableFunction.ApplyEnd(proc)
if err != nil {
return result, err
}
}
if apply.Runner != nil {
err := apply.Runner.End(proc)
if err != nil {
return result, err
}
}
return result, nil
}
if ctr.inbat.Last() {
// last batch
result.Batch = ctr.inbat
err := apply.TableFunction.ApplyEnd(proc)
if err != nil {
return result, err
if apply.TableFunction != nil {
err := apply.TableFunction.ApplyEnd(proc)
if err != nil {
return result, err
}
}
if apply.Runner != nil {
err := apply.Runner.End(proc)
if err != nil {
return result, err
}
}
return result, nil
}
Expand All @@ -105,7 +129,9 @@ func (apply *Apply) Call(proc *process.Process) (vm.CallResult, error) {
}
ctr.batIdx = 0
ctr.tfFinish = true
apply.TableFunction.ApplyArgsEval(ctr.inbat, proc)
if apply.TableFunction != nil {
apply.TableFunction.ApplyArgsEval(ctr.inbat, proc)
}
}
if ctr.rbat == nil {
ctr.rbat = batch.NewWithSize(len(apply.Result))
Expand Down Expand Up @@ -137,15 +163,23 @@ func (ctr *container) probe(ap *Apply, proc *process.Process, result *vm.CallRes
var err error
for i := ctr.batIdx; i < count; i++ {
if ctr.tfFinish {
err = ap.TableFunction.ApplyStart(i, proc, analyzer)
if ap.TableFunction != nil {
err = ap.TableFunction.ApplyStart(i, proc, analyzer)
} else {
err = ap.Runner.Start(ctr.inbat, i, proc, analyzer)
}
ctr.tfNull = true
if err != nil {
return err
}
ctr.tfFinish = false
}
for {
tfResult, err = ap.TableFunction.ApplyCall(proc)
if ap.TableFunction != nil {
tfResult, err = ap.TableFunction.ApplyCall(proc)
} else {
tfResult, err = ap.Runner.Call(proc)
}
if err != nil {
return err
}
Expand Down
68 changes: 66 additions & 2 deletions pkg/sql/colexec/apply/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/table_function"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -54,12 +59,12 @@ func TestNilTableFunctionLifecycle(t *testing.T) {
err := arg.Prepare(proc)
require.Error(t, err)
require.True(t, moerr.IsMoErrCode(err, moerr.ErrInvalidState))
require.ErrorContains(t, err, "apply operator missing table function")
require.ErrorContains(t, err, "apply operator missing table function or subquery runner")

_, err = arg.Call(proc)
require.Error(t, err)
require.True(t, moerr.IsMoErrCode(err, moerr.ErrInvalidState))
require.ErrorContains(t, err, "apply operator missing table function")
require.ErrorContains(t, err, "apply operator missing table function or subquery runner")

require.NotPanics(t, func() {
arg.Reset(proc, false, nil)
Expand All @@ -81,6 +86,65 @@ func newTestCase(t *testing.T, applyType int) applyTestCase {
}
}

type mockRunner struct {
batches []*batch.Batch
idx int
}

func (m *mockRunner) Prepare(proc *process.Process) error { return nil }
func (m *mockRunner) Start(input *batch.Batch, row int, proc *process.Process, analyzer process.Analyzer) error {
m.idx = 0
return nil
}
func (m *mockRunner) Call(proc *process.Process) (vm.CallResult, error) {
if m.idx >= len(m.batches) {
return vm.CallResult{Batch: batch.EmptyBatch}, nil
}
result := vm.NewCallResult()
result.Batch = m.batches[m.idx]
m.idx++
return result, nil
}
func (m *mockRunner) End(proc *process.Process) error { return nil }
func (m *mockRunner) Reset(proc *process.Process, pipelineFailed bool, err error) {}
func (m *mockRunner) Free(proc *process.Process, pipelineFailed bool, err error) {}

func TestRunnerLifecycle(t *testing.T) {
proc := testutil.NewProcess(t)
arg := NewArgument()
arg.ApplyType = CROSS
arg.Result = []colexec.ResultPos{{Rel: 0, Pos: 0}, {Rel: 1, Pos: 0}}
arg.Typs = []types.Type{types.T_int64.ToType()}
arg.Runner = &mockRunner{
batches: []*batch.Batch{
testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.MakeInt64Vector([]int64{100}, nil, proc.Mp()),
},
[]int64{1},
),
},
}

input := testutil.NewBatchWithVectors(
[]*vector.Vector{
testutil.MakeInt64Vector([]int64{1}, nil, proc.Mp()),
},
[]int64{1},
)
child := colexec.NewMockOperator().WithBatchs([]*batch.Batch{input, nil})
arg.AppendChild(child)

require.NoError(t, arg.Prepare(proc))

result, err := arg.Call(proc)
require.NoError(t, err)
require.NotNil(t, result.Batch)
require.Equal(t, 1, result.Batch.RowCount())
require.Equal(t, int64(1), vector.GetFixedAtNoTypeCheck[int64](result.Batch.Vecs[0], 0))
require.Equal(t, int64(100), vector.GetFixedAtNoTypeCheck[int64](result.Batch.Vecs[1], 0))
}

/*
func resetChildren(arg *Apply) {
bat := colexec.MakeMockBatchs()
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/colexec/apply/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,23 @@ type container struct {
sels []int32
}

type SubqueryRunner interface {
Prepare(proc *process.Process) error
Start(input *batch.Batch, row int, proc *process.Process, analyzer process.Analyzer) error
Call(proc *process.Process) (vm.CallResult, error)
End(proc *process.Process) error
Reset(proc *process.Process, pipelineFailed bool, err error)
Free(proc *process.Process, pipelineFailed bool, err error)
}

type Apply struct {
ctr container
ApplyType int
Result []colexec.ResultPos
Typs []types.Type

TableFunction *table_function.TableFunction
Runner SubqueryRunner
vm.OperatorBase
}

Expand Down Expand Up @@ -102,6 +112,9 @@ func (apply *Apply) Reset(proc *process.Process, pipelineFailed bool, err error)
if apply.TableFunction != nil {
apply.TableFunction.Reset(proc, pipelineFailed, err)
}
if apply.Runner != nil {
apply.Runner.Reset(proc, pipelineFailed, err)
}
}

func (apply *Apply) Free(proc *process.Process, pipelineFailed bool, err error) {
Expand All @@ -116,6 +129,9 @@ func (apply *Apply) Free(proc *process.Process, pipelineFailed bool, err error)
if apply.TableFunction != nil {
apply.TableFunction.Free(proc, pipelineFailed, err)
}
if apply.Runner != nil {
apply.Runner.Free(proc, pipelineFailed, err)
}
}

func (apply *Apply) ExecProjection(proc *process.Process, input *batch.Batch) (*batch.Batch, error) {
Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/colexec/correlated_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2026 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package colexec

import (
"context"

"github.com/matrixorigin/matrixone/pkg/container/batch"
)

type correlatedBatchesCtxKey struct{}

type correlatedBatches struct {
batches []*batch.Batch
row int
}

func WithCorrelatedBatches(ctx context.Context, batches []*batch.Batch, row int) context.Context {
return context.WithValue(ctx, correlatedBatchesCtxKey{}, correlatedBatches{
batches: batches,
row: row,
})
}

func GetCorrelatedBatches(ctx context.Context) ([]*batch.Batch, int, bool) {
if ctx == nil {
return nil, 0, false
}
value, ok := ctx.Value(correlatedBatchesCtxKey{}).(correlatedBatches)
if !ok {
return nil, 0, false
}
return value.batches, value.row, true
}
Loading
Loading