Skip to content

Commit 19a6ffe

Browse files
committed
Merge branch 'master' of github.com:CortexFoundation/robot
2 parents 1e2b0fb + 18cb32c commit 19a6ffe

1 file changed

Lines changed: 70 additions & 59 deletions

File tree

monitor.go

Lines changed: 70 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -548,117 +548,128 @@ func (m *Monitor) skip(i uint64) bool {
548548
}*/
549549

550550
func (m *Monitor) syncLastBlock() uint64 {
551-
/*currentNumber, err := m.currentBlock()
552-
if err != nil {
553-
return 0
554-
}*/
555-
556551
currentNumber := m.currentNumber.Load()
552+
lastNumber := m.lastNumber.Load()
557553

558-
if currentNumber < m.lastNumber.Load() {
559-
log.Warn("Fs sync rollback", "current", currentNumber, "last", m.lastNumber.Load(), "offset", m.lastNumber.Load()-currentNumber)
554+
// Step 1: Handle rollback logic if current block number is less than the last processed number.
555+
if currentNumber < lastNumber {
556+
log.Warn("Fs sync rollback detected", "current", currentNumber, "last", lastNumber, "offset", lastNumber-currentNumber)
557+
rollbackNumber := uint64(0)
560558
if currentNumber > 65536 {
561-
m.lastNumber.Store(currentNumber - 65536)
562-
} else {
563-
m.lastNumber.Store(0)
559+
rollbackNumber = currentNumber - 65536
564560
}
565-
m.startNumber.Store(m.lastNumber.Load())
561+
m.lastNumber.Store(rollbackNumber)
562+
m.startNumber.Store(rollbackNumber)
566563
}
567564

565+
// Step 2: Determine the block range for this sync batch.
568566
minNumber := m.lastNumber.Load() + 1
569-
maxNumber := uint64(0)
567+
maxNumber := currentNumber
570568
if currentNumber > delay {
571569
maxNumber = currentNumber - delay
572-
//maxNumber = currentNumber
573570
}
574571

572+
// If the last processed block is unexpectedly higher than the current block (after rollback check),
573+
// this indicates a need to sync backward from the last number.
575574
if m.lastNumber.Load() > currentNumber {
576-
if m.lastNumber.Load() > batch {
577-
minNumber = m.lastNumber.Load() - batch
575+
minNumber = m.lastNumber.Load() - batch
576+
if m.lastNumber.Load() < batch {
577+
minNumber = 0 // Avoids underflow if lastNumber is smaller than batch
578578
}
579579
}
580580

581-
if maxNumber > batch+minNumber {
581+
// Adjust maxNumber to not exceed the batch size.
582+
if maxNumber > minNumber+batch {
582583
maxNumber = minNumber + batch
583584
}
584585

585-
// replay
586-
if minNumber >= delay {
587-
//minNumber = minNumber - delay
588-
}
589-
590586
if maxNumber < minNumber {
591587
return 0
592588
}
593589

594-
//if m.start == 0 {
595590
m.start = mclock.Now()
596-
//}
597591

598-
counter := 0
599-
for i := minNumber; i <= maxNumber; { // i++ {
592+
processedCount := 0
593+
for i := minNumber; i <= maxNumber; {
600594
if m.terminated.Load() {
601-
log.Warn("Fs scan terminated", "number", i)
595+
log.Warn("Fs scan terminated by signal", "lastProcessed", i-1)
602596
m.lastNumber.Store(i - 1)
603597
return 0
604598
}
605-
//if maxNumber > minNumber && i%2048 == 0 {
606-
// log.Info("Running", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch, "i", i, "srv", m.srv.Load(), "count", maxNumber-minNumber, "progress", float64(i)/float64(currentNumber))
607-
//}
599+
608600
if m.ckp != nil && m.skip(i) {
609601
i++
610602
continue
611603
}
612604

613-
if maxNumber-i >= m.scope {
614-
blocks, rpcErr := m.rpcBatchBlockByNumber(i, i+m.scope)
615-
if rpcErr != nil {
616-
log.Error("Batch sync old block failed", "current", maxNumber, "number", i, "batch", m.scope, "error", rpcErr)
617-
m.lastNumber.Store(i - 1)
618-
return 0
605+
// Step 3: Fetch blocks in a batch or individually based on remaining scope.
606+
remainingScope := maxNumber - i
607+
if remainingScope >= m.scope {
608+
// Process a batch of blocks
609+
blocks, err := m.rpcBatchBlockByNumber(i, i+m.scope)
610+
if err != nil {
611+
return m.handleSyncError("Batch sync old block failed", err, i-1)
619612
}
620613

621-
// batch blocks operation according service category
622-
for _, rpcBlock := range blocks {
623-
m.taskCh <- rpcBlock
614+
// Send blocks to the processing channel
615+
for _, block := range blocks {
616+
m.taskCh <- block
624617
}
625618

626-
size := len(blocks)
627-
for n := 0; n < size; n++ {
619+
// Wait for the processing results for the entire batch
620+
for range blocks {
628621
select {
629622
case err := <-m.errCh:
630623
if err != nil {
631-
m.lastNumber.Store(i - 1)
632-
log.Error("solve err", "err", err, "last", m.lastNumber.Load(), "i", i, "scope", m.scope, "min", minNumber, "max", maxNumber, "cur", currentNumber)
633-
return 0
624+
return m.handleSyncError("Processing error", err, i-1)
634625
}
635626
case <-m.exitCh:
627+
log.Info("Task checker quit signal received")
636628
m.lastNumber.Store(i - 1)
637-
log.Info("Task checker quit")
638629
return 0
639630
}
640631
}
641-
i += uint64(size)
642-
counter += size
632+
633+
i += uint64(len(blocks))
634+
processedCount += len(blocks)
643635
} else {
644-
rpcBlock, rpcErr := m.rpcBlockByNumber(i)
645-
if rpcErr != nil {
646-
log.Error("Sync old block failed", "number", i, "error", rpcErr)
647-
m.lastNumber.Store(i - 1)
648-
return 0
636+
// Process a single block
637+
block, err := m.rpcBlockByNumber(i)
638+
if err != nil {
639+
return m.handleSyncError("Sync old block failed", err, i-1)
649640
}
650-
if err := m.solve(rpcBlock); err != nil {
651-
log.Error("solve err", "err", err)
652-
m.lastNumber.Store(i - 1)
653-
return 0
641+
642+
if err := m.solve(block); err != nil {
643+
return m.handleSyncError("solve err", err, i-1)
654644
}
645+
655646
i++
656-
counter++
647+
processedCount++
657648
}
658649
}
659-
//log.Debug("Last number changed", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch)
650+
651+
// Step 4: Finalize the sync operation.
660652
m.lastNumber.Store(maxNumber)
661-
elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start)
662-
log.Debug("Chain segment frozen", "from", minNumber, "to", maxNumber, "range", uint64(maxNumber-minNumber+1), "counter", counter, "scope", m.scope, "current", m.CurrentNumber(), "prog", float64(maxNumber)/float64(m.CurrentNumber()), "last", m.lastNumber.Load(), "bps", float64(counter)*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA))
653+
elapsed := time.Duration(mclock.Now()) - time.Duration(m.start)
654+
655+
log.Debug("Chain segment frozen",
656+
"from", minNumber,
657+
"to", maxNumber,
658+
"totalBlocks", uint64(maxNumber-minNumber+1),
659+
"processed", processedCount,
660+
"scope", m.scope,
661+
"current", m.CurrentNumber(),
662+
"last", m.lastNumber.Load(),
663+
"elapsed", common.PrettyDuration(elapsed),
664+
"bps", float64(processedCount)/elapsed.Seconds(),
665+
)
666+
663667
return uint64(maxNumber - minNumber)
664668
}
669+
670+
// handleSyncError is a helper function to log an error, update the last processed block number, and return 0.
671+
func (m *Monitor) handleSyncError(msg string, err error, lastBlock uint64) uint64 {
672+
log.Error(msg, "error", err, "lastProcessed", lastBlock)
673+
m.lastNumber.Store(lastBlock)
674+
return 0
675+
}

0 commit comments

Comments
 (0)