Skip to content

Commit 7bdab61

Browse files
committed
Experimental with coordination protocol
1 parent fa7e84c commit 7bdab61

2 files changed

Lines changed: 29 additions & 1 deletion

File tree

cmd/mpcium/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
186186
singingResultQueue,
187187
reshareResultQueue,
188188
identityStore,
189+
consulClient,
189190
)
190191
eventConsumer.Run()
191192
defer eventConsumer.Close()

pkg/eventconsumer/event_consumer.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/fystack/mpcium/pkg/messaging"
1717
"github.com/fystack/mpcium/pkg/mpc"
1818
"github.com/fystack/mpcium/pkg/types"
19+
"github.com/hashicorp/consul/api"
1920
"github.com/nats-io/nats.go"
2021
"github.com/spf13/viper"
2122
)
@@ -41,6 +42,7 @@ type eventConsumer struct {
4142
node *mpc.Node
4243
pubsub messaging.PubSub
4344
mpcThreshold int
45+
consulClient *api.Client
4446

4547
genKeyResultQueue messaging.MessageQueue
4648
signingResultQueue messaging.MessageQueue
@@ -72,6 +74,7 @@ func NewEventConsumer(
7274
signingResultQueue messaging.MessageQueue,
7375
reshareResultQueue messaging.MessageQueue,
7476
identityStore identity.Store,
77+
consulClient *api.Client,
7578
) EventConsumer {
7679
maxConcurrentKeygen := viper.GetInt("max_concurrent_keygen")
7780
if maxConcurrentKeygen == 0 {
@@ -115,6 +118,7 @@ func NewEventConsumer(
115118
keygenMsgBuffer: make(chan *nats.Msg, 100),
116119
signingMsgBuffer: make(chan *nats.Msg, 200), // Larger buffer for signing
117120
sessionWarmUpDelayMs: sessionWarmUpDelayMs,
121+
consulClient: consulClient,
118122
}
119123

120124
go ec.startKeyGenEventWorker()
@@ -599,6 +603,9 @@ func (ec *eventConsumer) consumeReshareEvent() error {
599603
return
600604
}
601605

606+
// Create coordinator for this reshare operation
607+
coordinator := mpc.NewReshareCoordinator(ec.consulClient, walletID, ec.node.ID())
608+
602609
createSession := func(isNewPeer bool) (mpc.ReshareSession, error) {
603610
return ec.node.CreateReshareSession(
604611
sessionType,
@@ -661,7 +668,22 @@ func (ec *eventConsumer) consumeReshareEvent() error {
661668
newSession.ListenToPeersAsync(extraOldCommiteePeers)
662669
}
663670

664-
ec.warmUpSession()
671+
// Signal ready and wait for all participants
672+
if err := coordinator.SignalReady(); err != nil {
673+
logger.Error("Failed to signal ready", err)
674+
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed to signal ready", natMsg)
675+
return
676+
}
677+
678+
logger.Info("Waiting for all participants to be ready", "walletID", walletID)
679+
if err := coordinator.WaitForAll(msg.NodeIDs); err != nil {
680+
logger.Error("Failed waiting for participants", err)
681+
ec.handleReshareSessionError(walletID, keyType, msg.NewThreshold, err, "Failed waiting for participants", natMsg)
682+
return
683+
}
684+
685+
logger.Info("All participants ready, starting reshare", "walletID", walletID)
686+
665687
if oldSession != nil {
666688
ctxOld, doneOld := context.WithCancel(ctx)
667689
go oldSession.Reshare(doneOld)
@@ -707,6 +729,11 @@ func (ec *eventConsumer) consumeReshareEvent() error {
707729
wg.Wait()
708730
logger.Info("Reshare session finished", "walletID", walletID, "pubKey", fmt.Sprintf("%x", successEvent.PubKey))
709731

732+
// Cleanup coordination data
733+
if err := coordinator.Cleanup(); err != nil {
734+
logger.Error("Failed to cleanup coordination data", err)
735+
}
736+
710737
if newSession != nil {
711738
successBytes, err := json.Marshal(successEvent)
712739
if err != nil {

0 commit comments

Comments
 (0)