-
Notifications
You must be signed in to change notification settings - Fork 296
fix: false duplicate entry error on concurrent INSERT with NULL uniqu… #24046
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
86ae08a
661744a
551c881
3e5154f
75b5b4a
e15ac4e
d4b3fae
2f7604a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -168,6 +168,15 @@ func (n *Bitmap) Len() int64 { | |
| return n.len | ||
| } | ||
|
|
||
| // RecalculateCount recounts the number of set bits from the data array. | ||
| // This is used to fix count after concurrent bitmap corruption. | ||
| func (n *Bitmap) RecalculateCount() { | ||
| n.count = 0 | ||
| for _, w := range n.data { | ||
| n.count += int64(bits.OnesCount64(w)) | ||
| } | ||
| } | ||
|
|
||
| // Size return number of bytes in n.data | ||
| // XXX WTF Note that this size is not the same as InitWithSize. | ||
| func (n *Bitmap) Size() int { | ||
|
|
@@ -349,7 +358,13 @@ func (n *Bitmap) TryExpandWithSize(size int) { | |
| return | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ✅ TryExpandWithSize zeroing — correct fixThis is a legitimate bug fix. When Note: the |
||
| } | ||
| if len(n.data) < newCap { | ||
| oldLen := len(n.data) | ||
| n.data = n.data[:newCap] | ||
| // Zero out newly exposed slots to avoid reading stale data | ||
| // left over from a previous use of the same backing array. | ||
| for i := oldLen; i < newCap; i++ { | ||
| n.data[i] = 0 | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -135,7 +135,7 @@ func TryExpand(nsp *Nulls, size int) { | |
|
|
||
| // Contains returns true if the integer is contained in the Nulls | ||
| func (nsp *Nulls) Contains(row uint64) bool { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Removing
|
||
| return nsp != nil && !nsp.np.EmptyByFlag() && nsp.np.Contains(row) | ||
| return nsp != nil && nsp.np.Contains(row) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1. Bitmap contains race panic Nulls.Contains now always calls bitmap.Bitmap.Contains, and some union paths rely on IsEmpty() instead of EmptyByFlag(), increasing read access to bitmaps during concurrent expansion. Since Bitmap.TryExpandWithSize sets len before resizing data, a concurrent Contains() can index past data and panic. Agent Prompt
|
||
| } | ||
|
|
||
| func Contains(nsp *Nulls, row uint64) bool { | ||
|
|
@@ -226,8 +226,13 @@ func Range(nsp *Nulls, start, end, bias uint64, b *Nulls) { | |
| } | ||
|
|
||
| b.np.InitWithSize(int64(end + 1 - bias)) | ||
|
|
||
| // Take a snapshot of the source bitmap to prevent reading inconsistent | ||
| // state when the source is being concurrently modified by a parallel | ||
| // Prepare call on the same operator chain. | ||
| snap := nsp.Clone() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 Clone() on every Range() call — severe hot-path performance regression
This also only provides a snapshot — if the underlying concurrency bug remains, the snapshot may still be taken from an inconsistent state (torn read). |
||
| for ; start < end; start++ { | ||
| if nsp.np.Contains(start) { | ||
| if snap.np.Contains(start) { | ||
| b.np.Add(start - bias) | ||
| } | ||
| } | ||
|
Comment on lines
226
to
238
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2. Range skips corrupted nulls nulls.Range still returns early based on EmptyByFlag() (which is count==0), so if count is corrupted to 0 while data has set bits, Range will drop NULLs entirely. This can make Vector.Window/CloneWindowTo silently lose NULL markers and produce incorrect results. Agent Prompt
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1329,6 +1329,9 @@ func (v *Vector) Copy(w *Vector, vi, wi int64, mp *mpool.MPool) error { | |
| if w.GetNulls().Contains(uint64(wi)) { | ||
| v.GetNulls().Set(uint64(vi)) | ||
| } else { | ||
| // Ensure bitmap len covers vi so that Contains() works correctly | ||
| // for all rows. TryExpand is needed because Unset doesn't expand. | ||
| nulls.TryExpand(v.GetNulls(), int(vi)+1) | ||
| v.GetNulls().Unset(uint64(vi)) | ||
| } | ||
| return nil | ||
|
|
@@ -1343,7 +1346,7 @@ func GetUnionAllFunction(typ types.Type, mp *mpool.MPool) func(v, w *Vector) err | |
| u64Length := uint64(moreLength) | ||
|
|
||
| moreNp := more.GetBitmap() | ||
| if moreNp == nil || moreNp.EmptyByFlag() || moreLength == 0 { | ||
| if moreNp == nil || moreNp.IsEmpty() || moreLength == 0 { | ||
| return | ||
| } | ||
|
|
||
|
|
@@ -1355,6 +1358,9 @@ func GetUnionAllFunction(typ types.Type, mp *mpool.MPool) func(v, w *Vector) err | |
| if moreNp.Contains(0) { | ||
| dst.Set(u64offset) | ||
| } | ||
| // Ensure bitmap len covers the full range so Contains() works | ||
| // correctly for all rows, even trailing non-null rows. | ||
| nulls.TryExpand(dst, oldLength+moreLength) | ||
| } | ||
|
|
||
| switch typ.Oid { | ||
|
|
@@ -2085,30 +2091,25 @@ func GetUnionAllFunction(typ types.Type, mp *mpool.MPool) func(v, w *Vector) err | |
| var err error | ||
| vs := toSliceOfLengthNoTypeCheck[types.Varlena](v, v.length+w.length) | ||
|
|
||
| bm := w.nsp.GetBitmap() | ||
| if bm != nil && !bm.EmptyByFlag() { | ||
| for i := range ws { | ||
| if w.gsp.Contains(uint64(i)) { | ||
| nulls.Add(&v.gsp, uint64(v.length)) | ||
| } | ||
| if bm.Contains(uint64(i)) { | ||
| nulls.Add(&v.nsp, uint64(v.length)) | ||
| } else { | ||
| err = BuildVarlenaFromVarlena(v, &vs[v.length], &ws[i], &w.area, mp) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| v.length++ | ||
| // Always use null-aware path to prevent losing null information | ||
| // when bitmap state is inconsistent due to concurrent access. | ||
| for i := range ws { | ||
| if w.gsp.Contains(uint64(i)) { | ||
| nulls.Add(&v.gsp, uint64(v.length)) | ||
| } | ||
| } else { | ||
| for i := range ws { | ||
| if w.nsp.Contains(uint64(i)) { | ||
| nulls.Add(&v.nsp, uint64(v.length)) | ||
| } else { | ||
| err = BuildVarlenaFromVarlena(v, &vs[v.length], &ws[i], &w.area, mp) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| v.length++ | ||
| } | ||
| v.length++ | ||
| } | ||
| // Ensure bitmap len covers all rows so Contains() works correctly | ||
| if v.nsp.Count() > 0 { | ||
| nulls.TryExpand(&v.nsp, v.length) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Removing Varlena fast path — performance regressionThe old code had a fast path for vectors with no nulls ( For large batches with no nulls (the common case), this adds an unnecessary |
||
| return nil | ||
| } | ||
|
|
@@ -2647,7 +2648,7 @@ func unionT[T int32 | int64](v, w *Vector, sels []T, mp *mpool.MPool) error { | |
| } | ||
| } else { | ||
| tlen := v.GetType().TypeSize() | ||
| if !w.nsp.EmptyByFlag() { | ||
| if !w.nsp.IsEmpty() { | ||
| for i, sel := range sels { | ||
| if w.gsp.Contains(uint64(sel)) { | ||
| nulls.Add(&v.gsp, uint64(oldLen+i)) | ||
|
|
@@ -2750,7 +2751,7 @@ func (v *Vector) UnionBatch(w *Vector, offset int64, cnt int, flags []uint8, mp | |
| vCol = toSliceOfLengthNoTypeCheck[types.Varlena](v, v.length+addCnt) | ||
| ToSliceNoTypeCheck(w, &wCol) | ||
|
|
||
| if !w.nsp.EmptyByFlag() { | ||
| if !w.nsp.IsEmpty() { | ||
| if flags == nil { | ||
| for i := 0; i < cnt; i++ { | ||
| if w.gsp.Contains(uint64(offset) + uint64(i)) { | ||
|
|
@@ -2785,6 +2786,8 @@ func (v *Vector) UnionBatch(w *Vector, offset int64, cnt int, flags []uint8, mp | |
| v.length++ | ||
| } | ||
| } | ||
| // Ensure bitmap len covers all rows after union | ||
| nulls.TryExpand(&v.nsp, v.length) | ||
| } else { | ||
| if flags == nil { | ||
| for i := 0; i < cnt; i++ { | ||
|
|
@@ -2815,7 +2818,7 @@ func (v *Vector) UnionBatch(w *Vector, offset int64, cnt int, flags []uint8, mp | |
| } | ||
| } else { | ||
| tlen := v.GetType().TypeSize() | ||
| if !w.nsp.EmptyByFlag() { | ||
| if !w.nsp.IsEmpty() { | ||
| if flags == nil { | ||
| for i := 0; i < cnt; i++ { | ||
| if w.gsp.Contains(uint64(offset) + uint64(i)) { | ||
|
|
@@ -3425,6 +3428,11 @@ func appendOneFixed[T any](vec *Vector, val T, isNull bool, mp *mpool.MPool) err | |
| if isNull { | ||
| nulls.Add(&vec.nsp, uint64(length)) | ||
| } else { | ||
| // Ensure bitmap len covers the new position so Contains() works | ||
| // correctly even when the last appended rows are non-null. | ||
| if vec.nsp.Count() > 0 { | ||
| nulls.TryExpand(&vec.nsp, vec.length) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 TryExpand on every non-null appendOneFixed — hot path costif vec.nsp.Count() > 0 {
nulls.TryExpand(&vec.nsp, vec.length)
}This runs on every non-null Consider doing this once after the batch is fully built, or tracking whether expansion is needed with a flag. |
||
| var col []T | ||
| ToSliceNoTypeCheck(vec, &col) | ||
| col[length] = val | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -139,9 +139,8 @@ func (preInsert *PreInsert) constructColBuf(proc *proc, bat *batch.Batch, first | |
| return err | ||
| } | ||
| } else { | ||
| preInsert.ctr.canFreeVecIdx[idx] = true | ||
| if bat.Vecs[idx].IsConst() { | ||
| preInsert.ctr.canFreeVecIdx[idx] = true | ||
| //expland const vector | ||
| typ := bat.Vecs[idx].GetType() | ||
| tmpVec := vector.NewOffHeapVecWithType(*typ) | ||
| if err = vector.GetUnionAllFunction(*typ, proc.Mp())(tmpVec, bat.Vecs[idx]); err != nil { | ||
|
|
@@ -150,7 +149,11 @@ func (preInsert *PreInsert) constructColBuf(proc *proc, bat *batch.Batch, first | |
| } | ||
| preInsert.ctr.buf.Vecs[idx] = tmpVec | ||
| } else { | ||
| preInsert.ctr.buf.SetVector(int32(idx), bat.Vecs[idx]) | ||
| dupVec, dupErr := bat.Vecs[idx].Dup(proc.Mp()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Dup() for every non-const column — memory regressionThe old code used If the issue is that a downstream operator modifies the shared vector (causing the bitmap corruption), the proper fix is either:
Rather than copying all columns unconditionally. |
||
| if dupErr != nil { | ||
| return dupErr | ||
| } | ||
| preInsert.ctr.buf.Vecs[idx] = dupVec | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,6 +67,13 @@ func (valueScan *ValueScan) makeValueScanBatch(proc *process.Process) (err error | |
| // select * from (values row(1,1), row(2,2), row(3,3)) a; | ||
| bat := valueScan.Batchs[0] | ||
|
|
||
| // Skip evalRowsetData if already done (prevents concurrent bitmap corruption | ||
| // when the same scope is started multiple times by nested MergeRun) | ||
| if valueScan.runningCtx.prepared { | ||
| return nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ��
|
||
| } | ||
| valueScan.runningCtx.prepared = true | ||
|
|
||
| for i := 0; i < valueScan.ColCount; i++ { | ||
| exprList = valueScan.ExprExecLists[i] | ||
| if len(exprList) == 0 { | ||
|
|
@@ -78,6 +85,14 @@ func (valueScan *ValueScan) makeValueScanBatch(proc *process.Process) (err error | |
| } | ||
| } | ||
|
|
||
| // Fix bitmap count/data inconsistency that can occur when the same | ||
| // operator chain is started multiple times by nested MergeRun. | ||
| for _, vec := range bat.Vecs { | ||
| if vec != nil && !vec.IsConst() && vec.Length() > 0 { | ||
| vec.GetNulls().GetBitmap().RecalculateCount() | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 RecalculateCount() treats the symptom, not the cause
This function exists to fix count/data inconsistency after "concurrent bitmap corruption" (per the comment). But if concurrent access can corrupt the count, it can also corrupt the data bits themselves. Recalculating count from (possibly corrupted) data only makes the count consistent with whatever garbage is in data.
The real fix should prevent the concurrent corruption in the first place. Consider this a temporary band-aid and track the root cause separately.