Skip to content

Commit dadd739

Browse files
authored
fix data race in bft deliverer (#5464)
Signed-off-by: Fedor Partanskiy <fredprtnsk@gmail.com>
1 parent 6a67397 commit dadd739

2 files changed

Lines changed: 97 additions & 90 deletions

File tree

common/deliverclient/blocksprovider/bft_deliverer.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (d *BFTDeliverer) DeliverBlocks() {
177177
// waiting for it to be consumed. A block receiver is created within.
178178
d.fetchErrorsC = make(chan error, 1)
179179
source := d.fetchSources[d.fetchSourceIndex]
180-
go d.FetchBlocks(source)
180+
go d.FetchBlocks(source, d.fetchErrorsC)
181181

182182
// Create and start a censorship monitor.
183183
d.censorshipMonitor = d.CensorshipDetectorFactory.Create(
@@ -326,28 +326,28 @@ func (d *BFTDeliverer) Stop() {
326326
d.blockReceiver.Stop()
327327
}
328328

329-
func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) {
329+
func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint, fetchErrorsC chan<- error) {
330330
d.Logger.Debugf("Trying to fetch blocks from orderer: %s", source.Address)
331331

332332
for {
333333
select {
334334
case <-d.DoneC:
335-
d.fetchErrorsC <- &ErrStopping{Message: "stopping"}
335+
fetchErrorsC <- &ErrStopping{Message: "stopping"}
336336
return
337337
default:
338338
}
339339

340340
seekInfoEnv, err := d.requester.SeekInfoBlocksFrom(d.getNextBlockNumber())
341341
if err != nil {
342342
d.Logger.Errorf("Could not create a signed Deliver SeekInfo message, something is critically wrong: %s", err)
343-
d.fetchErrorsC <- &ErrFatal{Message: fmt.Sprintf("could not create a signed Deliver SeekInfo message: %s", err)}
343+
fetchErrorsC <- &ErrFatal{Message: fmt.Sprintf("could not create a signed Deliver SeekInfo message: %s", err)}
344344
return
345345
}
346346

347347
deliverClient, cancel, err := d.requester.Connect(seekInfoEnv, source)
348348
if err != nil {
349349
d.Logger.Warningf("Could not connect to ordering service: %s", err)
350-
d.fetchErrorsC <- errors.Wrapf(err, "could not connect to ordering service, orderer-address: %s", source.Address)
350+
fetchErrorsC <- errors.Wrapf(err, "could not connect to ordering service, orderer-address: %s", source.Address)
351351
return
352352
}
353353

@@ -364,6 +364,9 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) {
364364
}
365365

366366
d.mutex.Lock()
367+
if d.blockReceiver != nil {
368+
d.blockReceiver.Stop()
369+
}
367370
d.blockReceiver = blockRcv
368371
d.mutex.Unlock()
369372

@@ -378,10 +381,10 @@ func (d *BFTDeliverer) FetchBlocks(source *orderers.Endpoint) {
378381
d.Logger.Debugf("BlockReceiver stopped while processing incoming blocks: %s", errProc)
379382
case *errRefreshEndpoint:
380383
d.Logger.Infof("Endpoint refreshed while processing incoming blocks: %s", errProc)
381-
d.fetchErrorsC <- errProc
384+
fetchErrorsC <- errProc
382385
default:
383386
d.Logger.Warningf("Failure while processing incoming blocks: %s", errProc)
384-
d.fetchErrorsC <- errProc
387+
fetchErrorsC <- errProc
385388
}
386389

387390
return

0 commit comments

Comments
 (0)