Skip to content

Commit 0d8531c

Browse files
committed
Return error when cluster is not ready when sending generate key event
1 parent 3502b0d commit 0d8531c

5 files changed

Lines changed: 66 additions & 15 deletions

File tree

cmd/mpcium/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
193193

194194
timeoutConsumer.Run()
195195
defer timeoutConsumer.Close()
196-
keygenConsumer := eventconsumer.NewKeygenConsumer(natsConn, keygenBroker, pubsub, peerRegistry)
196+
keygenConsumer := eventconsumer.NewKeygenConsumer(natsConn, keygenBroker, pubsub, peerRegistry, genKeyResultQueue)
197197
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingBroker, pubsub, peerRegistry, singingResultQueue)
198198

199199
// Make the node ready before starting the signing consumer

pkg/event/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ const (
9292
ErrorCodeContextCancelled ErrorCode = "ERROR_CONTEXT_CANCELLED"
9393
ErrorCodeOperationAborted ErrorCode = "ERROR_OPERATION_ABORTED"
9494
ErrorCodeNotMajority ErrorCode = "ERROR_NOT_MAJORITY"
95+
ErrorCodeClusterNotReady ErrorCode = "ERROR_CLUSTER_NOT_READY"
9596
)
9697

9798
// GetErrorCodeFromError attempts to categorize a generic error into a specific error code

pkg/eventconsumer/keygen_consumer.go

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ package eventconsumer
22

33
import (
44
"context"
5+
"encoding/json"
6+
"errors"
57
"fmt"
68
"time"
79

810
"github.com/fystack/mpcium/pkg/event"
911
"github.com/fystack/mpcium/pkg/logger"
1012
"github.com/fystack/mpcium/pkg/messaging"
1113
"github.com/fystack/mpcium/pkg/mpc"
14+
"github.com/fystack/mpcium/pkg/types"
1215
"github.com/google/uuid"
1316
"github.com/nats-io/nats.go"
1417
"github.com/nats-io/nats.go/jetstream"
@@ -31,22 +34,30 @@ type KeygenConsumer interface {
3134

3235
// keygenConsumer implements KeygenConsumer.
3336
type keygenConsumer struct {
34-
natsConn *nats.Conn
35-
pubsub messaging.PubSub
36-
jsBroker messaging.MessageBroker
37-
peerRegistry mpc.PeerRegistry
37+
natsConn *nats.Conn
38+
pubsub messaging.PubSub
39+
jsBroker messaging.MessageBroker
40+
peerRegistry mpc.PeerRegistry
41+
keygenResultQueue messaging.MessageQueue
3842

3943
// jsSub holds the JetStream subscription, so it can be cleaned up during Close().
4044
jsSub messaging.MessageSubscription
4145
}
4246

4347
// NewKeygenConsumer returns a new instance of KeygenConsumer.
44-
func NewKeygenConsumer(natsConn *nats.Conn, jsBroker messaging.MessageBroker, pubsub messaging.PubSub, peerRegistry mpc.PeerRegistry) KeygenConsumer {
48+
func NewKeygenConsumer(
49+
natsConn *nats.Conn,
50+
jsBroker messaging.MessageBroker,
51+
pubsub messaging.PubSub,
52+
peerRegistry mpc.PeerRegistry,
53+
keygenResultQueue messaging.MessageQueue,
54+
) KeygenConsumer {
4555
return &keygenConsumer{
46-
natsConn: natsConn,
47-
pubsub: pubsub,
48-
jsBroker: jsBroker,
49-
peerRegistry: peerRegistry,
56+
natsConn: natsConn,
57+
pubsub: pubsub,
58+
jsBroker: jsBroker,
59+
peerRegistry: peerRegistry,
60+
keygenResultQueue: keygenResultQueue,
5061
}
5162
}
5263

@@ -110,9 +121,21 @@ func (sc *keygenConsumer) Run(ctx context.Context) error {
110121
}
111122

112123
func (sc *keygenConsumer) handleKeygenEvent(msg jetstream.Msg) {
124+
raw := msg.Data()
125+
var keygenMsg types.GenerateKeyMessage
126+
sessionID := msg.Headers().Get("SessionID")
127+
128+
err := json.Unmarshal(raw, &keygenMsg)
129+
if err != nil {
130+
logger.Error("SigningConsumer: Failed to unmarshal keygen message", err)
131+
sc.handleKeygenError(keygenMsg, event.ErrorCodeUnmarshalFailure, err, sessionID)
132+
_ = msg.Nak()
133+
return
134+
}
113135

114136
if !sc.peerRegistry.ArePeersReady() {
115-
logger.Warn("KeygenConsumer: Not all peers are ready to sign, skipping message processing")
137+
logger.Warn("KeygenConsumer: Not all peers are ready to gen key, skipping message processing")
138+
sc.handleKeygenError(keygenMsg, event.ErrorCodeClusterNotReady, errors.New("not all peers are ready"), sessionID)
116139
return
117140
}
118141

@@ -167,6 +190,33 @@ func (sc *keygenConsumer) handleKeygenEvent(msg jetstream.Msg) {
167190
_ = msg.Nak()
168191
}
169192

193+
func (sc *keygenConsumer) handleKeygenError(keygenMsg types.GenerateKeyMessage, errorCode event.ErrorCode, err error, sessionID string) {
194+
keygenResult := event.KeygenResultEvent{
195+
ResultType: event.ResultTypeError,
196+
ErrorCode: string(errorCode),
197+
WalletID: keygenMsg.WalletID,
198+
ErrorReason: err.Error(),
199+
}
200+
201+
keygenResultBytes, err := json.Marshal(keygenResult)
202+
if err != nil {
203+
logger.Error("Failed to marshal keygen result event", err,
204+
"walletID", keygenResult.WalletID,
205+
)
206+
return
207+
}
208+
209+
topic := fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, keygenResult.WalletID)
210+
err = sc.keygenResultQueue.Enqueue(topic, keygenResultBytes, &messaging.EnqueueOptions{
211+
IdempotententKey: buildIdempotentKey(keygenMsg.WalletID, sessionID, mpc.TypeGenerateWalletResultFmt),
212+
})
213+
if err != nil {
214+
logger.Error("Failed to enqueue keygen result event", err,
215+
"walletID", keygenMsg.WalletID,
216+
)
217+
}
218+
}
219+
170220
// Close unsubscribes from the JetStream subject and cleans up resources.
171221
func (sc *keygenConsumer) Close() error {
172222
if sc.jsSub != nil {

pkg/eventconsumer/sign_consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func (sc *signingConsumer) handleSigningError(signMsg types.SignTxMessage, error
230230
}
231231

232232
err = sc.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, signingResultBytes, &messaging.EnqueueOptions{
233-
IdempotententKey: buildSigningIdempotentKey(signMsg.TxID, sessionID, mpc.TypeSigningResultFmt),
233+
IdempotententKey: buildIdempotentKey(signMsg.TxID, sessionID, mpc.TypeSigningResultFmt),
234234
})
235235
if err != nil {
236236
logger.Error("Failed to enqueue signing result event", err,
@@ -252,7 +252,7 @@ func (sc *signingConsumer) Close() error {
252252
return nil
253253
}
254254

255-
func buildSigningIdempotentKey(baseID string, sessionID string, formatTemplate string) string {
255+
func buildIdempotentKey(baseID string, sessionID string, formatTemplate string) string {
256256
var uniqueKey string
257257
if sessionID != "" {
258258
uniqueKey = fmt.Sprintf("%s:%s", baseID, sessionID)

pkg/mpc/registry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ func NewRegistry(
6868
) *registry {
6969
ecdhSession := NewECDHSession(nodeID, peerNodeIDs, pubSub, identityStore)
7070
mpcThreshold := viper.GetInt("mpc_threshold")
71-
if mpcThreshold <= 2 {
72-
logger.Fatal("mpc_threshold must be greater than 2", nil)
71+
if mpcThreshold < 1 {
72+
logger.Fatal("mpc_threshold must be greater than 0", nil)
7373
}
7474

7575
return &registry{

0 commit comments

Comments
 (0)