Skip to content

Commit 6852ee3

Browse files
committed
atomic for cur and last num
1 parent 6914a61 commit 6852ee3

1 file changed

Lines changed: 29 additions & 29 deletions

File tree

monitor.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ type Monitor struct {
5959

6060
exitCh chan any
6161
terminated atomic.Bool
62-
lastNumber uint64
63-
startNumber uint64
62+
lastNumber atomic.Uint64
63+
startNumber atomic.Uint64
6464
scope uint64
6565
currentNumber atomic.Uint64
6666
wg sync.WaitGroup
@@ -110,9 +110,8 @@ func New(flag *params.Config, cache, compress, listen bool, callback chan any) (
110110
cl: nil,
111111
//fs: fs,
112112
//dl: tMana,
113-
exitCh: make(chan any),
114-
lastNumber: uint64(0),
115-
scope: uint64(math.Min(float64(runtime.NumCPU()), float64(8))),
113+
exitCh: make(chan any),
114+
scope: uint64(math.Min(float64(runtime.NumCPU()), float64(8))),
116115
//taskCh: make(chan *types.Block, batch),
117116
//start: mclock.Now(),
118117
}
@@ -122,6 +121,7 @@ func New(flag *params.Config, cache, compress, listen bool, callback chan any) (
122121
return nil, err
123122
}
124123
m.fs = fs_
124+
m.lastNumber.Store(0)
125125
m.currentNumber.Store(0)
126126
m.terminated.Store(false)
127127
m.blockCache, _ = lru.New(delay)
@@ -211,17 +211,17 @@ func (m *Monitor) indexCheck() error {
211211

212212
version := m.fs.GetRoot(checkpoint.TfsCheckPoint)
213213
if common.BytesToHash(version) != checkpoint.TfsRoot {
214-
m.lastNumber = 0
215-
m.startNumber = 0
216-
if m.lastNumber > checkpoint.TfsCheckPoint {
214+
m.lastNumber.Store(0)
215+
m.startNumber.Store(0)
216+
if m.lastNumber.Load() > checkpoint.TfsCheckPoint {
217217
//m.fs.LastListenBlockNumber = 0
218218
m.fs.Anchor(0)
219219
//m.lastNumber = 0
220220
//if err := m.fs.Reset(); err != nil {
221221
// return err
222222
//}
223223
}
224-
log.Warn("Fs storage is reloading ...", "name", m.ckp.Name, "number", checkpoint.TfsCheckPoint, "version", common.BytesToHash(version), "checkpoint", checkpoint.TfsRoot, "blocks", len(m.fs.Blocks()), "files", len(m.fs.Files()), "txs", m.fs.Txs(), "lastNumber", m.lastNumber, "last in db", m.fs.LastListenBlockNumber())
224+
log.Warn("Fs storage is reloading ...", "name", m.ckp.Name, "number", checkpoint.TfsCheckPoint, "version", common.BytesToHash(version), "checkpoint", checkpoint.TfsRoot, "blocks", len(m.fs.Blocks()), "files", len(m.fs.Files()), "txs", m.fs.Txs(), "lastNumber", m.lastNumber.Load(), "last in db", m.fs.LastListenBlockNumber())
225225
} else {
226226
log.Info("Fs storage version check passed", "name", m.ckp.Name, "number", checkpoint.TfsCheckPoint, "version", common.BytesToHash(version), "blocks", len(m.fs.Blocks()), "files", len(m.fs.Files()), "txs", m.fs.Txs())
227227
}
@@ -574,9 +574,9 @@ func (m *Monitor) run() error {
574574
}
575575
m.cl = rpcClient
576576

577-
m.lastNumber = m.fs.LastListenBlockNumber()
577+
m.lastNumber.Store(m.fs.LastListenBlockNumber())
578578
m.currentBlock()
579-
m.startNumber = uint64(math.Min(float64(m.fs.LastListenBlockNumber()), float64(m.currentNumber.Load()))) // ? m.currentNumber:m.fs.LastListenBlockNumber
579+
m.startNumber.Store(uint64(math.Min(float64(m.fs.LastListenBlockNumber()), float64(m.currentNumber.Load())))) // ? m.currentNumber:m.fs.LastListenBlockNumber
580580

581581
if err := m.indexCheck(); err != nil {
582582
return err
@@ -644,7 +644,7 @@ func (m *Monitor) syncLatestBlock() {
644644
m.fs.Flush()
645645
//go m.exit()
646646
elapsed := time.Duration(mclock.Now()) - time.Duration(m.start)
647-
log.Info("Finish sync, listener will be paused", "current", m.currentNumber.Load(), "elapsed", common.PrettyDuration(elapsed), "progress", progress, "end", end, "last", m.lastNumber)
647+
log.Info("Finish sync, listener will be paused", "current", m.currentNumber.Load(), "elapsed", common.PrettyDuration(elapsed), "progress", progress, "end", end, "last", m.lastNumber.Load())
648648
//return
649649
timer.Reset(time.Millisecond * 1000 * 180)
650650
end = false
@@ -655,7 +655,7 @@ func (m *Monitor) syncLatestBlock() {
655655
}
656656
counter++
657657
if counter%10 == 0 {
658-
log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber, "end", end, "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber())
658+
log.Info("Monitor status", "blocks", progress, "current", m.CurrentNumber(), "latest", m.lastNumber.Load(), "end", end, "txs", m.fs.Txs(), "ckp", m.fs.CheckPoint(), "last", m.fs.LastListenBlockNumber())
659659
counter = 0
660660
}
661661
m.fs.Flush()
@@ -701,25 +701,25 @@ func (m *Monitor) syncLastBlock() uint64 {
701701
return 0
702702
}
703703

704-
if currentNumber < m.lastNumber {
705-
log.Warn("Fs sync rollback", "current", currentNumber, "last", m.lastNumber, "offset", m.lastNumber-currentNumber)
704+
if currentNumber < m.lastNumber.Load() {
705+
log.Warn("Fs sync rollback", "current", currentNumber, "last", m.lastNumber.Load(), "offset", m.lastNumber.Load()-currentNumber)
706706
if currentNumber > 65536 {
707-
m.lastNumber = currentNumber - 65536
707+
m.lastNumber.Store(currentNumber - 65536)
708708
} else {
709-
m.lastNumber = 0
709+
m.lastNumber.Store(0)
710710
}
711-
m.startNumber = m.lastNumber
711+
m.startNumber.Store(m.lastNumber.Load())
712712
}
713713

714-
minNumber := m.lastNumber + 1
714+
minNumber := m.lastNumber.Load() + 1
715715
maxNumber := uint64(0)
716716
if currentNumber > delay {
717717
maxNumber = currentNumber - delay
718718
}
719719

720-
if m.lastNumber > currentNumber {
721-
if m.lastNumber > batch {
722-
minNumber = m.lastNumber - batch
720+
if m.lastNumber.Load() > currentNumber {
721+
if m.lastNumber.Load() > batch {
722+
minNumber = m.lastNumber.Load() - batch
723723
}
724724
}
725725

@@ -750,12 +750,12 @@ func (m *Monitor) syncLastBlock() uint64 {
750750
blocks, rpcErr := m.rpcBatchBlockByNumber(i, i+m.scope)
751751
if rpcErr != nil {
752752
log.Error("Sync old block failed", "number", i, "error", rpcErr)
753-
m.lastNumber = i - 1
753+
m.lastNumber.Store(i - 1)
754754
return 0
755755
}
756756
for _, rpcBlock := range blocks {
757757
if err := m.solve(rpcBlock); err != nil {
758-
m.lastNumber = i - 1
758+
m.lastNumber.Store(i - 1)
759759
return 0
760760
}
761761
i++
@@ -777,11 +777,11 @@ func (m *Monitor) syncLastBlock() uint64 {
777777
rpcBlock, rpcErr := m.rpcBlockByNumber(i)
778778
if rpcErr != nil {
779779
log.Error("Sync old block failed", "number", i, "error", rpcErr)
780-
m.lastNumber = i - 1
780+
m.lastNumber.Store(i - 1)
781781
return 0
782782
}
783783
if err := m.solve(rpcBlock); err != nil {
784-
m.lastNumber = i - 1
784+
m.lastNumber.Store(i - 1)
785785
return 0
786786
}
787787
i++
@@ -799,11 +799,11 @@ func (m *Monitor) syncLastBlock() uint64 {
799799
}*/
800800
}
801801
}
802-
m.lastNumber = maxNumber
802+
m.lastNumber.Store(maxNumber)
803803
//if maxNumber-minNumber > batch-1 {
804804
if maxNumber-minNumber > delay {
805805
elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start)
806-
log.Debug("Chain segment frozen", "from", minNumber, "to", maxNumber, "range", uint64(maxNumber-minNumber), "current", uint64(m.CurrentNumber()), "progress", float64(maxNumber)/float64(m.CurrentNumber()), "last", m.lastNumber, "bps", float64(maxNumber)*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA))
806+
log.Debug("Chain segment frozen", "from", minNumber, "to", maxNumber, "range", uint64(maxNumber-minNumber), "current", uint64(m.CurrentNumber()), "progress", float64(maxNumber)/float64(m.CurrentNumber()), "last", m.lastNumber.Load(), "bps", float64(maxNumber)*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA))
807807
}
808808
return uint64(maxNumber - minNumber)
809809
}
@@ -813,7 +813,7 @@ func (m *Monitor) solve(block *types.Block) error {
813813
if i%65536 == 0 {
814814
defer func() {
815815
elapsedA := time.Duration(mclock.Now()) - time.Duration(m.start)
816-
log.Info("Nas monitor", "start", m.startNumber, "max", uint64(m.CurrentNumber()), "last", m.lastNumber, "cur", i, "bps", math.Abs(float64(i)-float64(m.startNumber))*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA), "scope", m.scope, "db", common.PrettyDuration(m.fs.Metrics()), "blocks", len(m.fs.Blocks()), "txs", m.fs.Txs(), "files", len(m.fs.Files()), "root", m.fs.Root())
816+
log.Info("Nas monitor", "start", m.startNumber.Load(), "max", uint64(m.CurrentNumber()), "last", m.lastNumber.Load(), "cur", i, "bps", math.Abs(float64(i)-float64(m.startNumber.Load()))*1000*1000*1000/float64(elapsedA), "elapsed", common.PrettyDuration(elapsedA), "scope", m.scope, "db", common.PrettyDuration(m.fs.Metrics()), "blocks", len(m.fs.Blocks()), "txs", m.fs.Txs(), "files", len(m.fs.Files()), "root", m.fs.Root())
817817
m.fs.SkipPrint()
818818
}()
819819
}

0 commit comments

Comments
 (0)