@@ -38,8 +38,8 @@ import (
3838 . "github.com/onsi/gomega"
3939 "github.com/onsi/gomega/types"
4040 "github.com/pkg/errors"
41- raft "go.etcd.io/etcd/ raft/v3"
42- "go.etcd.io/etcd/ raft/v3/raftpb"
41+ raft "go.etcd.io/raft/v3"
42+ "go.etcd.io/raft/v3/raftpb"
4343 "go.uber.org/zap"
4444)
4545
@@ -152,7 +152,7 @@ var _ = Describe("Chain", func() {
152152 walDir = path .Join (dataDir , "wal" )
153153 snapDir = path .Join (dataDir , "snapshot" )
154154
155- observeC = make (chan raft.SoftState , 1 )
155+ observeC = make (chan raft.SoftState , 3 )
156156
157157 support = & consensusmocks.FakeConsenterSupport {}
158158 support .ChannelIDReturns (channelID )
@@ -377,7 +377,7 @@ var _ = Describe("Chain", func() {
377377 // 2. a SoftState and an associated increase of term in the HardState due to the node being elected leader
378378 // 3. a block being committed
379379 // The duration being emitted is zero since we don't tick the fake clock during this time
380- Expect (fakeFields .fakeDataPersistDuration .ObserveCallCount ()).Should (Equal ( 3 ))
380+ Expect (fakeFields .fakeDataPersistDuration .ObserveCallCount ()).Should (BeNumerically ( ">=" , 3 ))
381381 Expect (fakeFields .fakeDataPersistDuration .ObserveArgsForCall (0 )).Should (Equal (float64 (0 )))
382382 Expect (fakeFields .fakeDataPersistDuration .ObserveArgsForCall (1 )).Should (Equal (float64 (0 )))
383383 Expect (fakeFields .fakeDataPersistDuration .ObserveArgsForCall (2 )).Should (Equal (float64 (0 )))
@@ -395,7 +395,7 @@ var _ = Describe("Chain", func() {
395395 Eventually (support .WriteBlockCallCount , LongEventualTimeout ).Should (Equal (2 ))
396396 Expect (fakeFields .fakeCommittedBlockNumber .SetCallCount ()).Should (Equal (3 )) // incl. initial call
397397 Expect (fakeFields .fakeCommittedBlockNumber .SetArgsForCall (2 )).Should (Equal (float64 (2 )))
398- Expect (fakeFields .fakeDataPersistDuration .ObserveCallCount ()).Should (Equal ( 4 ))
398+ Expect (fakeFields .fakeDataPersistDuration .ObserveCallCount ()).Should (BeNumerically ( ">=" , 4 ))
399399 Expect (fakeFields .fakeDataPersistDuration .ObserveArgsForCall (3 )).Should (Equal (float64 (0 )))
400400 })
401401
@@ -769,7 +769,7 @@ var _ = Describe("Chain", func() {
769769 })
770770
771771 It ("replays blocks from committed entries" , func () {
772- c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
772+ c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
773773 c .init ()
774774 c .Start ()
775775 defer c .Halt ()
@@ -798,7 +798,7 @@ var _ = Describe("Chain", func() {
798798
799799 It ("only replays blocks after Applied index" , func () {
800800 raftMetadata .RaftIndex = m1 .RaftIndex
801- c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
801+ c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
802802 c .support .WriteBlock (support .WriteBlockArgsForCall (0 ))
803803
804804 c .init ()
@@ -824,7 +824,7 @@ var _ = Describe("Chain", func() {
824824
825825 It ("does not replay any block if already in sync" , func () {
826826 raftMetadata .RaftIndex = m2 .RaftIndex
827- c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
827+ c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
828828 c .init ()
829829 c .Start ()
830830 defer c .Halt ()
@@ -956,7 +956,7 @@ var _ = Describe("Chain", func() {
956956
957957 chain .Halt ()
958958
959- c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
959+ c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
960960 c .init ()
961961
962962 signal := make (chan struct {})
@@ -1008,7 +1008,7 @@ var _ = Describe("Chain", func() {
10081008
10091009 chain .Halt ()
10101010
1011- c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
1011+ c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
10121012 c .init ()
10131013 c .Start ()
10141014 defer c .Halt ()
@@ -1043,7 +1043,7 @@ var _ = Describe("Chain", func() {
10431043 chain .Halt ()
10441044
10451045 raftMetadata .RaftIndex = m .RaftIndex
1046- c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
1046+ c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
10471047 c .opts .SnapshotIntervalSize = 1
10481048
10491049 c .init ()
@@ -1072,7 +1072,7 @@ var _ = Describe("Chain", func() {
10721072 m = & raftprotos.BlockMetadata {}
10731073 proto .Unmarshal (metadata , m )
10741074 raftMetadata .RaftIndex = m .RaftIndex
1075- cx := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
1075+ cx := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
10761076
10771077 cx .init ()
10781078 cx .Start ()
@@ -1136,7 +1136,7 @@ var _ = Describe("Chain", func() {
11361136 chain .Halt ()
11371137
11381138 raftMetadata .RaftIndex = m .RaftIndex
1139- c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
1139+ c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
11401140 cnt := support .WriteBlockCallCount ()
11411141 for i := range cnt {
11421142 c .support .WriteBlock (support .WriteBlockArgsForCall (i ))
@@ -1194,7 +1194,7 @@ var _ = Describe("Chain", func() {
11941194 chain .Halt ()
11951195
11961196 raftMetadata .RaftIndex = m .RaftIndex
1197- c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
1197+ c := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
11981198 // replay block 1&2
11991199 c .support .WriteBlock (support .WriteBlockArgsForCall (0 ))
12001200 c .support .WriteBlock (support .WriteBlockArgsForCall (1 ))
@@ -1241,7 +1241,7 @@ var _ = Describe("Chain", func() {
12411241 chain .Halt ()
12421242
12431243 raftMetadata .RaftIndex = m .RaftIndex
1244- c1 := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil )
1244+ c1 := newChain (10 * time .Second , channelID , dataDir , 1 , raftMetadata , consenters , cryptoProvider , nil , nil , logger )
12451245 cnt := support .WriteBlockCallCount ()
12461246 for i := range cnt {
12471247 c1 .support .WriteBlock (support .WriteBlockArgsForCall (i ))
@@ -1433,7 +1433,7 @@ var _ = Describe("Chain", func() {
14331433 configEnv = newConfigEnv (channelID , common .HeaderType_CONFIG , newConfigUpdateEnv (channelID , nil , value ))
14341434
14351435 fakeHaltCallbacker = & mocks.HaltCallbacker {}
1436- network = createNetwork (timeout , channelID , dataDir , raftMetadata , consenters , cryptoProvider , tlsCA , fakeHaltCallbacker .HaltCallback )
1436+ network = createNetwork (timeout , channelID , dataDir , raftMetadata , consenters , cryptoProvider , tlsCA , fakeHaltCallbacker .HaltCallback , logger )
14371437 c1 , c2 = network .chains [1 ], network .chains [2 ]
14381438 c1 .cutter .CutNext = true
14391439 network .init ()
@@ -1652,7 +1652,7 @@ var _ = Describe("Chain", func() {
16521652 },
16531653 }
16541654
1655- network = createNetwork (timeout , channelID , dataDir , raftMetadata , consenters , cryptoProvider , tlsCA , nil )
1655+ network = createNetwork (timeout , channelID , dataDir , raftMetadata , consenters , cryptoProvider , tlsCA , nil , logger )
16561656 c1 = network .chains [1 ]
16571657 c2 = network .chains [2 ]
16581658 c3 = network .chains [3 ]
@@ -2020,7 +2020,7 @@ var _ = Describe("Chain", func() {
20202020 raftmeta , err := etcdraft .ReadBlockMetadata (meta , nil )
20212021 Expect (err ).NotTo (HaveOccurred ())
20222022
2023- c4 := newChain (timeout , channelID , dataDir , 4 , raftmeta , consenters , cryptoProvider , nil , nil )
2023+ c4 := newChain (timeout , channelID , dataDir , 4 , raftmeta , consenters , cryptoProvider , nil , nil , logger )
20242024 // if we join a node to existing network, it MUST already obtained blocks
20252025 // till the config block that adds this node to cluster.
20262026 c4 .support .WriteBlock (c1 .support .WriteBlockArgsForCall (0 ))
@@ -2143,7 +2143,7 @@ var _ = Describe("Chain", func() {
21432143 raftmeta , err := etcdraft .ReadBlockMetadata (meta , nil )
21442144 Expect (err ).NotTo (HaveOccurred ())
21452145
2146- c4 := newChain (timeout , channelID , dataDir , 4 , raftmeta , consenters , cryptoProvider , nil , nil )
2146+ c4 := newChain (timeout , channelID , dataDir , 4 , raftmeta , consenters , cryptoProvider , nil , nil , logger )
21472147 // if we join a node to existing network, it MUST already obtained blocks
21482148 // till the config block that adds this node to cluster.
21492149 c4 .support .WriteBlock (c1 .support .WriteBlockArgsForCall (0 ))
@@ -2218,7 +2218,7 @@ var _ = Describe("Chain", func() {
22182218 raftmeta , err := etcdraft .ReadBlockMetadata (meta , nil )
22192219 Expect (err ).NotTo (HaveOccurred ())
22202220
2221- c4 := newChain (timeout , channelID , dataDir , 4 , raftmeta , consenters , cryptoProvider , nil , nil )
2221+ c4 := newChain (timeout , channelID , dataDir , 4 , raftmeta , consenters , cryptoProvider , nil , nil , logger )
22222222 // if we join a node to existing network, it MUST already obtained blocks
22232223 // till the config block that adds this node to cluster.
22242224 c4 .support .WriteBlock (c1 .support .WriteBlockArgsForCall (0 ))
@@ -2366,7 +2366,7 @@ var _ = Describe("Chain", func() {
23662366 raftmeta , err := etcdraft .ReadBlockMetadata (meta , nil )
23672367 Expect (err ).NotTo (HaveOccurred ())
23682368
2369- c4 := newChain (timeout , channelID , dataDir , 4 , raftmeta , consenters , cryptoProvider , nil , nil )
2369+ c4 := newChain (timeout , channelID , dataDir , 4 , raftmeta , consenters , cryptoProvider , nil , nil , logger )
23702370 // if we join a node to existing network, it MUST already obtained blocks
23712371 // till the config block that adds this node to cluster.
23722372 c4 .support .WriteBlock (c1 .support .WriteBlockArgsForCall (0 ))
@@ -3380,6 +3380,7 @@ func newChain(
33803380 cryptoProvider bccsp.BCCSP ,
33813381 support * consensusmocks.FakeConsenterSupport ,
33823382 haltCallback func (),
3383+ logger * flogging.FabricLogger ,
33833384) * chain {
33843385 rpc := & mocks.FakeRPC {}
33853386 clock := fakeclock .NewFakeClock (time .Now ())
@@ -3399,7 +3400,7 @@ func newChain(
33993400 BlockMetadata : raftMetadata ,
34003401 LeaderCheckInterval : 500 * time .Millisecond ,
34013402 Consenters : consenters ,
3402- Logger : flogging . NewFabricLogger ( zap . NewExample ()) ,
3403+ Logger : logger ,
34033404 MemoryStorage : storage ,
34043405 WALDir : path .Join (dataDir , "wal" ),
34053406 SnapDir : path .Join (dataDir , "snapshot" ),
@@ -3416,9 +3417,9 @@ func newChain(
34163417 support .BlockCutterReturns (cutter )
34173418
34183419 // upon leader change, lead is reset to 0 before set to actual
3419- // new leader, i.e. 1 -> 0 -> 2. Therefore 2 numbers will be
3420- // sent on this chan, so we need size to be 2
3421- observe := make (chan raft.SoftState , 2 )
3420+ // new leader, i.e. 1 -> 0 -> 2. Therefore 3 numbers will be
3421+ // sent on this chan, so we need size to be 3
3422+ observe := make (chan raft.SoftState , 3 )
34223423
34233424 configurator := & mocks.FakeConfigurator {}
34243425 puller := & mocks.FakeBlockPuller {}
@@ -3708,6 +3709,7 @@ func createNetwork(
37083709 cryptoProvider bccsp.BCCSP ,
37093710 tlsCA tlsgen.CA ,
37103711 haltCallback func (),
3712+ logger * flogging.FabricLogger ,
37113713) * network {
37123714 n := & network {
37133715 chains : make (map [uint64 ]* chain ),
@@ -3725,7 +3727,7 @@ func createNetwork(
37253727 support .SharedConfigReturns (mockOrdererWithBatchTimeout (timeout , nil ))
37263728 mockOrdererConfig := mockOrdererWithTLSRootCert (timeout , nil , tlsCA )
37273729 support .SharedConfigReturns (mockOrdererConfig )
3728- n .addChain (newChain (timeout , channel , dir , nodeID , m , consenters , cryptoProvider , support , haltCallback ))
3730+ n .addChain (newChain (timeout , channel , dir , nodeID , m , consenters , cryptoProvider , support , haltCallback , logger ))
37293731 }
37303732
37313733 return n
@@ -3870,8 +3872,10 @@ func (n *network) elect(id uint64) {
38703872
38713873 // Send node an artificial MsgTimeoutNow to emulate leadership transfer.
38723874 fmt .Fprintf (GinkgoWriter , "Send artificial MsgTimeoutNow to elect node %d\n " , id )
3873- candidate .Consensus (& orderer.ConsensusRequest {Payload : protoutil .MarshalOrPanic (& raftpb.Message {Type : raftpb .MsgTimeoutNow , To : id })}, 0 )
3874- Eventually (candidate .observe , LongEventualTimeout ).Should (Receive (StateEqual (id , raft .StateLeader )))
3875+ Eventually (func () <- chan raft.SoftState {
3876+ candidate .Consensus (& orderer.ConsensusRequest {Payload : protoutil .MarshalOrPanic (& raftpb.Message {Type : raftpb .MsgTimeoutNow , To : id })}, 0 )
3877+ return candidate .observe
3878+ }).WithTimeout (LongEventualTimeout ).Should (Receive (StateEqual (id , raft .StateLeader )))
38753879
38763880 n .Lock ()
38773881 n .leader = id
0 commit comments