diff --git a/pkg/common/bitmap/bitmap.go b/pkg/common/bitmap/bitmap.go index 80ed391dff5a0..a9983f854ed70 100644 --- a/pkg/common/bitmap/bitmap.go +++ b/pkg/common/bitmap/bitmap.go @@ -168,6 +168,15 @@ func (n *Bitmap) Len() int64 { return n.len } +// RecalculateCount recounts the number of set bits from the data array. +// This is used to fix count after concurrent bitmap corruption. +func (n *Bitmap) RecalculateCount() { + n.count = 0 + for _, w := range n.data { + n.count += int64(bits.OnesCount64(w)) + } +} + // Size return number of bytes in n.data // XXX WTF Note that this size is not the same as InitWithSize. func (n *Bitmap) Size() int { @@ -349,7 +358,13 @@ func (n *Bitmap) TryExpandWithSize(size int) { return } if len(n.data) < newCap { + oldLen := len(n.data) n.data = n.data[:newCap] + // Zero out newly exposed slots to avoid reading stale data + // left over from a previous use of the same backing array. + for i := oldLen; i < newCap; i++ { + n.data[i] = 0 + } } } diff --git a/pkg/container/nulls/nulls.go b/pkg/container/nulls/nulls.go index f6b11432c6d62..d948ad54d9a4e 100644 --- a/pkg/container/nulls/nulls.go +++ b/pkg/container/nulls/nulls.go @@ -135,7 +135,7 @@ func TryExpand(nsp *Nulls, size int) { // Contains returns true if the integer is contained in the Nulls func (nsp *Nulls) Contains(row uint64) bool { - return nsp != nil && !nsp.np.EmptyByFlag() && nsp.np.Contains(row) + return nsp != nil && nsp.np.Contains(row) } func Contains(nsp *Nulls, row uint64) bool { @@ -226,8 +226,13 @@ func Range(nsp *Nulls, start, end, bias uint64, b *Nulls) { } b.np.InitWithSize(int64(end + 1 - bias)) + + // Take a snapshot of the source bitmap to prevent reading inconsistent + // state when the source is being concurrently modified by a parallel + // Prepare call on the same operator chain. + snap := nsp.Clone() for ; start < end; start++ { - if nsp.np.Contains(start) { + if snap.np.Contains(start) { b.np.Add(start - bias) } } diff --git a/pkg/container/vector/vector.go b/pkg/container/vector/vector.go index 08a6e6e9e7fef..96e9ab2454bd8 100644 --- a/pkg/container/vector/vector.go +++ b/pkg/container/vector/vector.go @@ -1329,6 +1329,9 @@ func (v *Vector) Copy(w *Vector, vi, wi int64, mp *mpool.MPool) error { if w.GetNulls().Contains(uint64(wi)) { v.GetNulls().Set(uint64(vi)) } else { + // Ensure bitmap len covers vi so that Contains() works correctly + // for all rows. TryExpand is needed because Unset doesn't expand. + nulls.TryExpand(v.GetNulls(), int(vi)+1) v.GetNulls().Unset(uint64(vi)) } return nil @@ -1343,7 +1346,7 @@ func GetUnionAllFunction(typ types.Type, mp *mpool.MPool) func(v, w *Vector) err u64Length := uint64(moreLength) moreNp := more.GetBitmap() - if moreNp == nil || moreNp.EmptyByFlag() || moreLength == 0 { + if moreNp == nil || moreNp.IsEmpty() || moreLength == 0 { return } @@ -1355,6 +1358,9 @@ func GetUnionAllFunction(typ types.Type, mp *mpool.MPool) func(v, w *Vector) err if moreNp.Contains(0) { dst.Set(u64offset) } + // Ensure bitmap len covers the full range so Contains() works + // correctly for all rows, even trailing non-null rows. + nulls.TryExpand(dst, oldLength+moreLength) } switch typ.Oid { @@ -2085,30 +2091,25 @@ func GetUnionAllFunction(typ types.Type, mp *mpool.MPool) func(v, w *Vector) err var err error vs := toSliceOfLengthNoTypeCheck[types.Varlena](v, v.length+w.length) - bm := w.nsp.GetBitmap() - if bm != nil && !bm.EmptyByFlag() { - for i := range ws { - if w.gsp.Contains(uint64(i)) { - nulls.Add(&v.gsp, uint64(v.length)) - } - if bm.Contains(uint64(i)) { - nulls.Add(&v.nsp, uint64(v.length)) - } else { - err = BuildVarlenaFromVarlena(v, &vs[v.length], &ws[i], &w.area, mp) - if err != nil { - return err - } - } - v.length++ + // Always use null-aware path to prevent losing null information + // when bitmap state is inconsistent due to concurrent access. + for i := range ws { + if w.gsp.Contains(uint64(i)) { + nulls.Add(&v.gsp, uint64(v.length)) } - } else { - for i := range ws { + if w.nsp.Contains(uint64(i)) { + nulls.Add(&v.nsp, uint64(v.length)) + } else { err = BuildVarlenaFromVarlena(v, &vs[v.length], &ws[i], &w.area, mp) if err != nil { return err } - v.length++ } + v.length++ + } + // Ensure bitmap len covers all rows so Contains() works correctly + if v.nsp.Count() > 0 { + nulls.TryExpand(&v.nsp, v.length) } return nil } @@ -2647,7 +2648,7 @@ func unionT[T int32 | int64](v, w *Vector, sels []T, mp *mpool.MPool) error { } } else { tlen := v.GetType().TypeSize() - if !w.nsp.EmptyByFlag() { + if !w.nsp.IsEmpty() { for i, sel := range sels { if w.gsp.Contains(uint64(sel)) { nulls.Add(&v.gsp, uint64(oldLen+i)) @@ -2750,7 +2751,7 @@ func (v *Vector) UnionBatch(w *Vector, offset int64, cnt int, flags []uint8, mp vCol = toSliceOfLengthNoTypeCheck[types.Varlena](v, v.length+addCnt) ToSliceNoTypeCheck(w, &wCol) - if !w.nsp.EmptyByFlag() { + if !w.nsp.IsEmpty() { if flags == nil { for i := 0; i < cnt; i++ { if w.gsp.Contains(uint64(offset) + uint64(i)) { @@ -2785,6 +2786,8 @@ func (v *Vector) UnionBatch(w *Vector, offset int64, cnt int, flags []uint8, mp v.length++ } } + // Ensure bitmap len covers all rows after union + nulls.TryExpand(&v.nsp, v.length) } else { if flags == nil { for i := 0; i < cnt; i++ { @@ -2815,7 +2818,7 @@ func (v *Vector) UnionBatch(w *Vector, offset int64, cnt int, flags []uint8, mp } } else { tlen := v.GetType().TypeSize() - if !w.nsp.EmptyByFlag() { + if !w.nsp.IsEmpty() { if flags == nil { for i := 0; i < cnt; i++ { if w.gsp.Contains(uint64(offset) + uint64(i)) { @@ -3425,6 +3428,11 @@ func appendOneFixed[T any](vec *Vector, val T, isNull bool, mp *mpool.MPool) err if isNull { nulls.Add(&vec.nsp, uint64(length)) } else { + // Ensure bitmap len covers the new position so Contains() works + // correctly even when the last appended rows are non-null. + if vec.nsp.Count() > 0 { + nulls.TryExpand(&vec.nsp, vec.length) + } var col []T ToSliceNoTypeCheck(vec, &col) col[length] = val diff --git a/pkg/sql/colexec/lockop/lock_op.go b/pkg/sql/colexec/lockop/lock_op.go index 8ad88f4630775..ecca2c08e80e7 100644 --- a/pkg/sql/colexec/lockop/lock_op.go +++ b/pkg/sql/colexec/lockop/lock_op.go @@ -18,9 +18,6 @@ import ( "bytes" "context" "fmt" - "strings" - "time" - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/morpc" "github.com/matrixorigin/matrixone/pkg/common/reuse" @@ -44,6 +41,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/process" "go.uber.org/zap" + "strings" + "time" ) var ( @@ -155,6 +154,7 @@ func callNonBlocking( } lockOp.ctr.lockCount += int64(result.Batch.RowCount()) + if err = performLock(result.Batch, proc, lockOp, analyzer, -1); err != nil { return result, err } @@ -243,6 +243,7 @@ func performLock( WithLockTable(target.lockTable, target.changeDef). WithHasNewVersionInRangeFunc(lockOp.ctr.hasNewVersionInRange), ) + if lockOp.logger.Enabled(zap.DebugLevel) { lockOp.logger.Debug("lock result", zap.Uint64("table", target.tableID), diff --git a/pkg/sql/colexec/preinsert/preinsert.go b/pkg/sql/colexec/preinsert/preinsert.go index c23074b099642..f1b788023e1ad 100644 --- a/pkg/sql/colexec/preinsert/preinsert.go +++ b/pkg/sql/colexec/preinsert/preinsert.go @@ -139,9 +139,8 @@ func (preInsert *PreInsert) constructColBuf(proc *proc, bat *batch.Batch, first return err } } else { + preInsert.ctr.canFreeVecIdx[idx] = true if bat.Vecs[idx].IsConst() { - preInsert.ctr.canFreeVecIdx[idx] = true - //expland const vector typ := bat.Vecs[idx].GetType() tmpVec := vector.NewOffHeapVecWithType(*typ) if err = vector.GetUnionAllFunction(*typ, proc.Mp())(tmpVec, bat.Vecs[idx]); err != nil { @@ -150,7 +149,11 @@ func (preInsert *PreInsert) constructColBuf(proc *proc, bat *batch.Batch, first } preInsert.ctr.buf.Vecs[idx] = tmpVec } else { - preInsert.ctr.buf.SetVector(int32(idx), bat.Vecs[idx]) + dupVec, dupErr := bat.Vecs[idx].Dup(proc.Mp()) + if dupErr != nil { + return dupErr + } + preInsert.ctr.buf.Vecs[idx] = dupVec } } } diff --git a/pkg/sql/colexec/projection/projection.go b/pkg/sql/colexec/projection/projection.go index 7793bbc37d194..95a52b4454287 100644 --- a/pkg/sql/colexec/projection/projection.go +++ b/pkg/sql/colexec/projection/projection.go @@ -79,9 +79,6 @@ func (projection *Projection) Call(proc *process.Process) (vm.CallResult, error) } // for projection operator, all Vectors of projectBat come from executor.Eval // and will not be modified within projection operator. so we can used the result of executor.Eval directly. - // (if operator will modify vector/agg of batch, you should make a copy) - // however, it should be noted that since they directly come from executor.Eval - // these vectors cannot be free by batch.Clean directly and must be handed over executor.Free projection.ctr.buf.Vecs[i] = vec } projection.maxAllocSize = max(projection.maxAllocSize, projection.ctr.buf.Size()) diff --git a/pkg/sql/colexec/value_scan/types.go b/pkg/sql/colexec/value_scan/types.go index db3fcb37a301c..ae9978c71809c 100644 --- a/pkg/sql/colexec/value_scan/types.go +++ b/pkg/sql/colexec/value_scan/types.go @@ -41,9 +41,10 @@ type ValueScan struct { type container struct { // nowIdx indicates which data should send to next operator now. - nowIdx int - start int - end int + nowIdx int + start int + end int + prepared bool } func init() { @@ -73,6 +74,7 @@ func (valueScan *ValueScan) Reset(proc *process.Process, _ bool, _ error) { valueScan.runningCtx.nowIdx = 0 valueScan.runningCtx.start = 0 valueScan.runningCtx.end = 0 + valueScan.runningCtx.prepared = false //for prepare stmt, valuescan batch vecs do not need to reset, when next execute, prepare just copy data to vecs, length is same to last execute for i := 0; i < valueScan.ColCount; i++ { diff --git a/pkg/sql/colexec/value_scan/value_scan.go b/pkg/sql/colexec/value_scan/value_scan.go index c89b8321c117c..82da1bf088f2f 100644 --- a/pkg/sql/colexec/value_scan/value_scan.go +++ b/pkg/sql/colexec/value_scan/value_scan.go @@ -67,6 +67,13 @@ func (valueScan *ValueScan) makeValueScanBatch(proc *process.Process) (err error // select * from (values row(1,1), row(2,2), row(3,3)) a; bat := valueScan.Batchs[0] + // Skip evalRowsetData if already done (prevents concurrent bitmap corruption + // when the same scope is started multiple times by nested MergeRun) + if valueScan.runningCtx.prepared { + return nil + } + valueScan.runningCtx.prepared = true + for i := 0; i < valueScan.ColCount; i++ { exprList = valueScan.ExprExecLists[i] if len(exprList) == 0 { @@ -78,6 +85,14 @@ func (valueScan *ValueScan) makeValueScanBatch(proc *process.Process) (err error } } + // Fix bitmap count/data inconsistency that can occur when the same + // operator chain is started multiple times by nested MergeRun. + for _, vec := range bat.Vecs { + if vec != nil && !vec.IsConst() && vec.Length() > 0 { + vec.GetNulls().GetBitmap().RecalculateCount() + } + } + return nil } diff --git a/pkg/sql/compile/analyze_module.go b/pkg/sql/compile/analyze_module.go index db1d81fe475f4..74d800f4f39c1 100644 --- a/pkg/sql/compile/analyze_module.go +++ b/pkg/sql/compile/analyze_module.go @@ -295,6 +295,9 @@ func (c *Compile) fillPlanNodeAnalyzeInfo(stats *statistic.StatsInfo) { //---------------------------------------------------------------------------------------------------------------------- func ConvertScopeToPhyScope(scope *Scope, receiverMap map[*process.WaitRegister]int) models.PhyScope { + if scope == nil { + return models.PhyScope{} + } phyScope := models.PhyScope{ Magic: scope.Magic.String(), Mcpu: int8(scope.NodeInfo.Mcpu), @@ -319,6 +322,9 @@ func ConvertScopeToPhyScope(scope *Scope, receiverMap map[*process.WaitRegister] } func UpdatePreparePhyScope(scope *Scope, phyScope models.PhyScope) bool { + if scope == nil { + return true + } res := UpdatePreparePhyOperator(scope.RootOp, phyScope.RootOperator) if !res { return false @@ -714,6 +720,9 @@ func explainScopes(scopes []*Scope, gap int, rmp map[*process.WaitRegister]int, // It includes header information of Scope, data source information, and pipeline tree information. // In addition, it recursively displays information from any PreScopes. func explainSingleScope(scope *Scope, index int, gap int, rmp map[*process.WaitRegister]int, option *ExplainOption, buffer *bytes.Buffer) { + if scope == nil { + return + } gapNextLine(gap, buffer) // Scope Header diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 3189a33b013d1..675eeed6cebbb 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -228,6 +228,9 @@ func (c *Compile) Reset(proc *process.Process, startAt time.Time, fill func(*bat } func UpdateScopeTxnOffset(scope *Scope, txnOffset int) { + if scope == nil { + return + } scope.TxnOffset = txnOffset for i := range scope.PreScopes { UpdateScopeTxnOffset(scope.PreScopes[i], txnOffset) @@ -302,6 +305,9 @@ func (c *Compile) clear() { } func (c *Compile) addAllAffectedRows(s *Scope) { + if s == nil { + return + } for _, ps := range s.PreScopes { c.addAllAffectedRows(ps) } diff --git a/pkg/sql/compile/compile2.go b/pkg/sql/compile/compile2.go index 3b8cb0c87e3d0..76b4f5404abc0 100644 --- a/pkg/sql/compile/compile2.go +++ b/pkg/sql/compile/compile2.go @@ -647,6 +647,9 @@ func (c *Compile) InitPipelineContextToRetryQuery() { // buildContextFromParentCtx build the context for the pipeline tree. // the input parameter is the whole tree's parent context. func (s *Scope) buildContextFromParentCtx(parentCtx context.Context) { + if s == nil { + return + } receiverCtx := s.Proc.BuildPipelineContext(parentCtx) // build context for receiver. @@ -664,6 +667,9 @@ func setContextForParallelScope(parallelScope *Scope, originalContext context.Co // build context for data entry. for _, prePipeline := range parallelScope.PreScopes { + if prePipeline == nil { + continue + } prePipeline.buildContextFromParentCtx(parallelScope.Proc.Ctx) } } diff --git a/pkg/sql/compile/debugTools.go b/pkg/sql/compile/debugTools.go index 916d78874799b..e7a4a5d247e8d 100644 --- a/pkg/sql/compile/debugTools.go +++ b/pkg/sql/compile/debugTools.go @@ -122,6 +122,9 @@ func DebugShowScopes(ss []*Scope, level DebugLevel) string { // genReceiverMap recursively traverses the Scope tree and generates unique identifiers (integers) for // each WaitRegister in Scope. func genReceiverMap(s *Scope, mp map[*process.WaitRegister]int) { + if s == nil { + return + } for i := range s.PreScopes { genReceiverMap(s.PreScopes[i], mp) } @@ -147,6 +150,9 @@ func showScopes(scopes []*Scope, gap int, rmp map[*process.WaitRegister]int, lev // It includes header information of Scope, data source information, and pipeline tree information. // In addition, it recursively displays information from any PreScopes. func showSingleScope(scope *Scope, index int, gap int, rmp map[*process.WaitRegister]int, level DebugLevel, buffer *bytes.Buffer) { + if scope == nil { + return + } gapNextLine(gap, buffer) // Scope Header diff --git a/pkg/sql/compile/fuzzyCheck.go b/pkg/sql/compile/fuzzyCheck.go index efddab89c7a41..9782eb7d1e886 100644 --- a/pkg/sql/compile/fuzzyCheck.go +++ b/pkg/sql/compile/fuzzyCheck.go @@ -27,7 +27,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/common/reuse" "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/container/nulls" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/pb/plan" @@ -218,11 +217,18 @@ func (f *fuzzyCheck) firstlyCheck(ctx context.Context, toCheck *vector.Vector) e if err != nil { return err } - for _, k := range pkey { + for i, k := range pkey { + // NULL != NULL in SQL standard, skip NULL values from duplicate check + if toCheck.GetNulls().Contains(uint64(i)) { + continue + } kcnt[k]++ } } else { for i := 0; i < toCheck.Length(); i++ { + if toCheck.GetNulls().Contains(uint64(i)) { + continue + } b := toCheck.GetRawBytesAt(i) t, err := types.Unpack(b) if err != nil { @@ -270,13 +276,21 @@ func (f *fuzzyCheck) genCollsionKeys(toCheck *vector.Vector) ([][]string, error) if err != nil { return nil, err } - keys[0] = pkey + // Skip NULL values - they cannot be duplicates + for i, k := range pkey { + if !toCheck.GetNulls().Contains(uint64(i)) { + keys[0] = append(keys[0], k) + } + } } else { scales := make([]int32, len(f.compoundCols)) for i, c := range f.compoundCols { scales[i] = c.Typ.Scale } for i := 0; i < toCheck.Length(); i++ { + if toCheck.GetNulls().Contains(uint64(i)) { + continue + } b := toCheck.GetRawBytesAt(i) t, err := types.Unpack(b) if err != nil { @@ -423,7 +437,7 @@ func (f *fuzzyCheck) format(toCheck *vector.Vector) ([]string, error) { } func vectorToString(vec *vector.Vector, rowIndex int) (string, error) { - if nulls.Any(vec.GetNulls()) { + if vec.GetNulls().Contains(uint64(rowIndex)) { return "", nil } switch vec.GetType().Oid { diff --git a/pkg/sql/compile/remoterun.go b/pkg/sql/compile/remoterun.go index b97e515b97d40..133a365fe4bd6 100644 --- a/pkg/sql/compile/remoterun.go +++ b/pkg/sql/compile/remoterun.go @@ -158,6 +158,9 @@ func fillPipeline(s *Scope) (*pipeline.Pipeline, error) { // generatePipeline generate a base pipeline.Pipeline structure without instructions // according to source scope. func generatePipeline(s *Scope, ctx *scopeContext, ctxId int32) (*pipeline.Pipeline, int32, error) { + if s == nil { + return nil, ctxId, nil + } var err error p := &pipeline.Pipeline{} @@ -254,6 +257,9 @@ func generatePipeline(s *Scope, ctx *scopeContext, ctxId int32) (*pipeline.Pipel // fillInstructionsForPipeline fills pipeline's instructions. func fillInstructionsForPipeline(s *Scope, ctx *scopeContext, p *pipeline.Pipeline, ctxId int32) (int32, error) { + if s == nil { + return ctxId, nil + } var err error for i := range s.PreScopes { diff --git a/pkg/sql/compile/remoterunClient.go b/pkg/sql/compile/remoterunClient.go index e2ae6f61c8cc4..a32a90cac4cf8 100644 --- a/pkg/sql/compile/remoterunClient.go +++ b/pkg/sql/compile/remoterunClient.go @@ -108,6 +108,9 @@ func (s *Scope) remoteRun(c *Compile) (sender *messageSenderOnClient, err error) // // it returns true if the pipeline has only the root operator capable of sending data to other outer pipeline. func checkPipelineStandaloneExecutableAtRemote(s *Scope) bool { + if s == nil { + return false + } var regs = make(map[*process.WaitRegister]struct{}) var toScan []*Scope // record which mergeReceivers this scope tree holds. @@ -117,10 +120,17 @@ func checkPipelineStandaloneExecutableAtRemote(s *Scope) bool { node := toScan[len(toScan)-1] toScan = toScan[:len(toScan)-1] + if node == nil { + continue + } + if len(node.PreScopes) > 0 { toScan = append(toScan, node.PreScopes...) } + if node.Proc == nil { + continue + } for i := range node.Proc.Reg.MergeReceivers { regs[node.Proc.Reg.MergeReceivers[i]] = struct{}{} } @@ -137,6 +147,10 @@ func checkPipelineStandaloneExecutableAtRemote(s *Scope) bool { node := toScan[len(toScan)-1] toScan = toScan[:len(toScan)-1] + if node == nil { + continue + } + if len(node.PreScopes) > 0 { toScan = append(toScan, node.PreScopes...) } diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 0dd88de76c8c7..208b3d7bdfd8c 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -78,7 +78,9 @@ func (s *Scope) release() { return } for i := range s.PreScopes { - s.PreScopes[i].release() + if s.PreScopes[i] != nil { + s.PreScopes[i].release() + } } vm.HandleAllOp(s.RootOp, func(parentOp vm.Operator, op vm.Operator) error { op.Release() @@ -97,6 +99,9 @@ func (s *Scope) Reset(c *Compile) error { return err } for _, scope := range s.PreScopes { + if scope == nil { + continue + } if err = scope.Reset(c); err != nil { return err } @@ -215,6 +220,9 @@ func (s *Scope) Run(c *Compile) (err error) { func (s *Scope) FreeOperator(c *Compile) { for _, scope := range s.PreScopes { + if scope == nil { + continue + } scope.FreeOperator(c) } @@ -230,6 +238,9 @@ func (s *Scope) InitAllDataSource(c *Compile) error { return err } for _, scope := range s.PreScopes { + if scope == nil { + continue + } err := scope.InitAllDataSource(c) if err != nil { return err @@ -249,6 +260,9 @@ func (s *Scope) SetOperatorInfoRecursively(cb func() int32) { }) for _, scope := range s.PreScopes { + if scope == nil { + continue + } scope.SetOperatorInfoRecursively(cb) } } @@ -272,15 +286,39 @@ func (s *Scope) MergeRun(c *Compile) error { s.ScopeAnalyzer.Start() defer s.ScopeAnalyzer.Stop() - // specific case. - if c.IsTpQuery() && !c.hasMergeOp { + // For TP queries: run non-connector PreScopes sequentially first + // (e.g. hashbuild pipelines), then run connector PreScopes concurrently + // with the main pipeline. This prevents concurrent access to shared + // operator chains – in particular the bitmap data-race that occurs when a + // DEDUP-JOIN build side shares Window sub-batch vectors via SetVector. + // The original code also required !c.hasMergeOp, but that condition + // caused the sequential path to be skipped for INSERT with UNIQUE KEY + // (compileLock sets hasMergeOp=true via newMergeScope), leading to the + // bitmap corruption bug. + // + // After running each non-connector scope, we must perform the full cleanup + // sequence: FreeOperator (op.Free → frees expression executors) followed by + // release (op.Release + reuse.Free[Scope]). This mirrors the normal cleanup + // order in Compile.FreeOperator → Compile.clear. The entry is then nil'd so + // that later walks of PreScopes skip it (nil checks exist throughout). + if c.IsTpQuery() { + hasConnector := false for i := len(s.PreScopes) - 1; i >= 0; i-- { - err := s.PreScopes[i].MergeRun(c) - if err != nil { - return err + ps := s.PreScopes[i] + if ps != nil && ps.RootOp != nil && ps.RootOp.OpType() == vm.Connector { + hasConnector = true + } else if ps != nil { + if err := ps.MergeRun(c); err != nil { + return err + } + ps.FreeOperator(c) + ps.release() + s.PreScopes[i] = nil } } - return s.ParallelRun(c) + if !hasConnector { + return s.ParallelRun(c) + } } // Merge Run normally. @@ -288,10 +326,15 @@ func (s *Scope) MergeRun(c *Compile) error { preScopeResultReceiveChan := make(chan error, len(s.PreScopes)) // step 1. + concurrentPreScopes := 0 for i := range s.PreScopes { - wg.Add(1) - scope := s.PreScopes[i] + if scope == nil { + continue + } + + concurrentPreScopes++ + wg.Add(1) submitPreScope := ants.Submit( func() { @@ -343,7 +386,7 @@ func (s *Scope) MergeRun(c *Compile) error { } }() - preScopeCount := len(s.PreScopes) + preScopeCount := concurrentPreScopes remoteScopeCount := len(s.RemoteReceivRegInfos) //after parallelRun, prescope count may change. we need to save this before parallelRun diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index df9a36db99604..f85736f6a9d0e 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -225,6 +225,9 @@ func (s *Scope) holdAnyCannotRemoteOperator() error { } for _, pre := range s.PreScopes { + if pre == nil { + continue + } if err := pre.holdAnyCannotRemoteOperator(); err != nil { return err } diff --git a/pkg/sql/plan/stats.go b/pkg/sql/plan/stats.go index b58c4f189a7f5..0e0357c12bf68 100644 --- a/pkg/sql/plan/stats.go +++ b/pkg/sql/plan/stats.go @@ -1812,13 +1812,11 @@ func (builder *QueryBuilder) determineBuildAndProbeSide(nodeID int32, recursive } case plan.Node_DEDUP: - if node.OnDuplicateAction != plan.Node_FAIL || node.DedupJoinCtx != nil { - node.IsRightJoin = false - } else if builder.optimizerHints != nil && builder.optimizerHints.disableRightJoin != 0 { - node.IsRightJoin = false - } else if rightChild.Stats.Outcnt > 100 && leftChild.Stats.Outcnt < rightChild.Stats.Outcnt { - node.IsRightJoin = true - } + // Disable right join optimization for DEDUP joins. The right join path + // swaps children and reverses probe/build sides, which can cause incorrect + // NULL handling during unique key duplicate detection — leading to false + // "Duplicate entry ''" errors on concurrent INSERT with NULL unique keys. + node.IsRightJoin = false } if builder.hasRecursiveScan(builder.qry.Nodes[node.Children[1]]) { diff --git a/pkg/vm/engine/disttae/txn.go b/pkg/vm/engine/disttae/txn.go index ed700d301eca8..be4e79bba1446 100644 --- a/pkg/vm/engine/disttae/txn.go +++ b/pkg/vm/engine/disttae/txn.go @@ -256,9 +256,15 @@ func (txn *Transaction) dumpBatch(ctx context.Context, offset int) error { func checkPKDupGeneric[T comparable]( mp map[any]bool, t *types.Type, + pk *vector.Vector, vals []T, start, count int) (bool, string) { - for _, v := range vals[start : start+count] { + nsp := pk.GetNulls() + for i := start; i < start+count; i++ { + if nsp.Contains(uint64(i)) { + continue + } + v := vals[i] if _, ok := mp[v]; ok { entry := common.TypeStringValue(*t, v, false) return true, entry @@ -276,76 +282,79 @@ func checkPKDup( switch colType.Oid { case types.T_bool: vs := vector.MustFixedColNoTypeCheck[bool](pk) - return checkPKDupGeneric[bool](mp, colType, vs, start, count) + return checkPKDupGeneric[bool](mp, colType, pk, vs, start, count) case types.T_bit: vs := vector.MustFixedColNoTypeCheck[uint64](pk) - return checkPKDupGeneric[uint64](mp, colType, vs, start, count) + return checkPKDupGeneric[uint64](mp, colType, pk, vs, start, count) case types.T_int8: vs := vector.MustFixedColNoTypeCheck[int8](pk) - return checkPKDupGeneric[int8](mp, colType, vs, start, count) + return checkPKDupGeneric[int8](mp, colType, pk, vs, start, count) case types.T_int16: vs := vector.MustFixedColNoTypeCheck[int16](pk) - return checkPKDupGeneric[int16](mp, colType, vs, start, count) + return checkPKDupGeneric[int16](mp, colType, pk, vs, start, count) case types.T_int32: vs := vector.MustFixedColNoTypeCheck[int32](pk) - return checkPKDupGeneric[int32](mp, colType, vs, start, count) + return checkPKDupGeneric[int32](mp, colType, pk, vs, start, count) case types.T_int64: vs := vector.MustFixedColNoTypeCheck[int64](pk) - return checkPKDupGeneric[int64](mp, colType, vs, start, count) + return checkPKDupGeneric[int64](mp, colType, pk, vs, start, count) case types.T_uint8: vs := vector.MustFixedColNoTypeCheck[uint8](pk) - return checkPKDupGeneric[uint8](mp, colType, vs, start, count) + return checkPKDupGeneric[uint8](mp, colType, pk, vs, start, count) case types.T_uint16: vs := vector.MustFixedColNoTypeCheck[uint16](pk) - return checkPKDupGeneric[uint16](mp, colType, vs, start, count) + return checkPKDupGeneric[uint16](mp, colType, pk, vs, start, count) case types.T_uint32: vs := vector.MustFixedColNoTypeCheck[uint32](pk) - return checkPKDupGeneric[uint32](mp, colType, vs, start, count) + return checkPKDupGeneric[uint32](mp, colType, pk, vs, start, count) case types.T_uint64: vs := vector.MustFixedColNoTypeCheck[uint64](pk) - return checkPKDupGeneric[uint64](mp, colType, vs, start, count) + return checkPKDupGeneric[uint64](mp, colType, pk, vs, start, count) case types.T_decimal64: vs := vector.MustFixedColNoTypeCheck[types.Decimal64](pk) - return checkPKDupGeneric[types.Decimal64](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Decimal64](mp, colType, pk, vs, start, count) case types.T_decimal128: vs := vector.MustFixedColNoTypeCheck[types.Decimal128](pk) - return checkPKDupGeneric[types.Decimal128](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Decimal128](mp, colType, pk, vs, start, count) case types.T_uuid: vs := vector.MustFixedColNoTypeCheck[types.Uuid](pk) - return checkPKDupGeneric[types.Uuid](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Uuid](mp, colType, pk, vs, start, count) case types.T_float32: vs := vector.MustFixedColNoTypeCheck[float32](pk) - return checkPKDupGeneric[float32](mp, colType, vs, start, count) + return checkPKDupGeneric[float32](mp, colType, pk, vs, start, count) case types.T_float64: vs := vector.MustFixedColNoTypeCheck[float64](pk) - return checkPKDupGeneric[float64](mp, colType, vs, start, count) + return checkPKDupGeneric[float64](mp, colType, pk, vs, start, count) case types.T_date: vs := vector.MustFixedColNoTypeCheck[types.Date](pk) - return checkPKDupGeneric[types.Date](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Date](mp, colType, pk, vs, start, count) case types.T_timestamp: vs := vector.MustFixedColNoTypeCheck[types.Timestamp](pk) - return checkPKDupGeneric[types.Timestamp](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Timestamp](mp, colType, pk, vs, start, count) case types.T_time: vs := vector.MustFixedColNoTypeCheck[types.Time](pk) - return checkPKDupGeneric[types.Time](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Time](mp, colType, pk, vs, start, count) case types.T_datetime: vs := vector.MustFixedColNoTypeCheck[types.Datetime](pk) - return checkPKDupGeneric[types.Datetime](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Datetime](mp, colType, pk, vs, start, count) case types.T_enum: vs := vector.MustFixedColNoTypeCheck[types.Enum](pk) - return checkPKDupGeneric[types.Enum](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Enum](mp, colType, pk, vs, start, count) case types.T_TS: vs := vector.MustFixedColNoTypeCheck[types.TS](pk) - return checkPKDupGeneric[types.TS](mp, colType, vs, start, count) + return checkPKDupGeneric[types.TS](mp, colType, pk, vs, start, count) case types.T_Rowid: vs := vector.MustFixedColNoTypeCheck[types.Rowid](pk) - return checkPKDupGeneric[types.Rowid](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Rowid](mp, colType, pk, vs, start, count) case types.T_Blockid: vs := vector.MustFixedColNoTypeCheck[types.Blockid](pk) - return checkPKDupGeneric[types.Blockid](mp, colType, vs, start, count) + return checkPKDupGeneric[types.Blockid](mp, colType, pk, vs, start, count) case types.T_char, types.T_varchar, types.T_json, types.T_binary, types.T_varbinary, types.T_blob, types.T_text, types.T_datalink: for i := start; i < start+count; i++ { + if pk.GetNulls().Contains(uint64(i)) { + continue + } v := pk.UnsafeGetStringAt(i) if _, ok := mp[v]; ok { entry := common.TypeStringValue(*colType, []byte(v), false) @@ -355,6 +364,9 @@ func checkPKDup( } case types.T_array_float32: for i := start; i < start+count; i++ { + if pk.GetNulls().Contains(uint64(i)) { + continue + } v := types.ArrayToString[float32](vector.GetArrayAt[float32](pk, i)) if _, ok := mp[v]; ok { entry := common.TypeStringValue(*colType, pk.GetBytesAt(i), false) @@ -364,6 +376,9 @@ func checkPKDup( } case types.T_array_float64: for i := start; i < start+count; i++ { + if pk.GetNulls().Contains(uint64(i)) { + continue + } v := types.ArrayToString[float64](vector.GetArrayAt[float64](pk, i)) if _, ok := mp[v]; ok { entry := common.TypeStringValue(*colType, pk.GetBytesAt(i), false) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 4c6123721c492..e4415fff929ef 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -2513,8 +2513,16 @@ func (tbl *txnTable) PKPersistedBetween( return true, err } - keys.InplaceSort() - bytes, _ := keys.MarshalBinary() + // Clone the keys vector before sorting to avoid corrupting the caller's batch. + // InplaceSort reorders Varlena entries but does NOT reorder the null bitmap, + // which would misalign nulls with data if done on the original batch vector. + sortedKeys, err := keys.Dup(tbl.proc.Load().Mp()) + if err != nil { + return false, err + } + defer sortedKeys.Free(tbl.proc.Load().Mp()) + sortedKeys.InplaceSort() + bytes, _ := sortedKeys.MarshalBinary() colExpr := readutil.NewColumnExpr(0, plan2.MakePlan2Type(keys.GetType()), tbl.tableDef.Pkey.PkeyColName) inExpr := plan2.MakeInExpr( tbl.proc.Load().Ctx, @@ -2637,27 +2645,58 @@ func tombstonePKExistsInRange( return false, nil } +// cloneNonNullKeys creates a deep copy of keysVector, filtering out NULL rows. +// This serves two purposes: +// 1. Avoids concurrent modification from pipeline operators reusing the batch +// 2. Removes NULL entries whose Varlena slots may contain garbage from buffer reuse +func cloneNonNullKeys(keysVector *vector.Vector, mp *mpool.MPool) (*vector.Vector, error) { + if !keysVector.HasNull() { + return keysVector.Dup(mp) + } + cloned := vector.NewVec(*keysVector.GetType()) + nsp := keysVector.GetNulls() + for i := 0; i < keysVector.Length(); i++ { + if !nsp.Contains(uint64(i)) { + if err := cloned.UnionOne(keysVector, int64(i), mp); err != nil { + cloned.Free(mp) + return nil, err + } + } + } + return cloned, nil +} + func (tbl *txnTable) PrimaryKeysMayBeUpserted( ctx context.Context, from types.TS, to types.TS, - batch *batch.Batch, + bat *batch.Batch, pkIndex int32, ) (bool, error) { - keysVector := batch.GetVector(pkIndex) - return tbl.primaryKeysMayBeChanged(ctx, from, to, keysVector, false) + mp := tbl.proc.Load().Mp() + clonedKeys, err := cloneNonNullKeys(bat.GetVector(pkIndex), mp) + if err != nil { + return false, err + } + defer clonedKeys.Free(mp) + return tbl.primaryKeysMayBeChanged(ctx, from, to, clonedKeys, false) } func (tbl *txnTable) PrimaryKeysMayBeModified( ctx context.Context, from types.TS, to types.TS, - batch *batch.Batch, + bat *batch.Batch, pkIndex int32, _ int32, ) (bool, error) { - keysVector := batch.GetVector(pkIndex) - return tbl.primaryKeysMayBeChanged(ctx, from, to, keysVector, true) + mp := tbl.proc.Load().Mp() + clonedKeys, err := cloneNonNullKeys(bat.GetVector(pkIndex), mp) + if err != nil { + return false, err + } + defer clonedKeys.Free(mp) + return tbl.primaryKeysMayBeChanged(ctx, from, to, clonedKeys, true) } func (tbl *txnTable) primaryKeysMayBeChanged( @@ -2715,11 +2754,14 @@ func (tbl *txnTable) primaryKeysMayBeChanged( //need check pk whether exist on S3 block. v2.TxnPKMayBeChangedPersistedCounter.Inc() - return tbl.PKPersistedBetween( + + changed, err2 := tbl.PKPersistedBetween( snap, from, to, keysVector, checkTombstone) + + return changed, err2 } func (tbl *txnTable) MergeObjects(