Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,12 +2302,6 @@ func checkModify(plan0 *plan.Plan, resolveFn func(string, string, *plan2.Snapsho
return true, err
}
}
if ctx := p.Query.Nodes[i].ReplaceCtx; ctx != nil {
flag, err := checkFn(ctx.Ref, ctx.TableDef)
if err != nil || flag {
return true, err
}
}
if ctx := p.Query.Nodes[i].DeleteCtx; ctx != nil {
flag, err := checkFn(ctx.Ref, ctx.TableDef)
if err != nil || flag {
Expand Down
7 changes: 0 additions & 7 deletions pkg/frontend/mysql_cmd_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2445,13 +2445,6 @@ func Test_checkModify(t *testing.T) {
expected_flag: true,
expected_err: false,
},
{
node: &plan.Node{
ReplaceCtx: &plan0.ReplaceCtx{},
},
expected_flag: true,
expected_err: false,
},
{
node: &plan.Node{
DeleteCtx: &plan0.DeleteCtx{},
Expand Down
40 changes: 29 additions & 11 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,6 @@ func (c *Compile) run(s *Scope) error {
return s.DropIndex(c)
case TruncateTable:
return s.TruncateTable(c)
case Replace:
return s.replace(c)
case TableClone:
return s.TableClone(c)
}
Expand Down Expand Up @@ -501,6 +499,26 @@ func (c *Compile) runOnce() (err error) {
c.proc.Base.StageCache.Clear()
}()

// Pre-check: REPLACE parent→child FK RESTRICT constraints must be
// verified before the REPLACE execution modifies any rows.
query := c.pn.GetQuery()
if query != nil && query.StmtType == plan.Query_INSERT && len(query.GetDetectSqls()) != 0 {
for _, sql := range query.DetectSqls {
if strings.HasPrefix(sql, "REPLACE_PARENT_CHK:") {
if err = runDetectSql(c, strings.TrimPrefix(sql, "REPLACE_PARENT_CHK:")); err != nil {
// Only translate the "check returned false" signal into the
// parent-row-referenced error; pass through real execution
// errors (syntax, permissions, network, txn conflicts) so
// they are not masked.
if moerr.IsMoErrCode(err, moerr.ErrFKNoReferencedRow2) {
return moerr.NewErrFKRowIsReferenced(c.proc.Ctx)
}
return err
}
}
}
Comment thread
ck89119 marked this conversation as resolved.
}

if c.IsTpQuery() && len(c.scopes) == 1 {
if err = c.run(c.scopes[0]); err != nil {
return err
Expand Down Expand Up @@ -578,10 +596,17 @@ func (c *Compile) runOnce() (err error) {

//detect fk self refer
//update, insert
query := c.pn.GetQuery()
query = c.pn.GetQuery()
if query != nil && (query.StmtType == plan.Query_INSERT ||
query.StmtType == plan.Query_UPDATE) && len(query.GetDetectSqls()) != 0 {
err = detectFkSelfRefer(c, query.DetectSqls)
// Filter out pre-check SQLs (already executed before the main operation)
var postCheckSqls []string
for _, sql := range query.DetectSqls {
if !strings.HasPrefix(sql, "REPLACE_PARENT_CHK:") {
postCheckSqls = append(postCheckSqls, sql)
}
}
err = detectFkSelfRefer(c, postCheckSqls)
}
//alter table ... add/drop foreign key
if err == nil && c.pn.GetDdl() != nil {
Expand All @@ -607,13 +632,6 @@ func (c *Compile) compileScope(pn *plan.Plan) ([]*Scope, error) {
}()
switch qry := pn.Plan.(type) {
case *plan.Plan_Query:
switch qry.Query.StmtType {
case plan.Query_REPLACE:
return []*Scope{
newScope(Replace).
withPlan(pn),
}, nil
}
scopes, err := c.compileQuery(qry.Query)
if err != nil {
return nil, err
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/compile/compile2.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,11 +669,9 @@ func setContextForParallelScope(parallelScope *Scope, originalContext context.Co
}

func (c *Compile) AnalyzeExecPlan(runC *Compile, queryResult *util2.RunResult, stats *statistic.StatsInfo, isExplainPhy bool, option *ExplainOption) {
switch planType := c.pn.Plan.(type) {
switch c.pn.Plan.(type) {
case *plan.Plan_Query:
if planType.Query.StmtType != plan.Query_REPLACE {
c.handleQueryPlanAnalyze(runC, queryResult, stats, isExplainPhy, option)
}
c.handleQueryPlanAnalyze(runC, queryResult, stats, isExplainPhy, option)
case *plan.Plan_Ddl:
handleDdlPlanAnalyze(runC, stats)
}
Expand Down
46 changes: 0 additions & 46 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package compile

import (
"context"
"fmt"
"slices"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -900,50 +898,6 @@ func receiveMsgAndForward(sender *messageSenderOnClient, forwardCh chan process.
}
}

func (s *Scope) replace(c *Compile) error {
dbName := s.Plan.GetQuery().Nodes[0].ReplaceCtx.TableDef.DbName
tblName := s.Plan.GetQuery().Nodes[0].ReplaceCtx.TableDef.Name
deleteCond := s.Plan.GetQuery().Nodes[0].ReplaceCtx.DeleteCond
rewriteFromOnDuplicateKey := s.Plan.GetQuery().Nodes[0].ReplaceCtx.RewriteFromOnDuplicateKey

delAffectedRows := uint64(0)
if deleteCond != "" {
result, err := c.runSqlWithResult(fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE %s", dbName, tblName, deleteCond), NoAccountId)
if err != nil {
return err
}
delAffectedRows = result.AffectedRows
}
var sql string
if rewriteFromOnDuplicateKey {
idx := strings.Index(strings.ToLower(c.sql), "on duplicate key update")
sql = c.sql[:idx]
} else {
removed := removeStringBetween(c.sql, "/*", "*/")
sql = "insert " + strings.TrimSpace(removed)[7:]
}
result, err := c.runSqlWithResult(sql, NoAccountId)
if err != nil {
return err
}
c.addAffectedRows(result.AffectedRows + delAffectedRows)
return nil
}

func removeStringBetween(s, start, end string) string {
startIndex := strings.Index(s, start)
for startIndex != -1 {
endIndex := strings.Index(s, end)
if endIndex == -1 || startIndex > endIndex {
return s
}

s = s[:startIndex] + s[endIndex+len(end):]
startIndex = strings.Index(s, start)
}
return s
}

// defaultStarCountTombstoneThreshold: when estimated tombstone rows exceed this, skip StarCount
// and use per-block aggOptimize (only scan blocks with tombstones). Avoids CollectTombstoneStats
// Merge path taking seconds for large tombstone sets.
Expand Down
19 changes: 0 additions & 19 deletions pkg/sql/compile/scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,25 +534,6 @@ func getReverseList2(rootOp vm.Operator, stack []vm.OpType) []vm.OpType {
return stack
}

func TestRemoveStringBetween(t *testing.T) {
cases := []struct {
input, output string
}{
{
input: "/* comment */ replace into t1 values (1);",
output: " replace into t1 values (1);",
},
{
input: "/* comment */ replace /* replace */ into t1 values (1);",
output: " replace into t1 values (1);",
},
}

for _, c := range cases {
require.Equal(t, c.output, removeStringBetween(c.input, "/*", "*/"))
}
}

type fakeStreamSender2 struct {
morpc.Stream
number int
Expand Down
Loading
Loading