Skip to content

Commit b94e763

Browse files
committed
wip: done taurus in mem and pubsub
1 parent 2f2ec12 commit b94e763

17 files changed

Lines changed: 587 additions & 2218 deletions

pkg/eventconsumer/event_consumer.go

Lines changed: 10 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package eventconsumer
22

33
import (
44
"context"
5-
"encoding/hex"
65
"encoding/json"
76
"errors"
87
"fmt"
@@ -187,7 +186,6 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
187186

188187
// ecdsaSession.Init()
189188
// eddsaSession.Init()
190-
taurusSession.Init()
191189

192190
// ctxEcdsa, doneEcdsa := context.WithCancel(baseCtx)
193191
// ctxEddsa, doneEddsa := context.WithCancel(baseCtx)
@@ -225,66 +223,32 @@ func (ec *eventConsumer) handleKeyGenEvent(natMsg *nats.Msg) {
225223
// }
226224
// }()
227225

228-
go func() {
229-
select {
230-
case <-ctxTaurus.Done():
231-
return
232-
case err := <-taurusSession.ErrChan():
233-
if err != nil {
234-
logger.Error("CGGMP21 keygen session error", err)
235-
errorChan <- err
236-
doneTaurus()
237-
}
238-
}
239-
}()
240-
241226
// ecdsaSession.ListenToIncomingMessageAsync()
242227
// eddsaSession.ListenToIncomingMessageAsync()
243-
taurusSession.ListenToIncomingMessageAsync(taurusSession.ProcessInboundMessage)
244228
// Temporary delay for peer setup
245229
ec.warmUpSession()
246230
// go ecdsaSession.GenerateKey(doneEcdsa)
247231
// go eddsaSession.GenerateKey(doneEddsa)
248-
go taurusSession.ProcessOutboundMessage()
249232

250-
// Wait for the keygen to complete
251-
completionChan := make(chan string, 1)
252233
go func() {
253-
result := taurusSession.WaitForFinish()
254-
completionChan <- result
255-
}()
256-
257-
// Wait for completion, error, or timeout
258-
select {
259-
case pubKeyHex := <-completionChan:
260-
// Success - set the public key
261-
if pubKeyHex != "" {
262-
pubKeyBytes, err := hex.DecodeString(pubKeyHex)
263-
if err == nil {
264-
successEvent.TaurusCMPPubKey = pubKeyBytes
265-
}
234+
data, err := taurusSession.Keygen(ctxTaurus)
235+
if err != nil {
236+
logger.Error("Failed to generate key", err)
237+
ec.handleKeygenSessionError(walletID, err, "Failed to generate key", natMsg)
238+
errorChan <- err
239+
doneTaurus()
266240
}
267-
doneTaurus() // Signal completion
268-
269-
case err := <-errorChan:
270-
// Error occurred
271-
ec.handleKeygenSessionError(walletID, err, "CGGMP21 keygen error", natMsg)
272-
return
273-
274-
case <-baseCtx.Done():
275-
// Timeout occurred
276-
logger.Warn("Key generation timed out", "walletID", walletID, "timeout", KeyGenTimeOut)
277-
ec.handleKeygenSessionError(walletID, fmt.Errorf("keygen session timed out after %v", KeyGenTimeOut), "Key generation timed out", natMsg)
278-
return
279-
}
241+
successEvent.TaurusCMPPubKey = data.Payload
242+
doneTaurus()
243+
}()
280244

281245
payload, err := json.Marshal(successEvent)
282246
if err != nil {
283247
logger.Error("Failed to marshal keygen success event", err)
284248
ec.handleKeygenSessionError(walletID, err, "Failed to marshal keygen success event", natMsg)
285249
return
286250
}
287-
251+
fmt.Println("payload", string(payload))
288252
key := fmt.Sprintf(mpc.TypeGenerateWalletResultFmt, walletID)
289253
if err := ec.genKeyResultQueue.Enqueue(
290254
key,

pkg/mpc/node.go

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/fystack/mpcium/pkg/messaging"
1717
"github.com/fystack/mpcium/pkg/mpc/taurus"
1818
"github.com/taurusgroup/multi-party-sig/pkg/party"
19+
"github.com/taurusgroup/multi-party-sig/pkg/pool"
1920
)
2021

2122
const (
@@ -146,32 +147,14 @@ func (p *Node) CreateCMPKeyGenSession(
146147
walletID string,
147148
threshold int,
148149
resultQueue messaging.MessageQueue,
149-
) (taurus.KeyGenSession, error) {
150-
if !p.peerRegistry.ArePeersReady() {
151-
return nil, fmt.Errorf(
152-
"peers are not ready yet. ready: %d, expected: %d",
153-
p.peerRegistry.GetReadyPeersCount(),
154-
len(p.peerIDs)+1,
155-
)
156-
}
157-
150+
) (*taurus.CmpParty, error) {
158151
readyPeerIDs := p.peerRegistry.GetReadyPeersIncludeSelf()
159152
selfPartyID, allPartyIDs := p.generateTaurusPartyIDs(PurposeKeygen, readyPeerIDs, DefaultVersion)
160-
161-
session := taurus.NewCGGMP21KeygenSession(
162-
walletID,
163-
p.pubSub,
164-
selfPartyID,
165-
allPartyIDs,
166-
threshold,
167-
p.kvstore,
168-
p.keyinfoStore,
169-
resultQueue,
170-
p.identityStore,
171-
)
172-
173-
session.Init()
174-
return session, nil
153+
tr := taurus.NewNATSTransport(walletID, selfPartyID, p.pubSub)
154+
adapter := taurus.NewTaurusNetworkAdapter(walletID, selfPartyID, tr, allPartyIDs)
155+
pl := pool.NewPool(0)
156+
party := taurus.NewCmpParty(walletID, selfPartyID, allPartyIDs, threshold, pl, adapter)
157+
return party, nil
175158
}
176159

177160
func (p *Node) CreateSigningSession(
@@ -506,8 +489,8 @@ func sessionKeyPrefix(sessionType SessionType) (string, error) {
506489
}
507490
}
508491

509-
func (p *Node) generateTaurusPartyIDs(purpose string, peerIDs []string, version int) (party.ID, []party.ID) {
510-
partyIDs := make([]party.ID, len(peerIDs))
492+
func (p *Node) generateTaurusPartyIDs(purpose string, peerIDs []string, version int) (party.ID, party.IDSlice) {
493+
partyIDs := make(party.IDSlice, len(peerIDs))
511494
var selfPartyID party.ID
512495

513496
for i, peerID := range peerIDs {

pkg/mpc/taurus/adapter.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package taurus
2+
3+
import (
4+
"encoding/json"
5+
"log/slog"
6+
7+
"github.com/taurusgroup/multi-party-sig/pkg/party"
8+
"github.com/taurusgroup/multi-party-sig/pkg/protocol"
9+
)
10+
11+
type NetworkInterface interface {
12+
Next() <-chan *protocol.Message
13+
Send(msg *protocol.Message)
14+
Done() <-chan struct{}
15+
}
16+
17+
type TaurusNetworkAdapter struct {
18+
sid string
19+
selfID party.ID
20+
transport Transport
21+
inbox chan *protocol.Message
22+
done chan struct{}
23+
peers party.IDSlice
24+
}
25+
26+
func NewTaurusNetworkAdapter(
27+
sid string,
28+
selfID party.ID,
29+
t Transport,
30+
peers party.IDSlice,
31+
) *TaurusNetworkAdapter {
32+
a := &TaurusNetworkAdapter{
33+
sid: sid,
34+
selfID: selfID,
35+
transport: t,
36+
inbox: make(chan *protocol.Message, 100),
37+
done: make(chan struct{}),
38+
peers: peers,
39+
}
40+
go a.route()
41+
return a
42+
}
43+
44+
func (a *TaurusNetworkAdapter) Next() <-chan *protocol.Message { return a.inbox }
45+
func (a *TaurusNetworkAdapter) Done() <-chan struct{} { return a.done }
46+
47+
func (a *TaurusNetworkAdapter) Send(msg *protocol.Message) {
48+
wire, err := json.Marshal(msg)
49+
if err != nil {
50+
slog.Error("❌ marshal protocol msg", "err", err)
51+
return
52+
}
53+
m := Msg{SID: a.sid, From: string(msg.From), IsBroadcast: msg.Broadcast, Data: wire}
54+
for _, pid := range a.peers {
55+
if pid == a.selfID {
56+
continue
57+
}
58+
if msg.Broadcast || msg.IsFor(pid) {
59+
_ = a.transport.Send(string(pid), m)
60+
}
61+
}
62+
}
63+
64+
func (a *TaurusNetworkAdapter) route() {
65+
for {
66+
select {
67+
case tm, ok := <-a.transport.Inbox():
68+
if !ok {
69+
close(a.done)
70+
return
71+
}
72+
var pm protocol.Message
73+
if err := json.Unmarshal(tm.Data, &pm); err != nil {
74+
slog.Error("❌ unmarshal protocol msg", "err", err)
75+
continue
76+
}
77+
select {
78+
case a.inbox <- &pm:
79+
default:
80+
slog.Warn("⚠️ inbox full, drop msg", "self", a.selfID)
81+
}
82+
case <-a.transport.Done():
83+
close(a.done)
84+
return
85+
}
86+
}
87+
}

pkg/mpc/taurus/cmp.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package taurus
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"math/big"
8+
9+
"github.com/fystack/mpcium/pkg/logger"
10+
"github.com/fystack/mpcium/pkg/types"
11+
"github.com/taurusgroup/multi-party-sig/pkg/ecdsa"
12+
"github.com/taurusgroup/multi-party-sig/pkg/math/curve"
13+
"github.com/taurusgroup/multi-party-sig/pkg/party"
14+
"github.com/taurusgroup/multi-party-sig/pkg/pool"
15+
"github.com/taurusgroup/multi-party-sig/pkg/protocol"
16+
"github.com/taurusgroup/multi-party-sig/protocols/cmp"
17+
)
18+
19+
type CmpParty struct {
20+
sid string
21+
id party.ID
22+
ids party.IDSlice
23+
threshold int
24+
pl *pool.Pool
25+
savedData *cmp.Config
26+
network NetworkInterface
27+
}
28+
29+
func NewCmpParty(
30+
sid string,
31+
id party.ID,
32+
ids party.IDSlice,
33+
threshold int,
34+
pl *pool.Pool,
35+
network NetworkInterface,
36+
) *CmpParty {
37+
return &CmpParty{
38+
sid: sid,
39+
id: id,
40+
ids: ids,
41+
threshold: threshold,
42+
pl: pl,
43+
network: network,
44+
}
45+
}
46+
47+
func (p *CmpParty) LoadKey(data *types.KeyData) error {
48+
cfg := cmp.EmptyConfig(curve.Secp256k1{})
49+
if err := cfg.UnmarshalBinary(data.Payload); err != nil {
50+
return fmt.Errorf("decode key data: %w", err)
51+
}
52+
p.savedData = cfg
53+
return nil
54+
}
55+
56+
func (p *CmpParty) Keygen(ctx context.Context) (types.KeyData, error) {
57+
h, err := protocol.NewMultiHandler(
58+
cmp.Keygen(curve.Secp256k1{}, p.id, p.ids, p.threshold, p.pl),
59+
[]byte(p.sid),
60+
)
61+
if err != nil {
62+
return types.KeyData{}, err
63+
}
64+
if err := p.executeProtocol(ctx, h); err != nil {
65+
return types.KeyData{}, err
66+
}
67+
res, err := h.Result()
68+
if err != nil {
69+
return types.KeyData{}, err
70+
}
71+
cfg, ok := res.(*cmp.Config)
72+
if !ok {
73+
return types.KeyData{}, errors.New("unexpected result type")
74+
}
75+
p.savedData = cfg
76+
packed, _ := cfg.MarshalBinary()
77+
return types.KeyData{SID: p.sid, Type: "taurus_cmp", Payload: packed}, nil
78+
}
79+
80+
func (p *CmpParty) Sign(ctx context.Context, msg *big.Int) ([]byte, error) {
81+
if p.savedData == nil {
82+
return nil, errors.New("no key loaded")
83+
}
84+
h, err := protocol.NewMultiHandler(
85+
cmp.Sign(p.savedData, p.ids, msg.Bytes(), p.pl),
86+
[]byte(p.sid),
87+
)
88+
if err != nil {
89+
return nil, err
90+
}
91+
if err := p.executeProtocol(ctx, h); err != nil {
92+
return nil, err
93+
}
94+
res, err := h.Result()
95+
if err != nil {
96+
return nil, err
97+
}
98+
sig, ok := res.(*ecdsa.Signature)
99+
if !ok {
100+
return nil, errors.New("unexpected signature result")
101+
}
102+
if !sig.Verify(p.savedData.PublicPoint(), msg.Bytes()) {
103+
return nil, errors.New("failed to verify cmp signature")
104+
}
105+
return sig.SigEthereum()
106+
}
107+
108+
func (p *CmpParty) Reshare(ctx context.Context) (types.KeyData, error) {
109+
if p.savedData == nil {
110+
return types.KeyData{}, errors.New("no key loaded")
111+
}
112+
h, err := protocol.NewMultiHandler(cmp.Refresh(p.savedData, p.pl), []byte(p.sid))
113+
if err != nil {
114+
return types.KeyData{}, err
115+
}
116+
if err := p.executeProtocol(ctx, h); err != nil {
117+
return types.KeyData{}, err
118+
}
119+
res, err := h.Result()
120+
if err != nil {
121+
return types.KeyData{}, err
122+
}
123+
cfg, ok := res.(*cmp.Config)
124+
if !ok {
125+
return types.KeyData{}, errors.New("unexpected result type")
126+
}
127+
p.savedData = cfg
128+
packed, _ := cfg.MarshalBinary()
129+
return types.KeyData{SID: p.sid, Type: "taurus_cmp", Payload: packed}, nil
130+
}
131+
132+
func (p *CmpParty) executeProtocol(ctx context.Context, h protocol.Handler) error {
133+
for {
134+
select {
135+
case <-ctx.Done():
136+
return ctx.Err()
137+
case msg, ok := <-h.Listen():
138+
if !ok {
139+
return nil
140+
}
141+
p.network.Send(msg)
142+
case msg := <-p.network.Next():
143+
if h.CanAccept(msg) {
144+
h.Accept(msg)
145+
} else {
146+
logger.Warn("⚠️ Ignored invalid msg", "self", p.id)
147+
}
148+
case <-p.network.Done():
149+
return nil
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)