Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/compare/arraycompare.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ func (c arrayCompare) Copy(vecSrc, vecDst int, src, dst int64, proc *process.Pro
nulls.Add(c.vs[vecDst].GetGrouping(), uint64(dst))
} else {
nulls.Del(c.vs[vecDst].GetGrouping(), uint64(dst))
c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp())
}
if c.isConstNull[vecSrc] || c.vs[vecSrc].GetNulls().Contains(uint64(src)) {
nulls.Add(c.vs[vecDst].GetNulls(), uint64(dst))
return nil
} else {
nulls.Del(c.vs[vecDst].GetNulls(), uint64(dst))
return c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp())
}
nulls.Del(c.vs[vecDst].GetNulls(), uint64(dst))
return c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp())
}

func (c arrayCompare) Compare(veci, vecj int, vi, vj int64) int {
Expand Down
6 changes: 2 additions & 4 deletions pkg/compare/strcompare.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ func (c *strCompare) Copy(vecSrc, vecDst int, src, dst int64, proc *process.Proc
nulls.Add(c.vs[vecDst].GetGrouping(), uint64(dst))
} else {
nulls.Del(c.vs[vecDst].GetGrouping(), uint64(dst))
c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp())
}
if c.isConstNull[vecSrc] || c.vs[vecSrc].GetNulls().Contains(uint64(src)) {
nulls.Add(c.vs[vecDst].GetNulls(), uint64(dst))
return nil
} else {
nulls.Del(c.vs[vecDst].GetNulls(), uint64(dst))
return c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp())
}
nulls.Del(c.vs[vecDst].GetNulls(), uint64(dst))
return c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp())
}

func (c *strCompare) Compare(veci, vecj int, vi, vj int64) int {
Expand Down
21 changes: 16 additions & 5 deletions pkg/container/vector/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,11 +1115,26 @@ func (v *Vector) Shuffle(sels []int64, mp *mpool.MPool) (err error) {
func (v *Vector) Copy(w *Vector, vi, wi int64, mp *mpool.MPool) error {
if w.class == CONSTANT {
if w.IsConstNull() {
if !v.typ.IsFixedLen() {
vva := MustFixedColNoTypeCheck[types.Varlena](v)
// Null varlen slots may retain stale offset/len metadata, so clear
// the destination header before marking the row null.
vva[vi] = types.Varlena{}
}
v.nsp.Set(uint64(vi))
return nil
}
// Non-null constant vectors still share the regular null/data path below.
wi = 0
}
if w.GetNulls().Contains(uint64(wi)) {
if !v.typ.IsFixedLen() {
vva := MustFixedColNoTypeCheck[types.Varlena](v)
vva[vi] = types.Varlena{}
}
v.GetNulls().Set(uint64(vi))
return nil
}
if v.typ.IsFixedLen() {
sz := v.typ.TypeSize()
copy(v.data[vi*int64(sz):(vi+1)*int64(sz)], w.data[wi*int64(sz):(wi+1)*int64(sz)])
Expand All @@ -1138,11 +1153,7 @@ func (v *Vector) Copy(w *Vector, vi, wi int64, mp *mpool.MPool) error {
}
}

if w.GetNulls().Contains(uint64(wi)) {
v.GetNulls().Set(uint64(vi))
} else {
v.GetNulls().Unset(uint64(vi))
}
v.GetNulls().Unset(uint64(vi))
return nil
}

Expand Down
32 changes: 32 additions & 0 deletions pkg/container/vector/vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,38 @@ func TestCopy(t *testing.T) {
w.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
}
{ // string null with stale varlena metadata
v := NewVec(types.T_varchar.ToType())
err := AppendBytes(v, []byte("seed"), false, mp)
require.NoError(t, err)
w := NewVec(types.T_varchar.ToType())
err = AppendBytes(w, nil, true, mp)
require.NoError(t, err)
ws := MustFixedColNoTypeCheck[types.Varlena](w)
ws[0].SetOffsetLen(25, 8)
err = v.Copy(w, 0, 0, mp)
require.NoError(t, err)
require.True(t, v.GetNulls().Contains(0))
v.Free(mp)
w.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
}
{ // array null with stale varlena metadata
v := NewVec(types.T_array_float64.ToType())
err := AppendArray[float64](v, []float64{1, 2}, false, mp)
require.NoError(t, err)
w := NewVec(types.T_array_float64.ToType())
err = AppendArray[float64](w, nil, true, mp)
require.NoError(t, err)
ws := MustFixedColNoTypeCheck[types.Varlena](w)
ws[0].SetOffsetLen(25, 16)
err = v.Copy(w, 0, 0, mp)
require.NoError(t, err)
require.True(t, v.GetNulls().Contains(0))
v.Free(mp)
w.Free(mp)
require.Equal(t, int64(0), mp.CurrNB())
}
}

func TestCloneWindow(t *testing.T) {
Expand Down
39 changes: 39 additions & 0 deletions pkg/sql/colexec/top/top_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
Expand Down Expand Up @@ -96,6 +97,44 @@ func TestTop(t *testing.T) {
}
}

func TestTopCopiesNullVarlenaRow(t *testing.T) {
tc := newTestCase(
t,
mpool.MustNewZero(),
[]types.Type{types.T_int64.ToType(), types.T_varchar.ToType()},
1,
[]*plan.OrderBySpec{{Expr: newExpression(0), Flag: 0}},
)
err := tc.arg.Prepare(tc.proc)
require.NoError(t, err)

bat := batch.NewWithSize(2)
bat.Vecs[0] = vector.NewVec(types.T_int64.ToType())
err = vector.AppendFixedList(bat.Vecs[0], []int64{2, 1}, nil, tc.proc.Mp())
require.NoError(t, err)

bat.Vecs[1] = vector.NewVec(types.T_varchar.ToType())
err = vector.AppendBytes(bat.Vecs[1], []byte("seed"), false, tc.proc.Mp())
require.NoError(t, err)
err = vector.AppendBytes(bat.Vecs[1], nil, true, tc.proc.Mp())
require.NoError(t, err)
ws := vector.MustFixedColNoTypeCheck[types.Varlena](bat.Vecs[1])
ws[1].SetOffsetLen(25, 8)
bat.SetRowCount(2)

resetChildren(tc.arg, []*batch.Batch{bat, batch.EmptyBatch})
result, err := vm.Exec(tc.arg, tc.proc)
require.NoError(t, err)
require.NotNil(t, result.Batch)
require.Equal(t, int64(1), vector.MustFixedColWithTypeCheck[int64](result.Batch.Vecs[0])[0])
require.True(t, result.Batch.Vecs[1].GetNulls().Contains(0))

tc.arg.Free(tc.proc, false, nil)
tc.arg.GetChildren(0).Free(tc.proc, false, nil)
tc.proc.Free()
require.Equal(t, int64(0), tc.proc.Mp().CurrNB())
}
Comment thread
qodo-code-review[bot] marked this conversation as resolved.

func BenchmarkTop(b *testing.B) {
for i := 0; i < b.N; i++ {
tcs := []testCase{
Expand Down
Loading