Skip to content

Commit dcc1e24

Browse files
authored
Fixed the poa and pos consensus stopped producing blocks after operations such as proposal changes (#324)
1 parent 52b1682 commit dcc1e24

7 files changed

Lines changed: 62 additions & 24 deletions

File tree

bcs/consensus/tdpos/schedule.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,11 @@ func (s *tdposSchedule) UpdateProposers(height int64) bool {
210210
return false
211211
}
212212
if !common.AddressEqual(nextProposers, s.validators) {
213-
s.log.Debug("tdpos::UpdateProposers", "origin", s.validators, "proposers", nextProposers)
213+
s.log.Debug("tdpos::UpdateProposers", "origin", s.validators, "proposers", nextProposers, "height", height)
214214
s.validators = nextProposers
215215
return true
216216
}
217+
s.log.Debug("tdpos::UpdateProposers", "origin", s.validators, "height", height)
217218
return false
218219
}
219220

@@ -352,7 +353,7 @@ func (s *tdposSchedule) calHisValidators(height int64) ([]string, error) {
352353
term, pos, blockPos := s.minerScheduling(block.GetTimestamp())
353354
// 往前回溯的最远距离为internal,即该轮term之前最多生产过多少个区块
354355
internal := pos*s.blockNum + blockPos
355-
begin := block.GetHeight() - internal
356+
begin := block.GetHeight() - internal - 1
356357
if begin <= s.startHeight {
357358
begin = s.startHeight
358359
}
@@ -361,7 +362,9 @@ func (s *tdposSchedule) calHisValidators(height int64) ([]string, error) {
361362
if err != nil {
362363
return nil, err
363364
}
364-
s.log.Debug("tdpos::CalculateProposers::target height.", "height", height, "targetHeight", targetHeight, "term", term)
365+
s.log.Debug("tdpos::CalculateProposers::target height.", "inputHeight", height, "targetHeight", targetHeight,
366+
"begin", begin, "end", block.GetHeight(), "term", term, "pos", pos, "blockPos", blockPos, "internal", internal,
367+
"blockNum", s.blockNum, "block.Timestamp", block.GetTimestamp())
365368
return s.calTopKNominator(targetHeight)
366369
}
367370

@@ -378,7 +381,7 @@ func (s *tdposSchedule) binarySearch(begin int64, end int64, term int64) (int64,
378381
return -1, err
379382
}
380383
if midTerm < term && nextMidTerm == term {
381-
return mid + 1, nil
384+
return mid, nil
382385
}
383386
if midTerm < term {
384387
begin = mid + 1

bcs/consensus/tdpos/schedule_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,12 @@ func TestCalHisValidators(t *testing.T) {
178178
return
179179
}
180180
target, _ = s.binarySearch(int64(1), int64(5), int64(2))
181-
if target != 4 {
181+
if target != 3 {
182182
t.Error("binarySearch cal err2.", "target", target)
183183
return
184184
}
185185
target, _ = s.binarySearch(int64(5), int64(6), int64(3))
186-
if target != 6 {
186+
if target != 5 {
187187
t.Error("binarySearch cal err.", "target", target)
188188
return
189189
}
@@ -198,7 +198,7 @@ func TestCalHisValidators(t *testing.T) {
198198
return
199199
}
200200
target, _ = s.binarySearch(int64(5), int64(11), int64(5))
201-
if target != 8 {
201+
if target != 7 {
202202
t.Error("binarySearch cal err.", "target", target)
203203
return
204204
}

bcs/consensus/tdpos/tdpos.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,9 @@ func (tp *tdposConsensus) CheckMinerMatch(ctx xcontext.XContext, block cctx.Bloc
177177
return false, err
178178
}
179179
if wantProposers[pos] != string(block.GetProposer()) {
180-
tp.log.Error("consensus:tdpos:CheckMinerMatch: invalid proposer", "want", wantProposers[pos], "have", string(block.GetProposer()))
180+
tp.log.Error("consensus:tdpos:CheckMinerMatch: invalid proposer",
181+
"want", wantProposers[pos], "have", string(block.GetProposer()),
182+
"wantProposers", wantProposers, "pos", pos)
181183
return false, ErrInvalidProposer
182184
}
183185

kernel/consensus/base/driver/chained-bft/saftyrules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ func (s *DefaultSaftyRules) CheckProposal(proposal, parent storage.QuorumCertInt
145145

146146
// 检查justify的所有vote签名
147147
justifySigns := parent.GetSignsInfo()
148+
s.Log.Debug("DefaultSaftyRules::CheckProposal", "parent", parent, "justifyValidators", justifyValidators)
148149
validCnt := 0
149150
for _, v := range justifySigns {
150151
if !isInSlice(v.GetAddress(), justifyValidators) {

kernel/consensus/base/driver/chained-bft/smr.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,12 @@ func (s *Smr) ProcessProposal(viewNumber int64, proposalID []byte, parentID []by
379379
s.log.Error("smr::ProcessProposal::NewMessage error")
380380
return ErrP2PInternalErr
381381
}
382+
382383
go s.p2p.SendMessage(createNewBCtx(), netMsg, p2p.WithAccounts(s.removeLocalValidator(validatesIpInfo)))
384+
s.log.Debug("smr::ProcessProposal::proposal", "localAddress", s.address, "validatesIpInfo", validatesIpInfo,
385+
"ProposalView", proposal.ProposalView, "ProposalId", utils.F(proposal.ProposalId),
386+
"Timestamp", proposal.Timestamp, "JustifyQC", proposal.JustifyQC)
387+
383388
s.localProposal.Store(utils.F(proposalID), proposal.Timestamp)
384389
// 若为单候选人情况,则此处需要特殊处理,矿工需要给自己提前签名
385390
if len(validatesIpInfo) == 1 {
@@ -518,6 +523,9 @@ func (s *Smr) handleReceivedProposal(msg *xuperp2p.XuperMessage) {
518523
// 此处如果失败,仍会执行下层逻辑,因为是多个节点通知该轮Leader,因此若发不出去仍可继续运行
519524
if leader != "" && netMsg != nil && leader != s.address {
520525
go s.p2p.SendMessage(createNewBCtx(), netMsg, p2p.WithAccounts([]string{leader}))
526+
s.log.Debug("smr::handleReceivedProposal::proposal", "localAddress", s.address, "leader", leader,
527+
"ProposalView", newProposalMsg.ProposalView, "ProposalId", utils.F(newProposalMsg.ProposalId),
528+
"Timestamp", newProposalMsg.Timestamp, "JustifyQC", newProposalMsg.JustifyQC)
521529
}
522530
}
523531

@@ -556,7 +564,7 @@ func (s *Smr) handleReceivedProposal(msg *xuperp2p.XuperMessage) {
556564
// 6.发送一个vote消息给下一个Leader
557565
nextLeader := s.election.GetLeader(s.pacemaker.GetCurrentView() + 1)
558566
if nextLeader == "" {
559-
s.log.Debug("smr::handleReceivedProposal::empty next leader", "next round", s.pacemaker.GetCurrentView()+1)
567+
s.log.Warn("smr::handleReceivedProposal::empty next leader", "next round", s.pacemaker.GetCurrentView()+1)
560568
return
561569
}
562570
s.voteProposal(newProposalMsg.GetProposalId(), newVote, newLedgerInfo, nextLeader)

kernel/consensus/base/driver/chained-bft/smr_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func NewSMR(node string, log logs.Logger, p2p network.Network, t *testing.T) *Sm
108108
saftyrules := &DefaultSaftyRules{
109109
Crypto: cryptoClient,
110110
QcTree: q,
111+
Log: log,
111112
}
112113
election := &ElectionA{
113114
addrs: []string{NodeA, NodeB, NodeC},

kernel/engines/xuperos/miner/miner.go

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
const (
2626
tickOnCalcBlock = time.Second
27-
syncOnstatusChangeTimeout = 1 * time.Minute
27+
syncOnStatusChangeTimeout = 1 * time.Minute
2828

2929
statusFollowing = 0
3030
statusMining = 1
@@ -65,21 +65,26 @@ func (t *Miner) ProcBlock(ctx xctx.XContext, block *lpb.InternalBlock) error {
6565
return nil
6666
}
6767

68+
// Start
6869
// 启动矿工,周期检查矿工身份
69-
// 同一时间,矿工状态是唯一的。0:休眠中 1:同步区块中 2:打包区块中
70+
// 同一时间,矿工状态是唯一的
71+
// 0:休眠中 1:同步区块中 2:打包区块中
7072
func (t *Miner) Start() {
73+
var err error
74+
7175
// 用于监测退出
7276
t.exitWG.Add(1)
7377
defer t.exitWG.Done()
7478

75-
var err error
79+
// 节点初始状态为同步节点
7680
t.status = statusFollowing
7781

82+
// 开启挖矿前先同步区块
7883
ctx := &xctx.BaseCtx{
7984
XLog: t.log,
8085
Timer: timer.NewXTimer(),
8186
}
82-
t.syncWithNeighbors(ctx)
87+
_ = t.syncWithNeighbors(ctx)
8388

8489
// 启动矿工循环
8590
for !t.IsExit() {
@@ -124,6 +129,7 @@ func (t *Miner) step() error {
124129
Timer: timer.NewXTimer(),
125130
}
126131

132+
// 账本和状态机最新区块id不一致,需要进行一次同步
127133
if !bytes.Equal(ledgerTipId, stateTipId) {
128134
err := t.ctx.State.Walk(ledgerTipId, false)
129135
if err != nil {
@@ -136,37 +142,51 @@ func (t *Miner) step() error {
136142
ctx.GetLog().Trace("miner step", "ledgerTipHeight", ledgerTipHeight, "ledgerTipId",
137143
utils.F(ledgerTipId), "stateTipId", utils.F(stateTipId))
138144

145+
// 如果上次角色是非矿工,则尝试同步网络最新区块
146+
// 注意:这里出现错误也要继续执行,防止恶意节点错误出块导致流程无法继续执行
147+
if t.status == statusFollowing {
148+
err := t.syncWithValidators(ctx, syncOnStatusChangeTimeout)
149+
ctx.GetLog().Trace("miner syncWithValidators before CompeteMaster", "originTipHeight", ledgerTipHeight,
150+
"currentLedgerHeight", t.ctx.Ledger.GetMeta().TrunkHeight, "err", err)
151+
trace("syncUpValidators")
152+
}
153+
139154
// 通过共识检查矿工身份
140155
isMiner, isSync, err := t.ctx.Consensus.CompeteMaster(ledgerTipHeight + 1)
141156
trace("competeMaster")
142157
ctx.GetLog().Trace("compete master result", "height", ledgerTipHeight+1, "isMiner", isMiner, "isSync", isSync, "err", err)
143158
if err != nil {
144159
return err
145160
}
146-
// 如需要同步,尝试同步网络最新区块
147-
if isMiner && isSync {
148-
err = t.syncWithValidators(ctx, syncOnstatusChangeTimeout)
149-
if err != nil {
150-
return err
151-
}
152-
}
153-
trace("syncUpValidators")
154161

155162
// 如果是矿工,出块
156163
if isMiner {
157-
if t.status == statusFollowing {
164+
if t.status == statusFollowing || isSync {
158165
ctx.GetLog().Info("miner change follow=>miner",
159166
"miner", t.ctx.Address.Address,
160167
"netAddr", t.ctx.EngCtx.Net.PeerInfo().Id,
161168
"height", t.ctx.Ledger.GetMeta().GetTrunkHeight(),
162169
)
170+
163171
// 在由非矿工向矿工切换的这次"边沿触发",主动向所有的验证集合的最长链进行一次区块同步
164-
err = t.syncWithValidators(ctx, syncOnstatusChangeTimeout)
172+
err = t.syncWithValidators(ctx, syncOnStatusChangeTimeout)
165173
if err != nil {
174+
ctx.GetLog().Error("miner change follow=>miner syncWithValidators failed", "err", err)
166175
return err
167176
}
177+
178+
// 由于同步了最长链,所以这里需要检查链是否增长
179+
// 由于pos和poa类共识依赖账本高度来判断状态,如果链发生变化则表明CompeteMaster的结果需要重新根据当前最新高度计算
180+
if ledgerTipHeight != t.ctx.Ledger.GetMeta().TrunkHeight {
181+
ctx.GetLog().Trace("miner change follow=>miner", "originTipHeight", ledgerTipHeight, "currentLedgerHeight",
182+
t.ctx.Ledger.GetMeta().TrunkHeight, "isMiner", isMiner, "isSync", isSync)
183+
return nil
184+
}
185+
trace("syncUpValidators")
168186
}
169187
t.status = statusMining
188+
189+
// 开始挖矿
170190
err = t.mining(ctx)
171191
if err != nil {
172192
return err
@@ -192,7 +212,7 @@ func (t *Miner) step() error {
192212
return nil
193213
}
194214

195-
// 挖矿生产区块
215+
// mining 挖矿生产区块
196216
func (t *Miner) mining(ctx xctx.XContext) error {
197217
ctx.GetLog().Debug("mining start.")
198218

@@ -213,6 +233,7 @@ func (t *Miner) mining(ctx xctx.XContext) error {
213233
}
214234
// 重置高度
215235
height = t.ctx.Ledger.GetMeta().TrunkHeight + 1
236+
ctx.GetLog().Debug("truncateTarget result", "newHeight", height)
216237
}
217238

218239
// 2.打包区块
@@ -249,6 +270,8 @@ func (t *Miner) mining(ctx xctx.XContext) error {
249270
"blockId", utils.F(block.GetBlockid()))
250271
return err
251272
}
273+
274+
// 5.可插拔共识,根据区块高度确认是否需要切换升级共识实例
252275
err = t.ctx.Consensus.SwitchConsensus(block.Height)
253276
if err != nil {
254277
ctx.GetLog().Warn("SwitchConsensus failed", "bcname", t.ctx.BCName,

0 commit comments

Comments
 (0)