diff --git a/pkg/sql/compile/fuzzyCheck.go b/pkg/sql/compile/fuzzyCheck.go index efddab89c7a41..3068ae8b83847 100644 --- a/pkg/sql/compile/fuzzyCheck.go +++ b/pkg/sql/compile/fuzzyCheck.go @@ -27,7 +27,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/reuse" "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/pb/plan" @@ -142,6 +141,18 @@ func (f *fuzzyCheck) fill(ctx context.Context, bat *batch.Batch) error { return err } + // Update cnt to reflect actual non-NULL collision keys. + // When all values are NULL, cnt becomes 0 and backgroundSQLCheck + // is skipped by the caller (compile.go checks f.cnt > 0). + if len(collision) > 0 { + f.cnt = len(collision[0]) + } else { + f.cnt = 0 + } + if f.cnt == 0 { + return nil + } + // generate codition used in background SQL if !f.onlyInsertHidden { if !f.isCompound { @@ -218,11 +229,18 @@ func (f *fuzzyCheck) firstlyCheck(ctx context.Context, toCheck *vector.Vector) e if err != nil { return err } - for _, k := range pkey { + for i, k := range pkey { + // SQL standard: NULL != NULL, skip NULLs from duplicate check + if toCheck.GetNulls().Contains(uint64(i)) { + continue + } kcnt[k]++ } } else { for i := 0; i < toCheck.Length(); i++ { + if toCheck.GetNulls().Contains(uint64(i)) { + continue + } b := toCheck.GetRawBytesAt(i) t, err := types.Unpack(b) if err != nil { @@ -270,13 +288,21 @@ func (f *fuzzyCheck) genCollsionKeys(toCheck *vector.Vector) ([][]string, error) if err != nil { return nil, err } - keys[0] = pkey + // Skip NULL values - they cannot be duplicates per SQL standard + for i, k := range pkey { + if !toCheck.GetNulls().Contains(uint64(i)) { + keys[0] = append(keys[0], k) + } + } } else { scales := make([]int32, len(f.compoundCols)) for i, c := range f.compoundCols { scales[i] = c.Typ.Scale } for i := 0; i < toCheck.Length(); i++ { + if toCheck.GetNulls().Contains(uint64(i)) { + continue + } b := toCheck.GetRawBytesAt(i) t, err := types.Unpack(b) if err != nil { @@ -423,7 +449,7 @@ func (f *fuzzyCheck) format(toCheck *vector.Vector) ([]string, error) { } func vectorToString(vec *vector.Vector, rowIndex int) (string, error) { - if nulls.Any(vec.GetNulls()) { + if vec.GetNulls().Contains(uint64(rowIndex)) { return "", nil } switch vec.GetType().Oid { @@ -491,7 +517,7 @@ func vectorToString(vec *vector.Vector, rowIndex int) (string, error) { val := vector.GetFixedAtNoTypeCheck[types.MoYear](vec, rowIndex) return val.String(), nil case types.T_enum: - return fmt.Sprintf("%v", vector.GetFixedAtNoTypeCheck[uint16](vec, rowIndex)), nil + return fmt.Sprintf("%v", vector.GetFixedAtNoTypeCheck[types.Enum](vec, rowIndex)), nil default: return "", moerr.NewInternalErrorNoCtxf("fuzzy filter can not parse correct string for type id : %d", vec.GetType().Oid) } diff --git a/pkg/sql/compile/fuzzyCheck_test.go b/pkg/sql/compile/fuzzyCheck_test.go new file mode 100644 index 0000000000000..b39cc4e5409f8 --- /dev/null +++ b/pkg/sql/compile/fuzzyCheck_test.go @@ -0,0 +1,836 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compile + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/mpool" + "github.com/matrixorigin/matrixone/pkg/container/batch" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/stretchr/testify/require" +) + +func TestVectorToStringNullHandling(t *testing.T) { + mp, err := mpool.NewMPool("test_vectorToString", 0, mpool.NoFixed) + require.NoError(t, err) + defer mpool.DeleteMPool(mp) + + t.Run("per_row_null_check", func(t *testing.T) { + // Vector with mix of NULLs and values + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(42), false, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(vec, int64(99), false, mp)) + defer vec.Free(mp) + + // Non-NULL row should return value + s, err := vectorToString(vec, 0) + require.NoError(t, err) + require.Equal(t, "42", s) + + // NULL row should return empty string + s, err = vectorToString(vec, 1) + require.NoError(t, err) + require.Equal(t, "", s) + + // Non-NULL row after NULL should still return value + s, err = vectorToString(vec, 2) + require.NoError(t, err) + require.Equal(t, "99", s) + }) + + t.Run("all_nulls", func(t *testing.T) { + vec := vector.NewVec(types.T_varchar.ToType()) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + defer vec.Free(mp) + + s, err := vectorToString(vec, 0) + require.NoError(t, err) + require.Equal(t, "", s) + + s, err = vectorToString(vec, 1) + require.NoError(t, err) + require.Equal(t, "", s) + }) + + t.Run("varchar_values", func(t *testing.T) { + vec := vector.NewVec(types.T_varchar.ToType()) + require.NoError(t, vector.AppendBytes(vec, []byte("hello"), false, mp)) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) // NULL + defer vec.Free(mp) + + s, err := vectorToString(vec, 0) + require.NoError(t, err) + require.Equal(t, "hello", s) + + s, err = vectorToString(vec, 1) + require.NoError(t, err) + require.Equal(t, "", s) + }) + + // Cover more type branches in vectorToString + t.Run("bool", func(t *testing.T) { + vec := vector.NewVec(types.T_bool.ToType()) + require.NoError(t, vector.AppendFixed(vec, true, false, mp)) + require.NoError(t, vector.AppendFixed(vec, false, true, mp)) // NULL + require.NoError(t, vector.AppendFixed(vec, false, false, mp)) + defer vec.Free(mp) + + s, _ := vectorToString(vec, 0) + require.Equal(t, "true", s) + s, _ = vectorToString(vec, 1) + require.Equal(t, "", s) // NULL + s, _ = vectorToString(vec, 2) + require.Equal(t, "false", s) + }) + + t.Run("int_types", func(t *testing.T) { + // int8 + vec8 := vector.NewVec(types.T_int8.ToType()) + require.NoError(t, vector.AppendFixed(vec8, int8(7), false, mp)) + require.NoError(t, vector.AppendFixed(vec8, int8(0), true, mp)) + defer vec8.Free(mp) + s, _ := vectorToString(vec8, 0) + require.Equal(t, "7", s) + s, _ = vectorToString(vec8, 1) + require.Equal(t, "", s) + + // int16 + vec16 := vector.NewVec(types.T_int16.ToType()) + require.NoError(t, vector.AppendFixed(vec16, int16(16), false, mp)) + require.NoError(t, vector.AppendFixed(vec16, int16(0), true, mp)) + defer vec16.Free(mp) + s, _ = vectorToString(vec16, 0) + require.Equal(t, "16", s) + s, _ = vectorToString(vec16, 1) + require.Equal(t, "", s) + + // int32 + vec32 := vector.NewVec(types.T_int32.ToType()) + require.NoError(t, vector.AppendFixed(vec32, int32(32), false, mp)) + require.NoError(t, vector.AppendFixed(vec32, int32(0), true, mp)) + defer vec32.Free(mp) + s, _ = vectorToString(vec32, 0) + require.Equal(t, "32", s) + s, _ = vectorToString(vec32, 1) + require.Equal(t, "", s) + }) + + t.Run("uint_types", func(t *testing.T) { + vec8 := vector.NewVec(types.T_uint8.ToType()) + require.NoError(t, vector.AppendFixed(vec8, uint8(8), false, mp)) + require.NoError(t, vector.AppendFixed(vec8, uint8(0), true, mp)) + defer vec8.Free(mp) + s, _ := vectorToString(vec8, 0) + require.Equal(t, "8", s) + s, _ = vectorToString(vec8, 1) + require.Equal(t, "", s) + + vec16 := vector.NewVec(types.T_uint16.ToType()) + require.NoError(t, vector.AppendFixed(vec16, uint16(16), false, mp)) + require.NoError(t, vector.AppendFixed(vec16, uint16(0), true, mp)) + defer vec16.Free(mp) + s, _ = vectorToString(vec16, 0) + require.Equal(t, "16", s) + + vec32 := vector.NewVec(types.T_uint32.ToType()) + require.NoError(t, vector.AppendFixed(vec32, uint32(32), false, mp)) + require.NoError(t, vector.AppendFixed(vec32, uint32(0), true, mp)) + defer vec32.Free(mp) + s, _ = vectorToString(vec32, 0) + require.Equal(t, "32", s) + + vec64 := vector.NewVec(types.T_uint64.ToType()) + require.NoError(t, vector.AppendFixed(vec64, uint64(64), false, mp)) + require.NoError(t, vector.AppendFixed(vec64, uint64(0), true, mp)) + defer vec64.Free(mp) + s, _ = vectorToString(vec64, 0) + require.Equal(t, "64", s) + }) + + t.Run("float_types", func(t *testing.T) { + vec32 := vector.NewVec(types.T_float32.ToType()) + require.NoError(t, vector.AppendFixed(vec32, float32(1.5), false, mp)) + require.NoError(t, vector.AppendFixed(vec32, float32(0), true, mp)) + defer vec32.Free(mp) + s, _ := vectorToString(vec32, 0) + require.Equal(t, "1.5", s) + s, _ = vectorToString(vec32, 1) + require.Equal(t, "", s) + + vec64 := vector.NewVec(types.T_float64.ToType()) + require.NoError(t, vector.AppendFixed(vec64, float64(2.5), false, mp)) + require.NoError(t, vector.AppendFixed(vec64, float64(0), true, mp)) + defer vec64.Free(mp) + s, _ = vectorToString(vec64, 0) + require.Equal(t, "2.5", s) + s, _ = vectorToString(vec64, 1) + require.Equal(t, "", s) + }) + + t.Run("decimal_types", func(t *testing.T) { + dec64 := vector.NewVec(types.New(types.T_decimal64, 10, 2)) + d64, _ := types.Decimal64FromFloat64(3.14, 64, 2) + require.NoError(t, vector.AppendFixed(dec64, d64, false, mp)) + require.NoError(t, vector.AppendFixed(dec64, d64, true, mp)) // NULL + defer dec64.Free(mp) + s, _ := vectorToString(dec64, 0) + require.NotEmpty(t, s) + s, _ = vectorToString(dec64, 1) + require.Equal(t, "", s) + + dec128 := vector.NewVec(types.New(types.T_decimal128, 20, 2)) + d128, _ := types.Decimal128FromFloat64(6.28, 128, 2) + require.NoError(t, vector.AppendFixed(dec128, d128, false, mp)) + require.NoError(t, vector.AppendFixed(dec128, d128, true, mp)) + defer dec128.Free(mp) + s, _ = vectorToString(dec128, 0) + require.NotEmpty(t, s) + s, _ = vectorToString(dec128, 1) + require.Equal(t, "", s) + }) + + t.Run("bit_type", func(t *testing.T) { + vec := vector.NewVec(types.T_bit.ToType()) + require.NoError(t, vector.AppendFixed(vec, uint64(0xFF), false, mp)) + require.NoError(t, vector.AppendFixed(vec, uint64(0), true, mp)) + defer vec.Free(mp) + s, _ := vectorToString(vec, 0) + require.Equal(t, "255", s) + s, _ = vectorToString(vec, 1) + require.Equal(t, "", s) + }) + + t.Run("char_text_blob", func(t *testing.T) { + for _, oid := range []types.T{types.T_char, types.T_text, types.T_blob, types.T_binary, types.T_varbinary} { + vec := vector.NewVec(oid.ToType()) + require.NoError(t, vector.AppendBytes(vec, []byte("data"), false, mp)) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + defer vec.Free(mp) + s, _ := vectorToString(vec, 0) + require.Equal(t, "data", s, "type %v", oid) + s, _ = vectorToString(vec, 1) + require.Equal(t, "", s, "NULL for type %v", oid) + } + }) + + t.Run("uuid", func(t *testing.T) { + vec := vector.NewVec(types.T_uuid.ToType()) + u, _ := types.ParseUuid("12345678-1234-1234-1234-123456789abc") + require.NoError(t, vector.AppendFixed(vec, u, false, mp)) + require.NoError(t, vector.AppendFixed(vec, u, true, mp)) + defer vec.Free(mp) + s, _ := vectorToString(vec, 0) + require.NotEmpty(t, s) + s, _ = vectorToString(vec, 1) + require.Equal(t, "", s) + }) + + t.Run("date_time_types", func(t *testing.T) { + // date + dv := vector.NewVec(types.T_date.ToType()) + d, _ := types.ParseDateCast("2024-01-15") + require.NoError(t, vector.AppendFixed(dv, d, false, mp)) + require.NoError(t, vector.AppendFixed(dv, d, true, mp)) + defer dv.Free(mp) + s, _ := vectorToString(dv, 0) + require.Contains(t, s, "2024") + s, _ = vectorToString(dv, 1) + require.Equal(t, "", s) + + // time + tv := vector.NewVec(types.T_time.ToType()) + tm, _ := types.ParseTime("12:30:45", 0) + require.NoError(t, vector.AppendFixed(tv, tm, false, mp)) + require.NoError(t, vector.AppendFixed(tv, tm, true, mp)) + defer tv.Free(mp) + s, _ = vectorToString(tv, 0) + require.Contains(t, s, "12") + s, _ = vectorToString(tv, 1) + require.Equal(t, "", s) + + // datetime + dtv := vector.NewVec(types.New(types.T_datetime, 0, 0)) + dt, _ := types.ParseDatetime("2024-01-15 12:30:45", 0) + require.NoError(t, vector.AppendFixed(dtv, dt, false, mp)) + require.NoError(t, vector.AppendFixed(dtv, dt, true, mp)) + defer dtv.Free(mp) + s, _ = vectorToString(dtv, 0) + require.Contains(t, s, "2024") + s, _ = vectorToString(dtv, 1) + require.Equal(t, "", s) + + // timestamp + tsv := vector.NewVec(types.New(types.T_timestamp, 0, 0)) + ts, _ := types.ParseTimestamp(time.Local, "2024-01-15 12:30:45", 0) + require.NoError(t, vector.AppendFixed(tsv, ts, false, mp)) + require.NoError(t, vector.AppendFixed(tsv, ts, true, mp)) + defer tsv.Free(mp) + s, _ = vectorToString(tsv, 0) + require.Contains(t, s, "2024") + s, _ = vectorToString(tsv, 1) + require.Equal(t, "", s) + + // year + yv := vector.NewVec(types.T_year.ToType()) + require.NoError(t, vector.AppendFixed(yv, types.MoYear(2024), false, mp)) + require.NoError(t, vector.AppendFixed(yv, types.MoYear(0), true, mp)) + defer yv.Free(mp) + s, _ = vectorToString(yv, 0) + require.Contains(t, s, "2024") + s, _ = vectorToString(yv, 1) + require.Equal(t, "", s) + }) + + t.Run("enum_type", func(t *testing.T) { + vec := vector.NewVec(types.T_enum.ToType()) + require.NoError(t, vector.AppendFixed(vec, types.Enum(3), false, mp)) + require.NoError(t, vector.AppendFixed(vec, types.Enum(0), true, mp)) + defer vec.Free(mp) + s, _ := vectorToString(vec, 0) + require.Equal(t, "3", s) + s, _ = vectorToString(vec, 1) + require.Equal(t, "", s) + }) + + t.Run("json_type", func(t *testing.T) { + vec := vector.NewVec(types.T_json.ToType()) + bj, _ := types.ParseStringToByteJson(`{"key":"val"}`) + bjBytes, _ := bj.Marshal() + require.NoError(t, vector.AppendBytes(vec, bjBytes, false, mp)) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + defer vec.Free(mp) + s, _ := vectorToString(vec, 0) + require.Contains(t, s, "key") + s, _ = vectorToString(vec, 1) + require.Equal(t, "", s) + }) +} + +func TestFirstlyCheckSkipsNulls(t *testing.T) { + mp, err := mpool.NewMPool("test_firstlyCheck", 0, mpool.NoFixed) + require.NoError(t, err) + defer mpool.DeleteMPool(mp) + + t.Run("all_nulls_no_error", func(t *testing.T) { + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) + defer vec.Free(mp) + + f := &fuzzyCheck{attr: "pk"} + err := f.firstlyCheck(context.Background(), vec) + require.NoError(t, err, "all-NULL rows must not trigger duplicate error") + }) + + t.Run("nulls_mixed_with_distinct_values", func(t *testing.T) { + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(1), false, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(vec, int64(2), false, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) // NULL + defer vec.Free(mp) + + f := &fuzzyCheck{attr: "pk"} + err := f.firstlyCheck(context.Background(), vec) + require.NoError(t, err, "NULLs should be skipped, 1 and 2 are distinct") + }) + + t.Run("real_dup_still_caught", func(t *testing.T) { + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(5), false, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(vec, int64(5), false, mp)) // dup! + defer vec.Free(mp) + + f := &fuzzyCheck{attr: "pk"} + err := f.firstlyCheck(context.Background(), vec) + require.Error(t, err, "real duplicate must be caught") + require.Contains(t, err.Error(), "Duplicate entry") + }) + + t.Run("compound_all_nulls_no_error", func(t *testing.T) { + // Compound key: all rows are NULL (serial() propagated) + vec := vector.NewVec(types.T_varchar.ToType()) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + defer vec.Free(mp) + + f := &fuzzyCheck{ + attr: "__mo_cpkey_col", + isCompound: true, + compoundCols: []*plan.ColDef{ + {Name: "a", Typ: plan.Type{Id: int32(types.T_int64)}}, + {Name: "b", Typ: plan.Type{Id: int32(types.T_int64)}}, + }, + } + err := f.firstlyCheck(context.Background(), vec) + require.NoError(t, err, "compound all-NULL rows must not trigger duplicate error") + }) + + t.Run("compound_mixed_nulls_distinct", func(t *testing.T) { + packer := types.NewPacker() + defer packer.Close() + + vec := vector.NewVec(types.T_varchar.ToType()) + // Row 0: packed (1, 10) — non-NULL + packer.Reset() + packer.EncodeInt64(1) + packer.EncodeInt64(10) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + // Row 1: NULL + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + // Row 2: packed (2, 20) — non-NULL + packer.Reset() + packer.EncodeInt64(2) + packer.EncodeInt64(20) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + defer vec.Free(mp) + + f := &fuzzyCheck{ + attr: "__mo_cpkey_col", + isCompound: true, + compoundCols: []*plan.ColDef{ + {Name: "a", Typ: plan.Type{Id: int32(types.T_int64)}}, + {Name: "b", Typ: plan.Type{Id: int32(types.T_int64)}}, + }, + } + err := f.firstlyCheck(context.Background(), vec) + require.NoError(t, err, "NULLs should be skipped, (1,10) and (2,20) are distinct") + }) + + t.Run("compound_dup_caught", func(t *testing.T) { + packer := types.NewPacker() + defer packer.Close() + + vec := vector.NewVec(types.T_varchar.ToType()) + // Row 0: packed (1, 10) + packer.Reset() + packer.EncodeInt64(1) + packer.EncodeInt64(10) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + // Row 1: NULL + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + // Row 2: packed (1, 10) — duplicate! + packer.Reset() + packer.EncodeInt64(1) + packer.EncodeInt64(10) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + defer vec.Free(mp) + + f := &fuzzyCheck{ + attr: "__mo_cpkey_col", + isCompound: true, + compoundCols: []*plan.ColDef{ + {Name: "a", Typ: plan.Type{Id: int32(types.T_int64)}}, + {Name: "b", Typ: plan.Type{Id: int32(types.T_int64)}}, + }, + } + err := f.firstlyCheck(context.Background(), vec) + require.Error(t, err, "compound duplicate must be caught") + require.Contains(t, err.Error(), "Duplicate entry") + }) +} + +func TestGenCollsionKeysSkipsNulls(t *testing.T) { + mp, err := mpool.NewMPool("test_genCollsionKeys", 0, mpool.NoFixed) + require.NoError(t, err) + defer mpool.DeleteMPool(mp) + + t.Run("non_compound_filters_nulls", func(t *testing.T) { + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(1), false, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(vec, int64(3), false, mp)) + defer vec.Free(mp) + + f := &fuzzyCheck{attr: "pk"} + keys, err := f.genCollsionKeys(vec) + require.NoError(t, err) + require.Len(t, keys[0], 2, "should only have 2 non-NULL keys") + }) + + t.Run("non_compound_all_nulls", func(t *testing.T) { + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) + defer vec.Free(mp) + + f := &fuzzyCheck{attr: "pk"} + keys, err := f.genCollsionKeys(vec) + require.NoError(t, err) + require.Len(t, keys[0], 0, "all NULLs should produce empty keys") + }) + + t.Run("compound_filters_nulls", func(t *testing.T) { + // For compound keys, the packed tuple is a Varlena vector. + // If ANY component column is NULL, serial() marks the tuple NULL. + packer := types.NewPacker() + defer packer.Close() + + vec := vector.NewVec(types.T_varchar.ToType()) + // Row 0: non-NULL packed tuple + packer.Reset() + packer.EncodeInt64(1) + packer.EncodeInt64(10) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + // Row 1: NULL (component column was NULL) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + // Row 2: non-NULL packed tuple + packer.Reset() + packer.EncodeInt64(2) + packer.EncodeInt64(20) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + defer vec.Free(mp) + + f := &fuzzyCheck{ + attr: "__mo_cpkey_col", + isCompound: true, + compoundCols: []*plan.ColDef{ + {Name: "a", Typ: plan.Type{Id: int32(types.T_int64)}}, + {Name: "b", Typ: plan.Type{Id: int32(types.T_int64)}}, + }, + } + keys, err := f.genCollsionKeys(vec) + require.NoError(t, err) + // 2 columns, each should have 2 non-NULL entries + require.Len(t, keys, 2) + require.Len(t, keys[0], 2, "column a should have 2 non-NULL keys") + require.Len(t, keys[1], 2, "column b should have 2 non-NULL keys") + }) +} + +func TestFillAllNullsSkipsBackgroundCheck(t *testing.T) { + mp, err := mpool.NewMPool("test_fill", 0, mpool.NoFixed) + require.NoError(t, err) + defer mpool.DeleteMPool(mp) + + t.Run("non_compound_all_nulls", func(t *testing.T) { + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) + + bat := batch.NewWithSize(1) + bat.Vecs[0] = vec + bat.SetRowCount(2) + defer func() { + vec.Free(mp) + bat.Clean(mp) + }() + + f := &fuzzyCheck{attr: "pk"} + err := f.fill(context.Background(), bat) + require.NoError(t, err) + require.Equal(t, 0, f.cnt, "cnt should be 0 when all keys are NULL") + require.Equal(t, "", f.condition, "condition should be empty") + }) + + t.Run("non_compound_mixed", func(t *testing.T) { + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(42), false, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(vec, int64(99), false, mp)) + + bat := batch.NewWithSize(1) + bat.Vecs[0] = vec + bat.SetRowCount(3) + defer func() { + vec.Free(mp) + bat.Clean(mp) + }() + + f := &fuzzyCheck{attr: "pk"} + err := f.fill(context.Background(), bat) + require.NoError(t, err) + require.Equal(t, 2, f.cnt, "cnt should only count non-NULL keys") + require.NotEmpty(t, f.condition, "condition should be generated for non-NULL keys") + // condition should contain the two non-NULL values + require.Contains(t, f.condition, "42") + require.Contains(t, f.condition, "99") + }) + + t.Run("compound_all_nulls", func(t *testing.T) { + vec := vector.NewVec(types.T_varchar.ToType()) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + + bat := batch.NewWithSize(1) + bat.Vecs[0] = vec + bat.SetRowCount(2) + defer func() { + vec.Free(mp) + bat.Clean(mp) + }() + + f := &fuzzyCheck{ + attr: "__mo_cpkey_col", + isCompound: true, + compoundCols: []*plan.ColDef{ + {Name: "a", Typ: plan.Type{Id: int32(types.T_int64)}}, + {Name: "b", Typ: plan.Type{Id: int32(types.T_int64)}}, + }, + } + err := f.fill(context.Background(), bat) + require.NoError(t, err) + require.Equal(t, 0, f.cnt, "compound all-NULLs should have cnt=0") + require.Equal(t, "", f.condition) + }) + + t.Run("compound_mixed_nulls", func(t *testing.T) { + packer := types.NewPacker() + defer packer.Close() + + vec := vector.NewVec(types.T_varchar.ToType()) + // Row 0: packed (1, 10) — non-NULL + packer.Reset() + packer.EncodeInt64(1) + packer.EncodeInt64(10) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + // Row 1: NULL + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + // Row 2: packed (2, 20) — non-NULL + packer.Reset() + packer.EncodeInt64(2) + packer.EncodeInt64(20) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + + bat := batch.NewWithSize(1) + bat.Vecs[0] = vec + bat.SetRowCount(3) + defer func() { + vec.Free(mp) + bat.Clean(mp) + }() + + f := &fuzzyCheck{ + attr: "__mo_cpkey_col", + isCompound: true, + compoundCols: []*plan.ColDef{ + {Name: "a", Typ: plan.Type{Id: int32(types.T_int64)}}, + {Name: "b", Typ: plan.Type{Id: int32(types.T_int64)}}, + }, + } + err := f.fill(context.Background(), bat) + require.NoError(t, err) + require.Equal(t, 2, f.cnt, "compound mixed should count only 2 non-NULL keys") + require.NotEmpty(t, f.condition) + }) +} + +// TestFillOnlyInsertHidden tests the hidden unique index path (onlyInsertHidden=true). +// This covers the backfill/CREATE UNIQUE INDEX path where the hidden table receives +// only the unique column values. +func TestFillOnlyInsertHidden(t *testing.T) { + mp, err := mpool.NewMPool("test_fill_hidden", 0, mpool.NoFixed) + require.NoError(t, err) + defer mpool.DeleteMPool(mp) + + t.Run("hidden_with_values", func(t *testing.T) { + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(10), false, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(20), false, mp)) + + bat := batch.NewWithSize(1) + bat.Vecs[0] = vec + bat.SetRowCount(2) + defer func() { + vec.Free(mp) + bat.Clean(mp) + }() + + f := &fuzzyCheck{ + attr: "pk", + onlyInsertHidden: true, + } + err := f.fill(context.Background(), bat) + require.NoError(t, err) + require.Equal(t, 2, f.cnt) + require.Contains(t, f.condition, "10") + require.Contains(t, f.condition, "20") + }) + + t.Run("hidden_dup_caught_by_firstly_check_skipped", func(t *testing.T) { + // onlyInsertHidden skips firstlyCheck — duplicates in the hidden table + // are caught by backgroundSQLCheck, not the in-batch check. + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(5), false, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(5), false, mp)) // dup value + + bat := batch.NewWithSize(1) + bat.Vecs[0] = vec + bat.SetRowCount(2) + defer func() { + vec.Free(mp) + bat.Clean(mp) + }() + + f := &fuzzyCheck{ + attr: "pk", + onlyInsertHidden: true, + } + // fill should NOT error — firstlyCheck is skipped for hidden tables + err := f.fill(context.Background(), bat) + require.NoError(t, err) + require.Equal(t, 2, f.cnt) + }) +} + +// TestFillEndToEndConditionGeneration exercises the full fill() pipeline: +// fill() → firstlyCheck → genCollsionKeys → condition string generation +// and verifies the generated SQL condition is valid for backgroundSQLCheck. +func TestFillEndToEndConditionGeneration(t *testing.T) { + mp, err := mpool.NewMPool("test_e2e_fill", 0, mpool.NoFixed) + require.NoError(t, err) + defer mpool.DeleteMPool(mp) + + t.Run("non_compound_nulls_skipped_real_dup_caught", func(t *testing.T) { + // Multiple NULLs + a real duplicate value. firstlyCheck should catch the dup. + vec := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(vec, int64(7), false, mp)) + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(vec, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(vec, int64(7), false, mp)) // dup! + + bat := batch.NewWithSize(1) + bat.Vecs[0] = vec + bat.SetRowCount(4) + defer func() { + vec.Free(mp) + bat.Clean(mp) + }() + + f := &fuzzyCheck{attr: "pk"} + err := f.fill(context.Background(), bat) + require.Error(t, err, "real duplicate should be caught") + require.Contains(t, err.Error(), "Duplicate entry") + }) + + t.Run("non_compound_nulls_only_skips_background_sql", func(t *testing.T) { + // All NULLs → cnt=0, condition="" → backgroundSQLCheck would be skipped + vec := vector.NewVec(types.T_varchar.ToType()) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + + bat := batch.NewWithSize(1) + bat.Vecs[0] = vec + bat.SetRowCount(3) + defer func() { + vec.Free(mp) + bat.Clean(mp) + }() + + f := &fuzzyCheck{attr: "name"} + err := f.fill(context.Background(), bat) + require.NoError(t, err) + require.Equal(t, 0, f.cnt, "all NULLs → cnt=0 → backgroundSQLCheck skipped") + require.Equal(t, "", f.condition) + }) + + t.Run("compound_mixed_generates_valid_condition", func(t *testing.T) { + packer := types.NewPacker() + defer packer.Close() + + vec := vector.NewVec(types.T_varchar.ToType()) + // Row 0: packed (10, 100) — non-NULL + packer.Reset() + packer.EncodeInt64(10) + packer.EncodeInt64(100) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + // Row 1: NULL + require.NoError(t, vector.AppendBytes(vec, nil, true, mp)) + // Row 2: packed (20, 200) — non-NULL + packer.Reset() + packer.EncodeInt64(20) + packer.EncodeInt64(200) + require.NoError(t, vector.AppendBytes(vec, packer.GetBuf(), false, mp)) + + bat := batch.NewWithSize(1) + bat.Vecs[0] = vec + bat.SetRowCount(3) + defer func() { + vec.Free(mp) + bat.Clean(mp) + }() + + f := &fuzzyCheck{ + attr: "__mo_cpkey_col", + isCompound: true, + compoundCols: []*plan.ColDef{ + {Name: "a", Typ: plan.Type{Id: int32(types.T_int64)}}, + {Name: "b", Typ: plan.Type{Id: int32(types.T_int64)}}, + }, + } + err := f.fill(context.Background(), bat) + require.NoError(t, err) + require.Equal(t, 2, f.cnt) + // Condition should contain both column names + require.Contains(t, f.condition, "a =") + require.Contains(t, f.condition, "b =") + }) +} + +// TestConcurrentFirstlyCheck verifies that firstlyCheck is safe to call +// concurrently with independent fuzzyCheck instances (simulating concurrent INSERTs). +func TestConcurrentFirstlyCheck(t *testing.T) { + mp, err := mpool.NewMPool("test_concurrent_fc", 0, mpool.NoFixed) + require.NoError(t, err) + defer mpool.DeleteMPool(mp) + + const numGoroutines = 8 + const numRows = 100 + + var wg sync.WaitGroup + errors := make(chan error, numGoroutines) + + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + // Each goroutine creates its own vector and fuzzyCheck + vec := vector.NewVec(types.T_int64.ToType()) + for i := 0; i < numRows; i++ { + isNull := (i % 3) == 0 // every 3rd row is NULL + if isNull { + vector.AppendFixed(vec, int64(0), true, mp) + } else { + // Use goroutineID*numRows+i to ensure unique values across goroutines + vector.AppendFixed(vec, int64(goroutineID*numRows+i), false, mp) + } + } + defer vec.Free(mp) + + f := &fuzzyCheck{attr: "pk"} + if err := f.firstlyCheck(context.Background(), vec); err != nil { + errors <- err + } + }(g) + } + + wg.Wait() + close(errors) + + for err := range errors { + t.Errorf("unexpected error: %v", err) + } +} diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index ed700d301eca8..114dff11beaba 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -256,9 +256,15 @@ func (txn *Transaction) dumpBatch(ctx context.Context, offset int) error { func checkPKDupGeneric[T comparable]( mp map[any]bool, t *types.Type, + pk *vector.Vector, vals []T, start, count int) (bool, string) { - for _, v := range vals[start : start+count] { + nsp := pk.GetNulls() + for i, v := range vals[start : start+count] { + // SQL standard: NULL != NULL, skip NULLs from duplicate check + if nsp.Contains(uint64(start + i)) { + continue + } if _, ok := mp[v]; ok { entry := common.TypeStringValue(*t, v, false) return true, entry @@ -276,76 +282,80 @@ func checkPKDup( switch colType.Oid { case types.T_bool: vs := vector.MustFixedColNoTypeCheck[bool](pk) - return checkPKDupGeneric[bool](mp, colType, vs, start, count) + return checkPKDupGeneric[bool](mp, colType, pk, vs, start, count) case types.T_bit: vs := vector.MustFixedColNoTypeCheck[uint64](pk) - return checkPKDupGeneric[uint64](mp, colType, vs, start, count) + return checkPKDupGeneric[uint64](mp, colType, pk, vs, start, count) case types.T_int8: vs := vector.MustFixedColNoTypeCheck[int8](pk) - return checkPKDupGeneric[int8](mp, colType, vs, start, count) + return checkPKDupGeneric[int8](mp, colType, pk, vs, start, count) case types.T_int16: vs := vector.MustFixedColNoTypeCheck[int16](pk) - return checkPKDupGeneric[int16](mp, colType, vs, start, count) + return checkPKDupGeneric[int16](mp, colType, pk, vs, start, count) case types.T_int32: vs := vector.MustFixedColNoTypeCheck[int32](pk) - return checkPKDupGeneric[int32](mp, colType, vs, start, count) + return checkPKDupGeneric[int32](mp, colType, pk, vs, start, count) case types.T_int64: vs := vector.MustFixedColNoTypeCheck[int64](pk) - return checkPKDupGeneric[int64](mp, colType, vs, start, count) + return checkPKDupGeneric[int64](mp, colType, pk, vs, start, count) case types.T_uint8: vs := vector.MustFixedColNoTypeCheck[uint8](pk) - return checkPKDupGeneric[uint8](mp, colType, vs, start, count) + return checkPKDupGeneric[uint8](mp, colType, pk, vs, start, count) case types.T_uint16: vs := vector.MustFixedColNoTypeCheck[uint16](pk) - return checkPKDupGeneric[uint16](mp, colType, vs, start, count) + return checkPKDupGeneric[uint16](mp, colType, pk, vs, start, count) case types.T_uint32: vs := vector.MustFixedColNoTypeCheck[uint32](pk) - return checkPKDupGeneric[uint32](mp, colType, vs, start, count) + return checkPKDupGeneric[uint32](mp, colType, pk, vs, start, count) case types.T_uint64: vs := vector.MustFixedColNoTypeCheck[uint64](pk) - return checkPKDupGeneric[uint64](mp, colType, vs, start, count) + return checkPKDupGeneric[uint64](mp, colType, pk, vs, start, count) case types.T_decimal64: vs := vector.MustFixedColNoTypeCheck[types.Decimal64](pk) - return checkPKDupGeneric[types.Decimal64](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Decimal64](mp, colType, pk, vs, start, count) case types.T_decimal128: vs := vector.MustFixedColNoTypeCheck[types.Decimal128](pk) - return checkPKDupGeneric[types.Decimal128](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Decimal128](mp, colType, pk, vs, start, count) case types.T_uuid: vs := vector.MustFixedColNoTypeCheck[types.Uuid](pk) - return checkPKDupGeneric[types.Uuid](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Uuid](mp, colType, pk, vs, start, count) case types.T_float32: vs := vector.MustFixedColNoTypeCheck[float32](pk) - return checkPKDupGeneric[float32](mp, colType, vs, start, count) + return checkPKDupGeneric[float32](mp, colType, pk, vs, start, count) case types.T_float64: vs := vector.MustFixedColNoTypeCheck[float64](pk) - return checkPKDupGeneric[float64](mp, colType, vs, start, count) + return checkPKDupGeneric[float64](mp, colType, pk, vs, start, count) case types.T_date: vs := vector.MustFixedColNoTypeCheck[types.Date](pk) - return checkPKDupGeneric[types.Date](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Date](mp, colType, pk, vs, start, count) case types.T_timestamp: vs := vector.MustFixedColNoTypeCheck[types.Timestamp](pk) - return checkPKDupGeneric[types.Timestamp](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Timestamp](mp, colType, pk, vs, start, count) case types.T_time: vs := vector.MustFixedColNoTypeCheck[types.Time](pk) - return checkPKDupGeneric[types.Time](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Time](mp, colType, pk, vs, start, count) case types.T_datetime: vs := vector.MustFixedColNoTypeCheck[types.Datetime](pk) - return checkPKDupGeneric[types.Datetime](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Datetime](mp, colType, pk, vs, start, count) case types.T_enum: vs := vector.MustFixedColNoTypeCheck[types.Enum](pk) - return checkPKDupGeneric[types.Enum](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Enum](mp, colType, pk, vs, start, count) case types.T_TS: vs := vector.MustFixedColNoTypeCheck[types.TS](pk) - return checkPKDupGeneric[types.TS](mp, colType, vs, start, count) + return checkPKDupGeneric[types.TS](mp, colType, pk, vs, start, count) case types.T_Rowid: vs := vector.MustFixedColNoTypeCheck[types.Rowid](pk) - return checkPKDupGeneric[types.Rowid](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Rowid](mp, colType, pk, vs, start, count) case types.T_Blockid: vs := vector.MustFixedColNoTypeCheck[types.Blockid](pk) - return checkPKDupGeneric[types.Blockid](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Blockid](mp, colType, pk, vs, start, count) case types.T_char, types.T_varchar, types.T_json, types.T_binary, types.T_varbinary, types.T_blob, types.T_text, types.T_datalink: + nsp := pk.GetNulls() for i := start; i < start+count; i++ { + if nsp.Contains(uint64(i)) { + continue + } v := pk.UnsafeGetStringAt(i) if _, ok := mp[v]; ok { entry := common.TypeStringValue(*colType, []byte(v), false) @@ -354,7 +364,11 @@ func checkPKDup( mp[v] = true } case types.T_array_float32: + nsp := pk.GetNulls() for i := start; i < start+count; i++ { + if nsp.Contains(uint64(i)) { + continue + } v := types.ArrayToString[float32](vector.GetArrayAt[float32](pk, i)) if _, ok := mp[v]; ok { entry := common.TypeStringValue(*colType, pk.GetBytesAt(i), false) @@ -363,7 +377,11 @@ func checkPKDup( mp[v] = true } case types.T_array_float64: + nsp := pk.GetNulls() for i := start; i < start+count; i++ { + if nsp.Contains(uint64(i)) { + continue + } v := types.ArrayToString[float64](vector.GetArrayAt[float64](pk, i)) if _, ok := mp[v]; ok { entry := common.TypeStringValue(*colType, pk.GetBytesAt(i), false) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 4c6123721c492..19328b2f82d2c 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -2674,6 +2674,19 @@ func (tbl *txnTable) primaryKeysMayBeChanged( v2.TxnPKMayBeChangedTotalCounter.Inc() + // SQL standard: NULL != NULL, filter out NULLs before duplicate checking. + // Also creates a owned copy so InplaceSort in PKPersistedBetween won't + // corrupt the caller's batch vector or its null bitmap. + mp := tbl.proc.Load().Mp() + keysVector, err := dupVectorWithoutNulls(keysVector, mp) + if err != nil { + return false, err + } + defer keysVector.Free(mp) + if keysVector.Length() == 0 { + return false, nil + } + if tbl.db.op.IsSnapOp() { return false, moerr.NewInternalErrorNoCtx("primary key modification is not allowed in snapshot transaction") @@ -3080,3 +3093,22 @@ func (tbl *txnTable) getCommittedRows( func (tbl *txnTable) GetExtraInfo() *api.SchemaExtra { return tbl.extraInfo } + +// dupVectorWithoutNulls returns an owned copy of v with NULL rows removed. +// If v has no NULLs it returns Dup(v). The caller must Free the result. +func dupVectorWithoutNulls(v *vector.Vector, mp *mpool.MPool) (*vector.Vector, error) { + if !v.HasNull() { + return v.Dup(mp) + } + filtered := vector.NewVec(*v.GetType()) + nsp := v.GetNulls() + for i := 0; i < v.Length(); i++ { + if !nsp.Contains(uint64(i)) { + if err := filtered.UnionOne(v, int64(i), mp); err != nil { + filtered.Free(mp) + return nil, err + } + } + } + return filtered, nil +} diff --git a/pkg/vm/engine/disttae/txn_test.go b/pkg/vm/engine/disttae/txn_test.go index 28528109cd2fd..55161926cca8f 100644 --- a/pkg/vm/engine/disttae/txn_test.go +++ b/pkg/vm/engine/disttae/txn_test.go @@ -551,6 +551,454 @@ func newTransactionWithActivePKTableForTest( return txn } +// TestCheckPKDupSkipsNulls verifies that checkPKDup correctly skips NULL +// values per SQL standard (NULL != NULL), preventing false duplicate errors. +func TestCheckPKDupSkipsNulls(t *testing.T) { + proc := testutil.NewProc(t) + mp := proc.Mp() + + t.Run("int64_all_nulls_no_dup", func(t *testing.T) { + // All NULLs should never produce a duplicate + pk := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(pk, int64(0), true, mp)) + require.NoError(t, vector.AppendFixed(pk, int64(0), true, mp)) + require.NoError(t, vector.AppendFixed(pk, int64(0), true, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup, "all-NULL rows must not report duplicate") + require.Empty(t, m, "NULL rows must not be added to the map") + pk.Free(mp) + }) + + t.Run("int64_mixed_nulls_and_values", func(t *testing.T) { + // Two NULLs + two distinct values: no duplicate + pk := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(pk, int64(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, int64(2), false, mp)) + require.NoError(t, vector.AppendFixed(pk, int64(0), true, mp)) // NULL + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 4) + require.False(t, dup, "NULLs should be skipped, 1 and 2 are distinct") + require.Len(t, m, 2, "only non-NULL values should be in the map") + pk.Free(mp) + }) + + t.Run("int64_real_dup_among_nulls", func(t *testing.T) { + // Real duplicate among NULLs should still be caught + pk := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(pk, int64(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, int64(1), false, mp)) // dup! + + m := make(map[any]bool) + dup, entry := checkPKDup(m, pk, 0, 3) + require.True(t, dup, "real duplicate 1 must be caught") + require.Contains(t, entry, "1") + pk.Free(mp) + }) + + t.Run("varchar_nulls_no_dup", func(t *testing.T) { + // String type NULLs should be skipped + pk := vector.NewVec(types.T_varchar.ToType()) + require.NoError(t, vector.AppendBytes(pk, []byte("hello"), false, mp)) + require.NoError(t, vector.AppendBytes(pk, nil, true, mp)) // NULL + require.NoError(t, vector.AppendBytes(pk, nil, true, mp)) // NULL + require.NoError(t, vector.AppendBytes(pk, []byte("world"), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 4) + require.False(t, dup, "NULLs should be skipped for varchar") + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("varchar_null_empty_string_no_collision", func(t *testing.T) { + // NULL and empty string "" are different: NULL is skipped, "" is a value + pk := vector.NewVec(types.T_varchar.ToType()) + require.NoError(t, vector.AppendBytes(pk, nil, true, mp)) // NULL + require.NoError(t, vector.AppendBytes(pk, []byte(""), false, mp)) // empty string + require.NoError(t, vector.AppendBytes(pk, nil, true, mp)) // NULL + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup, "NULL and empty string must not collide") + require.Len(t, m, 1, "only the empty string should be in the map") + pk.Free(mp) + }) + + t.Run("partial_range_with_nulls", func(t *testing.T) { + // Test start/count range with NULLs + pk := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(pk, int64(10), false, mp)) + require.NoError(t, vector.AppendFixed(pk, int64(0), true, mp)) // NULL at pos 1 + require.NoError(t, vector.AppendFixed(pk, int64(20), false, mp)) + require.NoError(t, vector.AppendFixed(pk, int64(0), true, mp)) // NULL at pos 3 + + // Check only range [1,3) — NULL at 1, value 20 at 2 + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 1, 2) + require.False(t, dup) + require.Len(t, m, 1, "only pos 2 (value 20) should be in map") + pk.Free(mp) + }) + + t.Run("array_float32_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_array_float32.ToType()) + require.NoError(t, vector.AppendArray(pk, []float32{1.0, 2.0}, false, mp)) + require.NoError(t, vector.AppendArray(pk, []float32{0}, true, mp)) // NULL + require.NoError(t, vector.AppendArray(pk, []float32{1.0, 2.0}, false, mp)) // dup! + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.True(t, dup, "real duplicate array should be caught") + pk.Free(mp) + }) + + t.Run("array_float32_all_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_array_float32.ToType()) + require.NoError(t, vector.AppendArray(pk, []float32{0}, true, mp)) + require.NoError(t, vector.AppendArray(pk, []float32{0}, true, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 2) + require.False(t, dup, "all-NULL arrays must not report duplicate") + require.Empty(t, m) + pk.Free(mp) + }) + + t.Run("array_float64_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_array_float64.ToType()) + require.NoError(t, vector.AppendArray(pk, []float64{1.0}, false, mp)) + require.NoError(t, vector.AppendArray(pk, []float64{0}, true, mp)) // NULL + require.NoError(t, vector.AppendArray(pk, []float64{2.0}, false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup, "NULLs should be skipped for float64 arrays") + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("bool_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_bool.ToType()) + require.NoError(t, vector.AppendFixed(pk, true, false, mp)) + require.NoError(t, vector.AppendFixed(pk, false, true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, false, false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("int8_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_int8.ToType()) + require.NoError(t, vector.AppendFixed(pk, int8(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, int8(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, int8(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("int16_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_int16.ToType()) + require.NoError(t, vector.AppendFixed(pk, int16(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, int16(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, int16(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("int32_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_int32.ToType()) + require.NoError(t, vector.AppendFixed(pk, int32(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, int32(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, int32(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("uint8_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_uint8.ToType()) + require.NoError(t, vector.AppendFixed(pk, uint8(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, uint8(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, uint8(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("uint16_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_uint16.ToType()) + require.NoError(t, vector.AppendFixed(pk, uint16(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, uint16(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, uint16(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("uint32_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_uint32.ToType()) + require.NoError(t, vector.AppendFixed(pk, uint32(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, uint32(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, uint32(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("uint64_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_uint64.ToType()) + require.NoError(t, vector.AppendFixed(pk, uint64(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, uint64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, uint64(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("float32_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_float32.ToType()) + require.NoError(t, vector.AppendFixed(pk, float32(1.0), false, mp)) + require.NoError(t, vector.AppendFixed(pk, float32(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, float32(2.0), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("float64_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_float64.ToType()) + require.NoError(t, vector.AppendFixed(pk, float64(1.0), false, mp)) + require.NoError(t, vector.AppendFixed(pk, float64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, float64(2.0), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("date_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_date.ToType()) + require.NoError(t, vector.AppendFixed(pk, types.Date(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, types.Date(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, types.Date(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("datetime_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_datetime.ToType()) + require.NoError(t, vector.AppendFixed(pk, types.Datetime(100), false, mp)) + require.NoError(t, vector.AppendFixed(pk, types.Datetime(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, types.Datetime(200), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("uuid_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_uuid.ToType()) + u1 := types.Uuid{1} + u2 := types.Uuid{2} + require.NoError(t, vector.AppendFixed(pk, u1, false, mp)) + require.NoError(t, vector.AppendFixed(pk, types.Uuid{}, true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, u2, false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("decimal64_nulls", func(t *testing.T) { + tp := types.T_decimal64.ToType() + tp.Scale = 2 + pk := vector.NewVec(tp) + require.NoError(t, vector.AppendFixed(pk, types.Decimal64(100), false, mp)) + require.NoError(t, vector.AppendFixed(pk, types.Decimal64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, types.Decimal64(200), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("decimal128_nulls", func(t *testing.T) { + tp := types.T_decimal128.ToType() + tp.Scale = 2 + pk := vector.NewVec(tp) + d1 := types.Decimal128{B0_63: 100, B64_127: 0} + d2 := types.Decimal128{B0_63: 200, B64_127: 0} + require.NoError(t, vector.AppendFixed(pk, d1, false, mp)) + require.NoError(t, vector.AppendFixed(pk, types.Decimal128{}, true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, d2, false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("timestamp_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_timestamp.ToType()) + require.NoError(t, vector.AppendFixed(pk, types.Timestamp(100), false, mp)) + require.NoError(t, vector.AppendFixed(pk, types.Timestamp(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, types.Timestamp(200), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("time_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_time.ToType()) + require.NoError(t, vector.AppendFixed(pk, types.Time(100), false, mp)) + require.NoError(t, vector.AppendFixed(pk, types.Time(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, types.Time(200), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("enum_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_enum.ToType()) + require.NoError(t, vector.AppendFixed(pk, types.Enum(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, types.Enum(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, types.Enum(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) + + t.Run("bit_nulls", func(t *testing.T) { + pk := vector.NewVec(types.T_bit.ToType()) + require.NoError(t, vector.AppendFixed(pk, uint64(1), false, mp)) + require.NoError(t, vector.AppendFixed(pk, uint64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(pk, uint64(2), false, mp)) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 3) + require.False(t, dup) + require.Len(t, m, 2) + pk.Free(mp) + }) +} + +// TestDupVectorWithoutNulls tests the extracted helper that filters NULLs +// and duplicates the vector for safe InplaceSort. +func TestDupVectorWithoutNulls(t *testing.T) { + proc := testutil.NewProc(t) + mp := proc.Mp() + + t.Run("no_nulls", func(t *testing.T) { + v := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(v, int64(1), false, mp)) + require.NoError(t, vector.AppendFixed(v, int64(2), false, mp)) + + out, err := dupVectorWithoutNulls(v, mp) + require.NoError(t, err) + require.Equal(t, 2, out.Length()) + require.False(t, out.HasNull()) + out.Free(mp) + v.Free(mp) + }) + + t.Run("some_nulls", func(t *testing.T) { + v := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(v, int64(1), false, mp)) + require.NoError(t, vector.AppendFixed(v, int64(0), true, mp)) // NULL + require.NoError(t, vector.AppendFixed(v, int64(2), false, mp)) + require.NoError(t, vector.AppendFixed(v, int64(0), true, mp)) // NULL + + out, err := dupVectorWithoutNulls(v, mp) + require.NoError(t, err) + require.Equal(t, 2, out.Length()) + require.False(t, out.HasNull()) + out.Free(mp) + v.Free(mp) + }) + + t.Run("all_nulls", func(t *testing.T) { + v := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(v, int64(0), true, mp)) + require.NoError(t, vector.AppendFixed(v, int64(0), true, mp)) + + out, err := dupVectorWithoutNulls(v, mp) + require.NoError(t, err) + require.Equal(t, 0, out.Length()) + out.Free(mp) + v.Free(mp) + }) + + t.Run("varchar_with_nulls", func(t *testing.T) { + v := vector.NewVec(types.T_varchar.ToType()) + require.NoError(t, vector.AppendBytes(v, []byte("hello"), false, mp)) + require.NoError(t, vector.AppendBytes(v, nil, true, mp)) // NULL + require.NoError(t, vector.AppendBytes(v, []byte("world"), false, mp)) + + out, err := dupVectorWithoutNulls(v, mp) + require.NoError(t, err) + require.Equal(t, 2, out.Length()) + require.False(t, out.HasNull()) + out.Free(mp) + v.Free(mp) + }) +} + func newInt64BatchForTest( t *testing.T, proc *process.Process, @@ -620,3 +1068,139 @@ func newInsertBatchWithRowIDForTest( bat.SetRowCount(len(pks)) return bat } + +// TestConcurrentCheckPKDup verifies that checkPKDup works correctly when +// called concurrently with different vectors, each containing NULLs. +// This simulates the real production path where multiple INSERT txns +// perform PK duplicate checking concurrently. +func TestConcurrentCheckPKDup(t *testing.T) { + proc := testutil.NewProc(t) + mp := proc.Mp() + + const numGoroutines = 8 + const rowsPerGoroutine = 50 + + var wg sync.WaitGroup + errors := make(chan error, numGoroutines) + results := make(chan bool, numGoroutines) + + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + + pk := vector.NewVec(types.T_int64.ToType()) + for i := 0; i < rowsPerGoroutine; i++ { + isNull := (i % 5) == 0 // 20% NULLs + if isNull { + vector.AppendFixed(pk, int64(0), true, mp) + } else { + // Unique values per goroutine + vector.AppendFixed(pk, int64(goroutineID*1000+i), false, mp) + } + } + defer pk.Free(mp) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, rowsPerGoroutine) + results <- dup + if dup { + errors <- moerr.NewInternalErrorNoCtxf("unexpected duplicate in goroutine %d", goroutineID) + } + }(g) + } + + wg.Wait() + close(errors) + close(results) + + for err := range errors { + t.Errorf("concurrent checkPKDup error: %v", err) + } + + // All goroutines should report no duplicates + for dup := range results { + require.False(t, dup, "no duplicates expected with unique values per goroutine") + } +} + +// TestConcurrentCheckPKDup_RealDupWithNulls ensures that concurrent +// checkPKDup calls correctly detect real duplicates even when NULLs +// are present, but never flag NULLs as duplicates of each other. +func TestConcurrentCheckPKDup_RealDupWithNulls(t *testing.T) { + proc := testutil.NewProc(t) + mp := proc.Mp() + + const numGoroutines = 4 + var wg sync.WaitGroup + dupDetected := make(chan bool, numGoroutines) + + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func() { + defer wg.Done() + + pk := vector.NewVec(types.T_int64.ToType()) + // [1, NULL, 2, NULL, 1] — contains real dup (1 appears twice) + vector.AppendFixed(pk, int64(1), false, mp) + vector.AppendFixed(pk, int64(0), true, mp) + vector.AppendFixed(pk, int64(2), false, mp) + vector.AppendFixed(pk, int64(0), true, mp) + vector.AppendFixed(pk, int64(1), false, mp) // dup! + defer pk.Free(mp) + + m := make(map[any]bool) + dup, _ := checkPKDup(m, pk, 0, 5) + dupDetected <- dup + }() + } + + wg.Wait() + close(dupDetected) + + for dup := range dupDetected { + require.True(t, dup, "real duplicate (value=1) must be detected even with NULLs present") + } +} + +// TestDupVectorWithoutNulls_ConcurrentSafety verifies that dupVectorWithoutNulls +// produces independent copies safe for concurrent InplaceSort. +func TestDupVectorWithoutNulls_ConcurrentSafety(t *testing.T) { + proc := testutil.NewProc(t) + mp := proc.Mp() + + // Original vector with NULLs + orig := vector.NewVec(types.T_int64.ToType()) + require.NoError(t, vector.AppendFixed(orig, int64(30), false, mp)) + require.NoError(t, vector.AppendFixed(orig, int64(0), true, mp)) + require.NoError(t, vector.AppendFixed(orig, int64(10), false, mp)) + require.NoError(t, vector.AppendFixed(orig, int64(0), true, mp)) + require.NoError(t, vector.AppendFixed(orig, int64(20), false, mp)) + defer orig.Free(mp) + + const numGoroutines = 4 + var wg sync.WaitGroup + + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func() { + defer wg.Done() + + dup, err := dupVectorWithoutNulls(orig, mp) + require.NoError(t, err) + defer dup.Free(mp) + + require.Equal(t, 3, dup.Length(), "should have 3 non-NULL values") + require.False(t, dup.HasNull(), "filtered vector should have no NULLs") + + // InplaceSort on the copy should not corrupt original + dup.InplaceSort() + }() + } + + wg.Wait() + + // Original should be unchanged + require.Equal(t, 5, orig.Length()) + require.True(t, orig.HasNull()) +}