@@ -2,13 +2,15 @@ package eventconsumer
22
33import (
44 "context"
5+ "encoding/json"
56 "fmt"
67 "time"
78
89 "github.com/fystack/mpcium/pkg/event"
910 "github.com/fystack/mpcium/pkg/logger"
1011 "github.com/fystack/mpcium/pkg/messaging"
1112 "github.com/fystack/mpcium/pkg/mpc"
13+ "github.com/fystack/mpcium/pkg/types"
1214 "github.com/google/uuid"
1315 "github.com/nats-io/nats.go"
1416 "github.com/nats-io/nats.go/jetstream"
@@ -34,25 +36,27 @@ type SigningConsumer interface {
3436
3537// signingConsumer implements SigningConsumer.
3638type signingConsumer struct {
37- natsConn * nats.Conn
38- pubsub messaging.PubSub
39- jsBroker messaging.MessageBroker
40- peerRegistry mpc.PeerRegistry
41- mpcThreshold int
39+ natsConn * nats.Conn
40+ pubsub messaging.PubSub
41+ jsBroker messaging.MessageBroker
42+ peerRegistry mpc.PeerRegistry
43+ mpcThreshold int
44+ signingResultQueue messaging.MessageQueue
4245
4346 // jsSub holds the JetStream subscription, so it can be cleaned up during Close().
4447 jsSub messaging.Subscription
4548}
4649
4750// NewSigningConsumer returns a new instance of SigningConsumer.
48- func NewSigningConsumer (natsConn * nats.Conn , jsBroker messaging.MessageBroker , pubsub messaging.PubSub , peerRegistry mpc.PeerRegistry ) SigningConsumer {
51+ func NewSigningConsumer (natsConn * nats.Conn , jsBroker messaging.MessageBroker , pubsub messaging.PubSub , peerRegistry mpc.PeerRegistry , signingResultQueue messaging. MessageQueue ) SigningConsumer {
4952 mpcThreshold := viper .GetInt ("mpc_threshold" )
5053 return & signingConsumer {
51- natsConn : natsConn ,
52- pubsub : pubsub ,
53- jsBroker : jsBroker ,
54- peerRegistry : peerRegistry ,
55- mpcThreshold : mpcThreshold ,
54+ natsConn : natsConn ,
55+ pubsub : pubsub ,
56+ jsBroker : jsBroker ,
57+ peerRegistry : peerRegistry ,
58+ mpcThreshold : mpcThreshold ,
59+ signingResultQueue : signingResultQueue ,
5660 }
5761}
5862
@@ -136,18 +140,25 @@ func (sc *signingConsumer) Run(ctx context.Context) error {
136140// When signing completes, the session publishes the result to a queue and calls the onSuccess callback, which sends a reply to the inbox that the SigningConsumer is monitoring.
137141// The reply signals completion, allowing the SigningConsumer to acknowledge the original message.
138142func (sc * signingConsumer ) handleSigningEvent (msg jetstream.Msg ) {
139- // Check if we still have enough peers before processing the message
140- requiredPeers := int64 (sc .mpcThreshold + 1 )
141- readyPeers := sc .peerRegistry .GetReadyPeersCount ()
142-
143- if readyPeers < requiredPeers {
144- logger .Warn ("SigningConsumer: Not enough peers to process signing request, rejecting message" ,
145- "ready" , readyPeers ,
146- "required" , requiredPeers )
147- // Immediately return and let nats redeliver the message with backoff
143+ // Parse the signing request message to extract transaction details
144+ raw := msg .Data ()
145+ var signingMsg types.SignTxMessage
146+ sessionID := msg .Headers ().Get ("SessionID" )
147+
148+ err := json .Unmarshal (raw , & signingMsg )
149+ if err != nil {
150+ logger .Error ("SigningConsumer: Failed to unmarshal signing message" , err )
151+ sc .handleSigningError (signingMsg , event .ErrorCodeUnmarshalFailure , err , sessionID )
152+ _ = msg .Nak ()
148153 return
149154 }
150155
156+ if ! sc .peerRegistry .AreMajorityReady () {
157+ requiredPeers := int64 (sc .mpcThreshold + 1 )
158+ err := fmt .Errorf ("not enough peers to process signing request: ready=%d, required=%d" , sc .peerRegistry .GetReadyPeersCount (), requiredPeers )
159+ sc .handleSigningError (signingMsg , event .ErrorCodeNotMajority , err , sessionID )
160+ return
161+ }
151162 // Create a reply inbox to receive the signing event response.
152163 replyInbox := nats .NewInbox ()
153164
@@ -199,6 +210,36 @@ func (sc *signingConsumer) handleSigningEvent(msg jetstream.Msg) {
199210 _ = msg .Nak ()
200211}
201212
213+ func (sc * signingConsumer ) handleSigningError (signMsg types.SignTxMessage , errorCode event.ErrorCode , err error , sessionID string ) {
214+ signingResult := event.SigningResultEvent {
215+ ResultType : event .ResultTypeError ,
216+ ErrorCode : errorCode ,
217+ NetworkInternalCode : signMsg .NetworkInternalCode ,
218+ WalletID : signMsg .WalletID ,
219+ TxID : signMsg .TxID ,
220+ ErrorReason : err .Error (),
221+ }
222+
223+ signingResultBytes , err := json .Marshal (signingResult )
224+ if err != nil {
225+ logger .Error ("Failed to marshal signing result event" , err ,
226+ "walletID" , signMsg .WalletID ,
227+ "txID" , signMsg .TxID ,
228+ )
229+ return
230+ }
231+
232+ err = sc .signingResultQueue .Enqueue (event .SigningResultCompleteTopic , signingResultBytes , & messaging.EnqueueOptions {
233+ IdempotententKey : buildSigningIdempotentKey (signMsg .TxID , sessionID , mpc .TypeSigningResultFmt ),
234+ })
235+ if err != nil {
236+ logger .Error ("Failed to enqueue signing result event" , err ,
237+ "walletID" , signMsg .WalletID ,
238+ "txID" , signMsg .TxID ,
239+ )
240+ }
241+ }
242+
202243// Close unsubscribes from the JetStream subject and cleans up resources.
203244func (sc * signingConsumer ) Close () error {
204245 if sc .jsSub != nil {
@@ -210,3 +251,13 @@ func (sc *signingConsumer) Close() error {
210251 }
211252 return nil
212253}
254+
255+ func buildSigningIdempotentKey (baseID string , sessionID string , formatTemplate string ) string {
256+ var uniqueKey string
257+ if sessionID != "" {
258+ uniqueKey = fmt .Sprintf ("%s:%s" , baseID , sessionID )
259+ } else {
260+ uniqueKey = baseID
261+ }
262+ return fmt .Sprintf (formatTemplate , uniqueKey )
263+ }
0 commit comments