Skip to content

Commit 9d6f466

Browse files
authored
Merge pull request #3873 from oasisprotocol/kostko/stable/20.12.x/cons-graceful-halt
[BACKPORT/20.12.x] go/consensus: Gracefully handle halt
2 parents 76f75f2 + 2098172 commit 9d6f466

15 files changed

Lines changed: 158 additions & 50 deletions

File tree

.changelog/3755.bugfix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
go/consensus: Move upgrade logic to governance, gracefully handle halt

.changelog/3873.bugfix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
go/oasis-node: Dump correct block height on halt

.changelog/3877.internal.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
go: Ignore jwt-go vulns since we're not using the features

go/.nancy-ignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ CVE-2018-17848
1313
CVE-2020-15114
1414
CVE-2020-15136
1515
CVE-2020-15115
16+
CVE-2020-26160 # Until viper and etcd/prometheus are upgraded to not need jwt-go.

go/common/grpc/grpc.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const (
3636

3737
maxRecvMsgSize = 104857600 // 100 MiB
3838
maxSendMsgSize = 104857600 // 100 MiB
39+
40+
gracefulStopWaitPeriod = 5 * time.Second
3941
)
4042

4143
var (
@@ -506,7 +508,19 @@ func (s *Server) Stop() {
506508
}
507509
default:
508510
}
509-
s.server.GracefulStop() // Repeated calls are ok.
511+
512+
// Attempt to stop gracefully, but if that doesn't work, stop forcibly.
513+
gracefulCh := make(chan struct{})
514+
go func() {
515+
s.server.GracefulStop()
516+
close(gracefulCh)
517+
}()
518+
select {
519+
case <-gracefulCh:
520+
case <-time.After(gracefulStopWaitPeriod):
521+
s.Logger.Warn("graceful stop failed, forcing stop")
522+
s.server.Stop()
523+
}
510524
s.server = nil
511525
}
512526
}

go/consensus/api/api.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,16 +210,18 @@ type Backend interface {
210210
GetAddresses() ([]node.ConsensusAddress, error)
211211
}
212212

213+
// HaltHook is a function that gets called when consensus needs to halt for some reason.
214+
type HaltHook func(ctx context.Context, blockHeight int64, epoch epochtime.EpochTime, err error)
215+
213216
// ServicesBackend is an interface for consensus backends which indicate support for
214217
// communicating with consensus services.
215218
//
216219
// In case the feature is absent, these methods may return nil or ErrUnsupported.
217220
type ServicesBackend interface {
218221
ClientBackend
219222

220-
// RegisterHaltHook registers a function to be called when the
221-
// consensus Halt epoch height is reached.
222-
RegisterHaltHook(func(ctx context.Context, blockHeight int64, epoch epochtime.EpochTime))
223+
// RegisterHaltHook registers a function to be called when the consensus needs to halt.
224+
RegisterHaltHook(hook HaltHook)
223225

224226
// SubmissionManager returns the transaction submission manager.
225227
SubmissionManager() SubmissionManager

go/consensus/tendermint/abci/mux.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (a *ApplicationServer) Register(app api.Application) error {
133133

134134
// RegisterHaltHook registers a function to be called when the
135135
// consensus Halt epoch height is reached.
136-
func (a *ApplicationServer) RegisterHaltHook(hook func(ctx context.Context, blockHeight int64, epoch epochtime.EpochTime)) {
136+
func (a *ApplicationServer) RegisterHaltHook(hook consensus.HaltHook) {
137137
a.mux.registerHaltHook(hook)
138138
}
139139

@@ -216,7 +216,7 @@ type abciMux struct {
216216
lastBeginBlock int64
217217
currentTime time.Time
218218

219-
haltHooks []func(context.Context, int64, epochtime.EpochTime)
219+
haltHooks []consensus.HaltHook
220220

221221
// invalidatedTxs maps transaction hashes (hash.Hash) to a subscriber
222222
// waiting for that transaction to become invalid.
@@ -255,7 +255,7 @@ func (mux *abciMux) watchInvalidatedTx(txHash hash.Hash) (<-chan error, pubsub.C
255255
return resultCh, sub, nil
256256
}
257257

258-
func (mux *abciMux) registerHaltHook(hook func(context.Context, int64, epochtime.EpochTime)) {
258+
func (mux *abciMux) registerHaltHook(hook consensus.HaltHook) {
259259
mux.Lock()
260260
defer mux.Unlock()
261261

@@ -358,6 +358,13 @@ func (mux *abciMux) InitChain(req types.RequestInitChain) types.ResponseInitChai
358358
return resp
359359
}
360360

361+
func (mux *abciMux) dispatchHaltHooks(blockHeight int64, currentEpoch epochtime.EpochTime, err error) {
362+
for _, hook := range mux.haltHooks {
363+
hook(mux.state.ctx, blockHeight, currentEpoch, err)
364+
}
365+
mux.logger.Debug("halt hook dispatch complete")
366+
}
367+
361368
func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock {
362369
blockHeight := mux.state.BlockHeight()
363370

@@ -396,14 +403,10 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
396403
case nil:
397404
// Everything ok.
398405
case upgrade.ErrStopForUpgrade:
399-
// Stop for upgrade -- but dispatch halt hooks first.
400-
mux.logger.Debug("dispatching halt hooks before stopping for upgrade")
401-
for _, hook := range mux.haltHooks {
402-
hook(mux.state.ctx, blockHeight, currentEpoch)
403-
}
404-
mux.logger.Debug("halt hook dispatch complete")
405-
406-
panic("mux: reached upgrade epoch")
406+
// Signal graceful stop for upgrade.
407+
mux.logger.Debug("dispatching halt hooks for upgrade")
408+
mux.dispatchHaltHooks(blockHeight, currentEpoch, err)
409+
panic(err)
407410
default:
408411
panic(fmt.Sprintf("mux: error while trying to perform consensus upgrade: %v", err))
409412
}
@@ -418,11 +421,8 @@ func (mux *abciMux) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginB
418421
"block_height", blockHeight,
419422
"epoch", mux.state.haltEpochHeight,
420423
)
421-
mux.logger.Debug("Dispatching halt hooks")
422-
for _, hook := range mux.haltHooks {
423-
hook(mux.state.ctx, blockHeight, mux.state.haltEpochHeight)
424-
}
425-
mux.logger.Debug("Halt hook dispatch complete")
424+
mux.logger.Debug("dispatching halt hooks before halt epoch")
425+
mux.dispatchHaltHooks(blockHeight, currentEpoch, nil)
426426
return types.ResponseBeginBlock{}
427427
case true:
428428
if !mux.state.afterHaltEpoch(ctx) {

go/consensus/tendermint/full/full.go

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"bytes"
66
"context"
77
"fmt"
8+
"math/rand"
89
"path/filepath"
910
"strings"
1011
"sync"
@@ -132,6 +133,9 @@ const (
132133
// NOTE: this is only used during the initial sync.
133134
syncWorkerLastBlockTimeDiffThreshold = 1 * time.Minute
134135

136+
minUpgradeStopWaitPeriod = 5 * time.Second
137+
upgradeStopDelay = 30 * time.Second
138+
135139
// tmSubscriberID is the subscriber identifier used for all internal Tendermint pubsub
136140
// subscriptions. If any other subscriber IDs need to be derived they will be under this prefix.
137141
tmSubscriberID = "oasis-core"
@@ -181,8 +185,10 @@ type fullService struct { // nolint: maligned
181185
isInitialized, isStarted bool
182186
startedCh chan struct{}
183187
syncedCh chan struct{}
188+
quitCh chan struct{}
184189

185-
startFn func() error
190+
startFn func() error
191+
stopOnce sync.Once
186192

187193
nextSubscriberID uint64
188194
}
@@ -219,6 +225,19 @@ func (t *fullService) Start() error {
219225
return fmt.Errorf("tendermint: failed to start service: %w", err)
220226
}
221227

228+
// Make sure the quit channel is closed when the node shuts down.
229+
go func() {
230+
select {
231+
case <-t.quitCh:
232+
case <-t.node.Quit():
233+
select {
234+
case <-t.quitCh:
235+
default:
236+
close(t.quitCh)
237+
}
238+
}
239+
}()
240+
222241
// Start event dispatchers for all the service clients.
223242
t.serviceClientsWg.Add(len(t.serviceClients))
224243
for _, svc := range t.serviceClients {
@@ -247,11 +266,7 @@ func (t *fullService) Start() error {
247266

248267
// Implements service.BackgroundService.
249268
func (t *fullService) Quit() <-chan struct{} {
250-
if !t.started() {
251-
return make(chan struct{})
252-
}
253-
254-
return t.node.Quit()
269+
return t.quitCh
255270
}
256271

257272
// Implements service.BackgroundService.
@@ -266,14 +281,15 @@ func (t *fullService) Stop() {
266281
return
267282
}
268283

269-
t.failMonitor.markCleanShutdown()
270-
if err := t.node.Stop(); err != nil {
271-
t.Logger.Error("Error on stopping node", err)
272-
}
284+
t.stopOnce.Do(func() {
285+
t.failMonitor.markCleanShutdown()
286+
if err := t.node.Stop(); err != nil {
287+
t.Logger.Error("Error on stopping node", err)
288+
}
273289

274-
t.svcMgr.Stop()
275-
t.mux.Stop()
276-
t.node.Wait()
290+
t.svcMgr.Stop()
291+
t.mux.Stop()
292+
})
277293
}
278294

279295
func (t *fullService) Started() <-chan struct{} {
@@ -401,7 +417,7 @@ func (t *fullService) GetGenesisDocument(ctx context.Context) (*genesisAPI.Docum
401417
return t.genesis, nil
402418
}
403419

404-
func (t *fullService) RegisterHaltHook(hook func(context.Context, int64, epochtimeAPI.EpochTime)) {
420+
func (t *fullService) RegisterHaltHook(hook consensusAPI.HaltHook) {
405421
if !t.initialized() {
406422
return
407423
}
@@ -1301,6 +1317,41 @@ func (t *fullService) lazyInit() error {
13011317
t.client = tmcli.New(t.node)
13021318
t.failMonitor = newFailMonitor(t.ctx, t.Logger, t.node.ConsensusState().Wait)
13031319

1320+
// Register a halt hook that handles upgrades gracefully.
1321+
t.RegisterHaltHook(func(ctx context.Context, blockHeight int64, epoch epochtimeAPI.EpochTime, err error) {
1322+
if !errors.Is(err, upgradeAPI.ErrStopForUpgrade) {
1323+
return
1324+
}
1325+
1326+
// Mark this as a clean shutdown and request the node to stop gracefully.
1327+
t.failMonitor.markCleanShutdown()
1328+
1329+
// Wait before stopping to give time for P2P messages to propagate. Sleep for at least
1330+
// minUpgradeStopWaitPeriod or the configured commit timeout.
1331+
t.Logger.Info("waiting a bit before stopping the node for upgrade")
1332+
waitPeriod := minUpgradeStopWaitPeriod
1333+
if tc := t.genesis.Consensus.Parameters.TimeoutCommit; tc > waitPeriod {
1334+
waitPeriod = tc
1335+
}
1336+
time.Sleep(waitPeriod)
1337+
1338+
go func() {
1339+
// Sleep another period so there is some time between when consensus shuts down and
1340+
// when all the other services start shutting down.
1341+
//
1342+
// Randomize the period so that not all nodes shut down at the same time.
1343+
delay := getRandomValueFromInterval(0.5, rand.Float64(), upgradeStopDelay)
1344+
time.Sleep(delay)
1345+
1346+
t.Logger.Info("stopping the node for upgrade")
1347+
t.Stop()
1348+
1349+
// Close the quit channel early to force the node to stop. This is needed because
1350+
// the Tendermint node will otherwise never quit.
1351+
close(t.quitCh)
1352+
}()
1353+
})
1354+
13041355
return nil
13051356
}
13061357

@@ -1462,6 +1513,7 @@ func New(
14621513
dataDir: dataDir,
14631514
startedCh: make(chan struct{}),
14641515
syncedCh: make(chan struct{}),
1516+
quitCh: make(chan struct{}),
14651517
}
14661518

14671519
t.Logger.Info("starting a full consensus node")
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package full
2+
3+
import "time"
4+
5+
// Borrowed from https://github.com/cenkalti/backoff.
6+
7+
// Returns a random value from the following interval:
8+
// [currentInterval - randomizationFactor * currentInterval, currentInterval + randomizationFactor * currentInterval].
9+
func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration {
10+
delta := randomizationFactor * float64(currentInterval)
11+
minInterval := float64(currentInterval) - delta
12+
maxInterval := float64(currentInterval) + delta
13+
14+
// Get a random value from the range [minInterval, maxInterval].
15+
// The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then
16+
// we want a 33% chance for selecting either 1, 2 or 3.
17+
return time.Duration(minInterval + (random * (maxInterval - minInterval + 1)))
18+
}

go/consensus/tendermint/seed/seed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func (srv *seedService) SubmitTxNoWait(ctx context.Context, tx *transaction.Sign
254254
}
255255

256256
// Implements Backend.
257-
func (srv *seedService) RegisterHaltHook(func(ctx context.Context, blockHeight int64, epoch epochtime.EpochTime)) {
257+
func (srv *seedService) RegisterHaltHook(consensus.HaltHook) {
258258
panic(consensus.ErrUnsupported)
259259
}
260260

0 commit comments

Comments
 (0)