diff --git a/consensus/interface.go b/consensus/interface.go index 266d2ff296f..7c6c4ad5714 100644 --- a/consensus/interface.go +++ b/consensus/interface.go @@ -189,7 +189,7 @@ type PeerBlacklistHandler interface { // SigningHandler defines the behaviour of a component that handles multi and single signatures used in consensus operations type SigningHandler interface { Reset(pubKeys []string) error - CreateSignatureShareForPublicKey(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) + CreateSignatureShareForPublicKey(ctx context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) CreateSignatureForPublicKey(message []byte, publicKeyBytes []byte) ([]byte, error) VerifySingleSignature(publicKeyBytes []byte, message []byte, signature []byte) error StoreSignatureShare(index uint16, sig []byte) error diff --git a/consensus/spos/bls/v1/subroundSignature.go b/consensus/spos/bls/v1/subroundSignature.go index 360aba6e48a..d566ff163e4 100644 --- a/consensus/spos/bls/v1/subroundSignature.go +++ b/consensus/spos/bls/v1/subroundSignature.go @@ -73,7 +73,7 @@ func checkNewSubroundSignatureParams( } // doSignatureJob method does the job of the subround Signature -func (sr *subroundSignature) doSignatureJob(_ context.Context) bool { +func (sr *subroundSignature) doSignatureJob(ctx context.Context) bool { if !sr.CanDoSubroundJob(sr.Current()) { return false } @@ -93,6 +93,7 @@ func (sr *subroundSignature) doSignatureJob(_ context.Context) bool { } signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + ctx, sr.GetData(), uint16(selfIndex), sr.GetHeader().GetEpoch(), @@ -116,7 +117,7 @@ func (sr *subroundSignature) doSignatureJob(_ context.Context) bool { } } - return sr.doSignatureJobForManagedKeys() + return sr.doSignatureJobForManagedKeys(ctx) } func (sr *subroundSignature) createAndSendSignatureMessage(signatureShare []byte, pkBytes []byte) bool { @@ -351,7 +352,7 @@ func (sr *subroundSignature) remainingTime() time.Duration { return remainigTime } -func (sr *subroundSignature) doSignatureJobForManagedKeys() bool { +func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) bool { isMultiKeyLeader := sr.IsMultiKeyLeaderInCurrentRound() numMultiKeysSignaturesSent := 0 @@ -371,6 +372,7 @@ func (sr *subroundSignature) doSignatureJobForManagedKeys() bool { } signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + ctx, sr.GetData(), uint16(selfIndex), sr.GetHeader().GetEpoch(), diff --git a/consensus/spos/bls/v1/subroundSignature_test.go b/consensus/spos/bls/v1/subroundSignature_test.go index 03e5101cd5f..79ca52835ce 100644 --- a/consensus/spos/bls/v1/subroundSignature_test.go +++ b/consensus/spos/bls/v1/subroundSignature_test.go @@ -1,6 +1,7 @@ package v1_test import ( + "context" "testing" "github.com/multiversx/mx-chain-core-go/core/check" @@ -360,7 +361,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { err := errors.New("create signature share error") signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return nil, err }, } @@ -370,7 +371,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { assert.False(t, r) signingHandler = &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, } @@ -441,7 +442,7 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) { err := errors.New("create signature share error") signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return nil, err }, } @@ -451,7 +452,7 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) { assert.False(t, r) signingHandler = &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, } diff --git a/consensus/spos/bls/v2/benchmark_send_proof_test.go b/consensus/spos/bls/v2/benchmark_send_proof_test.go index a589e4ae683..33e68e4e4a8 100644 --- a/consensus/spos/bls/v2/benchmark_send_proof_test.go +++ b/consensus/spos/bls/v2/benchmark_send_proof_test.go @@ -1,6 +1,7 @@ package v2_test import ( + "context" "sort" "testing" @@ -171,7 +172,7 @@ func benchmarkSendProof(b *testing.B, numberOfKeys int) { // Create signature shares for all validators (simulating that all have signed) for i := 0; i < len(keys); i++ { - _, err := signingHandler.CreateSignatureShareForPublicKey(dataToBeSigned, uint16(i), 0, []byte(keys[i])) + _, err := signingHandler.CreateSignatureShareForPublicKey(context.TODO(), dataToBeSigned, uint16(i), 0, []byte(keys[i])) require.Nil(b, err) err = srEndRound.SetJobDone(keys[i], bls.SrSignature, true) require.Nil(b, err) @@ -183,7 +184,7 @@ func benchmarkSendProof(b *testing.B, numberOfKeys int) { for i := 0; i < b.N; i++ { // Reset signature state for each iteration for j := 0; j < len(keys); j++ { - _, _ = signingHandler.CreateSignatureShareForPublicKey(dataToBeSigned, uint16(j), 0, []byte(keys[j])) + _, _ = signingHandler.CreateSignatureShareForPublicKey(context.TODO(), dataToBeSigned, uint16(j), 0, []byte(keys[j])) } b.StartTimer() diff --git a/consensus/spos/bls/v2/benchmark_verify_signatures_test.go b/consensus/spos/bls/v2/benchmark_verify_signatures_test.go index 505a83dbcae..dc95dfbd818 100644 --- a/consensus/spos/bls/v2/benchmark_verify_signatures_test.go +++ b/consensus/spos/bls/v2/benchmark_verify_signatures_test.go @@ -114,7 +114,7 @@ func BenchmarkSubroundEndRound_VerifyNodesOnAggSigFailTime(b *testing.B) { sr := initSubroundEndRoundWithContainerAndConsensusState(container, &statusHandler.AppStatusHandlerStub{}, consensusState) for i := 0; i < len(sr.ConsensusGroup()); i++ { - _, err := sr.SigningHandler().CreateSignatureShareForPublicKey(dataToBeSigned, uint16(i), sr.EnableEpochsHandler().GetCurrentEpoch(), []byte(keys[i])) + _, err := sr.SigningHandler().CreateSignatureShareForPublicKey(context.TODO(), dataToBeSigned, uint16(i), sr.EnableEpochsHandler().GetCurrentEpoch(), []byte(keys[i])) require.Nil(b, err) _ = sr.SetJobDone(keys[i], bls.SrSignature, true) } diff --git a/consensus/spos/bls/v2/blsSubroundsFactory.go b/consensus/spos/bls/v2/blsSubroundsFactory.go index 6c76df939c1..48a0d4e4989 100644 --- a/consensus/spos/bls/v2/blsSubroundsFactory.go +++ b/consensus/spos/bls/v2/blsSubroundsFactory.go @@ -218,6 +218,7 @@ func (fct *factory) generateBlockSubround() error { processingThresholdPercent, fct.worker, syncController, + fct.signatureThrottler, ) if err != nil { return err diff --git a/consensus/spos/bls/v2/common.go b/consensus/spos/bls/v2/common.go new file mode 100644 index 00000000000..5d6e55cfd90 --- /dev/null +++ b/consensus/spos/bls/v2/common.go @@ -0,0 +1,30 @@ +package v2 + +import ( + "context" + "fmt" + "time" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-go/consensus/spos" +) + +func checkGoRoutinesThrottler( + ctx context.Context, + signatureThrottler core.Throttler, +) error { + for { + if signatureThrottler.CanProcess() { + break + } + + select { + case <-time.After(timeSpentBetweenChecks): + continue + case <-ctx.Done(): + return fmt.Errorf("%w while checking the throttler", spos.ErrTimeIsOut) + } + } + + return nil +} diff --git a/consensus/spos/bls/v2/export_test.go b/consensus/spos/bls/v2/export_test.go index 58988e0d82f..24b60861d88 100644 --- a/consensus/spos/bls/v2/export_test.go +++ b/consensus/spos/bls/v2/export_test.go @@ -203,7 +203,7 @@ func (sr *subroundBlock) SendBlockBody(body data.BodyHandler, marshalizedBody [] // SendBlockHeader method sends the proposed block header in the subround Block func (sr *subroundBlock) SendBlockHeader(header data.HeaderHandler, marshalizedHeader []byte) bool { - return sr.sendBlockHeader(header, marshalizedHeader) + return sr.sendBlockHeader(context.TODO(), header, marshalizedHeader) } // ComputeSubroundProcessingMetric computes processing metric related to the subround Block @@ -395,3 +395,8 @@ func (sr *subroundEndRound) UpdateNonceDeltaMetrics() { func (sr *subroundBlock) PrepareBlockForExecution(header data.HeaderHandler, body data.BodyHandler) error { return sr.prepareBlockForExecution(header, body) } + +// TriggerCreateSignaturesForManagedKeys - +func (sr *subroundBlock) TriggerCreateSignaturesForManagedKeys(ctx context.Context) bool { + return sr.triggerCreateSignaturesForManagedKeys(ctx) +} diff --git a/consensus/spos/bls/v2/subroundBlock.go b/consensus/spos/bls/v2/subroundBlock.go index 9c6ebeaae06..7f41fa86c5e 100644 --- a/consensus/spos/bls/v2/subroundBlock.go +++ b/consensus/spos/bls/v2/subroundBlock.go @@ -29,6 +29,7 @@ type subroundBlock struct { worker spos.WorkerHandler mutBlockProcessing sync.Mutex syncController spos.NtpSyncControllerHandler + signatureThrottler core.Throttler } // NewSubroundBlock creates a subroundBlock object @@ -37,6 +38,7 @@ func NewSubroundBlock( processingThresholdPercentage int, worker spos.WorkerHandler, syncController spos.NtpSyncControllerHandler, + signatureThrottler core.Throttler, ) (*subroundBlock, error) { err := checkNewSubroundBlockParams(baseSubround) if err != nil { @@ -49,12 +51,16 @@ func NewSubroundBlock( if check.IfNil(syncController) { return nil, ErrNilRoundSyncController } + if check.IfNil(signatureThrottler) { + return nil, spos.ErrNilThrottler + } srBlock := subroundBlock{ Subround: baseSubround, processingThresholdPercentage: processingThresholdPercentage, worker: worker, syncController: syncController, + signatureThrottler: signatureThrottler, } srBlock.Job = srBlock.doBlockJob @@ -142,7 +148,7 @@ func (sr *subroundBlock) doBlockJob(ctx context.Context) bool { return false } - sentWithSuccess := sr.sendBlock(header, body, leader) + sentWithSuccess := sr.sendBlock(ctx, header, body, leader) if !sentWithSuccess { return false } @@ -220,7 +226,12 @@ func printLogMessage(ctx context.Context, baseMessage string, err error) { log.Debug(baseMessage, "error", err.Error()) } -func (sr *subroundBlock) sendBlock(header data.HeaderHandler, body data.BodyHandler, leader string) bool { +func (sr *subroundBlock) sendBlock( + ctx context.Context, + header data.HeaderHandler, + body data.BodyHandler, + leader string, +) bool { marshalledBody, err := sr.Marshalizer().Marshal(body) if err != nil { log.Debug("sendBlock.Marshal: body", "error", err.Error()) @@ -236,7 +247,7 @@ func (sr *subroundBlock) sendBlock(header data.HeaderHandler, body data.BodyHand sr.logBlockSize(marshalledBody, marshalledHeader) headerHash := sr.Hasher().Compute(string(marshalledHeader)) - if !sr.sendBlockBody(body, marshalledBody) || !sr.sendBlockHeader(header, headerHash) { + if !sr.sendBlockBody(body, marshalledBody) || !sr.sendBlockHeader(ctx, header, headerHash) { return false } @@ -313,6 +324,7 @@ func (sr *subroundBlock) sendBlockBody( // sendBlockHeader method sends the proposed block header in the subround Block func (sr *subroundBlock) sendBlockHeader( + ctx context.Context, headerHandler data.HeaderHandler, headerHash []byte, ) bool { @@ -336,6 +348,8 @@ func (sr *subroundBlock) sendBlockHeader( sr.SetData(headerHash) sr.SetHeader(headerHandler) + sr.triggerCreateSignaturesForManagedKeys(ctx) + // log the header output for debugging purposes headerOutput, err := common.PrettifyStruct(headerHandler) if err == nil { @@ -345,6 +359,60 @@ func (sr *subroundBlock) sendBlockHeader( return true } +func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Context) bool { + sigSubroundEndTime := time.Duration(float64(sr.RoundHandler().TimeDuration()) * srSignatureEndTime) + timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sigSubroundEndTime) + + sigCtx, cancel := context.WithTimeout(ctx, timeLeft) + sr.SetSignaturesCtxCancelFunc(cancel) + + for idx, pk := range sr.ConsensusGroup() { + pkBytes := []byte(pk) + if !sr.IsKeyManagedBySelf(pkBytes) { + continue + } + + err := checkGoRoutinesThrottler(ctx, sr.signatureThrottler) + if err != nil { + log.Debug("triggerCreateSignaturesForManagedKeys.checkGoRoutinesThrottler", "err", err) + return false + } + sr.signatureThrottler.StartProcessing() + sr.SignaturesWaitGroup().Add(1) + + go func(sigCtx context.Context, idx int, pk string) { + defer sr.signatureThrottler.EndProcessing() + defer sr.SignaturesWaitGroup().Done() + + select { + case <-sigCtx.Done(): + log.Debug("triggerCreateSignaturesForManagedKeys: context done", "timeLeft", timeLeft) + return + default: + } + + pkBytes := []byte(pk) + currentHash := sr.GetData() + + _, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + sigCtx, + currentHash, + uint16(idx), + sr.GetHeader().GetEpoch(), + pkBytes, + ) + if err != nil { + log.Debug("triggerCreateSignaturesForManagedKeys.CreateSignatureShareForPublicKey", "error", err.Error()) + return + } + }(sigCtx, idx, pk) + } + + log.Debug("step 1: multi keys signatures creation has been triggered") + + return true +} + func (sr *subroundBlock) sendDirectSentTransactions( header data.HeaderHandler, body data.BodyHandler, @@ -643,6 +711,8 @@ func (sr *subroundBlock) receivedBlockHeader(headerHandler data.HeaderHandler) { sr.AddReceivedHeader(headerHandler) + sr.triggerCreateSignaturesForManagedKeys(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), sr.RoundHandler().TimeDuration()) defer cancel() diff --git a/consensus/spos/bls/v2/subroundBlock_test.go b/consensus/spos/bls/v2/subroundBlock_test.go index ef0e5bddafc..7458b508989 100644 --- a/consensus/spos/bls/v2/subroundBlock_test.go +++ b/consensus/spos/bls/v2/subroundBlock_test.go @@ -1,9 +1,11 @@ package v2_test import ( + "context" "errors" "fmt" "math/big" + "sync/atomic" "testing" "time" @@ -19,6 +21,7 @@ import ( "github.com/multiversx/mx-chain-go/consensus/spos" "github.com/multiversx/mx-chain-go/consensus/spos/bls" v2 "github.com/multiversx/mx-chain-go/consensus/spos/bls/v2" + dataRetrieverMock "github.com/multiversx/mx-chain-go/dataRetriever/mock" "github.com/multiversx/mx-chain-go/process/asyncExecution/cache" "github.com/multiversx/mx-chain-go/sharding/nodesCoordinator" "github.com/multiversx/mx-chain-go/testscommon" @@ -80,6 +83,7 @@ func defaultSubroundBlockFromSubround(sr *spos.Subround) (v2.SubroundBlock, erro }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) return srBlock, err @@ -97,6 +101,7 @@ func defaultSubroundBlockWithoutErrorFromSubround(sr *spos.Subround) v2.Subround }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) return srBlock @@ -179,6 +184,7 @@ func TestSubroundBlock_NewSubroundBlockNilSubroundShouldFail(t *testing.T) { v2.ProcessingThresholdPercent, &consensusMocks.SposWorkerMock{}, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) assert.Nil(t, srBlock) assert.Equal(t, spos.ErrNilSubround, err) @@ -334,6 +340,7 @@ func TestSubroundBlock_NewSubroundBlockNilWorkerShouldFail(t *testing.T) { v2.ProcessingThresholdPercent, nil, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) assert.Nil(t, srBlock) assert.Equal(t, spos.ErrNilWorker, err) @@ -351,6 +358,7 @@ func TestSubroundBlock_NewSubroundBlockNilRoundSyncController(t *testing.T) { v2.ProcessingThresholdPercent, &consensusMocks.SposWorkerMock{}, nil, + &dataRetrieverMock.ThrottlerStub{}, ) require.Nil(t, srBlock) require.Equal(t, v2.ErrNilRoundSyncController, err) @@ -610,6 +618,7 @@ func TestSubroundBlock_DoBlockJob(t *testing.T) { }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) providedLeaderSignature := []byte("leader signature") @@ -713,6 +722,7 @@ func TestSubroundBlock_DoBlockJob(t *testing.T) { }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) providedLeaderSignature := []byte("leader signature") @@ -1615,6 +1625,7 @@ func TestSubroundBlock_UpdateConsensusMetrics(t *testing.T) { }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) consensusMetrics.ResetInstanceValues() @@ -1827,6 +1838,146 @@ func TestSubroundBlock_prepareBlockForExecution(t *testing.T) { }) } +func TestSubroundBlock_TriggerCreateSignaturesForManagedKeys(t *testing.T) { + t.Parallel() + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + numMultiKeysSignaturesCreated := int32(0) + + signingHandler := &consensusMocks.SigningHandlerStub{ + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) + return []byte("SIG"), nil + }, + } + container.SetSigningHandler(signingHandler) + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + + srBlock, _ := v2.NewSubroundBlock( + sr, + v2.ProcessingThresholdPercent, + &consensusMocks.SposWorkerMock{}, + &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, + ) + + sr.SetHeader(&block.Header{}) + sr.SetSelfPubKey("OTHER") + + r := srBlock.TriggerCreateSignaturesForManagedKeys(context.TODO()) + assert.True(t, r) + + srBlock.SignaturesWaitGroup().Wait() + + assert.Equal(t, int32(9), atomic.LoadInt32(&numMultiKeysSignaturesCreated)) // there are 9 keys in default consensus group config + }) + + t.Run("should fail", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + numMultiKeysSignaturesCreated := int32(0) + + signingHandler := &consensusMocks.SigningHandlerStub{ + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) + return []byte("SIG"), nil + }, + } + container.SetSigningHandler(signingHandler) + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + + srBlock, _ := v2.NewSubroundBlock( + sr, + v2.ProcessingThresholdPercent, + &consensusMocks.SposWorkerMock{}, + &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{ + CanProcessCalled: func() bool { + return false + }, + }, + ) + + sr.SetHeader(&block.Header{}) + sr.SetSelfPubKey("OTHER") + + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + r := srBlock.TriggerCreateSignaturesForManagedKeys(ctx) + assert.False(t, r) + + srBlock.SignaturesWaitGroup().Wait() + + assert.Equal(t, int32(0), atomic.LoadInt32(&numMultiKeysSignaturesCreated)) + }) +} + func TestSubroundBlock_IsInterfaceNil(t *testing.T) { t.Parallel() diff --git a/consensus/spos/bls/v2/subroundSignature.go b/consensus/spos/bls/v2/subroundSignature.go index 4f7b864d687..e997741d1d1 100644 --- a/consensus/spos/bls/v2/subroundSignature.go +++ b/consensus/spos/bls/v2/subroundSignature.go @@ -3,7 +3,6 @@ package v2 import ( "bytes" "context" - "fmt" "sync" "sync/atomic" "time" @@ -108,14 +107,19 @@ func (sr *subroundSignature) doSignatureJob(ctx context.Context) bool { return false } - // Wait once for the entire node if competing block detected nonce := sr.GetHeader().GetNonce() currentHash := sr.GetData() - shouldAbort := sr.waitIfCompetingBlockForNode(ctx, nonce, currentHash) - if shouldAbort { - return false + + pkBytes := sr.getPkForCompetingBlock(nonce, currentHash) + hasCompetingBlockForPk := len(pkBytes) != 0 + if hasCompetingBlockForPk { + shouldAbort := sr.waitIfCompetingBlock(ctx, pkBytes, nonce, currentHash) + if shouldAbort { + return false + } } + // Wait once for the entire node if competing block detected isSelfSingleKeyInConsensusGroup := sr.IsNodeInConsensusGroup(sr.SelfPubKey()) && commonConsensus.ShouldConsiderSelfKeyInConsensus(sr.NodeRedundancyHandler()) if isSelfSingleKeyInConsensusGroup { if !sr.doSignatureJobForSingleKey(ctx) { @@ -213,7 +217,29 @@ func (sr *subroundSignature) doSignatureConsensusCheck() bool { return false } +func (sr *subroundSignature) waitForSingatures() { + done := make(chan struct{}) + go func() { + sr.SignaturesWaitGroup().Wait() + close(done) + }() + + timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), time.Duration(sr.EndTime())) + + select { + case <-done: + sr.SignaturesCtxCancel() + return + case <-time.After(timeLeft): + log.Debug("timeout while waiting for signatures to be created") + return + } +} + func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) bool { + // wait for optimistic signatures creation to finish + sr.waitForSingatures() + numMultiKeysSignaturesSent := int32(0) sentSigForAllKeys := atomicCore.Flag{} sentSigForAllKeys.SetValue(true) @@ -230,7 +256,7 @@ func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) b continue } - err := sr.checkGoRoutinesThrottler(ctx) + err := checkGoRoutinesThrottler(ctx, sr.signatureThrottler) if err != nil { return false } @@ -259,20 +285,28 @@ func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) b return sentSigForAllKeys.IsSet() } -func (sr *subroundSignature) sendSignatureForManagedKey(_ context.Context, idx int, pk string) bool { +func (sr *subroundSignature) sendSignatureForManagedKey(ctx context.Context, idx int, pk string) bool { pkBytes := []byte(pk) nonce := sr.GetHeader().GetNonce() currentHash := sr.GetData() - signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey( - currentHash, - uint16(idx), - sr.GetHeader().GetEpoch(), - pkBytes, - ) + signatureShare, err := sr.SigningHandler().SignatureShare(uint16(idx)) if err != nil { - log.Debug("sendSignatureForManagedKey.CreateSignatureShareForPublicKey", "error", err.Error()) - return false + // signature share not found (optimistic signature share creation was not triggered) + // will try to create it + log.Debug("sendSignatureForManagedKey.SignatureShare: sig not already created, will try to create it", "error", err) + + signatureShare, err = sr.SigningHandler().CreateSignatureShareForPublicKey( + ctx, + currentHash, + uint16(idx), + sr.GetHeader().GetEpoch(), + pkBytes, + ) + if err != nil { + log.Debug("sendSignatureForManagedKey.CreateSignatureShareForPublicKey", "error", err.Error()) + return false + } } // Record the signed nonce before broadcast so competing block detection works @@ -289,22 +323,7 @@ func (sr *subroundSignature) sendSignatureForManagedKey(_ context.Context, idx i return sr.completeSignatureSubRound(pk) } -func (sr *subroundSignature) checkGoRoutinesThrottler(ctx context.Context) error { - for { - if sr.signatureThrottler.CanProcess() { - break - } - select { - case <-time.After(timeSpentBetweenChecks): - continue - case <-ctx.Done(): - return fmt.Errorf("%w while checking the throttler", spos.ErrTimeIsOut) - } - } - return nil -} - -func (sr *subroundSignature) doSignatureJobForSingleKey(_ context.Context) bool { +func (sr *subroundSignature) doSignatureJobForSingleKey(ctx context.Context) bool { pkBytes := []byte(sr.SelfPubKey()) nonce := sr.GetHeader().GetNonce() currentHash := sr.GetData() @@ -316,6 +335,7 @@ func (sr *subroundSignature) doSignatureJobForSingleKey(_ context.Context) bool } signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + ctx, currentHash, uint16(selfIndex), sr.GetHeader().GetEpoch(), @@ -339,6 +359,30 @@ func (sr *subroundSignature) doSignatureJobForSingleKey(_ context.Context) bool return sr.completeSignatureSubRound(sr.SelfPubKey()) } +func (sr *subroundSignature) getPkForCompetingBlock(nonce uint64, currentHash []byte) []byte { + // check self key + selfPk := []byte(sr.SelfPubKey()) + previousHash, exists := sr.sentSignatureTracker.GetSignedHash(selfPk, nonce) + if exists && !bytes.Equal(previousHash, currentHash) { + return selfPk + } + + // check managed keys + for _, pk := range sr.ConsensusGroup() { + pkBytes := []byte(pk) + if !sr.IsKeyManagedBySelf(pkBytes) { + continue + } + + previousHash, exists := sr.sentSignatureTracker.GetSignedHash(pkBytes, nonce) + if exists && !bytes.Equal(previousHash, currentHash) { + return pkBytes + } + } + + return nil +} + // waitIfCompetingBlockForNode checks if any key managed by this node previously signed a different // hash for the given nonce. If found, waits once for the entire node instead of per-key. func (sr *subroundSignature) waitIfCompetingBlockForNode(ctx context.Context, nonce uint64, currentHash []byte) bool { diff --git a/consensus/spos/bls/v2/subroundSignatureCompetingBlock_test.go b/consensus/spos/bls/v2/subroundSignatureCompetingBlock_test.go index 96ad2f61953..cdbcc6b3d64 100644 --- a/consensus/spos/bls/v2/subroundSignatureCompetingBlock_test.go +++ b/consensus/spos/bls/v2/subroundSignatureCompetingBlock_test.go @@ -315,7 +315,7 @@ func TestWaitIfCompetingBlock_RecordSignedNonceCalledBeforeBroadcast(t *testing. container := consensusMocks.InitConsensusCore() container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, }) diff --git a/consensus/spos/bls/v2/subroundSignature_test.go b/consensus/spos/bls/v2/subroundSignature_test.go index 05c26afd5f6..76bd07bf1f8 100644 --- a/consensus/spos/bls/v2/subroundSignature_test.go +++ b/consensus/spos/bls/v2/subroundSignature_test.go @@ -507,7 +507,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { container := consensusMocks.InitConsensusCore() signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, } @@ -597,7 +597,7 @@ func TestSubroundSignature_SendSignature(t *testing.T) { container := consensusMocks.InitConsensusCore() container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return make([]byte, 0), expErr }, }) @@ -652,7 +652,7 @@ func TestSubroundSignature_SendSignature(t *testing.T) { container := consensusMocks.InitConsensusCore() container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, }) @@ -720,7 +720,81 @@ func TestSubroundSignature_SendSignature(t *testing.T) { container := consensusMocks.InitConsensusCore() container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + return []byte("SIG"), nil + }, + }) + + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + container.SetBroadcastMessenger(&consensusMocks.BroadcastMessengerMock{ + BroadcastConsensusMessageCalled: func(message *consensus.Message) error { + return nil + }, + }) + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + sr.SetHeader(&block.Header{}) + + signatureSentForPks := make(map[string]struct{}) + varCalled := false + srSignature, _ := v2.NewSubroundSignature( + sr, + &statusHandler.AppStatusHandlerStub{}, + &testscommon.SentSignatureTrackerStub{ + SignatureSentCalled: func(pkBytes []byte) { + signatureSentForPks[string(pkBytes)] = struct{}{} + varCalled = true + }, + }, + &consensusMocks.SposWorkerMock{}, + &dataRetrieverMock.ThrottlerStub{}, + ) + + _ = srSignature.SendSignatureForManagedKey(context.Background(), 1, "a") + + assert.True(t, varCalled) + }) + + t.Run("if sig share already available, should not create it", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + assert.Fail(t, "should not have been called") + return []byte(""), nil + }, + SignatureShareCalled: func(index uint16) ([]byte, error) { return []byte("SIG"), nil }, }) @@ -800,7 +874,7 @@ func TestSubroundSignature_DoSignatureJobForManagedKeys(t *testing.T) { container.SetEnableEpochsHandler(enableEpochsHandler) signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, } diff --git a/consensus/spos/consensusState.go b/consensus/spos/consensusState.go index 7c136ae8be4..9ba7c0bfc88 100644 --- a/consensus/spos/consensusState.go +++ b/consensus/spos/consensusState.go @@ -2,6 +2,7 @@ package spos import ( "bytes" + "context" "sync" "time" @@ -44,6 +45,9 @@ type ConsensusState struct { processingBlock bool mutProcessingBlock sync.RWMutex + signaturesWaitGroup *sync.WaitGroup + signaturesTimeoutCtxCancel context.CancelFunc + *roundConsensus *roundThreshold *roundStatus @@ -79,6 +83,7 @@ func (cns *ConsensusState) ResetConsensusRoundState() { cns.roundCanceled = false cns.extendedCalled = false cns.waitingAllSignaturesTimeOut = false + cns.signaturesWaitGroup = &sync.WaitGroup{} cns.mutState.Unlock() cns.ResetRoundStatus() @@ -520,6 +525,29 @@ func (cns *ConsensusState) SetWaitingAllSignaturesTimeOut(waitingAllSignaturesTi cns.waitingAllSignaturesTimeOut = waitingAllSignaturesTimeOut } +// SignaturesWaitGroup returns wait group for optimistic signatures handling +func (cns *ConsensusState) SignaturesWaitGroup() *sync.WaitGroup { + return cns.signaturesWaitGroup +} + +// SetSignaturesCtxCancelFunc will set signatures context cancel function +func (cns *ConsensusState) SetSignaturesCtxCancelFunc(cancelFunc context.CancelFunc) { + cns.mutState.Lock() + defer cns.mutState.Unlock() + + cns.signaturesTimeoutCtxCancel = cancelFunc +} + +// SignaturesCtxCancel will cancel signatures context +func (cns *ConsensusState) SignaturesCtxCancel() { + cns.mutState.RLock() + defer cns.mutState.RUnlock() + + if cns.signaturesTimeoutCtxCancel != nil { + cns.signaturesTimeoutCtxCancel() + } +} + // IsInterfaceNil returns true if there is no value under the interface func (cns *ConsensusState) IsInterfaceNil() bool { return cns == nil diff --git a/consensus/spos/interface.go b/consensus/spos/interface.go index 4c170171bfc..78d4576e2c2 100644 --- a/consensus/spos/interface.go +++ b/consensus/spos/interface.go @@ -2,6 +2,7 @@ package spos import ( "context" + "sync" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -231,6 +232,9 @@ type ConsensusStateHandler interface { SetHeader(header data.HeaderHandler) GetWaitingAllSignaturesTimeOut() bool SetWaitingAllSignaturesTimeOut(bool) + SignaturesWaitGroup() *sync.WaitGroup + SetSignaturesCtxCancelFunc(cancelFunc context.CancelFunc) + SignaturesCtxCancel() RoundConsensusHandler RoundStatusHandler RoundThresholdHandler diff --git a/consensus/spos/worker.go b/consensus/spos/worker.go index 20556782702..0fd4f99e070 100644 --- a/consensus/spos/worker.go +++ b/consensus/spos/worker.go @@ -856,6 +856,8 @@ func (wrk *Worker) callReceivedHeaderCallbacks(message *consensus.Message) { // Extend does an extension for the subround with subroundId func (wrk *Worker) Extend(subroundId int) { wrk.consensusState.SetExtendedCalled(true) + wrk.consensusState.SignaturesCtxCancel() + log.Debug("extend function is called", "subround", wrk.consensusService.GetSubroundName(subroundId)) diff --git a/factory/crypto/errors.go b/factory/crypto/errors.go index 513e7f09e88..689e9ee2a33 100644 --- a/factory/crypto/errors.go +++ b/factory/crypto/errors.go @@ -43,3 +43,6 @@ var ErrNilMessage = errors.New("message to be signed or to be verified is nil") // ErrBitmapMismatch is raised when an invalid bitmap is passed to the multisigner var ErrBitmapMismatch = errors.New("multi signer reported a mismatch in used bitmap") + +// ErrTimeIsOut is raised when time is out for signing operation +var ErrTimeIsOut = errors.New("timeout while handling signatures") diff --git a/factory/crypto/signingHandler.go b/factory/crypto/signingHandler.go index f2a30671f43..bc16f159c6b 100644 --- a/factory/crypto/signingHandler.go +++ b/factory/crypto/signingHandler.go @@ -1,6 +1,7 @@ package crypto import ( + "context" "sync" "github.com/multiversx/mx-chain-core-go/core/check" @@ -135,11 +136,23 @@ func (sh *signingHandler) Reset(pubKeys []string) error { // CreateSignatureShareForPublicKey returns a signature over a message using the managed private key that was selected based on the provided // publicKeyBytes argument -func (sh *signingHandler) CreateSignatureShareForPublicKey(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { +func (sh *signingHandler) CreateSignatureShareForPublicKey( + ctx context.Context, + message []byte, + index uint16, + epoch uint32, + publicKeyBytes []byte, +) ([]byte, error) { if message == nil { return nil, ErrNilMessage } + select { + case <-ctx.Done(): + return nil, ErrTimeIsOut + default: + } + privateKey := sh.keysHandler.GetHandledPrivateKey(publicKeyBytes) privateKeyBytes, err := privateKey.ToByteArray() if err != nil { @@ -159,6 +172,12 @@ func (sh *signingHandler) CreateSignatureShareForPublicKey(message []byte, index return nil, err } + select { + case <-ctx.Done(): + return nil, ErrTimeIsOut + default: + } + sh.data.sigShares[index] = sigShareBytes return sigShareBytes, nil diff --git a/factory/crypto/signingHandler_test.go b/factory/crypto/signingHandler_test.go index 281a842e3bf..6fcd4999749 100644 --- a/factory/crypto/signingHandler_test.go +++ b/factory/crypto/signingHandler_test.go @@ -1,6 +1,7 @@ package crypto_test import ( + "context" "errors" "testing" @@ -164,7 +165,7 @@ func TestSigningHandler_CreateSignatureShareForPublicKey(t *testing.T) { t.Parallel() signer, _ := cryptoFactory.NewSigningHandler(createMockArgsSigningHandler()) - sigShare, err := signer.CreateSignatureShareForPublicKey(nil, selfIndex, epoch, pkBytes) + sigShare, err := signer.CreateSignatureShareForPublicKey(context.TODO(), nil, selfIndex, epoch, pkBytes) require.Nil(t, sigShare) require.Equal(t, cryptoFactory.ErrNilMessage, err) }) @@ -182,7 +183,7 @@ func TestSigningHandler_CreateSignatureShareForPublicKey(t *testing.T) { args.MultiSignerContainer = cryptoMocks.NewMultiSignerContainerMock(multiSigner) signer, _ := cryptoFactory.NewSigningHandler(args) - sigShare, err := signer.CreateSignatureShareForPublicKey([]byte("msg1"), selfIndex, epoch, pkBytes) + sigShare, err := signer.CreateSignatureShareForPublicKey(context.TODO(), []byte("msg1"), selfIndex, epoch, pkBytes) require.Nil(t, sigShare) require.Equal(t, expectedErr, err) }) @@ -200,7 +201,7 @@ func TestSigningHandler_CreateSignatureShareForPublicKey(t *testing.T) { signer, _ := cryptoFactory.NewSigningHandler(args) - sigShare, err := signer.CreateSignatureShareForPublicKey([]byte("message"), uint16(0), epoch, pkBytes) + sigShare, err := signer.CreateSignatureShareForPublicKey(context.TODO(), []byte("message"), uint16(0), epoch, pkBytes) require.Nil(t, sigShare) require.Equal(t, expectedErr, err) }) @@ -227,7 +228,7 @@ func TestSigningHandler_CreateSignatureShareForPublicKey(t *testing.T) { args.MultiSignerContainer = cryptoMocks.NewMultiSignerContainerMock(multiSigner) signer, _ := cryptoFactory.NewSigningHandler(args) - sigShare, err := signer.CreateSignatureShareForPublicKey([]byte("msg1"), selfIndex, epoch, pkBytes) + sigShare, err := signer.CreateSignatureShareForPublicKey(context.TODO(), []byte("msg1"), selfIndex, epoch, pkBytes) require.Nil(t, err) require.Equal(t, expectedSigShare, sigShare) assert.True(t, getHandledPrivateKeyCalled) diff --git a/node/chainSimulator/process/processor.go b/node/chainSimulator/process/processor.go index 9612c4cc22e..7a459f9bd7a 100644 --- a/node/chainSimulator/process/processor.go +++ b/node/chainSimulator/process/processor.go @@ -1,6 +1,7 @@ package process import ( + "context" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -476,7 +477,7 @@ func (creator *blocksCreator) generateAggregatedSignature(headerHash []byte, epo } totalKey++ - if _, err = signingHandler.CreateSignatureShareForPublicKey(headerHash, uint16(idx), epoch, []byte(pubKey)); err != nil { + if _, err = signingHandler.CreateSignatureShareForPublicKey(context.TODO(), headerHash, uint16(idx), epoch, []byte(pubKey)); err != nil { return nil, err } } diff --git a/node/chainSimulator/process/processor_test.go b/node/chainSimulator/process/processor_test.go index d5c637aef8c..1d22e3a2d42 100644 --- a/node/chainSimulator/process/processor_test.go +++ b/node/chainSimulator/process/processor_test.go @@ -1,6 +1,7 @@ package process_test import ( + "context" "errors" "testing" "time" @@ -390,7 +391,7 @@ func TestBlocksCreator_CreateNewBlock(t *testing.T) { return &mock.CryptoComponentsStub{ KeysHandlerField: kh, SigHandler: &testsConsensus.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return nil, expectedErr }, }, diff --git a/testscommon/consensus/consensusStateMock.go b/testscommon/consensus/consensusStateMock.go index 587abc6b5d8..a3fa8523a8c 100644 --- a/testscommon/consensus/consensusStateMock.go +++ b/testscommon/consensus/consensusStateMock.go @@ -1,6 +1,8 @@ package consensus import ( + "context" + "sync" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -86,6 +88,9 @@ type ConsensusStateMock struct { FallbackThresholdCalled func(subroundId int) int SetFallbackThresholdCalled func(subroundId int, threshold int) ResetConsensusRoundStateCalled func() + SignaturesWaitGroupCalled func() *sync.WaitGroup + SetSignaturesCtxCancelFuncCalled func(cancelFunc context.CancelFunc) + SignaturesCtxCancelCalled func() } // AddReceivedHeader - @@ -654,6 +659,29 @@ func (cnsm *ConsensusStateMock) SetThreshold(subroundId int, threshold int) { } } +// SignaturesWaitGroup - +func (cnsm *ConsensusStateMock) SignaturesWaitGroup() *sync.WaitGroup { + if cnsm.SignaturesWaitGroupCalled != nil { + return cnsm.SignaturesWaitGroupCalled() + } + + return nil +} + +// SetSignaturesCtxCancelFunc - +func (cnsm *ConsensusStateMock) SetSignaturesCtxCancelFunc(cancelFunc context.CancelFunc) { + if cnsm.SetSignaturesCtxCancelFuncCalled != nil { + cnsm.SetSignaturesCtxCancelFuncCalled(cancelFunc) + } +} + +// SignaturesCtxCancel - +func (cnsm *ConsensusStateMock) SignaturesCtxCancel() { + if cnsm.SignaturesCtxCancelCalled != nil { + cnsm.SignaturesCtxCancelCalled() + } +} + // IsInterfaceNil returns true if there is no value under the interface func (cnsm *ConsensusStateMock) IsInterfaceNil() bool { return cnsm == nil diff --git a/testscommon/consensus/signingHandlerStub.go b/testscommon/consensus/signingHandlerStub.go index 79127a17797..8e9ca4a9b2d 100644 --- a/testscommon/consensus/signingHandlerStub.go +++ b/testscommon/consensus/signingHandlerStub.go @@ -1,11 +1,15 @@ package consensus -import crypto "github.com/multiversx/mx-chain-crypto-go" +import ( + "context" + + crypto "github.com/multiversx/mx-chain-crypto-go" +) // SigningHandlerStub implements SigningHandler interface type SigningHandlerStub struct { ResetCalled func(pubKeys []string) error - CreateSignatureShareForPublicKeyCalled func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) + CreateSignatureShareForPublicKeyCalled func(ctx context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) CreateSignatureForPublicKeyCalled func(message []byte, publicKeyBytes []byte) ([]byte, error) VerifySingleSignatureCalled func(publicKeyBytes []byte, message []byte, signature []byte) error StoreSignatureShareCalled func(index uint16, sig []byte) error @@ -27,9 +31,9 @@ func (stub *SigningHandlerStub) Reset(pubKeys []string) error { } // CreateSignatureShareForPublicKey - -func (stub *SigningHandlerStub) CreateSignatureShareForPublicKey(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { +func (stub *SigningHandlerStub) CreateSignatureShareForPublicKey(ctx context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { if stub.CreateSignatureShareForPublicKeyCalled != nil { - return stub.CreateSignatureShareForPublicKeyCalled(message, index, epoch, publicKeyBytes) + return stub.CreateSignatureShareForPublicKeyCalled(ctx, message, index, epoch, publicKeyBytes) } return make([]byte, 0), nil