Skip to content

Commit 71475a7

Browse files
authored
Merge branch 'main' into mergify/configuration-deprecated-update
2 parents c07cd6f + c829bb1 commit 71475a7

3 files changed

Lines changed: 178 additions & 48 deletions

File tree

pkg/common/morpc/client_test.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -417,19 +417,11 @@ func TestCloseIdleBackends(t *testing.T) {
417417
tb.activeTime = time.Time{}
418418
tb.Unlock()
419419

420-
go func() {
421-
ctx, cancel := context.WithTimeout(context.TODO(), 2*time.Second)
422-
defer cancel()
423-
st, err := activeBackend.NewStream(false)
424-
assert.NoError(t, err)
425-
for i := 0; i < 50; i++ {
426-
_ = st.Send(ctx, newTestMessage(1))
427-
runtime.Gosched()
428-
}
429-
}()
430-
431420
gcDeadline := time.Now().Add(10 * time.Second)
432421
for time.Now().Before(gcDeadline) {
422+
// Refresh the non-idle backend inline so GC deterministically closes only the
423+
// backend we explicitly marked idle, without relying on goroutine scheduling.
424+
activeBackend.(*testBackend).active()
433425
globalClientGC.doGCIdle()
434426
runtime.Gosched()
435427
c.mu.Lock()

pkg/sql/colexec/onduplicatekey/on_duplicate_key.go

Lines changed: 162 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package onduplicatekey
1717
import (
1818
"bytes"
1919
"fmt"
20+
"math"
2021

2122
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2223
"github.com/matrixorigin/matrixone/pkg/container/batch"
@@ -112,23 +113,29 @@ func resetInsertBatchForOnduplicateKey(proc *process.Process, originBatch *batch
112113
insertArg.ctr.rbat = batch.NewWithSize(len(insertArg.Attrs))
113114
insertArg.ctr.rbat.Attrs = insertArg.Attrs
114115

115-
insertArg.ctr.checkConflictBat = batch.NewWithSize(len(insertArg.Attrs))
116-
insertArg.ctr.checkConflictBat.Attrs = append(insertArg.ctr.checkConflictBat.Attrs, insertArg.Attrs...)
117-
118116
for i, v := range originBatch.Vecs {
119117
newVec := vector.NewVec(*v.GetType())
120118
insertArg.ctr.rbat.SetVector(int32(i), newVec)
119+
}
121120

122-
ckVec := vector.NewVec(*v.GetType())
123-
insertArg.ctr.checkConflictBat.SetVector(int32(i), ckVec)
121+
// Initialize hash-based conflict detection
122+
insertArg.ctr.uniqueKeyColIndices = make([][]int32, len(insertArg.UniqueColCheckExpr))
123+
insertArg.ctr.conflictMaps = make([]map[string]int, len(insertArg.UniqueColCheckExpr))
124+
for i, expr := range insertArg.UniqueColCheckExpr {
125+
insertArg.ctr.uniqueKeyColIndices[i] = extractColIndicesFromExpr(expr)
126+
if len(insertArg.ctr.uniqueKeyColIndices[i]) == 0 {
127+
return moerr.NewInternalErrorf(proc.Ctx, "failed to extract column indices from unique constraint expression %d", i)
128+
}
129+
insertArg.ctr.conflictMaps[i] = make(map[string]int)
124130
}
125131
} else {
126132
insertArg.ctr.rbat.CleanOnlyData()
127-
insertArg.ctr.checkConflictBat.CleanOnlyData()
133+
for i := range insertArg.ctr.conflictMaps {
134+
clear(insertArg.ctr.conflictMaps[i])
135+
}
128136
}
129137

130138
insertBatch := insertArg.ctr.rbat
131-
checkConflictBatch := insertArg.ctr.checkConflictBat
132139
attrs := make([]string, len(insertBatch.Attrs))
133140
copy(attrs, insertBatch.Attrs)
134141

@@ -140,12 +147,10 @@ func resetInsertBatchForOnduplicateKey(proc *process.Process, originBatch *batch
140147
return err
141148
}
142149

143-
// check if uniqueness conflict found in checkConflictBatch
144-
oldConflictRowIdx, conflictMsg, err := checkConflict(proc, newBatch, checkConflictBatch, insertArg.ctr.uniqueCheckExes, insertArg.UniqueCols, insertColCount)
145-
if err != nil {
146-
newBatch.Clean(proc.GetMPool())
147-
return err
148-
}
150+
// O(1) hash map conflict check instead of O(N) linear scan
151+
oldConflictRowIdx, conflictMsg := findConflictByHashMap(
152+
&insertArg.ctr.keyBuf, newBatch.Vecs, insertArg.ctr.uniqueKeyColIndices,
153+
insertArg.ctr.conflictMaps, insertArg.UniqueCols, 0)
149154
if oldConflictRowIdx > -1 {
150155

151156
if insertArg.IsIgnore {
@@ -177,6 +182,13 @@ func resetInsertBatchForOnduplicateKey(proc *process.Process, originBatch *batch
177182
newBatch.Clean(proc.GetMPool())
178183
return err
179184
}
185+
186+
// Save old keys before in-place update (in case update changes unique columns)
187+
oldKeys := make([]string, len(insertArg.ctr.uniqueKeyColIndices))
188+
for k, colIndices := range insertArg.ctr.uniqueKeyColIndices {
189+
oldKeys[k] = serializeUniqueKey(&insertArg.ctr.keyBuf, insertBatch.Vecs, colIndices, oldConflictRowIdx)
190+
}
191+
180192
// update the oldConflictRowIdx of insertBatch by newBatch
181193
for j := 0; j < insertColCount; j++ {
182194
fromVec := tmpBatch.Vecs[j]
@@ -187,15 +199,19 @@ func resetInsertBatchForOnduplicateKey(proc *process.Process, originBatch *batch
187199
newBatch.Clean(proc.GetMPool())
188200
return err
189201
}
202+
}
190203

191-
toVec2 := checkConflictBatch.Vecs[j]
192-
err = toVec2.Copy(fromVec, int64(oldConflictRowIdx), 0, proc.Mp())
193-
if err != nil {
194-
tmpBatch.Clean(proc.GetMPool())
195-
newBatch.Clean(proc.GetMPool())
196-
return err
204+
// Update hash maps after in-place modification
205+
for k, colIndices := range insertArg.ctr.uniqueKeyColIndices {
206+
if oldKeys[k] != "" {
207+
delete(insertArg.ctr.conflictMaps[k], oldKeys[k])
208+
}
209+
newKey := serializeUniqueKey(&insertArg.ctr.keyBuf, insertBatch.Vecs, colIndices, oldConflictRowIdx)
210+
if newKey != "" {
211+
insertArg.ctr.conflictMaps[k][newKey] = oldConflictRowIdx
197212
}
198213
}
214+
199215
tmpBatch.Clean(proc.GetMPool())
200216
} else {
201217
// row id is null: means no uniqueness conflict found in origin rows
@@ -205,11 +221,8 @@ func resetInsertBatchForOnduplicateKey(proc *process.Process, originBatch *batch
205221
newBatch.Clean(proc.GetMPool())
206222
return err
207223
}
208-
_, err = checkConflictBatch.Append(proc.Ctx, proc.Mp(), newBatch)
209-
if err != nil {
210-
newBatch.Clean(proc.GetMPool())
211-
return err
212-
}
224+
addToConflictMaps(&insertArg.ctr.keyBuf, insertBatch.Vecs, insertArg.ctr.uniqueKeyColIndices,
225+
insertArg.ctr.conflictMaps, insertBatch.RowCount()-1)
213226
} else {
214227

215228
if insertArg.IsIgnore {
@@ -222,12 +235,9 @@ func resetInsertBatchForOnduplicateKey(proc *process.Process, originBatch *batch
222235
newBatch.Clean(proc.GetMPool())
223236
return err
224237
}
225-
conflictRowIdx, conflictMsg, err := checkConflict(proc, tmpBatch, checkConflictBatch, insertArg.ctr.uniqueCheckExes, insertArg.UniqueCols, insertColCount)
226-
if err != nil {
227-
tmpBatch.Clean(proc.GetMPool())
228-
newBatch.Clean(proc.GetMPool())
229-
return err
230-
}
238+
conflictRowIdx, conflictMsg := findConflictByHashMap(
239+
&insertArg.ctr.keyBuf, tmpBatch.Vecs, insertArg.ctr.uniqueKeyColIndices,
240+
insertArg.ctr.conflictMaps, insertArg.UniqueCols, 0)
231241
if conflictRowIdx > -1 {
232242
tmpBatch.Clean(proc.GetMPool())
233243
newBatch.Clean(proc.GetMPool())
@@ -240,12 +250,8 @@ func resetInsertBatchForOnduplicateKey(proc *process.Process, originBatch *batch
240250
newBatch.Clean(proc.GetMPool())
241251
return err
242252
}
243-
_, err = checkConflictBatch.Append(proc.Ctx, proc.Mp(), tmpBatch)
244-
if err != nil {
245-
tmpBatch.Clean(proc.GetMPool())
246-
newBatch.Clean(proc.GetMPool())
247-
return err
248-
}
253+
addToConflictMaps(&insertArg.ctr.keyBuf, insertBatch.Vecs, insertArg.ctr.uniqueKeyColIndices,
254+
insertArg.ctr.conflictMaps, insertBatch.RowCount()-1)
249255
}
250256
tmpBatch.Clean(proc.GetMPool())
251257
}
@@ -269,6 +275,123 @@ func resetColPos(e *plan.Expr, columnCount int) {
269275
}
270276
}
271277

278+
// extractColIndicesFromExpr extracts the left-side column indices from a unique check expression.
279+
// For a single-column unique key: "col_i = col_j" → [i]
280+
// For a composite unique key: "(col_i = col_j AND col_k = col_l)" → [i, k]
281+
func extractColIndicesFromExpr(expr *plan.Expr) []int32 {
282+
switch e := expr.Expr.(type) {
283+
case *plan.Expr_F:
284+
if e.F.Func.ObjName == "=" {
285+
if col := extractColRefFromExpr(e.F.Args[0]); col != nil {
286+
return []int32{col.Col.ColPos}
287+
}
288+
} else if e.F.Func.ObjName == "and" {
289+
left := extractColIndicesFromExpr(e.F.Args[0])
290+
right := extractColIndicesFromExpr(e.F.Args[1])
291+
return append(left, right...)
292+
}
293+
}
294+
return nil
295+
}
296+
297+
// extractColRefFromExpr recursively unwraps cast/type expressions to find the underlying column reference.
298+
func extractColRefFromExpr(expr *plan.Expr) *plan.Expr_Col {
299+
switch e := expr.Expr.(type) {
300+
case *plan.Expr_Col:
301+
return e
302+
case *plan.Expr_F:
303+
// Handle cast-like functions: try first argument
304+
if len(e.F.Args) > 0 {
305+
return extractColRefFromExpr(e.F.Args[0])
306+
}
307+
}
308+
return nil
309+
}
310+
311+
// serializeUniqueKey serializes unique key column values into a string for hash map lookup.
312+
// Returns empty string if any column is NULL (NULL never conflicts per SQL semantics).
313+
// The caller-provided buf is reset and reused to avoid per-call allocations.
314+
// Float types are canonicalized to match SQL '=' semantics (scale-based rounding, -0/+0 normalization).
315+
func serializeUniqueKey(buf *bytes.Buffer, vecs []*vector.Vector, colIndices []int32, row int) string {
316+
buf.Reset()
317+
for _, colIdx := range colIndices {
318+
v := vecs[colIdx]
319+
if v.GetNulls().Contains(uint64(row)) {
320+
return ""
321+
}
322+
typ := v.GetType()
323+
switch typ.Oid {
324+
case types.T_float32:
325+
val := vector.MustFixedColWithTypeCheck[float32](v)[row]
326+
if typ.Scale > 0 {
327+
pow := math.Pow10(int(typ.Scale))
328+
val = float32(math.Round(float64(val)*pow) / pow)
329+
}
330+
if val == 0 {
331+
val = 0
332+
}
333+
bits := math.Float32bits(val)
334+
buf.Write([]byte{0, 0, 0, 4, byte(bits >> 24), byte(bits >> 16), byte(bits >> 8), byte(bits)})
335+
case types.T_float64:
336+
val := vector.MustFixedColWithTypeCheck[float64](v)[row]
337+
if val == 0 {
338+
val = 0
339+
}
340+
bits := math.Float64bits(val)
341+
buf.Write([]byte{0, 0, 0, 8,
342+
byte(bits >> 56), byte(bits >> 48), byte(bits >> 40), byte(bits >> 32),
343+
byte(bits >> 24), byte(bits >> 16), byte(bits >> 8), byte(bits)})
344+
default:
345+
b := v.GetRawBytesAt(row)
346+
l := len(b)
347+
buf.WriteByte(byte(l >> 24))
348+
buf.WriteByte(byte(l >> 16))
349+
buf.WriteByte(byte(l >> 8))
350+
buf.WriteByte(byte(l))
351+
buf.Write(b)
352+
}
353+
}
354+
return buf.String()
355+
}
356+
357+
// findConflictByHashMap checks if a row conflicts with any existing row using hash maps.
358+
// Returns the conflicting row index and message, or (-1, "") if no conflict.
359+
func findConflictByHashMap(
360+
buf *bytes.Buffer,
361+
vecs []*vector.Vector,
362+
uniqueKeyColIndices [][]int32,
363+
conflictMaps []map[string]int,
364+
uniqueCols []string,
365+
row int,
366+
) (int, string) {
367+
for i, colIndices := range uniqueKeyColIndices {
368+
key := serializeUniqueKey(buf, vecs, colIndices, row)
369+
if key == "" {
370+
continue
371+
}
372+
if idx, exists := conflictMaps[i][key]; exists {
373+
return idx, fmt.Sprintf("Duplicate entry for key '%s'", uniqueCols[i])
374+
}
375+
}
376+
return -1, ""
377+
}
378+
379+
// addToConflictMaps adds a row's unique key values to all conflict hash maps.
380+
func addToConflictMaps(
381+
buf *bytes.Buffer,
382+
vecs []*vector.Vector,
383+
uniqueKeyColIndices [][]int32,
384+
conflictMaps []map[string]int,
385+
rowIdx int,
386+
) {
387+
for i, colIndices := range uniqueKeyColIndices {
388+
key := serializeUniqueKey(buf, vecs, colIndices, rowIdx)
389+
if key != "" {
390+
conflictMaps[i][key] = rowIdx
391+
}
392+
}
393+
}
394+
272395
func fetchOneRowAsBatch(idx int, originBatch *batch.Batch, proc *process.Process, attrs []string) (*batch.Batch, error) {
273396
newBatch := batch.NewWithSize(len(attrs))
274397
newBatch.Attrs = attrs
@@ -329,6 +452,9 @@ func updateOldBatch(evalBatch *batch.Batch, updateExpr map[string]*plan.Expr, pr
329452
return newBatch, nil
330453
}
331454

455+
// checkConflict uses expression evaluation to detect conflicts in checkConflictBatch.
456+
// This is the legacy O(N) per-call approach, kept for testing purposes.
457+
// The hot path now uses findConflictByHashMap for O(1) lookups instead.
332458
func checkConflict(proc *process.Process, newBatch *batch.Batch, checkConflictBatch *batch.Batch,
333459
checkExpressionExecutor []colexec.ExpressionExecutor, uniqueCols []string, colCount int) (int, string, error) {
334460
if checkConflictBatch.RowCount() == 0 {

pkg/sql/colexec/onduplicatekey/types.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package onduplicatekey
1616

1717
import (
18+
"bytes"
19+
1820
"github.com/matrixorigin/matrixone/pkg/common/reuse"
1921
"github.com/matrixorigin/matrixone/pkg/container/batch"
2022
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
@@ -33,9 +35,14 @@ const (
3335

3436
type container struct {
3537
state int
36-
checkConflictBat *batch.Batch // batch to check conflict
38+
checkConflictBat *batch.Batch // deprecated: kept for cleanup safety
3739
rbat *batch.Batch // return batch
3840
uniqueCheckExes []colexec.ExpressionExecutor
41+
42+
// Hash-based conflict detection (replaces O(N²) linear scan)
43+
uniqueKeyColIndices [][]int32 // column indices for each unique constraint
44+
conflictMaps []map[string]int // serialized_unique_key → row_index in rbat
45+
keyBuf bytes.Buffer // reusable buffer for serializeUniqueKey
3946
}
4047

4148
type OnDuplicatekey struct {
@@ -102,6 +109,9 @@ func (onDuplicatekey *OnDuplicatekey) Reset(proc *process.Process, pipelineFaile
102109
exe.ResetForNextQuery()
103110
}
104111
}
112+
for i := range onDuplicatekey.ctr.conflictMaps {
113+
clear(onDuplicatekey.ctr.conflictMaps[i])
114+
}
105115
onDuplicatekey.ctr.state = Build
106116
}
107117

@@ -120,6 +130,8 @@ func (onDuplicatekey *OnDuplicatekey) Free(proc *process.Process, pipelineFailed
120130
}
121131
}
122132
onDuplicatekey.ctr.uniqueCheckExes = nil
133+
onDuplicatekey.ctr.conflictMaps = nil
134+
onDuplicatekey.ctr.uniqueKeyColIndices = nil
123135
}
124136

125137
func (onDuplicatekey *OnDuplicatekey) ExecProjection(proc *process.Process, input *batch.Batch) (*batch.Batch, error) {

0 commit comments

Comments
 (0)