Skip to content

Commit 18cb32c

Browse files
authored
Merge pull request #30 from CortexFoundation/monitor-b
optimized
2 parents 4a02e08 + 04f4b6b commit 18cb32c

1 file changed

Lines changed: 70 additions & 59 deletions

File tree

monitor.go

Lines changed: 70 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -549,117 +549,128 @@ func (m *Monitor) skip(i uint64) bool {
549549
}*/
550550

551551
func (m *Monitor) syncLastBlock() uint64 {
552-
/*currentNumber, err := m.currentBlock()
553-
if err != nil {
554-
return 0
555-
}*/
556-
557552
currentNumber := m.currentNumber.Load()
553+
lastNumber := m.lastNumber.Load()
558554

559-
if currentNumber < m.lastNumber.Load() {
560-
log.Warn("Fs sync rollback", "current", currentNumber, "last", m.lastNumber.Load(), "offset", m.lastNumber.Load()-currentNumber)
555+
// Step 1: Handle rollback logic if current block number is less than the last processed number.
556+
if currentNumber < lastNumber {
557+
log.Warn("Fs sync rollback detected", "current", currentNumber, "last", lastNumber, "offset", lastNumber-currentNumber)
558+
rollbackNumber := uint64(0)
561559
if currentNumber > 65536 {
562-
m.lastNumber.Store(currentNumber - 65536)
563-
} else {
564-
m.lastNumber.Store(0)
560+
rollbackNumber = currentNumber - 65536
565561
}
566-
m.startNumber.Store(m.lastNumber.Load())
562+
m.lastNumber.Store(rollbackNumber)
563+
m.startNumber.Store(rollbackNumber)
567564
}
568565

566+
// Step 2: Determine the block range for this sync batch.
569567
minNumber := m.lastNumber.Load() + 1
570-
maxNumber := uint64(0)
568+
maxNumber := currentNumber
571569
if currentNumber > delay {
572570
maxNumber = currentNumber - delay
573-
//maxNumber = currentNumber
574571
}
575572

573+
// If the last processed block is unexpectedly higher than the current block (after rollback check),
574+
// this indicates a need to sync backward from the last number.
576575
if m.lastNumber.Load() > currentNumber {
577-
if m.lastNumber.Load() > batch {
578-
minNumber = m.lastNumber.Load() - batch
576+
minNumber = m.lastNumber.Load() - batch
577+
if m.lastNumber.Load() < batch {
578+
minNumber = 0 // Avoids underflow if lastNumber is smaller than batch
579579
}
580580
}
581581

582-
if maxNumber > batch+minNumber {
582+
// Adjust maxNumber to not exceed the batch size.
583+
if maxNumber > minNumber+batch {
583584
maxNumber = minNumber + batch
584585
}
585586

586-
// replay
587-
if minNumber >= delay {
588-
//minNumber = minNumber - delay
589-
}
590-
591587
if maxNumber < minNumber {
592588
return 0
593589
}
594590

595-
//if m.start == 0 {
596591
m.start = mclock.Now()
597-
//}
598592

599-
counter := 0
600-
for i := minNumber; i <= maxNumber; { // i++ {
593+
processedCount := 0
594+
for i := minNumber; i <= maxNumber; {
601595
if m.terminated.Load() {
602-
log.Warn("Fs scan terminated", "number", i)
596+
log.Warn("Fs scan terminated by signal", "lastProcessed", i-1)
603597
m.lastNumber.Store(i - 1)
604598
return 0
605599
}
606-
//if maxNumber > minNumber && i%2048 == 0 {
607-
// log.Info("Running", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch, "i", i, "srv", m.srv.Load(), "count", maxNumber-minNumber, "progress", float64(i)/float64(currentNumber))
608-
//}
600+
609601
if m.ckp != nil && m.skip(i) {
610602
i++
611603
continue
612604
}
613605

614-
if maxNumber-i >= m.scope {
615-
blocks, rpcErr := m.rpcBatchBlockByNumber(i, i+m.scope)
616-
if rpcErr != nil {
617-
log.Error("Batch sync old block failed", "current", maxNumber, "number", i, "batch", m.scope, "error", rpcErr)
618-
m.lastNumber.Store(i - 1)
619-
return 0
606+
// Step 3: Fetch blocks in a batch or individually based on remaining scope.
607+
remainingScope := maxNumber - i
608+
if remainingScope >= m.scope {
609+
// Process a batch of blocks
610+
blocks, err := m.rpcBatchBlockByNumber(i, i+m.scope)
611+
if err != nil {
612+
return m.handleSyncError("Batch sync old block failed", err, i-1)
620613
}
621614

622-
// batch blocks operation according service category
623-
for _, rpcBlock := range blocks {
624-
m.taskCh <- rpcBlock
615+
// Send blocks to the processing channel
616+
for _, block := range blocks {
617+
m.taskCh <- block
625618
}
626619

627-
size := len(blocks)
628-
for n := 0; n < size; n++ {
620+
// Wait for the processing results for the entire batch
621+
for range blocks {
629622
select {
630623
case err := <-m.errCh:
631624
if err != nil {
632-
m.lastNumber.Store(i - 1)
633-
log.Error("solve err", "err", err, "last", m.lastNumber.Load(), "i", i, "scope", m.scope, "min", minNumber, "max", maxNumber, "cur", currentNumber)
634-
return 0
625+
return m.handleSyncError("Processing error", err, i-1)
635626
}
636627
case <-m.exitCh:
628+
log.Info("Task checker quit signal received")
637629
m.lastNumber.Store(i - 1)
638-
log.Info("Task checker quit")
639630
return 0
640631
}
641632
}
642-
i += uint64(size)
643-
counter += size
633+
634+
i += uint64(len(blocks))
635+
processedCount += len(blocks)
644636
} else {
645-
rpcBlock, rpcErr := m.rpcBlockByNumber(i)
646-
if rpcErr != nil {
647-
log.Error("Sync old block failed", "number", i, "error", rpcErr)
648-
m.lastNumber.Store(i - 1)
649-
return 0
637+
// Process a single block
638+
block, err := m.rpcBlockByNumber(i)
639+
if err != nil {
640+
return m.handleSyncError("Sync old block failed", err, i-1)
650641
}
651-
if err := m.solve(rpcBlock); err != nil {
652-
log.Error("solve err", "err", err)
653-
m.lastNumber.Store(i - 1)
654-
return 0
642+
643+
if err := m.solve(block); err != nil {
644+
return m.handleSyncError("solve err", err, i-1)
655645
}
646+
656647
i++
657-
counter++
648+
processedCount++
658649
}
659650
}
660-
//log.Debug("Last number changed", "min", minNumber, "max", maxNumber, "cur", currentNumber, "last", m.lastNumber.Load(), "batch", batch)
651+
652+
// Step 4: Finalize the sync operation.
661653
m.lastNumber.Store(maxNumber)
662-
elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start)
663-
log.Debug("Chain segment frozen", "from", minNumber, "to", maxNumber, "range", uint64(maxNumber-minNumber+1), "counter", counter, "scope", m.scope, "current", m.CurrentNumber(), "prog", float64(maxNumber)/float64(m.CurrentNumber()), "last", m.lastNumber.Load(), "bps", float64(counter)*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA))
654+
elapsed := time.Duration(mclock.Now()) - time.Duration(m.start)
655+
656+
log.Debug("Chain segment frozen",
657+
"from", minNumber,
658+
"to", maxNumber,
659+
"totalBlocks", uint64(maxNumber-minNumber+1),
660+
"processed", processedCount,
661+
"scope", m.scope,
662+
"current", m.CurrentNumber(),
663+
"last", m.lastNumber.Load(),
664+
"elapsed", common.PrettyDuration(elapsed),
665+
"bps", float64(processedCount)/elapsed.Seconds(),
666+
)
667+
664668
return uint64(maxNumber - minNumber)
665669
}
670+
671+
// handleSyncError is a helper function to log an error, update the last processed block number, and return 0.
672+
func (m *Monitor) handleSyncError(msg string, err error, lastBlock uint64) uint64 {
673+
log.Error(msg, "error", err, "lastProcessed", lastBlock)
674+
m.lastNumber.Store(lastBlock)
675+
return 0
676+
}

0 commit comments

Comments
 (0)