Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type PeerBlacklistHandler interface {
// SigningHandler defines the behaviour of a component that handles multi and single signatures used in consensus operations
type SigningHandler interface {
Reset(pubKeys []string) error
CreateSignatureShareForPublicKey(message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error)
CreateSignatureShareForPublicKey(ctx context.Context, message []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error)
CreateSignatureForPublicKey(message []byte, publicKeyBytes []byte) ([]byte, error)
VerifySingleSignature(publicKeyBytes []byte, message []byte, signature []byte) error
StoreSignatureShare(index uint16, sig []byte) error
Expand Down
8 changes: 5 additions & 3 deletions consensus/spos/bls/v1/subroundSignature.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func checkNewSubroundSignatureParams(
}

// doSignatureJob method does the job of the subround Signature
func (sr *subroundSignature) doSignatureJob(_ context.Context) bool {
func (sr *subroundSignature) doSignatureJob(ctx context.Context) bool {
if !sr.CanDoSubroundJob(sr.Current()) {
return false
}
Expand All @@ -93,6 +93,7 @@ func (sr *subroundSignature) doSignatureJob(_ context.Context) bool {
}

signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey(
ctx,
sr.GetData(),
uint16(selfIndex),
sr.GetHeader().GetEpoch(),
Expand All @@ -116,7 +117,7 @@ func (sr *subroundSignature) doSignatureJob(_ context.Context) bool {
}
}

return sr.doSignatureJobForManagedKeys()
return sr.doSignatureJobForManagedKeys(ctx)
}

func (sr *subroundSignature) createAndSendSignatureMessage(signatureShare []byte, pkBytes []byte) bool {
Expand Down Expand Up @@ -351,7 +352,7 @@ func (sr *subroundSignature) remainingTime() time.Duration {
return remainigTime
}

func (sr *subroundSignature) doSignatureJobForManagedKeys() bool {
func (sr *subroundSignature) doSignatureJobForManagedKeys(ctx context.Context) bool {
isMultiKeyLeader := sr.IsMultiKeyLeaderInCurrentRound()

numMultiKeysSignaturesSent := 0
Expand All @@ -371,6 +372,7 @@ func (sr *subroundSignature) doSignatureJobForManagedKeys() bool {
}

signatureShare, err := sr.SigningHandler().CreateSignatureShareForPublicKey(
ctx,
sr.GetData(),
uint16(selfIndex),
sr.GetHeader().GetEpoch(),
Expand Down
9 changes: 5 additions & 4 deletions consensus/spos/bls/v1/subroundSignature_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1_test

import (
"context"
"testing"

"github.com/multiversx/mx-chain-core-go/core/check"
Expand Down Expand Up @@ -360,7 +361,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) {

err := errors.New("create signature share error")
signingHandler := &consensusMocks.SigningHandlerStub{
CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) {
CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) {
return nil, err
},
}
Expand All @@ -370,7 +371,7 @@ func TestSubroundSignature_DoSignatureJob(t *testing.T) {
assert.False(t, r)

signingHandler = &consensusMocks.SigningHandlerStub{
CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) {
CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) {
return []byte("SIG"), nil
},
}
Expand Down Expand Up @@ -441,7 +442,7 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) {

err := errors.New("create signature share error")
signingHandler := &consensusMocks.SigningHandlerStub{
CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) {
CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) {
return nil, err
},
}
Expand All @@ -451,7 +452,7 @@ func TestSubroundSignature_DoSignatureJobWithMultikey(t *testing.T) {
assert.False(t, r)

signingHandler = &consensusMocks.SigningHandlerStub{
CreateSignatureShareForPublicKeyCalled: func(msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) {
CreateSignatureShareForPublicKeyCalled: func(_ context.Context, msg []byte, index uint16, epoch uint32, publicKeyBytes []byte) ([]byte, error) {
return []byte("SIG"), nil
},
}
Expand Down
5 changes: 3 additions & 2 deletions consensus/spos/bls/v2/benchmark_send_proof_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2_test

import (
"context"
"sort"
"testing"

Expand Down Expand Up @@ -171,7 +172,7 @@ func benchmarkSendProof(b *testing.B, numberOfKeys int) {

// Create signature shares for all validators (simulating that all have signed)
for i := 0; i < len(keys); i++ {
_, err := signingHandler.CreateSignatureShareForPublicKey(dataToBeSigned, uint16(i), 0, []byte(keys[i]))
_, err := signingHandler.CreateSignatureShareForPublicKey(context.TODO(), dataToBeSigned, uint16(i), 0, []byte(keys[i]))
require.Nil(b, err)
err = srEndRound.SetJobDone(keys[i], bls.SrSignature, true)
require.Nil(b, err)
Expand All @@ -183,7 +184,7 @@ func benchmarkSendProof(b *testing.B, numberOfKeys int) {
for i := 0; i < b.N; i++ {
// Reset signature state for each iteration
for j := 0; j < len(keys); j++ {
_, _ = signingHandler.CreateSignatureShareForPublicKey(dataToBeSigned, uint16(j), 0, []byte(keys[j]))
_, _ = signingHandler.CreateSignatureShareForPublicKey(context.TODO(), dataToBeSigned, uint16(j), 0, []byte(keys[j]))
}

b.StartTimer()
Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/bls/v2/benchmark_verify_signatures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func BenchmarkSubroundEndRound_VerifyNodesOnAggSigFailTime(b *testing.B) {

sr := initSubroundEndRoundWithContainerAndConsensusState(container, &statusHandler.AppStatusHandlerStub{}, consensusState)
for i := 0; i < len(sr.ConsensusGroup()); i++ {
_, err := sr.SigningHandler().CreateSignatureShareForPublicKey(dataToBeSigned, uint16(i), sr.EnableEpochsHandler().GetCurrentEpoch(), []byte(keys[i]))
_, err := sr.SigningHandler().CreateSignatureShareForPublicKey(context.TODO(), dataToBeSigned, uint16(i), sr.EnableEpochsHandler().GetCurrentEpoch(), []byte(keys[i]))
require.Nil(b, err)
_ = sr.SetJobDone(keys[i], bls.SrSignature, true)
}
Expand Down
1 change: 1 addition & 0 deletions consensus/spos/bls/v2/blsSubroundsFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (fct *factory) generateBlockSubround() error {
processingThresholdPercent,
fct.worker,
syncController,
fct.signatureThrottler,
)
if err != nil {
return err
Expand Down
30 changes: 30 additions & 0 deletions consensus/spos/bls/v2/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package v2

import (
"context"
"fmt"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-go/consensus/spos"
)

func checkGoRoutinesThrottler(
ctx context.Context,
signatureThrottler core.Throttler,
) error {
for {
if signatureThrottler.CanProcess() {
break
}

select {
case <-time.After(timeSpentBetweenChecks):
continue
case <-ctx.Done():
return fmt.Errorf("%w while checking the throttler", spos.ErrTimeIsOut)
}
}

return nil
}
7 changes: 6 additions & 1 deletion consensus/spos/bls/v2/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (sr *subroundBlock) SendBlockBody(body data.BodyHandler, marshalizedBody []

// SendBlockHeader method sends the proposed block header in the subround Block
func (sr *subroundBlock) SendBlockHeader(header data.HeaderHandler, marshalizedHeader []byte) bool {
return sr.sendBlockHeader(header, marshalizedHeader)
return sr.sendBlockHeader(context.TODO(), header, marshalizedHeader)
}

// ComputeSubroundProcessingMetric computes processing metric related to the subround Block
Expand Down Expand Up @@ -395,3 +395,8 @@ func (sr *subroundEndRound) UpdateNonceDeltaMetrics() {
func (sr *subroundBlock) PrepareBlockForExecution(header data.HeaderHandler, body data.BodyHandler) error {
return sr.prepareBlockForExecution(header, body)
}

// TriggerCreateSignaturesForManagedKeys -
func (sr *subroundBlock) TriggerCreateSignaturesForManagedKeys(ctx context.Context) bool {
return sr.triggerCreateSignaturesForManagedKeys(ctx)
}
76 changes: 73 additions & 3 deletions consensus/spos/bls/v2/subroundBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type subroundBlock struct {
worker spos.WorkerHandler
mutBlockProcessing sync.Mutex
syncController spos.NtpSyncControllerHandler
signatureThrottler core.Throttler
}

// NewSubroundBlock creates a subroundBlock object
Expand All @@ -37,6 +38,7 @@ func NewSubroundBlock(
processingThresholdPercentage int,
worker spos.WorkerHandler,
syncController spos.NtpSyncControllerHandler,
signatureThrottler core.Throttler,
) (*subroundBlock, error) {
err := checkNewSubroundBlockParams(baseSubround)
if err != nil {
Expand All @@ -49,12 +51,16 @@ func NewSubroundBlock(
if check.IfNil(syncController) {
return nil, ErrNilRoundSyncController
}
if check.IfNil(signatureThrottler) {
return nil, spos.ErrNilThrottler
}

srBlock := subroundBlock{
Subround: baseSubround,
processingThresholdPercentage: processingThresholdPercentage,
worker: worker,
syncController: syncController,
signatureThrottler: signatureThrottler,
}

srBlock.Job = srBlock.doBlockJob
Expand Down Expand Up @@ -142,7 +148,7 @@ func (sr *subroundBlock) doBlockJob(ctx context.Context) bool {
return false
}

sentWithSuccess := sr.sendBlock(header, body, leader)
sentWithSuccess := sr.sendBlock(ctx, header, body, leader)
if !sentWithSuccess {
return false
}
Expand Down Expand Up @@ -220,7 +226,12 @@ func printLogMessage(ctx context.Context, baseMessage string, err error) {
log.Debug(baseMessage, "error", err.Error())
}

func (sr *subroundBlock) sendBlock(header data.HeaderHandler, body data.BodyHandler, leader string) bool {
func (sr *subroundBlock) sendBlock(
ctx context.Context,
header data.HeaderHandler,
body data.BodyHandler,
leader string,
) bool {
marshalledBody, err := sr.Marshalizer().Marshal(body)
if err != nil {
log.Debug("sendBlock.Marshal: body", "error", err.Error())
Expand All @@ -236,7 +247,7 @@ func (sr *subroundBlock) sendBlock(header data.HeaderHandler, body data.BodyHand
sr.logBlockSize(marshalledBody, marshalledHeader)
headerHash := sr.Hasher().Compute(string(marshalledHeader))

if !sr.sendBlockBody(body, marshalledBody) || !sr.sendBlockHeader(header, headerHash) {
if !sr.sendBlockBody(body, marshalledBody) || !sr.sendBlockHeader(ctx, header, headerHash) {
return false
}

Expand Down Expand Up @@ -313,6 +324,7 @@ func (sr *subroundBlock) sendBlockBody(

// sendBlockHeader method sends the proposed block header in the subround Block
func (sr *subroundBlock) sendBlockHeader(
ctx context.Context,
headerHandler data.HeaderHandler,
headerHash []byte,
) bool {
Expand All @@ -336,6 +348,8 @@ func (sr *subroundBlock) sendBlockHeader(
sr.SetData(headerHash)
sr.SetHeader(headerHandler)

sr.triggerCreateSignaturesForManagedKeys(ctx)

// log the header output for debugging purposes
headerOutput, err := common.PrettifyStruct(headerHandler)
if err == nil {
Expand All @@ -345,6 +359,60 @@ func (sr *subroundBlock) sendBlockHeader(
return true
}

func (sr *subroundBlock) triggerCreateSignaturesForManagedKeys(ctx context.Context) bool {
sigSubroundEndTime := time.Duration(float64(sr.RoundHandler().TimeDuration()) * srSignatureEndTime)
timeLeft := sr.RoundHandler().RemainingTime(sr.RoundHandler().TimeStamp(), sigSubroundEndTime)

sigCtx, cancel := context.WithTimeout(ctx, timeLeft)
sr.SetSignaturesCtxCancelFunc(cancel)

for idx, pk := range sr.ConsensusGroup() {
pkBytes := []byte(pk)
if !sr.IsKeyManagedBySelf(pkBytes) {
continue
}

err := checkGoRoutinesThrottler(ctx, sr.signatureThrottler)
if err != nil {
log.Debug("triggerCreateSignaturesForManagedKeys.checkGoRoutinesThrottler", "err", err)
return false
}
sr.signatureThrottler.StartProcessing()
sr.SignaturesWaitGroup().Add(1)

go func(sigCtx context.Context, idx int, pk string) {
defer sr.signatureThrottler.EndProcessing()
defer sr.SignaturesWaitGroup().Done()

select {
case <-sigCtx.Done():
log.Debug("triggerCreateSignaturesForManagedKeys: context done", "timeLeft", timeLeft)
return
default:
}

pkBytes := []byte(pk)
currentHash := sr.GetData()

_, err := sr.SigningHandler().CreateSignatureShareForPublicKey(
sigCtx,
currentHash,
uint16(idx),
sr.GetHeader().GetEpoch(),
pkBytes,
)
if err != nil {
log.Debug("triggerCreateSignaturesForManagedKeys.CreateSignatureShareForPublicKey", "error", err.Error())
return
}
}(sigCtx, idx, pk)
}

log.Debug("step 1: multi keys signatures creation has been triggered")

return true
}

func (sr *subroundBlock) sendDirectSentTransactions(
header data.HeaderHandler,
body data.BodyHandler,
Expand Down Expand Up @@ -643,6 +711,8 @@ func (sr *subroundBlock) receivedBlockHeader(headerHandler data.HeaderHandler) {

sr.AddReceivedHeader(headerHandler)

sr.triggerCreateSignaturesForManagedKeys(context.Background())

ctx, cancel := context.WithTimeout(context.Background(), sr.RoundHandler().TimeDuration())
defer cancel()

Expand Down
Loading
Loading