From 573aef0de356037a7424c078a28b7525331fd437 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 15 Apr 2026 12:22:44 +0300 Subject: [PATCH 01/11] optimistic sig share creation for managed keys --- consensus/spos/bls/v2/export_test.go | 5 + consensus/spos/bls/v2/subroundSignature.go | 106 ++++++++- .../spos/bls/v2/subroundSignature_test.go | 211 ++++++++++++++++++ 3 files changed, 313 insertions(+), 9 deletions(-) diff --git a/consensus/spos/bls/v2/export_test.go b/consensus/spos/bls/v2/export_test.go index 58988e0d82f..ea69ffa77e0 100644 --- a/consensus/spos/bls/v2/export_test.go +++ b/consensus/spos/bls/v2/export_test.go @@ -346,6 +346,11 @@ func (sr *subroundSignature) DoSignatureJobForManagedKeys(ctx context.Context) b return sr.doSignatureJobForManagedKeys(ctx) } +// CreateSignaturesForManagedKeys - +func (sr *subroundSignature) CreateSignaturesForManagedKeys(ctx context.Context) bool { + return sr.createSignaturesForManagedKeys(ctx) +} + // WaitIfCompetingBlock calls the unexported waitIfCompetingBlock function func (sr *subroundSignature) WaitIfCompetingBlock(ctx context.Context, pkBytes []byte, nonce uint64, currentHash []byte) bool { return sr.waitIfCompetingBlock(ctx, pkBytes, nonce, currentHash) diff --git a/consensus/spos/bls/v2/subroundSignature.go b/consensus/spos/bls/v2/subroundSignature.go index 4f7b864d687..176f2c12dde 100644 --- a/consensus/spos/bls/v2/subroundSignature.go +++ b/consensus/spos/bls/v2/subroundSignature.go @@ -108,9 +108,20 @@ func (sr *subroundSignature) doSignatureJob(ctx context.Context) bool { return false } - // Wait once for the entire node if competing block detected nonce := sr.GetHeader().GetNonce() currentHash := sr.GetData() + + shouldConsiderCompetingBlock := sr.waitForCompetingBlockEarlyChecks(nonce, currentHash) + if shouldConsiderCompetingBlock { + // create signature shares optimistically only if there might be a competing block to be analyzed + // otherwise create signatures only before sending, to avoid goroutines overhead + if !sr.createSignaturesForManagedKeys(ctx) { + log.Debug("step 2: subround cannot proceed, cannot create signatures for managed keys") + return false + } + } + + // Wait once for the entire node if competing block detected shouldAbort := sr.waitIfCompetingBlockForNode(ctx, nonce, currentHash) if shouldAbort { return false @@ -213,6 +224,59 @@ func (sr *subroundSignature) doSignatureConsensusCheck() bool { return false } +func (sr *subroundSignature) createSignaturesForManagedKeys(ctx context.Context) bool { + numMultiKeysSignaturesCreated := int32(0) + + wg := sync.WaitGroup{} + + for idx, pk := range sr.ConsensusGroup() { + pkBytes := []byte(pk) + if !sr.IsKeyManagedBySelf(pkBytes) { + continue + } + + if sr.IsJobDone(pk, sr.Current()) { + continue + } + + err := sr.checkGoRoutinesThrottler(ctx) + if err != nil { + return false + } + sr.signatureThrottler.StartProcessing() + wg.Add(1) + + go func(ctx context.Context, idx int, pk string) { + defer sr.signatureThrottler.EndProcessing() + + pkBytes := []byte(pk) + currentHash := sr.GetData() + + _, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + currentHash, + uint16(idx), + sr.GetHeader().GetEpoch(), + pkBytes, + ) + if err != nil { + log.Debug("createSignaturesForManagedKeys.CreateSignatureShareForPublicKey", "error", err.Error()) + } else { + atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) + } + + wg.Done() + }(ctx, idx, pk) + } + + wg.Wait() + + if numMultiKeysSignaturesCreated > 0 { + log.Debug("step 2: multi keys signatures have been created", "num", numMultiKeysSignaturesCreated) + } + + return true +} + func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) bool { numMultiKeysSignaturesSent := int32(0) sentSigForAllKeys := atomicCore.Flag{} @@ -264,15 +328,21 @@ func (sr *subroundSignature) sendSignatureForManagedKey(_ context.Context, idx i nonce := sr.GetHeader().GetNonce() currentHash := sr.GetData() - signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey( - currentHash, - uint16(idx), - sr.GetHeader().GetEpoch(), - pkBytes, - ) + signatureShare, err := sr.SigningHandler().SignatureShare(uint16(idx)) if err != nil { - log.Debug("sendSignatureForManagedKey.CreateSignatureShareForPublicKey", "error", err.Error()) - return false + // signature share not found (optimistic signature share creation was not triggered) + // will try to create it + + signatureShare, err = sr.SigningHandler().CreateSignatureShareForPublicKey( + currentHash, + uint16(idx), + sr.GetHeader().GetEpoch(), + pkBytes, + ) + if err != nil { + log.Debug("sendSignatureForManagedKey.CreateSignatureShareForPublicKey", "error", err.Error()) + return false + } } // Record the signed nonce before broadcast so competing block detection works @@ -339,6 +409,24 @@ func (sr *subroundSignature) doSignatureJobForSingleKey(_ context.Context) bool return sr.completeSignatureSubRound(sr.SelfPubKey()) } +func (sr *subroundSignature) waitForCompetingBlockEarlyChecks(nonce uint64, currentHash []byte) bool { + // do not check for self key, no need for optimistic signature creation if single-key node + + // check managed keys + for _, pk := range sr.ConsensusGroup() { + pkBytes := []byte(pk) + if !sr.IsKeyManagedBySelf(pkBytes) { + continue + } + previousHash, exists := sr.sentSignatureTracker.GetSignedHash(pkBytes, nonce) + if exists && !bytes.Equal(previousHash, currentHash) { + return true + } + } + + return false +} + // waitIfCompetingBlockForNode checks if any key managed by this node previously signed a different // hash for the given nonce. If found, waits once for the entire node instead of per-key. func (sr *subroundSignature) waitIfCompetingBlockForNode(ctx context.Context, nonce uint64, currentHash []byte) bool { diff --git a/consensus/spos/bls/v2/subroundSignature_test.go b/consensus/spos/bls/v2/subroundSignature_test.go index 05c26afd5f6..44c0fbf323d 100644 --- a/consensus/spos/bls/v2/subroundSignature_test.go +++ b/consensus/spos/bls/v2/subroundSignature_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "github.com/multiversx/mx-chain-core-go/core" @@ -784,6 +785,80 @@ func TestSubroundSignature_SendSignature(t *testing.T) { assert.True(t, varCalled) }) + + t.Run("if sig share already available, should not create it", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ + CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + assert.Fail(t, "should not have been called") + return []byte(""), nil + }, + SignatureShareCalled: func(index uint16) ([]byte, error) { + return []byte("SIG"), nil + }, + }) + + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + container.SetBroadcastMessenger(&consensusMocks.BroadcastMessengerMock{ + BroadcastConsensusMessageCalled: func(message *consensus.Message) error { + return nil + }, + }) + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + sr.SetHeader(&block.Header{}) + + signatureSentForPks := make(map[string]struct{}) + varCalled := false + srSignature, _ := v2.NewSubroundSignature( + sr, + &statusHandler.AppStatusHandlerStub{}, + &testscommon.SentSignatureTrackerStub{ + SignatureSentCalled: func(pkBytes []byte) { + signatureSentForPks[string(pkBytes)] = struct{}{} + varCalled = true + }, + }, + &consensusMocks.SposWorkerMock{}, + &dataRetrieverMock.ThrottlerStub{}, + ) + + _ = srSignature.SendSignatureForManagedKey(context.Background(), 1, "a") + + assert.True(t, varCalled) + }) } func TestSubroundSignature_DoSignatureJobForManagedKeys(t *testing.T) { @@ -932,6 +1007,142 @@ func TestSubroundSignature_DoSignatureJobForManagedKeys(t *testing.T) { }) } +func TestSubroundSignature_CreateSignaturesForManagedKeys(t *testing.T) { + t.Parallel() + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + numMultiKeysSignaturesCreated := int32(0) + + signingHandler := &consensusMocks.SigningHandlerStub{ + CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) + return []byte("SIG"), nil + }, + } + container.SetSigningHandler(signingHandler) + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + + srSignature, _ := v2.NewSubroundSignature( + sr, + &statusHandler.AppStatusHandlerStub{}, + &testscommon.SentSignatureTrackerStub{}, + &consensusMocks.SposWorkerMock{}, + &dataRetrieverMock.ThrottlerStub{}, + ) + + sr.SetHeader(&block.Header{}) + sr.SetSelfPubKey("OTHER") + + r := srSignature.CreateSignaturesForManagedKeys(context.TODO()) + assert.True(t, r) + + assert.Equal(t, int32(9), atomic.LoadInt32(&numMultiKeysSignaturesCreated)) // there are 9 keys in default consensus group config + }) + + t.Run("should fail", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + numMultiKeysSignaturesCreated := int32(0) + + signingHandler := &consensusMocks.SigningHandlerStub{ + CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) + return []byte("SIG"), nil + }, + } + container.SetSigningHandler(signingHandler) + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + + srSignature, _ := v2.NewSubroundSignature( + sr, + &statusHandler.AppStatusHandlerStub{}, + &testscommon.SentSignatureTrackerStub{}, + &consensusMocks.SposWorkerMock{}, + &dataRetrieverMock.ThrottlerStub{ + CanProcessCalled: func() bool { + return false + }, + }, + ) + + sr.SetHeader(&block.Header{}) + sr.SetSelfPubKey("OTHER") + + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + r := srSignature.CreateSignaturesForManagedKeys(ctx) + assert.False(t, r) + + assert.Equal(t, int32(0), atomic.LoadInt32(&numMultiKeysSignaturesCreated)) + }) +} + func TestSubroundSignature_DoSignatureConsensusCheck(t *testing.T) { t.Parallel() From faebd8f731d66c5bbc21bcd6e36c6c066d1079c0 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 15 Apr 2026 17:10:16 +0300 Subject: [PATCH 02/11] fixes after review --- consensus/spos/bls/v2/subroundSignature.go | 41 ++++++++++++++-------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/consensus/spos/bls/v2/subroundSignature.go b/consensus/spos/bls/v2/subroundSignature.go index 176f2c12dde..1c2ba18ac1f 100644 --- a/consensus/spos/bls/v2/subroundSignature.go +++ b/consensus/spos/bls/v2/subroundSignature.go @@ -111,22 +111,23 @@ func (sr *subroundSignature) doSignatureJob(ctx context.Context) bool { nonce := sr.GetHeader().GetNonce() currentHash := sr.GetData() - shouldConsiderCompetingBlock := sr.waitForCompetingBlockEarlyChecks(nonce, currentHash) - if shouldConsiderCompetingBlock { + pkBytes := sr.getPkForCompetingBlock(nonce, currentHash) + hasCompetingBlockForPk := len(pkBytes) != 0 + if hasCompetingBlockForPk { // create signature shares optimistically only if there might be a competing block to be analyzed // otherwise create signatures only before sending, to avoid goroutines overhead if !sr.createSignaturesForManagedKeys(ctx) { log.Debug("step 2: subround cannot proceed, cannot create signatures for managed keys") return false } - } - // Wait once for the entire node if competing block detected - shouldAbort := sr.waitIfCompetingBlockForNode(ctx, nonce, currentHash) - if shouldAbort { - return false + shouldAbort := sr.waitIfCompetingBlock(ctx, pkBytes, nonce, currentHash) + if shouldAbort { + return false + } } + // Wait once for the entire node if competing block detected isSelfSingleKeyInConsensusGroup := sr.IsNodeInConsensusGroup(sr.SelfPubKey()) && commonConsensus.ShouldConsiderSelfKeyInConsensus(sr.NodeRedundancyHandler()) if isSelfSingleKeyInConsensusGroup { if !sr.doSignatureJobForSingleKey(ctx) { @@ -248,6 +249,13 @@ func (sr *subroundSignature) createSignaturesForManagedKeys(ctx context.Context) go func(ctx context.Context, idx int, pk string) { defer sr.signatureThrottler.EndProcessing() + defer wg.Done() + + select { + case <-ctx.Done(): + return + default: + } pkBytes := []byte(pk) currentHash := sr.GetData() @@ -260,11 +268,10 @@ func (sr *subroundSignature) createSignaturesForManagedKeys(ctx context.Context) ) if err != nil { log.Debug("createSignaturesForManagedKeys.CreateSignatureShareForPublicKey", "error", err.Error()) - } else { - atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) + return } - wg.Done() + atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) }(ctx, idx, pk) } @@ -409,8 +416,13 @@ func (sr *subroundSignature) doSignatureJobForSingleKey(_ context.Context) bool return sr.completeSignatureSubRound(sr.SelfPubKey()) } -func (sr *subroundSignature) waitForCompetingBlockEarlyChecks(nonce uint64, currentHash []byte) bool { - // do not check for self key, no need for optimistic signature creation if single-key node +func (sr *subroundSignature) getPkForCompetingBlock(nonce uint64, currentHash []byte) []byte { + // check self key + selfPk := []byte(sr.SelfPubKey()) + previousHash, exists := sr.sentSignatureTracker.GetSignedHash(selfPk, nonce) + if exists && !bytes.Equal(previousHash, currentHash) { + return selfPk + } // check managed keys for _, pk := range sr.ConsensusGroup() { @@ -418,13 +430,14 @@ func (sr *subroundSignature) waitForCompetingBlockEarlyChecks(nonce uint64, curr if !sr.IsKeyManagedBySelf(pkBytes) { continue } + previousHash, exists := sr.sentSignatureTracker.GetSignedHash(pkBytes, nonce) if exists && !bytes.Equal(previousHash, currentHash) { - return true + return pkBytes } } - return false + return nil } // waitIfCompetingBlockForNode checks if any key managed by this node previously signed a different From 4800fee2c63cecd907e6074405e911d07cc983e1 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 24 Apr 2026 17:58:26 +0300 Subject: [PATCH 03/11] trigger sigs creation from block subround --- consensus/spos/bls/v2/blsSubroundsFactory.go | 1 + consensus/spos/bls/v2/common.go | 30 ++++ consensus/spos/bls/v2/export_test.go | 12 +- consensus/spos/bls/v2/subroundBlock.go | 74 ++++++++- consensus/spos/bls/v2/subroundBlock_test.go | 151 ++++++++++++++++++ consensus/spos/bls/v2/subroundSignature.go | 96 +++-------- .../spos/bls/v2/subroundSignature_test.go | 137 ---------------- consensus/spos/consensusState.go | 8 + consensus/spos/interface.go | 2 + 9 files changed, 287 insertions(+), 224 deletions(-) create mode 100644 consensus/spos/bls/v2/common.go diff --git a/consensus/spos/bls/v2/blsSubroundsFactory.go b/consensus/spos/bls/v2/blsSubroundsFactory.go index 6c76df939c1..48a0d4e4989 100644 --- a/consensus/spos/bls/v2/blsSubroundsFactory.go +++ b/consensus/spos/bls/v2/blsSubroundsFactory.go @@ -218,6 +218,7 @@ func (fct *factory) generateBlockSubround() error { processingThresholdPercent, fct.worker, syncController, + fct.signatureThrottler, ) if err != nil { return err diff --git a/consensus/spos/bls/v2/common.go b/consensus/spos/bls/v2/common.go new file mode 100644 index 00000000000..5d6e55cfd90 --- /dev/null +++ b/consensus/spos/bls/v2/common.go @@ -0,0 +1,30 @@ +package v2 + +import ( + "context" + "fmt" + "time" + + "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-go/consensus/spos" +) + +func checkGoRoutinesThrottler( + ctx context.Context, + signatureThrottler core.Throttler, +) error { + for { + if signatureThrottler.CanProcess() { + break + } + + select { + case <-time.After(timeSpentBetweenChecks): + continue + case <-ctx.Done(): + return fmt.Errorf("%w while checking the throttler", spos.ErrTimeIsOut) + } + } + + return nil +} diff --git a/consensus/spos/bls/v2/export_test.go b/consensus/spos/bls/v2/export_test.go index ea69ffa77e0..24b60861d88 100644 --- a/consensus/spos/bls/v2/export_test.go +++ b/consensus/spos/bls/v2/export_test.go @@ -203,7 +203,7 @@ func (sr *subroundBlock) SendBlockBody(body data.BodyHandler, marshalizedBody [] // SendBlockHeader method sends the proposed block header in the subround Block func (sr *subroundBlock) SendBlockHeader(header data.HeaderHandler, marshalizedHeader []byte) bool { - return sr.sendBlockHeader(header, marshalizedHeader) + return sr.sendBlockHeader(context.TODO(), header, marshalizedHeader) } // ComputeSubroundProcessingMetric computes processing metric related to the subround Block @@ -346,11 +346,6 @@ func (sr *subroundSignature) DoSignatureJobForManagedKeys(ctx context.Context) b return sr.doSignatureJobForManagedKeys(ctx) } -// CreateSignaturesForManagedKeys - -func (sr *subroundSignature) CreateSignaturesForManagedKeys(ctx context.Context) bool { - return sr.createSignaturesForManagedKeys(ctx) -} - // WaitIfCompetingBlock calls the unexported waitIfCompetingBlock function func (sr *subroundSignature) WaitIfCompetingBlock(ctx context.Context, pkBytes []byte, nonce uint64, currentHash []byte) bool { return sr.waitIfCompetingBlock(ctx, pkBytes, nonce, currentHash) @@ -400,3 +395,8 @@ func (sr *subroundEndRound) UpdateNonceDeltaMetrics() { func (sr *subroundBlock) PrepareBlockForExecution(header data.HeaderHandler, body data.BodyHandler) error { return sr.prepareBlockForExecution(header, body) } + +// TriggerCreateSignaturesForManagedKeys - +func (sr *subroundBlock) TriggerCreateSignaturesForManagedKeys(ctx context.Context) bool { + return sr.triggerCreateSignaturesForManagedKeys(ctx) +} diff --git a/consensus/spos/bls/v2/subroundBlock.go b/consensus/spos/bls/v2/subroundBlock.go index 9c6ebeaae06..6fdbde99616 100644 --- a/consensus/spos/bls/v2/subroundBlock.go +++ b/consensus/spos/bls/v2/subroundBlock.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "sync" + "sync/atomic" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -29,6 +30,7 @@ type subroundBlock struct { worker spos.WorkerHandler mutBlockProcessing sync.Mutex syncController spos.NtpSyncControllerHandler + signatureThrottler core.Throttler } // NewSubroundBlock creates a subroundBlock object @@ -37,6 +39,7 @@ func NewSubroundBlock( processingThresholdPercentage int, worker spos.WorkerHandler, syncController spos.NtpSyncControllerHandler, + signatureThrottler core.Throttler, ) (*subroundBlock, error) { err := checkNewSubroundBlockParams(baseSubround) if err != nil { @@ -49,12 +52,16 @@ func NewSubroundBlock( if check.IfNil(syncController) { return nil, ErrNilRoundSyncController } + if check.IfNil(signatureThrottler) { + return nil, spos.ErrNilThrottler + } srBlock := subroundBlock{ Subround: baseSubround, processingThresholdPercentage: processingThresholdPercentage, worker: worker, syncController: syncController, + signatureThrottler: signatureThrottler, } srBlock.Job = srBlock.doBlockJob @@ -142,7 +149,7 @@ func (sr *subroundBlock) doBlockJob(ctx context.Context) bool { return false } - sentWithSuccess := sr.sendBlock(header, body, leader) + sentWithSuccess := sr.sendBlock(ctx, header, body, leader) if !sentWithSuccess { return false } @@ -220,7 +227,12 @@ func printLogMessage(ctx context.Context, baseMessage string, err error) { log.Debug(baseMessage, "error", err.Error()) } -func (sr *subroundBlock) sendBlock(header data.HeaderHandler, body data.BodyHandler, leader string) bool { +func (sr *subroundBlock) sendBlock( + ctx context.Context, + header data.HeaderHandler, + body data.BodyHandler, + leader string, +) bool { marshalledBody, err := sr.Marshalizer().Marshal(body) if err != nil { log.Debug("sendBlock.Marshal: body", "error", err.Error()) @@ -236,7 +248,7 @@ func (sr *subroundBlock) sendBlock(header data.HeaderHandler, body data.BodyHand sr.logBlockSize(marshalledBody, marshalledHeader) headerHash := sr.Hasher().Compute(string(marshalledHeader)) - if !sr.sendBlockBody(body, marshalledBody) || !sr.sendBlockHeader(header, headerHash) { + if !sr.sendBlockBody(body, marshalledBody) || !sr.sendBlockHeader(ctx, header, headerHash) { return false } @@ -313,6 +325,7 @@ func (sr *subroundBlock) sendBlockBody( // sendBlockHeader method sends the proposed block header in the subround Block func (sr *subroundBlock) sendBlockHeader( + ctx context.Context, headerHandler data.HeaderHandler, headerHash []byte, ) bool { @@ -336,6 +349,8 @@ func (sr *subroundBlock) sendBlockHeader( sr.SetData(headerHash) sr.SetHeader(headerHandler) + sr.triggerCreateSignaturesForManagedKeys(ctx) + // log the header output for debugging purposes headerOutput, err := common.PrettifyStruct(headerHandler) if err == nil { @@ -345,6 +360,57 @@ func (sr *subroundBlock) sendBlockHeader( return true } +func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Context) bool { + numMultiKeysSignaturesCreated := int32(0) + + for idx, pk := range sr.ConsensusGroup() { + pkBytes := []byte(pk) + if !sr.IsKeyManagedBySelf(pkBytes) { + continue + } + + err := checkGoRoutinesThrottler(ctx, sr.signatureThrottler) + if err != nil { + return false + } + sr.signatureThrottler.StartProcessing() + sr.SignaturesWaitGroup().Add(1) + + go func(ctx context.Context, idx int, pk string) { + defer sr.signatureThrottler.EndProcessing() + defer sr.SignaturesWaitGroup().Done() + + select { + case <-ctx.Done(): + return + default: + } + + pkBytes := []byte(pk) + currentHash := sr.GetData() + + _, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + currentHash, + uint16(idx), + sr.GetHeader().GetEpoch(), + pkBytes, + ) + if err != nil { + log.Debug("createSignaturesForManagedKeys.CreateSignatureShareForPublicKey", "error", err.Error()) + return + } + + atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) + }(ctx, idx, pk) + } + + if numMultiKeysSignaturesCreated > 0 { + log.Debug("step 1: multi keys signatures creation has been triggered", "num", numMultiKeysSignaturesCreated) + } + + return true +} + func (sr *subroundBlock) sendDirectSentTransactions( header data.HeaderHandler, body data.BodyHandler, @@ -646,6 +712,8 @@ func (sr *subroundBlock) receivedBlockHeader(headerHandler data.HeaderHandler) { ctx, cancel := context.WithTimeout(context.Background(), sr.RoundHandler().TimeDuration()) defer cancel() + sr.triggerCreateSignaturesForManagedKeys(ctx) + _ = sr.processReceivedBlock(ctx, int64(headerHandler.GetRound()), []byte(sr.Leader())) sr.PeerHonestyHandler().ChangeScore( sr.Leader(), diff --git a/consensus/spos/bls/v2/subroundBlock_test.go b/consensus/spos/bls/v2/subroundBlock_test.go index ef0e5bddafc..e8fe8af9fb6 100644 --- a/consensus/spos/bls/v2/subroundBlock_test.go +++ b/consensus/spos/bls/v2/subroundBlock_test.go @@ -1,9 +1,11 @@ package v2_test import ( + "context" "errors" "fmt" "math/big" + "sync/atomic" "testing" "time" @@ -19,6 +21,7 @@ import ( "github.com/multiversx/mx-chain-go/consensus/spos" "github.com/multiversx/mx-chain-go/consensus/spos/bls" v2 "github.com/multiversx/mx-chain-go/consensus/spos/bls/v2" + dataRetrieverMock "github.com/multiversx/mx-chain-go/dataRetriever/mock" "github.com/multiversx/mx-chain-go/process/asyncExecution/cache" "github.com/multiversx/mx-chain-go/sharding/nodesCoordinator" "github.com/multiversx/mx-chain-go/testscommon" @@ -80,6 +83,7 @@ func defaultSubroundBlockFromSubround(sr *spos.Subround) (v2.SubroundBlock, erro }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) return srBlock, err @@ -97,6 +101,7 @@ func defaultSubroundBlockWithoutErrorFromSubround(sr *spos.Subround) v2.Subround }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) return srBlock @@ -179,6 +184,7 @@ func TestSubroundBlock_NewSubroundBlockNilSubroundShouldFail(t *testing.T) { v2.ProcessingThresholdPercent, &consensusMocks.SposWorkerMock{}, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) assert.Nil(t, srBlock) assert.Equal(t, spos.ErrNilSubround, err) @@ -334,6 +340,7 @@ func TestSubroundBlock_NewSubroundBlockNilWorkerShouldFail(t *testing.T) { v2.ProcessingThresholdPercent, nil, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) assert.Nil(t, srBlock) assert.Equal(t, spos.ErrNilWorker, err) @@ -351,6 +358,7 @@ func TestSubroundBlock_NewSubroundBlockNilRoundSyncController(t *testing.T) { v2.ProcessingThresholdPercent, &consensusMocks.SposWorkerMock{}, nil, + &dataRetrieverMock.ThrottlerStub{}, ) require.Nil(t, srBlock) require.Equal(t, v2.ErrNilRoundSyncController, err) @@ -610,6 +618,7 @@ func TestSubroundBlock_DoBlockJob(t *testing.T) { }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) providedLeaderSignature := []byte("leader signature") @@ -713,6 +722,7 @@ func TestSubroundBlock_DoBlockJob(t *testing.T) { }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) providedLeaderSignature := []byte("leader signature") @@ -1615,6 +1625,7 @@ func TestSubroundBlock_UpdateConsensusMetrics(t *testing.T) { }, }, &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, ) consensusMetrics.ResetInstanceValues() @@ -1827,6 +1838,146 @@ func TestSubroundBlock_prepareBlockForExecution(t *testing.T) { }) } +func TestSubroundBlock_TriggerCreateSignaturesForManagedKeys(t *testing.T) { + t.Parallel() + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + numMultiKeysSignaturesCreated := int32(0) + + signingHandler := &consensusMocks.SigningHandlerStub{ + CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) + return []byte("SIG"), nil + }, + } + container.SetSigningHandler(signingHandler) + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + + srBlock, _ := v2.NewSubroundBlock( + sr, + v2.ProcessingThresholdPercent, + &consensusMocks.SposWorkerMock{}, + &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{}, + ) + + sr.SetHeader(&block.Header{}) + sr.SetSelfPubKey("OTHER") + + r := srBlock.TriggerCreateSignaturesForManagedKeys(context.TODO()) + assert.True(t, r) + + srBlock.SignaturesWaitGroup().Wait() + + assert.Equal(t, int32(9), atomic.LoadInt32(&numMultiKeysSignaturesCreated)) // there are 9 keys in default consensus group config + }) + + t.Run("should fail", func(t *testing.T) { + t.Parallel() + + container := consensusMocks.InitConsensusCore() + enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ + IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { + return flag == common.AndromedaFlag + }, + } + container.SetEnableEpochsHandler(enableEpochsHandler) + + numMultiKeysSignaturesCreated := int32(0) + + signingHandler := &consensusMocks.SigningHandlerStub{ + CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) + return []byte("SIG"), nil + }, + } + container.SetSigningHandler(signingHandler) + consensusState := initializers.InitConsensusStateWithKeysHandler( + &testscommon.KeysHandlerStub{ + IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { + return true + }, + }, + ) + ch := make(chan bool, 1) + + sr, _ := spos.NewSubround( + bls.SrBlock, + bls.SrSignature, + bls.SrEndRound, + roundTimeDuration, + 0.7, + 0.85, + "(SIGNATURE)", + consensusState, + ch, + executeStoredMessages, + container, + chainID, + currentPid, + &statusHandler.AppStatusHandlerStub{}, + ) + + srBlock, _ := v2.NewSubroundBlock( + sr, + v2.ProcessingThresholdPercent, + &consensusMocks.SposWorkerMock{}, + &consensusMocks.NtpSyncControllerMock{}, + &dataRetrieverMock.ThrottlerStub{ + CanProcessCalled: func() bool { + return false + }, + }, + ) + + sr.SetHeader(&block.Header{}) + sr.SetSelfPubKey("OTHER") + + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + r := srBlock.TriggerCreateSignaturesForManagedKeys(ctx) + assert.False(t, r) + + srBlock.SignaturesWaitGroup().Wait() + + assert.Equal(t, int32(0), atomic.LoadInt32(&numMultiKeysSignaturesCreated)) + }) +} + func TestSubroundBlock_IsInterfaceNil(t *testing.T) { t.Parallel() diff --git a/consensus/spos/bls/v2/subroundSignature.go b/consensus/spos/bls/v2/subroundSignature.go index 1c2ba18ac1f..0d688267b10 100644 --- a/consensus/spos/bls/v2/subroundSignature.go +++ b/consensus/spos/bls/v2/subroundSignature.go @@ -3,7 +3,6 @@ package v2 import ( "bytes" "context" - "fmt" "sync" "sync/atomic" "time" @@ -114,13 +113,6 @@ func (sr *subroundSignature) doSignatureJob(ctx context.Context) bool { pkBytes := sr.getPkForCompetingBlock(nonce, currentHash) hasCompetingBlockForPk := len(pkBytes) != 0 if hasCompetingBlockForPk { - // create signature shares optimistically only if there might be a competing block to be analyzed - // otherwise create signatures only before sending, to avoid goroutines overhead - if !sr.createSignaturesForManagedKeys(ctx) { - log.Debug("step 2: subround cannot proceed, cannot create signatures for managed keys") - return false - } - shouldAbort := sr.waitIfCompetingBlock(ctx, pkBytes, nonce, currentHash) if shouldAbort { return false @@ -225,66 +217,29 @@ func (sr *subroundSignature) doSignatureConsensusCheck() bool { return false } -func (sr *subroundSignature) createSignaturesForManagedKeys(ctx context.Context) bool { - numMultiKeysSignaturesCreated := int32(0) - - wg := sync.WaitGroup{} - - for idx, pk := range sr.ConsensusGroup() { - pkBytes := []byte(pk) - if !sr.IsKeyManagedBySelf(pkBytes) { - continue - } +func (sr *subroundSignature) waitForSingatures() { + done := make(chan struct{}) + go func() { + sr.SignaturesWaitGroup().Wait() + close(done) + }() - if sr.IsJobDone(pk, sr.Current()) { - continue - } - - err := sr.checkGoRoutinesThrottler(ctx) - if err != nil { - return false - } - sr.signatureThrottler.StartProcessing() - wg.Add(1) - - go func(ctx context.Context, idx int, pk string) { - defer sr.signatureThrottler.EndProcessing() - defer wg.Done() + // TODO: analyse this more + timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sr.RoundHandler().TimeDuration()) - select { - case <-ctx.Done(): - return - default: - } - - pkBytes := []byte(pk) - currentHash := sr.GetData() - - _, err := sr.SigningHandler().CreateSignatureShareForPublicKey( - currentHash, - uint16(idx), - sr.GetHeader().GetEpoch(), - pkBytes, - ) - if err != nil { - log.Debug("createSignaturesForManagedKeys.CreateSignatureShareForPublicKey", "error", err.Error()) - return - } - - atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) - }(ctx, idx, pk) + select { + case <-done: + return + case <-time.After(timeLeft): + log.Debug("timeout while waiting for signatures to be created") + return } - - wg.Wait() - - if numMultiKeysSignaturesCreated > 0 { - log.Debug("step 2: multi keys signatures have been created", "num", numMultiKeysSignaturesCreated) - } - - return true } func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) bool { + // wait for optimistic signatures creation to finish + sr.waitForSingatures() + numMultiKeysSignaturesSent := int32(0) sentSigForAllKeys := atomicCore.Flag{} sentSigForAllKeys.SetValue(true) @@ -301,7 +256,7 @@ func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) b continue } - err := sr.checkGoRoutinesThrottler(ctx) + err := checkGoRoutinesThrottler(ctx, sr.signatureThrottler) if err != nil { return false } @@ -366,21 +321,6 @@ func (sr *subroundSignature) sendSignatureForManagedKey(_ context.Context, idx i return sr.completeSignatureSubRound(pk) } -func (sr *subroundSignature) checkGoRoutinesThrottler(ctx context.Context) error { - for { - if sr.signatureThrottler.CanProcess() { - break - } - select { - case <-time.After(timeSpentBetweenChecks): - continue - case <-ctx.Done(): - return fmt.Errorf("%w while checking the throttler", spos.ErrTimeIsOut) - } - } - return nil -} - func (sr *subroundSignature) doSignatureJobForSingleKey(_ context.Context) bool { pkBytes := []byte(sr.SelfPubKey()) nonce := sr.GetHeader().GetNonce() diff --git a/consensus/spos/bls/v2/subroundSignature_test.go b/consensus/spos/bls/v2/subroundSignature_test.go index 44c0fbf323d..a0fd76fe851 100644 --- a/consensus/spos/bls/v2/subroundSignature_test.go +++ b/consensus/spos/bls/v2/subroundSignature_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync" - "sync/atomic" "testing" "github.com/multiversx/mx-chain-core-go/core" @@ -1007,142 +1006,6 @@ func TestSubroundSignature_DoSignatureJobForManagedKeys(t *testing.T) { }) } -func TestSubroundSignature_CreateSignaturesForManagedKeys(t *testing.T) { - t.Parallel() - - t.Run("should work", func(t *testing.T) { - t.Parallel() - - container := consensusMocks.InitConsensusCore() - enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ - IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { - return flag == common.AndromedaFlag - }, - } - container.SetEnableEpochsHandler(enableEpochsHandler) - - numMultiKeysSignaturesCreated := int32(0) - - signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { - atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) - return []byte("SIG"), nil - }, - } - container.SetSigningHandler(signingHandler) - consensusState := initializers.InitConsensusStateWithKeysHandler( - &testscommon.KeysHandlerStub{ - IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { - return true - }, - }, - ) - ch := make(chan bool, 1) - - sr, _ := spos.NewSubround( - bls.SrBlock, - bls.SrSignature, - bls.SrEndRound, - roundTimeDuration, - 0.7, - 0.85, - "(SIGNATURE)", - consensusState, - ch, - executeStoredMessages, - container, - chainID, - currentPid, - &statusHandler.AppStatusHandlerStub{}, - ) - - srSignature, _ := v2.NewSubroundSignature( - sr, - &statusHandler.AppStatusHandlerStub{}, - &testscommon.SentSignatureTrackerStub{}, - &consensusMocks.SposWorkerMock{}, - &dataRetrieverMock.ThrottlerStub{}, - ) - - sr.SetHeader(&block.Header{}) - sr.SetSelfPubKey("OTHER") - - r := srSignature.CreateSignaturesForManagedKeys(context.TODO()) - assert.True(t, r) - - assert.Equal(t, int32(9), atomic.LoadInt32(&numMultiKeysSignaturesCreated)) // there are 9 keys in default consensus group config - }) - - t.Run("should fail", func(t *testing.T) { - t.Parallel() - - container := consensusMocks.InitConsensusCore() - enableEpochsHandler := &enableEpochsHandlerMock.EnableEpochsHandlerStub{ - IsFlagEnabledInEpochCalled: func(flag core.EnableEpochFlag, epoch uint32) bool { - return flag == common.AndromedaFlag - }, - } - container.SetEnableEpochsHandler(enableEpochsHandler) - - numMultiKeysSignaturesCreated := int32(0) - - signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { - atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) - return []byte("SIG"), nil - }, - } - container.SetSigningHandler(signingHandler) - consensusState := initializers.InitConsensusStateWithKeysHandler( - &testscommon.KeysHandlerStub{ - IsKeyManagedByCurrentNodeCalled: func(pkBytes []byte) bool { - return true - }, - }, - ) - ch := make(chan bool, 1) - - sr, _ := spos.NewSubround( - bls.SrBlock, - bls.SrSignature, - bls.SrEndRound, - roundTimeDuration, - 0.7, - 0.85, - "(SIGNATURE)", - consensusState, - ch, - executeStoredMessages, - container, - chainID, - currentPid, - &statusHandler.AppStatusHandlerStub{}, - ) - - srSignature, _ := v2.NewSubroundSignature( - sr, - &statusHandler.AppStatusHandlerStub{}, - &testscommon.SentSignatureTrackerStub{}, - &consensusMocks.SposWorkerMock{}, - &dataRetrieverMock.ThrottlerStub{ - CanProcessCalled: func() bool { - return false - }, - }, - ) - - sr.SetHeader(&block.Header{}) - sr.SetSelfPubKey("OTHER") - - ctx, cancel := context.WithCancel(context.TODO()) - cancel() - r := srSignature.CreateSignaturesForManagedKeys(ctx) - assert.False(t, r) - - assert.Equal(t, int32(0), atomic.LoadInt32(&numMultiKeysSignaturesCreated)) - }) -} - func TestSubroundSignature_DoSignatureConsensusCheck(t *testing.T) { t.Parallel() diff --git a/consensus/spos/consensusState.go b/consensus/spos/consensusState.go index 7c136ae8be4..01e9845e203 100644 --- a/consensus/spos/consensusState.go +++ b/consensus/spos/consensusState.go @@ -44,6 +44,8 @@ type ConsensusState struct { processingBlock bool mutProcessingBlock sync.RWMutex + signaturesWaitGroup *sync.WaitGroup + *roundConsensus *roundThreshold *roundStatus @@ -79,6 +81,7 @@ func (cns *ConsensusState) ResetConsensusRoundState() { cns.roundCanceled = false cns.extendedCalled = false cns.waitingAllSignaturesTimeOut = false + cns.signaturesWaitGroup = &sync.WaitGroup{} cns.mutState.Unlock() cns.ResetRoundStatus() @@ -520,6 +523,11 @@ func (cns *ConsensusState) SetWaitingAllSignaturesTimeOut(waitingAllSignaturesTi cns.waitingAllSignaturesTimeOut = waitingAllSignaturesTimeOut } +// SignaturesWaitGroup returns wait group for optimistic signatures handling +func (cns *ConsensusState) SignaturesWaitGroup() *sync.WaitGroup { + return cns.signaturesWaitGroup +} + // IsInterfaceNil returns true if there is no value under the interface func (cns *ConsensusState) IsInterfaceNil() bool { return cns == nil diff --git a/consensus/spos/interface.go b/consensus/spos/interface.go index 4c170171bfc..8733a4265b4 100644 --- a/consensus/spos/interface.go +++ b/consensus/spos/interface.go @@ -2,6 +2,7 @@ package spos import ( "context" + "sync" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -231,6 +232,7 @@ type ConsensusStateHandler interface { SetHeader(header data.HeaderHandler) GetWaitingAllSignaturesTimeOut() bool SetWaitingAllSignaturesTimeOut(bool) + SignaturesWaitGroup() *sync.WaitGroup RoundConsensusHandler RoundStatusHandler RoundThresholdHandler From 170d42a20d6aa2d77fe5bcfa8033189e5b1edcd8 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 27 Apr 2026 12:38:49 +0300 Subject: [PATCH 04/11] fix race in tests --- consensus/spos/bls/v2/subroundBlock.go | 4 ++-- testscommon/consensus/consensusStateMock.go | 11 +++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/consensus/spos/bls/v2/subroundBlock.go b/consensus/spos/bls/v2/subroundBlock.go index 6fdbde99616..b5754ea58c7 100644 --- a/consensus/spos/bls/v2/subroundBlock.go +++ b/consensus/spos/bls/v2/subroundBlock.go @@ -404,8 +404,8 @@ func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Conte }(ctx, idx, pk) } - if numMultiKeysSignaturesCreated > 0 { - log.Debug("step 1: multi keys signatures creation has been triggered", "num", numMultiKeysSignaturesCreated) + if atomic.LoadInt32(&numMultiKeysSignaturesCreated) > 0 { + log.Debug("step 1: multi keys signatures creation has been triggered", "num", atomic.LoadInt32(&numMultiKeysSignaturesCreated)) } return true diff --git a/testscommon/consensus/consensusStateMock.go b/testscommon/consensus/consensusStateMock.go index 587abc6b5d8..1c73854e195 100644 --- a/testscommon/consensus/consensusStateMock.go +++ b/testscommon/consensus/consensusStateMock.go @@ -1,6 +1,7 @@ package consensus import ( + "sync" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -86,6 +87,7 @@ type ConsensusStateMock struct { FallbackThresholdCalled func(subroundId int) int SetFallbackThresholdCalled func(subroundId int, threshold int) ResetConsensusRoundStateCalled func() + SignaturesWaitGroupCalled func() *sync.WaitGroup } // AddReceivedHeader - @@ -654,6 +656,15 @@ func (cnsm *ConsensusStateMock) SetThreshold(subroundId int, threshold int) { } } +// SignaturesWaitGroup - +func (cnsm *ConsensusStateMock) SignaturesWaitGroup() *sync.WaitGroup { + if cnsm.SignaturesWaitGroupCalled != nil { + return cnsm.SignaturesWaitGroupCalled() + } + + return nil +} + // IsInterfaceNil returns true if there is no value under the interface func (cnsm *ConsensusStateMock) IsInterfaceNil() bool { return cnsm == nil From 7b76320971d6429ba4d907cf5dd6c8f5aa389949 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 27 Apr 2026 13:35:52 +0300 Subject: [PATCH 05/11] update context timeout --- consensus/spos/bls/v2/subroundBlock.go | 12 +++++++++--- consensus/spos/bls/v2/subroundSignature.go | 1 - 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/consensus/spos/bls/v2/subroundBlock.go b/consensus/spos/bls/v2/subroundBlock.go index b5754ea58c7..32098e9333a 100644 --- a/consensus/spos/bls/v2/subroundBlock.go +++ b/consensus/spos/bls/v2/subroundBlock.go @@ -349,7 +349,10 @@ func (sr *subroundBlock) sendBlockHeader( sr.SetData(headerHash) sr.SetHeader(headerHandler) - sr.triggerCreateSignaturesForManagedKeys(ctx) + timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sr.RoundHandler().TimeDuration()) + sigsCtx, cancel := context.WithTimeout(context.Background(), timeLeft) + defer cancel() + sr.triggerCreateSignaturesForManagedKeys(sigsCtx) // log the header output for debugging purposes headerOutput, err := common.PrettifyStruct(headerHandler) @@ -709,10 +712,13 @@ func (sr *subroundBlock) receivedBlockHeader(headerHandler data.HeaderHandler) { sr.AddReceivedHeader(headerHandler) - ctx, cancel := context.WithTimeout(context.Background(), sr.RoundHandler().TimeDuration()) + timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sr.RoundHandler().TimeDuration()) + sigsCtx, cancel := context.WithTimeout(context.Background(), timeLeft) defer cancel() + sr.triggerCreateSignaturesForManagedKeys(sigsCtx) - sr.triggerCreateSignaturesForManagedKeys(ctx) + ctx, cancel := context.WithTimeout(context.Background(), sr.RoundHandler().TimeDuration()) + defer cancel() _ = sr.processReceivedBlock(ctx, int64(headerHandler.GetRound()), []byte(sr.Leader())) sr.PeerHonestyHandler().ChangeScore( diff --git a/consensus/spos/bls/v2/subroundSignature.go b/consensus/spos/bls/v2/subroundSignature.go index 0d688267b10..843ea8d9c50 100644 --- a/consensus/spos/bls/v2/subroundSignature.go +++ b/consensus/spos/bls/v2/subroundSignature.go @@ -224,7 +224,6 @@ func (sr *subroundSignature) waitForSingatures() { close(done) }() - // TODO: analyse this more timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sr.RoundHandler().TimeDuration()) select { From 4bbc8eae65e25c08aa382c4ea28ab19d35b8477a Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 28 Apr 2026 15:15:01 +0300 Subject: [PATCH 06/11] trigger sigs creation from block subround - without timeout ctx --- consensus/spos/bls/v2/subroundBlock.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/consensus/spos/bls/v2/subroundBlock.go b/consensus/spos/bls/v2/subroundBlock.go index 32098e9333a..5508b0ccb6d 100644 --- a/consensus/spos/bls/v2/subroundBlock.go +++ b/consensus/spos/bls/v2/subroundBlock.go @@ -349,10 +349,7 @@ func (sr *subroundBlock) sendBlockHeader( sr.SetData(headerHash) sr.SetHeader(headerHandler) - timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sr.RoundHandler().TimeDuration()) - sigsCtx, cancel := context.WithTimeout(context.Background(), timeLeft) - defer cancel() - sr.triggerCreateSignaturesForManagedKeys(sigsCtx) + sr.triggerCreateSignaturesForManagedKeys(ctx) // log the header output for debugging purposes headerOutput, err := common.PrettifyStruct(headerHandler) @@ -712,10 +709,7 @@ func (sr *subroundBlock) receivedBlockHeader(headerHandler data.HeaderHandler) { sr.AddReceivedHeader(headerHandler) - timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sr.RoundHandler().TimeDuration()) - sigsCtx, cancel := context.WithTimeout(context.Background(), timeLeft) - defer cancel() - sr.triggerCreateSignaturesForManagedKeys(sigsCtx) + sr.triggerCreateSignaturesForManagedKeys(context.Background()) ctx, cancel := context.WithTimeout(context.Background(), sr.RoundHandler().TimeDuration()) defer cancel() From ab81017b09e3bc59633b6baf77e3cdd344487fa8 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 28 Apr 2026 17:10:40 +0300 Subject: [PATCH 07/11] add context in singing handler for create sig --- consensus/interface.go | 2 +- consensus/spos/bls/v1/subroundSignature.go | 8 ++++--- .../spos/bls/v1/subroundSignature_test.go | 9 ++++---- .../spos/bls/v2/benchmark_send_proof_test.go | 5 +++-- .../v2/benchmark_verify_signatures_test.go | 2 +- consensus/spos/bls/v2/subroundBlock.go | 11 +++++++--- consensus/spos/bls/v2/subroundBlock_test.go | 4 ++-- consensus/spos/bls/v2/subroundSignature.go | 7 ++++-- .../subroundSignatureCompetingBlock_test.go | 2 +- .../spos/bls/v2/subroundSignature_test.go | 12 +++++----- consensus/spos/consensusState.go | 22 ++++++++++++++++++- consensus/spos/interface.go | 2 ++ consensus/spos/worker.go | 2 ++ factory/crypto/errors.go | 3 +++ factory/crypto/signingHandler.go | 21 +++++++++++++++++- factory/crypto/signingHandler_test.go | 9 ++++---- node/chainSimulator/process/processor.go | 3 ++- testscommon/consensus/consensusStateMock.go | 17 ++++++++++++++ testscommon/consensus/signingHandlerStub.go | 12 ++++++---- 19 files changed, 117 insertions(+), 36 deletions(-) diff --git a/consensus/interface.go b/consensus/interface.go index 266d2ff296f..7c6c4ad5714 100644 --- a/consensus/interface.go +++ b/consensus/interface.go @@ -189,7 +189,7 @@ type PeerBlacklistHandler interface { // SigningHandler defines the behaviour of a component that handles multi and single signatures used in consensus operations type SigningHandler interface { Reset(pubKeys []string) error - CreateSignatureShareForPublicKey(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) + CreateSignatureShareForPublicKey(ctx context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) CreateSignatureForPublicKey(message []byte, publicKeyBytes []byte) ([]byte, error) VerifySingleSignature(publicKeyBytes []byte, message []byte, signature []byte) error StoreSignatureShare(index uint16, sig []byte) error diff --git a/consensus/spos/bls/v1/subroundSignature.go b/consensus/spos/bls/v1/subroundSignature.go index 360aba6e48a..d566ff163e4 100644 --- a/consensus/spos/bls/v1/subroundSignature.go +++ b/consensus/spos/bls/v1/subroundSignature.go @@ -73,7 +73,7 @@ func checkNewSubroundSignatureParams( } // doSignatureJob method does the job of the subround Signature -func (sr *subroundSignature) doSignatureJob(_ context.Context) bool { +func (sr *subroundSignature) doSignatureJob(ctx context.Context) bool { if !sr.CanDoSubroundJob(sr.Current()) { return false } @@ -93,6 +93,7 @@ func (sr *subroundSignature) doSignatureJob(_ context.Context) bool { } signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + ctx, sr.GetData(), uint16(selfIndex), sr.GetHeader().GetEpoch(), @@ -116,7 +117,7 @@ func (sr *subroundSignature) doSignatureJob(_ context.Context) bool { } } - return sr.doSignatureJobForManagedKeys() + return sr.doSignatureJobForManagedKeys(ctx) } func (sr *subroundSignature) createAndSendSignatureMessage(signatureShare []byte, pkBytes []byte) bool { @@ -351,7 +352,7 @@ func (sr *subroundSignature) remainingTime() time.Duration { return remainigTime } -func (sr *subroundSignature) doSignatureJobForManagedKeys() bool { +func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) bool { isMultiKeyLeader := sr.IsMultiKeyLeaderInCurrentRound() numMultiKeysSignaturesSent := 0 @@ -371,6 +372,7 @@ func (sr *subroundSignature) doSignatureJobForManagedKeys() bool { } signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + ctx, sr.GetData(), uint16(selfIndex), sr.GetHeader().GetEpoch(), diff --git a/consensus/spos/bls/v1/subroundSignature_test.go b/consensus/spos/bls/v1/subroundSignature_test.go index 03e5101cd5f..79ca52835ce 100644 --- a/consensus/spos/bls/v1/subroundSignature_test.go +++ b/consensus/spos/bls/v1/subroundSignature_test.go @@ -1,6 +1,7 @@ package v1_test import ( + "context" "testing" "github.com/multiversx/mx-chain-core-go/core/check" @@ -360,7 +361,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { err := errors.New("create signature share error") signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return nil, err }, } @@ -370,7 +371,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { assert.False(t, r) signingHandler = &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, } @@ -441,7 +442,7 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) { err := errors.New("create signature share error") signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return nil, err }, } @@ -451,7 +452,7 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) { assert.False(t, r) signingHandler = &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, } diff --git a/consensus/spos/bls/v2/benchmark_send_proof_test.go b/consensus/spos/bls/v2/benchmark_send_proof_test.go index a589e4ae683..33e68e4e4a8 100644 --- a/consensus/spos/bls/v2/benchmark_send_proof_test.go +++ b/consensus/spos/bls/v2/benchmark_send_proof_test.go @@ -1,6 +1,7 @@ package v2_test import ( + "context" "sort" "testing" @@ -171,7 +172,7 @@ func benchmarkSendProof(b *testing.B, numberOfKeys int) { // Create signature shares for all validators (simulating that all have signed) for i := 0; i < len(keys); i++ { - _, err := signingHandler.CreateSignatureShareForPublicKey(dataToBeSigned, uint16(i), 0, []byte(keys[i])) + _, err := signingHandler.CreateSignatureShareForPublicKey(context.TODO(), dataToBeSigned, uint16(i), 0, []byte(keys[i])) require.Nil(b, err) err = srEndRound.SetJobDone(keys[i], bls.SrSignature, true) require.Nil(b, err) @@ -183,7 +184,7 @@ func benchmarkSendProof(b *testing.B, numberOfKeys int) { for i := 0; i < b.N; i++ { // Reset signature state for each iteration for j := 0; j < len(keys); j++ { - _, _ = signingHandler.CreateSignatureShareForPublicKey(dataToBeSigned, uint16(j), 0, []byte(keys[j])) + _, _ = signingHandler.CreateSignatureShareForPublicKey(context.TODO(), dataToBeSigned, uint16(j), 0, []byte(keys[j])) } b.StartTimer() diff --git a/consensus/spos/bls/v2/benchmark_verify_signatures_test.go b/consensus/spos/bls/v2/benchmark_verify_signatures_test.go index 505a83dbcae..dc95dfbd818 100644 --- a/consensus/spos/bls/v2/benchmark_verify_signatures_test.go +++ b/consensus/spos/bls/v2/benchmark_verify_signatures_test.go @@ -114,7 +114,7 @@ func BenchmarkSubroundEndRound_VerifyNodesOnAggSigFailTime(b *testing.B) { sr := initSubroundEndRoundWithContainerAndConsensusState(container, &statusHandler.AppStatusHandlerStub{}, consensusState) for i := 0; i < len(sr.ConsensusGroup()); i++ { - _, err := sr.SigningHandler().CreateSignatureShareForPublicKey(dataToBeSigned, uint16(i), sr.EnableEpochsHandler().GetCurrentEpoch(), []byte(keys[i])) + _, err := sr.SigningHandler().CreateSignatureShareForPublicKey(context.TODO(), dataToBeSigned, uint16(i), sr.EnableEpochsHandler().GetCurrentEpoch(), []byte(keys[i])) require.Nil(b, err) _ = sr.SetJobDone(keys[i], bls.SrSignature, true) } diff --git a/consensus/spos/bls/v2/subroundBlock.go b/consensus/spos/bls/v2/subroundBlock.go index 5508b0ccb6d..293661a090e 100644 --- a/consensus/spos/bls/v2/subroundBlock.go +++ b/consensus/spos/bls/v2/subroundBlock.go @@ -361,6 +361,10 @@ func (sr *subroundBlock) sendBlockHeader( } func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Context) bool { + timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sr.RoundHandler().TimeDuration()) + sigCtx, cancel := context.WithTimeout(ctx, timeLeft) + sr.SetSignaturesCtxCancelFunc(cancel) + numMultiKeysSignaturesCreated := int32(0) for idx, pk := range sr.ConsensusGroup() { @@ -376,12 +380,12 @@ func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Conte sr.signatureThrottler.StartProcessing() sr.SignaturesWaitGroup().Add(1) - go func(ctx context.Context, idx int, pk string) { + go func(sigCtx context.Context, idx int, pk string) { defer sr.signatureThrottler.EndProcessing() defer sr.SignaturesWaitGroup().Done() select { - case <-ctx.Done(): + case <-sigCtx.Done(): return default: } @@ -390,6 +394,7 @@ func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Conte currentHash := sr.GetData() _, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + sigCtx, currentHash, uint16(idx), sr.GetHeader().GetEpoch(), @@ -401,7 +406,7 @@ func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Conte } atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) - }(ctx, idx, pk) + }(sigCtx, idx, pk) } if atomic.LoadInt32(&numMultiKeysSignaturesCreated) > 0 { diff --git a/consensus/spos/bls/v2/subroundBlock_test.go b/consensus/spos/bls/v2/subroundBlock_test.go index e8fe8af9fb6..7458b508989 100644 --- a/consensus/spos/bls/v2/subroundBlock_test.go +++ b/consensus/spos/bls/v2/subroundBlock_test.go @@ -1855,7 +1855,7 @@ func TestSubroundBlock_TriggerCreateSignaturesForManagedKeys(t *testing.T) { numMultiKeysSignaturesCreated := int32(0) signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) return []byte("SIG"), nil }, @@ -1920,7 +1920,7 @@ func TestSubroundBlock_TriggerCreateSignaturesForManagedKeys(t *testing.T) { numMultiKeysSignaturesCreated := int32(0) signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) return []byte("SIG"), nil }, diff --git a/consensus/spos/bls/v2/subroundSignature.go b/consensus/spos/bls/v2/subroundSignature.go index 843ea8d9c50..8b8ef973bb2 100644 --- a/consensus/spos/bls/v2/subroundSignature.go +++ b/consensus/spos/bls/v2/subroundSignature.go @@ -228,6 +228,7 @@ func (sr *subroundSignature) waitForSingatures() { select { case <-done: + sr.SignaturesCtxCancel() return case <-time.After(timeLeft): log.Debug("timeout while waiting for signatures to be created") @@ -284,7 +285,7 @@ func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) b return sentSigForAllKeys.IsSet() } -func (sr *subroundSignature) sendSignatureForManagedKey(_ context.Context, idx int, pk string) bool { +func (sr *subroundSignature) sendSignatureForManagedKey(ctx context.Context, idx int, pk string) bool { pkBytes := []byte(pk) nonce := sr.GetHeader().GetNonce() currentHash := sr.GetData() @@ -295,6 +296,7 @@ func (sr *subroundSignature) sendSignatureForManagedKey(_ context.Context, idx i // will try to create it signatureShare, err = sr.SigningHandler().CreateSignatureShareForPublicKey( + ctx, currentHash, uint16(idx), sr.GetHeader().GetEpoch(), @@ -320,7 +322,7 @@ func (sr *subroundSignature) sendSignatureForManagedKey(_ context.Context, idx i return sr.completeSignatureSubRound(pk) } -func (sr *subroundSignature) doSignatureJobForSingleKey(_ context.Context) bool { +func (sr *subroundSignature) doSignatureJobForSingleKey(ctx context.Context) bool { pkBytes := []byte(sr.SelfPubKey()) nonce := sr.GetHeader().GetNonce() currentHash := sr.GetData() @@ -332,6 +334,7 @@ func (sr *subroundSignature) doSignatureJobForSingleKey(_ context.Context) bool } signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey( + ctx, currentHash, uint16(selfIndex), sr.GetHeader().GetEpoch(), diff --git a/consensus/spos/bls/v2/subroundSignatureCompetingBlock_test.go b/consensus/spos/bls/v2/subroundSignatureCompetingBlock_test.go index 96ad2f61953..cdbcc6b3d64 100644 --- a/consensus/spos/bls/v2/subroundSignatureCompetingBlock_test.go +++ b/consensus/spos/bls/v2/subroundSignatureCompetingBlock_test.go @@ -315,7 +315,7 @@ func TestWaitIfCompetingBlock_RecordSignedNonceCalledBeforeBroadcast(t *testing. container := consensusMocks.InitConsensusCore() container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, }) diff --git a/consensus/spos/bls/v2/subroundSignature_test.go b/consensus/spos/bls/v2/subroundSignature_test.go index a0fd76fe851..76bd07bf1f8 100644 --- a/consensus/spos/bls/v2/subroundSignature_test.go +++ b/consensus/spos/bls/v2/subroundSignature_test.go @@ -507,7 +507,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) { container := consensusMocks.InitConsensusCore() signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, } @@ -597,7 +597,7 @@ func TestSubroundSignature_SendSignature(t *testing.T) { container := consensusMocks.InitConsensusCore() container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return make([]byte, 0), expErr }, }) @@ -652,7 +652,7 @@ func TestSubroundSignature_SendSignature(t *testing.T) { container := consensusMocks.InitConsensusCore() container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, }) @@ -720,7 +720,7 @@ func TestSubroundSignature_SendSignature(t *testing.T) { container := consensusMocks.InitConsensusCore() container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, }) @@ -790,7 +790,7 @@ func TestSubroundSignature_SendSignature(t *testing.T) { container := consensusMocks.InitConsensusCore() container.SetSigningHandler(&consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { assert.Fail(t, "should not have been called") return []byte(""), nil }, @@ -874,7 +874,7 @@ func TestSubroundSignature_DoSignatureJobForManagedKeys(t *testing.T) { container.SetEnableEpochsHandler(enableEpochsHandler) signingHandler := &consensusMocks.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return []byte("SIG"), nil }, } diff --git a/consensus/spos/consensusState.go b/consensus/spos/consensusState.go index 01e9845e203..9ba7c0bfc88 100644 --- a/consensus/spos/consensusState.go +++ b/consensus/spos/consensusState.go @@ -2,6 +2,7 @@ package spos import ( "bytes" + "context" "sync" "time" @@ -44,7 +45,8 @@ type ConsensusState struct { processingBlock bool mutProcessingBlock sync.RWMutex - signaturesWaitGroup *sync.WaitGroup + signaturesWaitGroup *sync.WaitGroup + signaturesTimeoutCtxCancel context.CancelFunc *roundConsensus *roundThreshold @@ -528,6 +530,24 @@ func (cns *ConsensusState) SignaturesWaitGroup() *sync.WaitGroup { return cns.signaturesWaitGroup } +// SetSignaturesCtxCancelFunc will set signatures context cancel function +func (cns *ConsensusState) SetSignaturesCtxCancelFunc(cancelFunc context.CancelFunc) { + cns.mutState.Lock() + defer cns.mutState.Unlock() + + cns.signaturesTimeoutCtxCancel = cancelFunc +} + +// SignaturesCtxCancel will cancel signatures context +func (cns *ConsensusState) SignaturesCtxCancel() { + cns.mutState.RLock() + defer cns.mutState.RUnlock() + + if cns.signaturesTimeoutCtxCancel != nil { + cns.signaturesTimeoutCtxCancel() + } +} + // IsInterfaceNil returns true if there is no value under the interface func (cns *ConsensusState) IsInterfaceNil() bool { return cns == nil diff --git a/consensus/spos/interface.go b/consensus/spos/interface.go index 8733a4265b4..78d4576e2c2 100644 --- a/consensus/spos/interface.go +++ b/consensus/spos/interface.go @@ -233,6 +233,8 @@ type ConsensusStateHandler interface { GetWaitingAllSignaturesTimeOut() bool SetWaitingAllSignaturesTimeOut(bool) SignaturesWaitGroup() *sync.WaitGroup + SetSignaturesCtxCancelFunc(cancelFunc context.CancelFunc) + SignaturesCtxCancel() RoundConsensusHandler RoundStatusHandler RoundThresholdHandler diff --git a/consensus/spos/worker.go b/consensus/spos/worker.go index 20556782702..0fd4f99e070 100644 --- a/consensus/spos/worker.go +++ b/consensus/spos/worker.go @@ -856,6 +856,8 @@ func (wrk *Worker) callReceivedHeaderCallbacks(message *consensus.Message) { // Extend does an extension for the subround with subroundId func (wrk *Worker) Extend(subroundId int) { wrk.consensusState.SetExtendedCalled(true) + wrk.consensusState.SignaturesCtxCancel() + log.Debug("extend function is called", "subround", wrk.consensusService.GetSubroundName(subroundId)) diff --git a/factory/crypto/errors.go b/factory/crypto/errors.go index 513e7f09e88..689e9ee2a33 100644 --- a/factory/crypto/errors.go +++ b/factory/crypto/errors.go @@ -43,3 +43,6 @@ var ErrNilMessage = errors.New("message to be signed or to be verified is nil") // ErrBitmapMismatch is raised when an invalid bitmap is passed to the multisigner var ErrBitmapMismatch = errors.New("multi signer reported a mismatch in used bitmap") + +// ErrTimeIsOut is raised when time is out for signing operation +var ErrTimeIsOut = errors.New("timeout while handling signatures") diff --git a/factory/crypto/signingHandler.go b/factory/crypto/signingHandler.go index f2a30671f43..bc16f159c6b 100644 --- a/factory/crypto/signingHandler.go +++ b/factory/crypto/signingHandler.go @@ -1,6 +1,7 @@ package crypto import ( + "context" "sync" "github.com/multiversx/mx-chain-core-go/core/check" @@ -135,11 +136,23 @@ func (sh *signingHandler) Reset(pubKeys []string) error { // CreateSignatureShareForPublicKey returns a signature over a message using the managed private key that was selected based on the provided // publicKeyBytes argument -func (sh *signingHandler) CreateSignatureShareForPublicKey(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { +func (sh *signingHandler) CreateSignatureShareForPublicKey( + ctx context.Context, + message []byte, + index uint16, + epoch uint32, + publicKeyBytes []byte, +) ([]byte, error) { if message == nil { return nil, ErrNilMessage } + select { + case <-ctx.Done(): + return nil, ErrTimeIsOut + default: + } + privateKey := sh.keysHandler.GetHandledPrivateKey(publicKeyBytes) privateKeyBytes, err := privateKey.ToByteArray() if err != nil { @@ -159,6 +172,12 @@ func (sh *signingHandler) CreateSignatureShareForPublicKey(message []byte, index return nil, err } + select { + case <-ctx.Done(): + return nil, ErrTimeIsOut + default: + } + sh.data.sigShares[index] = sigShareBytes return sigShareBytes, nil diff --git a/factory/crypto/signingHandler_test.go b/factory/crypto/signingHandler_test.go index 281a842e3bf..6fcd4999749 100644 --- a/factory/crypto/signingHandler_test.go +++ b/factory/crypto/signingHandler_test.go @@ -1,6 +1,7 @@ package crypto_test import ( + "context" "errors" "testing" @@ -164,7 +165,7 @@ func TestSigningHandler_CreateSignatureShareForPublicKey(t *testing.T) { t.Parallel() signer, _ := cryptoFactory.NewSigningHandler(createMockArgsSigningHandler()) - sigShare, err := signer.CreateSignatureShareForPublicKey(nil, selfIndex, epoch, pkBytes) + sigShare, err := signer.CreateSignatureShareForPublicKey(context.TODO(), nil, selfIndex, epoch, pkBytes) require.Nil(t, sigShare) require.Equal(t, cryptoFactory.ErrNilMessage, err) }) @@ -182,7 +183,7 @@ func TestSigningHandler_CreateSignatureShareForPublicKey(t *testing.T) { args.MultiSignerContainer = cryptoMocks.NewMultiSignerContainerMock(multiSigner) signer, _ := cryptoFactory.NewSigningHandler(args) - sigShare, err := signer.CreateSignatureShareForPublicKey([]byte("msg1"), selfIndex, epoch, pkBytes) + sigShare, err := signer.CreateSignatureShareForPublicKey(context.TODO(), []byte("msg1"), selfIndex, epoch, pkBytes) require.Nil(t, sigShare) require.Equal(t, expectedErr, err) }) @@ -200,7 +201,7 @@ func TestSigningHandler_CreateSignatureShareForPublicKey(t *testing.T) { signer, _ := cryptoFactory.NewSigningHandler(args) - sigShare, err := signer.CreateSignatureShareForPublicKey([]byte("message"), uint16(0), epoch, pkBytes) + sigShare, err := signer.CreateSignatureShareForPublicKey(context.TODO(), []byte("message"), uint16(0), epoch, pkBytes) require.Nil(t, sigShare) require.Equal(t, expectedErr, err) }) @@ -227,7 +228,7 @@ func TestSigningHandler_CreateSignatureShareForPublicKey(t *testing.T) { args.MultiSignerContainer = cryptoMocks.NewMultiSignerContainerMock(multiSigner) signer, _ := cryptoFactory.NewSigningHandler(args) - sigShare, err := signer.CreateSignatureShareForPublicKey([]byte("msg1"), selfIndex, epoch, pkBytes) + sigShare, err := signer.CreateSignatureShareForPublicKey(context.TODO(), []byte("msg1"), selfIndex, epoch, pkBytes) require.Nil(t, err) require.Equal(t, expectedSigShare, sigShare) assert.True(t, getHandledPrivateKeyCalled) diff --git a/node/chainSimulator/process/processor.go b/node/chainSimulator/process/processor.go index 9612c4cc22e..7a459f9bd7a 100644 --- a/node/chainSimulator/process/processor.go +++ b/node/chainSimulator/process/processor.go @@ -1,6 +1,7 @@ package process import ( + "context" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -476,7 +477,7 @@ func (creator *blocksCreator) generateAggregatedSignature(headerHash []byte, epo } totalKey++ - if _, err = signingHandler.CreateSignatureShareForPublicKey(headerHash, uint16(idx), epoch, []byte(pubKey)); err != nil { + if _, err = signingHandler.CreateSignatureShareForPublicKey(context.TODO(), headerHash, uint16(idx), epoch, []byte(pubKey)); err != nil { return nil, err } } diff --git a/testscommon/consensus/consensusStateMock.go b/testscommon/consensus/consensusStateMock.go index 1c73854e195..a3fa8523a8c 100644 --- a/testscommon/consensus/consensusStateMock.go +++ b/testscommon/consensus/consensusStateMock.go @@ -1,6 +1,7 @@ package consensus import ( + "context" "sync" "time" @@ -88,6 +89,8 @@ type ConsensusStateMock struct { SetFallbackThresholdCalled func(subroundId int, threshold int) ResetConsensusRoundStateCalled func() SignaturesWaitGroupCalled func() *sync.WaitGroup + SetSignaturesCtxCancelFuncCalled func(cancelFunc context.CancelFunc) + SignaturesCtxCancelCalled func() } // AddReceivedHeader - @@ -665,6 +668,20 @@ func (cnsm *ConsensusStateMock) SignaturesWaitGroup() *sync.WaitGroup { return nil } +// SetSignaturesCtxCancelFunc - +func (cnsm *ConsensusStateMock) SetSignaturesCtxCancelFunc(cancelFunc context.CancelFunc) { + if cnsm.SetSignaturesCtxCancelFuncCalled != nil { + cnsm.SetSignaturesCtxCancelFuncCalled(cancelFunc) + } +} + +// SignaturesCtxCancel - +func (cnsm *ConsensusStateMock) SignaturesCtxCancel() { + if cnsm.SignaturesCtxCancelCalled != nil { + cnsm.SignaturesCtxCancelCalled() + } +} + // IsInterfaceNil returns true if there is no value under the interface func (cnsm *ConsensusStateMock) IsInterfaceNil() bool { return cnsm == nil diff --git a/testscommon/consensus/signingHandlerStub.go b/testscommon/consensus/signingHandlerStub.go index 79127a17797..8e9ca4a9b2d 100644 --- a/testscommon/consensus/signingHandlerStub.go +++ b/testscommon/consensus/signingHandlerStub.go @@ -1,11 +1,15 @@ package consensus -import crypto "github.com/multiversx/mx-chain-crypto-go" +import ( + "context" + + crypto "github.com/multiversx/mx-chain-crypto-go" +) // SigningHandlerStub implements SigningHandler interface type SigningHandlerStub struct { ResetCalled func(pubKeys []string) error - CreateSignatureShareForPublicKeyCalled func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) + CreateSignatureShareForPublicKeyCalled func(ctx context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) CreateSignatureForPublicKeyCalled func(message []byte, publicKeyBytes []byte) ([]byte, error) VerifySingleSignatureCalled func(publicKeyBytes []byte, message []byte, signature []byte) error StoreSignatureShareCalled func(index uint16, sig []byte) error @@ -27,9 +31,9 @@ func (stub *SigningHandlerStub) Reset(pubKeys []string) error { } // CreateSignatureShareForPublicKey - -func (stub *SigningHandlerStub) CreateSignatureShareForPublicKey(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { +func (stub *SigningHandlerStub) CreateSignatureShareForPublicKey(ctx context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { if stub.CreateSignatureShareForPublicKeyCalled != nil { - return stub.CreateSignatureShareForPublicKeyCalled(message, index, epoch, publicKeyBytes) + return stub.CreateSignatureShareForPublicKeyCalled(ctx, message, index, epoch, publicKeyBytes) } return make([]byte, 0), nil From f88f1e5a6776a2292c8170c20ecfa887932da171 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 28 Apr 2026 17:24:53 +0300 Subject: [PATCH 08/11] fix test --- node/chainSimulator/process/processor_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/chainSimulator/process/processor_test.go b/node/chainSimulator/process/processor_test.go index d5c637aef8c..1d22e3a2d42 100644 --- a/node/chainSimulator/process/processor_test.go +++ b/node/chainSimulator/process/processor_test.go @@ -1,6 +1,7 @@ package process_test import ( + "context" "errors" "testing" "time" @@ -390,7 +391,7 @@ func TestBlocksCreator_CreateNewBlock(t *testing.T) { return &mock.CryptoComponentsStub{ KeysHandlerField: kh, SigHandler: &testsConsensus.SigningHandlerStub{ - CreateSignatureShareForPublicKeyCalled: func(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { + CreateSignatureShareForPublicKeyCalled: func(_ context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) { return nil, expectedErr }, }, From fb7b9b71f7ac554206dcf1ced00d38798c44fdfd Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 4 May 2026 18:47:04 +0300 Subject: [PATCH 09/11] extra logging --- consensus/spos/bls/v2/subroundBlock.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/consensus/spos/bls/v2/subroundBlock.go b/consensus/spos/bls/v2/subroundBlock.go index 293661a090e..eb14a64566d 100644 --- a/consensus/spos/bls/v2/subroundBlock.go +++ b/consensus/spos/bls/v2/subroundBlock.go @@ -375,6 +375,7 @@ func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Conte err := checkGoRoutinesThrottler(ctx, sr.signatureThrottler) if err != nil { + log.Debug("triggerCreateSignaturesForManagedKeys.checkGoRoutinesThrottler", "err", err) return false } sr.signatureThrottler.StartProcessing() @@ -386,6 +387,7 @@ func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Conte select { case <-sigCtx.Done(): + log.Debug("triggerCreateSignaturesForManagedKeys: context done", "timeLeft", timeLeft) return default: } @@ -401,7 +403,7 @@ func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Conte pkBytes, ) if err != nil { - log.Debug("createSignaturesForManagedKeys.CreateSignatureShareForPublicKey", "error", err.Error()) + log.Debug("triggerCreateSignaturesForManagedKeys.CreateSignatureShareForPublicKey", "error", err.Error()) return } From 40875b512821b5c64991730e7d94da5c46d53d47 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 5 May 2026 12:36:35 +0300 Subject: [PATCH 10/11] adjust time left --- consensus/spos/bls/v2/subroundBlock.go | 4 +++- consensus/spos/bls/v2/subroundSignature.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/consensus/spos/bls/v2/subroundBlock.go b/consensus/spos/bls/v2/subroundBlock.go index eb14a64566d..3b82bc7bde3 100644 --- a/consensus/spos/bls/v2/subroundBlock.go +++ b/consensus/spos/bls/v2/subroundBlock.go @@ -361,7 +361,9 @@ func (sr *subroundBlock) sendBlockHeader( } func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Context) bool { - timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sr.RoundHandler().TimeDuration()) + sigSubroundEndTime := time.Duration(float64(sr.RoundHandler().TimeDuration()) * srSignatureEndTime) + timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sigSubroundEndTime) + sigCtx, cancel := context.WithTimeout(ctx, timeLeft) sr.SetSignaturesCtxCancelFunc(cancel) diff --git a/consensus/spos/bls/v2/subroundSignature.go b/consensus/spos/bls/v2/subroundSignature.go index 8b8ef973bb2..69873cce8cf 100644 --- a/consensus/spos/bls/v2/subroundSignature.go +++ b/consensus/spos/bls/v2/subroundSignature.go @@ -224,7 +224,7 @@ func (sr *subroundSignature) waitForSingatures() { close(done) }() - timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sr.RoundHandler().TimeDuration()) + timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), time.Duration(sr.EndTime())) select { case <-done: From 099ce93cb8a0fe476b96df486b875cdca89554e4 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 5 May 2026 16:08:39 +0300 Subject: [PATCH 11/11] update logging for triggered optimistic sigs --- consensus/spos/bls/v2/subroundBlock.go | 9 +-------- consensus/spos/bls/v2/subroundSignature.go | 1 + 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/consensus/spos/bls/v2/subroundBlock.go b/consensus/spos/bls/v2/subroundBlock.go index 3b82bc7bde3..7f41fa86c5e 100644 --- a/consensus/spos/bls/v2/subroundBlock.go +++ b/consensus/spos/bls/v2/subroundBlock.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "sync" - "sync/atomic" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -367,8 +366,6 @@ func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Conte sigCtx, cancel := context.WithTimeout(ctx, timeLeft) sr.SetSignaturesCtxCancelFunc(cancel) - numMultiKeysSignaturesCreated := int32(0) - for idx, pk := range sr.ConsensusGroup() { pkBytes := []byte(pk) if !sr.IsKeyManagedBySelf(pkBytes) { @@ -408,14 +405,10 @@ func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Conte log.Debug("triggerCreateSignaturesForManagedKeys.CreateSignatureShareForPublicKey", "error", err.Error()) return } - - atomic.AddInt32(&numMultiKeysSignaturesCreated, 1) }(sigCtx, idx, pk) } - if atomic.LoadInt32(&numMultiKeysSignaturesCreated) > 0 { - log.Debug("step 1: multi keys signatures creation has been triggered", "num", atomic.LoadInt32(&numMultiKeysSignaturesCreated)) - } + log.Debug("step 1: multi keys signatures creation has been triggered") return true } diff --git a/consensus/spos/bls/v2/subroundSignature.go b/consensus/spos/bls/v2/subroundSignature.go index 69873cce8cf..e997741d1d1 100644 --- a/consensus/spos/bls/v2/subroundSignature.go +++ b/consensus/spos/bls/v2/subroundSignature.go @@ -294,6 +294,7 @@ func (sr *subroundSignature) sendSignatureForManagedKey(ctx context.Context, idx if err != nil { // signature share not found (optimistic signature share creation was not triggered) // will try to create it + log.Debug("sendSignatureForManagedKey.SignatureShare: sig not already created, will try to create it", "error", err) signatureShare, err = sr.SigningHandler().CreateSignatureShareForPublicKey( ctx,