Skip to content

Commit 01af87c

Browse files
committed
feat (WIP): implement presign pool worker and enhance presign handling with new data structures
1 parent 07e8c7f commit 01af87c

10 files changed

Lines changed: 655 additions & 151 deletions

File tree

cmd/mpcium/main.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"syscall"
1212
"time"
1313

14+
"github.com/fystack/mpcium/pkg/client"
1415
"github.com/fystack/mpcium/pkg/config"
1516
"github.com/fystack/mpcium/pkg/constant"
1617
"github.com/fystack/mpcium/pkg/event"
@@ -22,7 +23,10 @@ import (
2223
"github.com/fystack/mpcium/pkg/logger"
2324
"github.com/fystack/mpcium/pkg/messaging"
2425
"github.com/fystack/mpcium/pkg/mpc"
26+
"github.com/fystack/mpcium/pkg/presign"
27+
"github.com/fystack/mpcium/pkg/presigninfo"
2528
"github.com/fystack/mpcium/pkg/security"
29+
"github.com/fystack/mpcium/pkg/types"
2630
"github.com/hashicorp/consul/api"
2731
"github.com/nats-io/nats.go"
2832
"github.com/spf13/viper"
@@ -77,6 +81,11 @@ func main() {
7781
Aliases: []string{"k"},
7882
Usage: "Path to file containing password for decrypting .age encrypted node private key",
7983
},
84+
&cli.BoolFlag{
85+
Name: "presign-pool-worker",
86+
Usage: "Enable presign pool worker",
87+
Value: false,
88+
},
8089
&cli.BoolFlag{
8190
Name: "debug",
8291
Usage: "Enable debug logging",
@@ -109,6 +118,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
109118
usePrompts := c.Bool("prompt-credentials")
110119
passwordFile := c.String("password-file")
111120
agePasswordFile := c.String("identity-password-file")
121+
presignPoolWorker := c.Bool("presign-pool-worker")
112122
debug := c.Bool("debug")
113123

114124
viper.SetDefault("backup_enabled", true)
@@ -193,6 +203,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
193203

194204
peerNodeIDs := GetPeerIDs(peers)
195205
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, consulClient.KV(), directMessaging, pubsub, identityStore)
206+
presignInfoStore := presigninfo.NewStore(consulClient.KV())
196207

197208
mpcNode := mpc.NewNode(
198209
nodeID,
@@ -201,6 +212,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
201212
directMessaging,
202213
badgerKV,
203214
keyinfoStore,
215+
presignInfoStore,
204216
peerRegistry,
205217
identityStore,
206218
)
@@ -293,14 +305,44 @@ func runNode(ctx context.Context, c *cli.Command) error {
293305
logger.Info("All consumers have finished")
294306
close(errChan)
295307
}()
308+
309+
// Start presign pool worker before entering the blocking error loop
310+
if presignPoolWorker {
311+
presignPoolCtx, presignPoolCancel := context.WithCancel(appContext)
312+
defer presignPoolCancel()
313+
localSigner, err := client.NewLocalSigner(types.EventInitiatorKeyTypeEd25519, client.LocalSignerOptions{
314+
KeyPath: "./event_initiator.key",
315+
})
316+
if err != nil {
317+
logger.Fatal("Failed to create local signer", err)
318+
}
319+
mpcClient := client.NewMPCClient(client.Options{
320+
NatsConn: natsConn,
321+
Signer: localSigner,
322+
})
323+
presignPool := presign.NewPresignPool(nil, mpcClient, presignInfoStore)
324+
325+
_, err = pubsub.Subscribe(eventconsumer.MPCHotWalletEvent, func(nm *nats.Msg) {
326+
walletID := string(nm.Data)
327+
if walletID != "" {
328+
presignPool.TouchHot(walletID)
329+
}
330+
})
331+
if err != nil {
332+
logger.Warn("Failed to subscribe to hot wallet events", "err", err)
333+
}
334+
335+
presignPool.Start(presignPoolCtx)
336+
defer presignPool.Stop()
337+
}
338+
296339
for err := range errChan {
297340
if err != nil {
298341
logger.Error("Consumer error received", err)
299342
cancel()
300343
return err
301344
}
302345
}
303-
304346
return nil
305347
}
306348

examples/generate/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ func main() {
134134
for _, walletID := range walletIDs {
135135
wg.Add(1) // Add to WaitGroup BEFORE attempting to create wallet
136136

137-
if err := mpcClient.CreateWallet(walletID); err != nil {
137+
if err := mpcClient.CreateWallet(&types.GenerateKeyMessage{
138+
WalletID: walletID,
139+
ECDSAProtocol: types.ProtocolCGGMP21,
140+
EdDSAProtocol: types.ProtocolGG18,
141+
}); err != nil {
138142
logger.Error("CreateWallet failed", err)
139143
walletStartTimes.Delete(walletID)
140144
// Mark this wallet as processed to prevent callback from processing it

examples/sign/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ func main() {
7171

7272
txMsg := &types.SignTxMessage{
7373
KeyType: types.KeyTypeSecp256k1,
74-
Protocol: types.ProtocolFROST,
75-
WalletID: "6d553e80-a1dc-4894-9eaf-b81e3fe0c94a", // Use the generated wallet ID
74+
Protocol: types.ProtocolCGGMP21,
75+
WalletID: "7ae6ae1c-7663-4dc4-b982-33fb0a3602c3", // Use the generated wallet ID
7676
NetworkInternalCode: "solana-devnet",
7777
TxID: txID,
7878
Tx: dummyTx,

pkg/eventconsumer/event_consumer.go

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ const (
2626
MPCReshareEvent = "mpc:reshare"
2727
MPCPresignEvent = "mpc:presign"
2828

29+
// Internal event to notify presign pool of a hot wallet
30+
MPCHotWalletEvent = "mpc:wallet_hot"
31+
2932
DefaultConcurrentKeygen = 2
3033
DefaultConcurrentSigning = 20
3134
DefaultSessionWarmUpDelay = 200
@@ -66,6 +69,12 @@ type eventConsumer struct {
6669
cleanupInterval time.Duration // How often to run cleanup
6770
sessionTimeout time.Duration // How long before a session is considered stale
6871
cleanupStopChan chan struct{} // Signal to stop cleanup goroutine
72+
73+
// Track recent signing activity to detect hot wallets
74+
hotMu sync.Mutex
75+
recentSigns map[string][]time.Time // key: walletID|keyType|protocol → timestamps within window
76+
hotWindow time.Duration // window for counting signs (e.g., 5 minutes)
77+
hotThreshold int // signs needed to mark as hot
6978
}
7079

7180
func NewEventConsumer(
@@ -120,6 +129,9 @@ func NewEventConsumer(
120129
keygenMsgBuffer: make(chan *nats.Msg, 100),
121130
signingMsgBuffer: make(chan *nats.Msg, 200), // Larger buffer for signing
122131
sessionWarmUpDelayMs: sessionWarmUpDelayMs,
132+
recentSigns: make(map[string][]time.Time),
133+
hotWindow: 5 * time.Minute,
134+
hotThreshold: 2,
123135
}
124136

125137
go ec.startKeyGenEventWorker()
@@ -393,6 +405,9 @@ func (ec *eventConsumer) handleSigningEvent(natMsg *nats.Msg) {
393405
ec.node.ID(),
394406
)
395407

408+
// Track activity to detect hot wallets
409+
ec.trackAndMaybeNotifyHot(msg)
410+
396411
// Check for duplicate session and track if new
397412
if ec.checkDuplicateSession(msg.WalletID, msg.TxID) {
398413
duplicateErr := fmt.Errorf(
@@ -658,7 +673,7 @@ func (ec *eventConsumer) consumePresignEvent() error {
658673
}
659674

660675
if success {
661-
ec.handlePresignSessionSuccess(msg.WalletID, natMsg)
676+
ec.handlePresignSessionSuccess(msg.WalletID, msg.TxID, natMsg)
662677
} else {
663678
ec.handlePresignSessionError(msg.WalletID,
664679
fmt.Errorf("presign operation returned false"),
@@ -676,10 +691,11 @@ func (ec *eventConsumer) consumePresignEvent() error {
676691
}
677692

678693
// handlePresignSessionSuccess handles successful presign operations
679-
func (ec *eventConsumer) handlePresignSessionSuccess(walletID string, natMsg *nats.Msg) {
694+
func (ec *eventConsumer) handlePresignSessionSuccess(walletID string, txID string, natMsg *nats.Msg) {
680695
presignResult := event.PresignResultEvent{
681696
ResultType: event.ResultTypeSuccess,
682697
WalletID: walletID,
698+
TxID: txID,
683699
Status: "success",
684700
}
685701

@@ -863,3 +879,38 @@ func composeSigningIdempotentKey(txID string, natMsg *nats.Msg) string {
863879
func composeReshareIdempotentKey(sessionID string, natMsg *nats.Msg) string {
864880
return composeIdempotentKey(sessionID, natMsg, mpc.TypeReshareWalletResultFmt)
865881
}
882+
883+
// trackAndMaybeNotifyHot records a signing event and publishes a hot wallet event
884+
// if at least hotThreshold signs occur within hotWindow for the same
885+
// (walletID, keyType, protocol) tuple.
886+
func (ec *eventConsumer) trackAndMaybeNotifyHot(msg types.SignTxMessage) {
887+
if msg.Protocol != types.ProtocolCGGMP21 {
888+
return
889+
}
890+
key := fmt.Sprintf("%s:%s:%s", msg.WalletID, string(msg.KeyType), string(msg.Protocol))
891+
now := time.Now()
892+
893+
ec.hotMu.Lock()
894+
// prune old entries
895+
list := ec.recentSigns[key]
896+
pruned := list[:0]
897+
cutoff := now.Add(-ec.hotWindow)
898+
for _, t := range list {
899+
if t.After(cutoff) {
900+
pruned = append(pruned, t)
901+
}
902+
}
903+
904+
ec.recentSigns[key] = append([]time.Time(nil), pruned...)
905+
currentCount := len(ec.recentSigns[key])
906+
907+
// If this push reaches the threshold, publish hot wallet once
908+
shouldPublish := currentCount+1 == ec.hotThreshold
909+
ec.recentSigns[key] = append(ec.recentSigns[key], now)
910+
ec.hotMu.Unlock()
911+
912+
if shouldPublish {
913+
_ = ec.pubsub.Publish(MPCHotWalletEvent, []byte(msg.WalletID))
914+
logger.Info("Published hot wallet event", "walletID", msg.WalletID)
915+
}
916+
}

pkg/mpc/node.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/fystack/mpcium/pkg/logger"
1616
"github.com/fystack/mpcium/pkg/messaging"
1717
"github.com/fystack/mpcium/pkg/mpc/taurus"
18+
"github.com/fystack/mpcium/pkg/presigninfo"
1819
"github.com/fystack/mpcium/pkg/types"
1920
"github.com/taurusgroup/multi-party-sig/pkg/party"
2021
)
@@ -34,13 +35,13 @@ type Node struct {
3435
nodeID string
3536
peerIDs []string
3637

37-
pubSub messaging.PubSub
38-
direct messaging.DirectMessaging
39-
kvstore kvstore.KVStore
40-
keyinfoStore keyinfo.Store
41-
ecdsaPreParams []*keygen.LocalPreParams
42-
identityStore identity.Store
43-
presignCache *taurus.PresignCache
38+
pubSub messaging.PubSub
39+
direct messaging.DirectMessaging
40+
kvstore kvstore.KVStore
41+
keyinfoStore keyinfo.Store
42+
presignInfoStore presigninfo.Store
43+
ecdsaPreParams []*keygen.LocalPreParams
44+
identityStore identity.Store
4445

4546
peerRegistry PeerRegistry
4647
}
@@ -52,6 +53,7 @@ func NewNode(
5253
direct messaging.DirectMessaging,
5354
kvstore kvstore.KVStore,
5455
keyinfoStore keyinfo.Store,
56+
presignInfoStore presigninfo.Store,
5557
peerRegistry PeerRegistry,
5658
identityStore identity.Store,
5759
) *Node {
@@ -60,15 +62,15 @@ func NewNode(
6062
logger.Info("Starting new node, preparams is generated successfully!", "elapsed", elapsed.Milliseconds())
6163

6264
node := &Node{
63-
nodeID: nodeID,
64-
peerIDs: peerIDs,
65-
pubSub: pubSub,
66-
direct: direct,
67-
kvstore: kvstore,
68-
keyinfoStore: keyinfoStore,
69-
peerRegistry: peerRegistry,
70-
identityStore: identityStore,
71-
presignCache: taurus.NewPresignCache(10 * time.Minute),
65+
nodeID: nodeID,
66+
peerIDs: peerIDs,
67+
pubSub: pubSub,
68+
direct: direct,
69+
kvstore: kvstore,
70+
keyinfoStore: keyinfoStore,
71+
presignInfoStore: presignInfoStore,
72+
peerRegistry: peerRegistry,
73+
identityStore: identityStore,
7274
}
7375
node.ecdsaPreParams = node.generatePreParams()
7476

@@ -157,7 +159,7 @@ func (p *Node) CreateTaurusSession(
157159
switch protocol {
158160
case types.ProtocolCGGMP21:
159161
tr := taurus.NewNATSTransport(walletID, selfPartyID, act, taurus.CGGMP21, p.pubSub, p.direct, p.identityStore)
160-
session = taurus.NewCGGMP21Session(walletID, selfPartyID, allPartyIDs, threshold, p.presignCache, tr, p.kvstore, p.keyinfoStore)
162+
session = taurus.NewCGGMP21Session(walletID, selfPartyID, allPartyIDs, threshold, nil, tr, p.kvstore, p.keyinfoStore)
161163
case types.ProtocolTaproot:
162164
tr := taurus.NewNATSTransport(walletID, selfPartyID, act, taurus.FROSTTaproot, p.pubSub, p.direct, p.identityStore)
163165
session = taurus.NewTaprootSession(walletID, selfPartyID, allPartyIDs, threshold, tr, p.kvstore, p.keyinfoStore)

0 commit comments

Comments
 (0)