diff --git a/pkg/compare/arraycompare.go b/pkg/compare/arraycompare.go index a0a87a59179f9..885278268cf1c 100644 --- a/pkg/compare/arraycompare.go +++ b/pkg/compare/arraycompare.go @@ -35,15 +35,13 @@ func (c arrayCompare) Copy(vecSrc, vecDst int, src, dst int64, proc *process.Pro nulls.Add(c.vs[vecDst].GetGrouping(), uint64(dst)) } else { nulls.Del(c.vs[vecDst].GetGrouping(), uint64(dst)) - c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp()) } if c.isConstNull[vecSrc] || c.vs[vecSrc].GetNulls().Contains(uint64(src)) { nulls.Add(c.vs[vecDst].GetNulls(), uint64(dst)) return nil - } else { - nulls.Del(c.vs[vecDst].GetNulls(), uint64(dst)) - return c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp()) } + nulls.Del(c.vs[vecDst].GetNulls(), uint64(dst)) + return c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp()) } func (c arrayCompare) Compare(veci, vecj int, vi, vj int64) int { diff --git a/pkg/compare/strcompare.go b/pkg/compare/strcompare.go index 24684fdb5f918..f74ddc2a6375c 100644 --- a/pkg/compare/strcompare.go +++ b/pkg/compare/strcompare.go @@ -36,15 +36,13 @@ func (c *strCompare) Copy(vecSrc, vecDst int, src, dst int64, proc *process.Proc nulls.Add(c.vs[vecDst].GetGrouping(), uint64(dst)) } else { nulls.Del(c.vs[vecDst].GetGrouping(), uint64(dst)) - c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp()) } if c.isConstNull[vecSrc] || c.vs[vecSrc].GetNulls().Contains(uint64(src)) { nulls.Add(c.vs[vecDst].GetNulls(), uint64(dst)) return nil - } else { - nulls.Del(c.vs[vecDst].GetNulls(), uint64(dst)) - return c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp()) } + nulls.Del(c.vs[vecDst].GetNulls(), uint64(dst)) + return c.vs[vecDst].Copy(c.vs[vecSrc], dst, src, proc.Mp()) } func (c *strCompare) Compare(veci, vecj int, vi, vj int64) int { diff --git a/pkg/container/vector/vector.go b/pkg/container/vector/vector.go index 08a6e6e9e7fef..516baf4e19f21 100644 --- a/pkg/container/vector/vector.go +++ b/pkg/container/vector/vector.go @@ -1303,11 +1303,26 @@ func (v *Vector) ShuffleWithBuf(sels []int64, mp *mpool.MPool, buf *[]byte) (err func (v *Vector) Copy(w *Vector, vi, wi int64, mp *mpool.MPool) error { if w.class == CONSTANT { if w.IsConstNull() { + if !v.typ.IsFixedLen() { + vva := MustFixedColNoTypeCheck[types.Varlena](v) + // Null varlen slots may retain stale offset/len metadata, so clear + // the destination header before marking the row null. + vva[vi] = types.Varlena{} + } v.nsp.Set(uint64(vi)) return nil } + // Non-null constant vectors still share the regular null/data path below. wi = 0 } + if w.GetNulls().Contains(uint64(wi)) { + if !v.typ.IsFixedLen() { + vva := MustFixedColNoTypeCheck[types.Varlena](v) + vva[vi] = types.Varlena{} + } + v.GetNulls().Set(uint64(vi)) + return nil + } if v.typ.IsFixedLen() { sz := v.typ.TypeSize() copy(v.data[vi*int64(sz):(vi+1)*int64(sz)], w.data[wi*int64(sz):(wi+1)*int64(sz)]) @@ -1326,11 +1341,7 @@ func (v *Vector) Copy(w *Vector, vi, wi int64, mp *mpool.MPool) error { } } - if w.GetNulls().Contains(uint64(wi)) { - v.GetNulls().Set(uint64(vi)) - } else { - v.GetNulls().Unset(uint64(vi)) - } + v.GetNulls().Unset(uint64(vi)) return nil } diff --git a/pkg/container/vector/vector_test.go b/pkg/container/vector/vector_test.go index a67de1e3c71ab..4752c28cf40f9 100644 --- a/pkg/container/vector/vector_test.go +++ b/pkg/container/vector/vector_test.go @@ -1514,6 +1514,38 @@ func TestCopy(t *testing.T) { w.Free(mp) require.Equal(t, int64(0), mp.CurrNB()) } + { // string null with stale varlena metadata + v := NewVec(types.T_varchar.ToType()) + err := AppendBytes(v, []byte("seed"), false, mp) + require.NoError(t, err) + w := NewVec(types.T_varchar.ToType()) + err = AppendBytes(w, nil, true, mp) + require.NoError(t, err) + ws := MustFixedColNoTypeCheck[types.Varlena](w) + ws[0].SetOffsetLen(25, 8) + err = v.Copy(w, 0, 0, mp) + require.NoError(t, err) + require.True(t, v.GetNulls().Contains(0)) + v.Free(mp) + w.Free(mp) + require.Equal(t, int64(0), mp.CurrNB()) + } + { // array null with stale varlena metadata + v := NewVec(types.T_array_float64.ToType()) + err := AppendArray[float64](v, []float64{1, 2}, false, mp) + require.NoError(t, err) + w := NewVec(types.T_array_float64.ToType()) + err = AppendArray[float64](w, nil, true, mp) + require.NoError(t, err) + ws := MustFixedColNoTypeCheck[types.Varlena](w) + ws[0].SetOffsetLen(25, 16) + err = v.Copy(w, 0, 0, mp) + require.NoError(t, err) + require.True(t, v.GetNulls().Contains(0)) + v.Free(mp) + w.Free(mp) + require.Equal(t, int64(0), mp.CurrNB()) + } } func TestCloneWindow(t *testing.T) { diff --git a/pkg/sql/colexec/top/top_test.go b/pkg/sql/colexec/top/top_test.go index 48eb00a553a36..9b4737233f700 100644 --- a/pkg/sql/colexec/top/top_test.go +++ b/pkg/sql/colexec/top/top_test.go @@ -23,6 +23,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/colexec" plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan" @@ -96,6 +97,44 @@ func TestTop(t *testing.T) { } } +func TestTopCopiesNullVarlenaRow(t *testing.T) { + tc := newTestCase( + t, + mpool.MustNewZero(), + []types.Type{types.T_int64.ToType(), types.T_varchar.ToType()}, + 1, + []*plan.OrderBySpec{{Expr: newExpression(0), Flag: 0}}, + ) + err := tc.arg.Prepare(tc.proc) + require.NoError(t, err) + + bat := batch.NewWithSize(2) + bat.Vecs[0] = vector.NewVec(types.T_int64.ToType()) + err = vector.AppendFixedList(bat.Vecs[0], []int64{2, 1}, nil, tc.proc.Mp()) + require.NoError(t, err) + + bat.Vecs[1] = vector.NewVec(types.T_varchar.ToType()) + err = vector.AppendBytes(bat.Vecs[1], []byte("seed"), false, tc.proc.Mp()) + require.NoError(t, err) + err = vector.AppendBytes(bat.Vecs[1], nil, true, tc.proc.Mp()) + require.NoError(t, err) + ws := vector.MustFixedColNoTypeCheck[types.Varlena](bat.Vecs[1]) + ws[1].SetOffsetLen(25, 8) + bat.SetRowCount(2) + + resetChildren(tc.arg, []*batch.Batch{bat, batch.EmptyBatch}) + result, err := vm.Exec(tc.arg, tc.proc) + require.NoError(t, err) + require.NotNil(t, result.Batch) + require.Equal(t, int64(1), vector.MustFixedColWithTypeCheck[int64](result.Batch.Vecs[0])[0]) + require.True(t, result.Batch.Vecs[1].GetNulls().Contains(0)) + + tc.arg.Free(tc.proc, false, nil) + tc.arg.GetChildren(0).Free(tc.proc, false, nil) + tc.proc.Free() + require.Equal(t, int64(0), tc.proc.Mp().CurrNB()) +} + func BenchmarkTop(b *testing.B) { for i := 0; i < b.N; i++ { tcs := []testCase{