Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/frontend/status_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package frontend

import (
"math"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -285,6 +286,12 @@ func (resper *MysqlResp) respStatus(ses *Session,
isIssue3482 = true
localFileName = st.Param.Filepath
}
if w := execCtx.proc.Base.Warnings.Load(); w > 0 {
if w > math.MaxUint16 {
w = math.MaxUint16
}
res.warnings = uint16(w)
}
Comment thread
ck89119 marked this conversation as resolved.
}

if err2 := resper.mysqlRrWr.WriteResponse(execCtx.reqCtx, res); err2 != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/pb/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (m *Message) SetProcData(data []byte) {
m.ProcInfoData = data
}

func (m *Message) SetWarningCount(n uint32) {
m.WarningCount = n
}

func (m *Message) DebugString() string {
errInfo := "none"
if len(m.Err) > 0 {
Expand Down
811 changes: 444 additions & 367 deletions pkg/pb/pipeline/pipeline.pb.go

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions pkg/sql/colexec/external/reader_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (r *CsvReader) makeBatchRows(proc *process.Process, bat *batch.Batch) (file
var finish bool
var row []csvparser.Field
var unexpectEOF bool
ignoreError := param.IgnoreError

for i := 0; i < OneBatchMaxRow; i++ {
select {
Expand All @@ -135,6 +136,14 @@ func (r *CsvReader) makeBatchRows(proc *process.Process, bat *batch.Batch) (file
if err != nil {
if err == io.EOF {
finish = true
} else if ignoreError {
if recoverErr := csvReader.RecoverAfterReadError(); recoverErr != nil {
return false, recoverErr
}
logutil.Warnf("load data ignore error: read csv row failed: %v", err)
proc.Base.Warnings.Add(1)
i--
continue
Comment thread
ck89119 marked this conversation as resolved.
Comment thread
ck89119 marked this conversation as resolved.
} else {
return false, err
}
Expand All @@ -161,10 +170,6 @@ func (r *CsvReader) makeBatchRows(proc *process.Process, bat *batch.Batch) (file
}
}

for j := 0; j < len(row); j++ {
curBatchSize += uint64(len(row[j].Val))
}

rowIdx := i
if param.Extern.Format == tree.JSONLINE {
row, err = r.transJson2Lines(proc.Ctx, row[0].Val, param.Attrs, param.Cols, param.Extern.JsonData)
Expand All @@ -181,6 +186,9 @@ func (r *CsvReader) makeBatchRows(proc *process.Process, bat *batch.Batch) (file
if err = getOneRowData(proc, bat, row, rowIdx, param); err != nil {
return false, err
}
for j := 0; j < len(row); j++ {
curBatchSize += uint64(len(row[j].Val))
}

if curBatchSize >= param.maxBatchSize {
break
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/external/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type ExternalParam struct {
type ExParamConst struct {
ParallelLoad bool
StrictSqlMode bool
IgnoreError bool
Close byte
maxBatchSize uint64
Idx int
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ func dupOperator(sourceOp vm.Operator, index int, maxParallel int) vm.Operator {
FileOffsetTotal: t.Es.FileOffsetTotal,
Extern: t.Es.Extern,
StrictSqlMode: t.Es.StrictSqlMode,
IgnoreError: t.Es.IgnoreError,
},
ExParam: external.ExParam{
Filter: &external.FilterParam{
Expand Down Expand Up @@ -898,6 +899,7 @@ func constructExternal(node *plan.Node, param *tree.ExternParam, ctx context.Con
FileSize: FileSize,
ClusterTable: node.GetClusterTable(),
StrictSqlMode: strictSqlMode,
IgnoreError: param.IgnoreError,
},
ExParam: external.ExParam{
Fileparam: new(external.ExFileparam),
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/compile/remoterun.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ func convertToPipelineInstruction(op vm.Operator, proc *process.Process, ctx *sc
FileList: t.Es.FileList,
Filter: t.Es.Filter.FilterExpr,
StrictSqlMode: t.Es.StrictSqlMode,
IgnoreError: t.Es.IgnoreError,
}
in.ProjectList = t.ProjectList
case *source.Source:
Expand Down Expand Up @@ -1104,6 +1105,7 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
CreateSql: t.CreateSql,
FileList: t.FileList,
StrictSqlMode: t.StrictSqlMode,
IgnoreError: t.IgnoreError,
},
ExParam: external.ExParam{
Fileparam: new(external.ExFileparam),
Expand Down
30 changes: 27 additions & 3 deletions pkg/sql/compile/remoterunClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,13 @@ func receiveMessageFromCnServerIfOnlyRun(s *Scope, sender *messageSenderOnClient
// However, I have used a loop here to ensure that the query can still be executed normally even if this situation occurs.
for {
bat, end, err = sender.receiveBatch()
if err != nil || end || bat == nil {
if err != nil {
return err
}
if end || bat == nil {
sender.mergeRemoteWarnings(s.Proc)
return nil
}
bat.Clean(mp)
}
}
Expand All @@ -244,9 +248,13 @@ func receiveMessageFromCnServerIfConnector(s *Scope, sender *messageSenderOnClie
nextChannel := s.RootOp.(*connector.Connector).Reg.Ch2
for {
bat, end, err = sender.receiveBatch()
if err != nil || end || bat == nil {
if err != nil {
return err
}
if end || bat == nil {
sender.mergeRemoteWarnings(s.Proc)
return nil
}
connectorAnalyze.Network(bat)

if err = forwardRemoteBatchWithContext(sender, nextChannel, bat, mp); err != nil {
Expand Down Expand Up @@ -282,9 +290,13 @@ func receiveMessageFromCnServerIfDispatch(s *Scope, sender *messageSenderOnClien
mp := s.Proc.Mp()
for {
bat, end, err = sender.receiveBatch()
if err != nil || end || bat == nil {
if err != nil {
return err
}
if end || bat == nil {
sender.mergeRemoteWarnings(s.Proc)
return nil
}

dispatchAnalyze.Network(bat)
fakeValueScanOperator.Batchs = append(fakeValueScanOperator.Batchs, bat)
Expand Down Expand Up @@ -363,6 +375,9 @@ type messageSenderOnClient struct {
// gaugeDecOnce ensures PipelineMessageSenderGauge.Dec() is called at most once when close() runs
// (including when close() returns early because alreadyClose is true, so the gauge still decrements).
gaugeDecOnce sync.Once

// remoteWarningCount accumulates warnings returned by remote MessageEnd packets.
remoteWarningCount uint32
}

func newMessageSenderOnClient(
Expand Down Expand Up @@ -484,6 +499,7 @@ func (sender *messageSenderOnClient) receiveBatch() (bat *batch.Batch, over bool
}
if m.IsEndMessage() {
sender.safeToClose = true
sender.remoteWarningCount += m.GetWarningCount()

anaData := m.GetAnalyse()
if len(anaData) > 0 {
Expand Down Expand Up @@ -518,6 +534,14 @@ func (sender *messageSenderOnClient) receiveBatch() (bat *batch.Batch, over bool
}
}

func (sender *messageSenderOnClient) mergeRemoteWarnings(proc *process.Process) {
if sender.remoteWarningCount == 0 || proc == nil {
return
}
proc.Base.Warnings.Add(sender.remoteWarningCount)
sender.remoteWarningCount = 0
}

func (sender *messageSenderOnClient) contextDoneError() error {
if sender.ctx == nil {
return nil
Expand Down
34 changes: 34 additions & 0 deletions pkg/sql/compile/remoterunClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/pb/pipeline"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/connector"
"github.com/matrixorigin/matrixone/pkg/testutil"
Expand Down Expand Up @@ -93,6 +94,39 @@ func Test_newMessageSenderOnClient(t *testing.T) {
client.waitingTheStopResponse()
}

func TestMessageSenderMergeRemoteWarnings(t *testing.T) {
proc := testutil.NewProcess(t)
sender := &messageSenderOnClient{remoteWarningCount: 7}

sender.mergeRemoteWarnings(proc)
assert.Equal(t, uint32(7), proc.Base.Warnings.Load())
assert.Equal(t, uint32(0), sender.remoteWarningCount)

// idempotent when no remote warnings left.
sender.mergeRemoteWarnings(proc)
assert.Equal(t, uint32(7), proc.Base.Warnings.Load())
}

func TestMessageSenderReceiveBatchCapturesWarningCount(t *testing.T) {
ctx := context.Background()
sender := &messageSenderOnClient{
ctx: ctx,
receiveCh: make(chan morpc.Message, 1),
mp: mpool.MustNewZero(),
}

endMsg := &pipeline.Message{}
endMsg.SetSid(pipeline.Status_MessageEnd)
endMsg.SetWarningCount(5)
sender.receiveCh <- endMsg

bat, over, err := sender.receiveBatch()
assert.NoError(t, err)
assert.Nil(t, bat)
assert.True(t, over)
assert.Equal(t, uint32(5), sender.remoteWarningCount)
}

func TestRemoteRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/compile/remoterunServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func handlePipelineMessage(receiver *messageReceiverOnServer) error {
if err == nil {
runCompile.GenPhyPlan(runCompile)
receiver.phyPlan = runCompile.anal.GetPhyPlan()
receiver.warningCount = runCompile.proc.Base.Warnings.Load()
}
return err

Expand Down Expand Up @@ -299,6 +300,9 @@ type messageReceiverOnServer struct {

// result.
phyPlan *models.PhyPlan

// warningCount records warnings generated during remote execution.
warningCount uint32
}

func newMessageReceiverOnServer(
Expand Down Expand Up @@ -488,6 +492,7 @@ func (receiver *messageReceiverOnServer) sendEndMessage() error {
message.SetSid(pipeline.Status_MessageEnd)
message.SetID(receiver.messageId)
message.SetMessageType(receiver.messageTyp)
message.SetWarningCount(receiver.warningCount)

jsonData, err := json.MarshalIndent(receiver.phyPlan, "", " ")
if err != nil {
Expand Down
21 changes: 20 additions & 1 deletion pkg/sql/compile/remoterun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func TestReceiveMsgAndForward_ReturnsOnBlockedReceiverCancel(t *testing.T) {

done := make(chan error, 1)
go func() {
done <- receiveMsgAndForward(sender, forwardCh)
done <- receiveMsgAndForward(sender, forwardCh, nil)
}()

cancel()
Expand All @@ -881,6 +881,25 @@ func TestReceiveMsgAndForward_ReturnsOnBlockedReceiverCancel(t *testing.T) {
}
}

func TestReceiveMsgAndForward_MergesRemoteWarningsOnEnd(t *testing.T) {
proc := testutil.NewProcess(t)
forwardCh := make(chan process.PipelineSignal, 1)

sender := &messageSenderOnClient{
ctx: context.Background(),
mp: proc.Mp(),
receiveCh: make(chan morpc.Message, 1),
}
endMsg := &pipeline.Message{}
endMsg.SetSid(pipeline.Status_MessageEnd)
endMsg.SetWarningCount(6)
sender.receiveCh <- endMsg

err := receiveMsgAndForward(sender, forwardCh, proc)
require.NoError(t, err)
require.Equal(t, uint32(6), proc.Base.Warnings.Load())
}

func TestReceiveMessageFromCnServerIfDispatch_PreservesCleanupOnOriginalRoot(t *testing.T) {
proc := testutil.NewProcess(t)

Expand Down
14 changes: 11 additions & 3 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ func (s *Scope) sendNotifyMessage(wg *sync.WaitGroup, resultChan chan notifyMess
sender.safeToClose = false
sender.alreadyClose = false

err = receiveMsgAndForward(sender, s.Proc.Reg.MergeReceivers[receiverIdx].Ch2)
err = receiveMsgAndForward(sender, s.Proc.Reg.MergeReceivers[receiverIdx].Ch2, s.Proc)
closeWithError(err, s.Proc.Reg.MergeReceivers[receiverIdx], sender)
},
)
Expand All @@ -885,12 +885,20 @@ func suppressRemoteRunCancelError(procCtx context.Context, err error) error {
return err
}

func receiveMsgAndForward(sender *messageSenderOnClient, forwardCh chan process.PipelineSignal) error {
func receiveMsgAndForward(
sender *messageSenderOnClient,
forwardCh chan process.PipelineSignal,
proc *process.Process,
) error {
for {
bat, end, err := sender.receiveBatch()
if err != nil || end || bat == nil {
if err != nil {
return err
}
if end || bat == nil {
sender.mergeRemoteWarnings(proc)
return nil
}

if err = forwardRemoteBatchWithContext(sender, forwardCh, bat, sender.mp); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/parsers/tree/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type ExParam struct {
Local bool
Parallel bool
Strict bool
IgnoreError bool
}

type S3Parameter struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/plan/bind_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (builder *QueryBuilder) bindExternalScan(

ctx := builder.compCtx
stmt.Param.Local = stmt.Local
if _, ok := stmt.DuplicateHandling.(*tree.DuplicateKeyIgnore); ok {
stmt.Param.IgnoreError = true
}
fileName, err := checkFileExist(stmt.Param, ctx)
if err != nil {
return -1, nil, err
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/plan/build_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ func buildLoad(stmt *tree.Load, ctx CompilerContext, isPrepareStmt bool) (*Plan,
}

stmt.Param.Local = stmt.Local
if _, ok := stmt.DuplicateHandling.(*tree.DuplicateKeyIgnore); ok {
stmt.Param.IgnoreError = true
}
fileName, err := checkFileExist(stmt.Param, ctx)
if err != nil {
return nil, err
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/util/csvparser/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ func (parser *CSVParser) Read(lastRow []Field) (row []Field, err error) {
return row, err
}

// RecoverAfterReadError discards bytes until the next record terminator so the
// next Read starts from a new physical line after a parse error.
func (parser *CSVParser) RecoverAfterReadError() error {
parser.recordBuffer = parser.recordBuffer[:0]
parser.fieldIndexes = parser.fieldIndexes[:0]
parser.fieldIsQuoted = parser.fieldIsQuoted[:0]

_, _, err := parser.readUntilTerminator()
if err == io.EOF {
return nil
}
return err
}

func (parser *CSVParser) Pos() int64 {
return parser.pos
}
Expand Down
Loading
Loading