Skip to content

Commit 6b66ae2

Browse files
committed
Remove jetstream pubsub change to message broker
1 parent 01d4852 commit 6b66ae2

10 files changed

Lines changed: 628 additions & 254 deletions

File tree

cmd/mpcium/main.go

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"os/signal"
88
"path/filepath"
9+
"sync"
910
"syscall"
1011
"time"
1112

@@ -122,11 +123,17 @@ func runNode(ctx context.Context, c *cli.Command) error {
122123
defer natsConn.Close()
123124

124125
pubsub := messaging.NewNATSPubSub(natsConn)
125-
signingStream, err := messaging.NewJetStreamPubSub(natsConn, event.SigningPublisherStream, []string{
126+
keygenBroker, err := messaging.NewJetStreamBroker(ctx, natsConn, event.KeygenBrokerStream, []string{
127+
event.KeygenRequestTopic,
128+
})
129+
if err != nil {
130+
logger.Fatal("Failed to create keygen jetstream broker", err)
131+
}
132+
signingBroker, err := messaging.NewJetStreamBroker(ctx, natsConn, event.SigningPublisherStream, []string{
126133
event.SigningRequestTopic,
127134
})
128135
if err != nil {
129-
logger.Fatal("Failed to create JetStream PubSub", err)
136+
logger.Fatal("Failed to create signing jetstream broker", err)
130137
}
131138

132139
directMessaging := messaging.NewNatsDirectMessaging(natsConn)
@@ -178,7 +185,8 @@ func runNode(ctx context.Context, c *cli.Command) error {
178185

179186
timeoutConsumer.Run()
180187
defer timeoutConsumer.Close()
181-
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingStream, pubsub, peerRegistry)
188+
keygenConsumer := eventconsumer.NewKeygenConsumer(natsConn, keygenBroker, pubsub, peerRegistry)
189+
signingConsumer := eventconsumer.NewSigningConsumer(natsConn, signingBroker, pubsub, peerRegistry)
182190

183191
// Make the node ready before starting the signing consumer
184192
if err := peerRegistry.Ready(); err != nil {
@@ -195,10 +203,44 @@ func runNode(ctx context.Context, c *cli.Command) error {
195203
cancel()
196204
}()
197205

198-
if err := signingConsumer.Run(appContext); err != nil {
199-
logger.Error("error running consumer:", err)
200-
}
206+
var wg sync.WaitGroup
207+
errChan := make(chan error, 2)
208+
209+
wg.Add(1)
210+
go func() {
211+
defer wg.Done()
212+
if err := keygenConsumer.Run(appContext); err != nil {
213+
logger.Error("error running keygen consumer", err)
214+
errChan <- fmt.Errorf("keygen consumer error: %w", err)
215+
return
216+
}
217+
logger.Info("Keygen consumer finished successfully")
218+
}()
219+
220+
wg.Add(1)
221+
go func() {
222+
defer wg.Done()
223+
if err := signingConsumer.Run(appContext); err != nil {
224+
logger.Error("error running signing consumer", err)
225+
errChan <- fmt.Errorf("signing consumer error: %w", err)
226+
return
227+
}
228+
logger.Info("Signing consumer finished successfully")
229+
}()
201230

231+
go func() {
232+
wg.Wait()
233+
logger.Info("All consumers have finished")
234+
close(errChan)
235+
}()
236+
237+
for err := range errChan {
238+
if err != nil {
239+
logger.Error("Consumer error received", err)
240+
cancel()
241+
return err
242+
}
243+
}
202244
return nil
203245
}
204246

pkg/client/client.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"context"
45
"crypto/ed25519"
56
"encoding/hex"
67
"encoding/json"
@@ -36,7 +37,8 @@ type MPCClient interface {
3637
}
3738

3839
type mpcClient struct {
39-
signingStream messaging.StreamPubsub
40+
signingBroker messaging.MessageBroker
41+
keygenBroker messaging.MessageBroker
4042
pubsub messaging.PubSub
4143
genKeySuccessQueue messaging.MessageQueue
4244
signResultQueue messaging.MessageQueue
@@ -115,11 +117,17 @@ func NewMPCClient(opts Options) MPCClient {
115117
priv := ed25519.NewKeyFromSeed(privSeed)
116118

117119
// 2) Create the PubSub for both publish & subscribe
118-
signingStream, err := messaging.NewJetStreamPubSub(opts.NatsConn, "mpc-signing", []string{
120+
signingBroker, err := messaging.NewJetStreamBroker(context.Background(), opts.NatsConn, "mpc-signing", []string{
119121
"mpc.signing_request.*",
120122
})
121123
if err != nil {
122-
logger.Fatal("Failed to create JetStream PubSub", err)
124+
logger.Fatal("Failed to create signing jetstream broker", err)
125+
}
126+
keygenBroker, err := messaging.NewJetStreamBroker(context.Background(), opts.NatsConn, "mpc-keygen", []string{
127+
"mpc.keygen_request.*",
128+
})
129+
if err != nil {
130+
logger.Fatal("Failed to create keygen jetstream broker", err)
123131
}
124132

125133
pubsub := messaging.NewNATSPubSub(opts.NatsConn)
@@ -135,7 +143,8 @@ func NewMPCClient(opts Options) MPCClient {
135143
reshareSuccessQueue := manager.NewMessageQueue("mpc_reshare_result")
136144

137145
return &mpcClient{
138-
signingStream: signingStream,
146+
signingBroker: signingBroker,
147+
keygenBroker: keygenBroker,
139148
pubsub: pubsub,
140149
genKeySuccessQueue: genKeySuccessQueue,
141150
signResultQueue: signResultQueue,
@@ -186,7 +195,7 @@ func (c *mpcClient) CreateWallet(walletID string) error {
186195
return fmt.Errorf("CreateWallet: marshal error: %w", err)
187196
}
188197

189-
if err := c.pubsub.Publish(eventconsumer.MPCGenerateEvent, bytes); err != nil {
198+
if err := c.keygenBroker.PublishMessage(context.Background(), event.KeygenRequestTopic, bytes); err != nil {
190199
return fmt.Errorf("CreateWallet: publish error: %w", err)
191200
}
192201
return nil
@@ -226,7 +235,7 @@ func (c *mpcClient) SignTransaction(msg *types.SignTxMessage) error {
226235
return fmt.Errorf("SignTransaction: marshal error: %w", err)
227236
}
228237

229-
if err := c.signingStream.Publish(event.SigningRequestEventTopic, bytes); err != nil {
238+
if err := c.signingBroker.PublishMessage(context.Background(), event.SigningRequestEventTopic, bytes); err != nil {
230239
return fmt.Errorf("SignTransaction: publish error: %w", err)
231240
}
232241
return nil

pkg/event/keygen.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package event
22

3+
const (
4+
KeygenBrokerStream = "mpc-keygen"
5+
KeygenConsumerStream = "mpc-keygen-consumer"
6+
KeygenRequestTopic = "mpc.keygen_request.*"
7+
)
8+
39
type KeygenResultEvent struct {
410
WalletID string `json:"wallet_id"`
511
ECDSAPubKey []byte `json:"ecdsa_pub_key"`

pkg/event/sign.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const (
66
SigningRequestTopic = "mpc.signing_request.*"
77
SigningResultTopic = "mpc.mpc_signing_result.*"
88
SigningResultCompleteTopic = "mpc.mpc_signing_result.complete"
9-
MPCSigningEventTopic = "mpc:sign"
109
SigningRequestEventTopic = "mpc.signing_request.event"
1110
)
1211

pkg/eventconsumer/event_consumer.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -124,27 +124,27 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
124124
var msg types.GenerateKeyMessage
125125
if err := json.Unmarshal(raw, &msg); err != nil {
126126
logger.Error("Failed to unmarshal keygen message", err)
127-
ec.handleKeygenSessionError(msg.WalletID, err, "Failed to unmarshal keygen message")
127+
ec.handleKeygenSessionError(msg.WalletID, err, "Failed to unmarshal keygen message", natMsg)
128128
return
129129
}
130130

131131
if err := ec.identityStore.VerifyInitiatorMessage(&msg); err != nil {
132132
logger.Error("Failed to verify initiator message", err)
133-
ec.handleKeygenSessionError(msg.WalletID, err, "Failed to verify initiator message")
133+
ec.handleKeygenSessionError(msg.WalletID, err, "Failed to verify initiator message", natMsg)
134134
return
135135
}
136136

137137
walletID := msg.WalletID
138138
ecdsaSession, err := ec.node.CreateKeyGenSession(mpc.SessionTypeECDSA, walletID, ec.mpcThreshold, ec.genKeyResultQueue)
139139
if err != nil {
140140
logger.Error("Failed to create ECDSA key generation session", err, "walletID", walletID)
141-
ec.handleKeygenSessionError(walletID, err, "Failed to create ECDSA key generation session")
141+
ec.handleKeygenSessionError(walletID, err, "Failed to create ECDSA key generation session", natMsg)
142142
return
143143
}
144144
eddsaSession, err := ec.node.CreateKeyGenSession(mpc.SessionTypeEDDSA, walletID, ec.mpcThreshold, ec.genKeyResultQueue)
145145
if err != nil {
146146
logger.Error("Failed to create EdDSA key generation session", err, "walletID", walletID)
147-
ec.handleKeygenSessionError(walletID, err, "Failed to create EdDSA key generation session")
147+
ec.handleKeygenSessionError(walletID, err, "Failed to create EdDSA key generation session", natMsg)
148148
return
149149
}
150150
ecdsaSession.Init()
@@ -167,8 +167,7 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
167167
successEvent.ECDSAPubKey = ecdsaSession.GetPubKeyResult()
168168
case err := <-ecdsaSession.ErrChan():
169169
logger.Error("ECDSA keygen session error", err)
170-
ec.handleKeygenSessionError(walletID, err, "ECDSA keygen session error")
171-
errorChan <- err
170+
ec.handleKeygenSessionError(walletID, err, "ECDSA keygen session error", natMsg)
172171
doneEcdsa()
173172
}
174173
}()
@@ -179,8 +178,7 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
179178
successEvent.EDDSAPubKey = eddsaSession.GetPubKeyResult()
180179
case err := <-eddsaSession.ErrChan():
181180
logger.Error("EdDSA keygen session error", err)
182-
ec.handleKeygenSessionError(walletID, err, "EdDSA keygen session error")
183-
errorChan <- err
181+
ec.handleKeygenSessionError(walletID, err, "EdDSA keygen session error", natMsg)
184182
doneEddsa()
185183
}
186184
}()
@@ -213,28 +211,29 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
213211
case <-baseCtx.Done():
214212
// timeout occurred
215213
logger.Warn("Key generation timed out", "walletID", walletID, "timeout", KeyGenTimeOut)
216-
ec.handleKeygenSessionError(walletID, fmt.Errorf("keygen session timed out after %v", KeyGenTimeOut), "Key generation timed out")
214+
ec.handleKeygenSessionError(walletID, fmt.Errorf("keygen session timed out after %v", KeyGenTimeOut), "Key generation timed out", natMsg)
217215
return
218216
}
219217

220218
payload, err := json.Marshal(successEvent)
221219
if err != nil {
222220
logger.Error("Failed to marshal keygen success event", err)
223-
ec.handleKeygenSessionError(walletID, err, "Failed to marshal keygen success event")
221+
ec.handleKeygenSessionError(walletID, err, "Failed to marshal keygen success event", natMsg)
224222
return
225223
}
226224

227225
key := fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, walletID)
228226
if err := ec.genKeyResultQueue.Enqueue(key, payload, &messaging.EnqueueOptions{IdempotententKey: key}); err != nil {
229227
logger.Error("Failed to publish key generation success message", err)
230-
ec.handleKeygenSessionError(walletID, err, "Failed to publish key generation success message")
228+
ec.handleKeygenSessionError(walletID, err, "Failed to publish key generation success message", natMsg)
231229
return
232230
}
231+
ec.sendReplyToRemoveMsg(natMsg)
233232
logger.Info("[COMPLETED KEY GEN] Key generation completed successfully", "walletID", walletID)
234233
}
235234

236235
// handleKeygenSessionError handles errors that occur during key generation
237-
func (ec *eventConsumer) handleKeygenSessionError(walletID string, err error, contextMsg string) {
236+
func (ec *eventConsumer) handleKeygenSessionError(walletID string, err error, contextMsg string, natMsg *nats.Msg) {
238237
fullErrMsg := fmt.Sprintf("%s: %v", contextMsg, err)
239238
errorCode := event.GetErrorCodeFromError(err)
240239

@@ -270,6 +269,7 @@ func (ec *eventConsumer) handleKeygenSessionError(walletID string, err error, co
270269
"payload", string(keygenResultBytes),
271270
)
272271
}
272+
ec.sendReplyToRemoveMsg(natMsg)
273273
}
274274

275275
func (ec *eventConsumer) startKeyGenEventWorker() {
@@ -368,6 +368,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
368368
msg.NetworkInternalCode,
369369
err,
370370
"Failed to create signing session",
371+
natMsg,
371372
)
372373
return
373374
}
@@ -386,6 +387,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
386387
msg.NetworkInternalCode,
387388
err,
388389
"Failed to init signing session",
390+
natMsg,
389391
)
390392
return
391393
}
@@ -407,6 +409,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
407409
msg.NetworkInternalCode,
408410
err,
409411
"Failed to sign tx",
412+
natMsg,
410413
)
411414
return
412415
}
@@ -426,14 +429,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
426429

427430
onSuccess := func(data []byte) {
428431
done()
429-
if natMsg.Reply != "" {
430-
err = ec.pubsub.Publish(natMsg.Reply, data)
431-
if err != nil {
432-
logger.Error("Failed to publish reply", err)
433-
} else {
434-
logger.Info("Reply to the original message", "reply", natMsg.Reply)
435-
}
436-
}
432+
ec.sendReplyToRemoveMsg(natMsg)
437433
}
438434
go session.Sign(onSuccess)
439435
})
@@ -445,7 +441,7 @@ func (ec *eventConsumer) consumeTxSigningEvent() error {
445441

446442
return nil
447443
}
448-
func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkInternalCode string, err error, contextMsg string) {
444+
func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkInternalCode string, err error, contextMsg string, natMsg *nats.Msg) {
449445
fullErrMsg := fmt.Sprintf("%s: %v", contextMsg, err)
450446
errorCode := event.GetErrorCodeFromError(err)
451447

@@ -475,7 +471,6 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkIntern
475471
)
476472
return
477473
}
478-
479474
err = ec.signingResultQueue.Enqueue(event.SigningResultCompleteTopic, signingResultBytes, &messaging.EnqueueOptions{
480475
IdempotententKey: txID,
481476
})
@@ -486,7 +481,24 @@ func (ec *eventConsumer) handleSigningSessionError(walletID, txID, networkIntern
486481
"payload", string(signingResultBytes),
487482
)
488483
}
484+
ec.sendReplyToRemoveMsg(natMsg)
489485
}
486+
487+
func (ec *eventConsumer) sendReplyToRemoveMsg(natMsg *nats.Msg) {
488+
msg := natMsg.Data
489+
490+
if natMsg.Reply == "" {
491+
logger.Warn("No reply inbox specified for sign success message", "msg", string(msg))
492+
return
493+
}
494+
495+
err := ec.pubsub.Publish(natMsg.Reply, msg)
496+
if err != nil {
497+
logger.Error("Failed to reply message", err, "reply", natMsg.Reply)
498+
return
499+
}
500+
}
501+
490502
func (ec *eventConsumer) consumeReshareEvent() error {
491503
sub, err := ec.pubsub.Subscribe(MPCReshareEvent, func(natMsg *nats.Msg) {
492504
var msg types.ResharingMessage

0 commit comments

Comments
 (0)