@@ -237,7 +237,8 @@ func (l *LogMinerStream) buildFilterSchemaTable() (filterSchemaTable string) {
237237 return
238238}
239239
240- func (l * LogMinerStream ) GetLogMinerRecord (startScn , endScn int64 , records chan * LogMinerRecord ) error {
240+ func (e * ExtractorOracle ) GetLogMinerRecord (startScn , endScn int64 , records chan * LogMinerRecord ) error {
241+ l := e .LogMinerStream
241242 // AND table_name IN ('%s')
242243 // strings.Join(sourceTableNames, `','`),
243244 query := fmt .Sprintf (`
@@ -292,7 +293,7 @@ WHERE
292293 }
293294 recordsNum := 0
294295 // var lrs []*LogMinerRecord
295- for rows .Next () {
296+ for rows .Next () && ! e . shutdown {
296297 lr , err := scan (rows )
297298 if err != nil {
298299 return err
@@ -745,7 +746,10 @@ func (e *ExtractorOracle) LoopLogminerRecord() error {
745746 }()
746747 for ! e .shutdown {
747748 select {
748- case r := <- records :
749+ case r , ok := <- records :
750+ if ! ok {
751+ continue
752+ }
749753 atomic .AddInt64 (& e .mysqlContext .DeltaEstimate , 1 )
750754 switch r .Operation {
751755 case OperationCodeStart :
@@ -798,11 +802,11 @@ func (e *ExtractorOracle) LoopLogminerRecord() error {
798802
799803 err = l .StartLogMinerBySCN2 (l .startScn , endScn )
800804 if err != nil {
801- fmt . Println ("StartLMBySCN " , err )
805+ l . logger . Error ("StartLMBySCN " , "err " , err )
802806 return err
803807 }
804808 l .logger .Info ("Get log miner record form" , "StartScn" , l .startScn , "EndScn" , endScn )
805- err = l .GetLogMinerRecord (l .startScn , endScn , records )
809+ err = e .GetLogMinerRecord (l .startScn , endScn , records )
806810 if err != nil {
807811 l .logger .Error ("GetLogMinerRecord " , "err" , err )
808812 return err
0 commit comments