@@ -129,7 +129,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
129129 if err != nil {
130130 logger .Fatal ("Failed to connect to NATS" , err )
131131 }
132- defer natsConn .Close ()
133132
134133 pubsub := messaging .NewNATSPubSub (natsConn )
135134 keygenBroker , err := messaging .NewJetStreamBroker (ctx , natsConn , event .KeygenBrokerStream , []string {
@@ -159,10 +158,10 @@ func runNode(ctx context.Context, c *cli.Command) error {
159158 reshareResultQueue := mqManager .NewMessageQueue ("mpc_reshare_result" )
160159 defer reshareResultQueue .Close ()
161160
162- logger .Info ("Node is running" , "peerID " , nodeID , "name" , nodeName )
161+ logger .Info ("Node is running" , "ID " , nodeID , "name" , nodeName )
163162
164163 peerNodeIDs := GetPeerIDs (peers )
165- peerRegistry := mpc .NewRegistry (nodeID , peerNodeIDs , consulClient .KV ())
164+ peerRegistry := mpc .NewRegistry (nodeID , peerNodeIDs , consulClient .KV (), directMessaging , pubsub , identityStore )
166165
167166 mpcNode := mpc .NewNode (
168167 nodeID ,
@@ -194,34 +193,46 @@ func runNode(ctx context.Context, c *cli.Command) error {
194193
195194 timeoutConsumer .Run ()
196195 defer timeoutConsumer .Close ()
197- keygenConsumer := eventconsumer .NewKeygenConsumer (natsConn , keygenBroker , pubsub , peerRegistry )
198- signingConsumer := eventconsumer .NewSigningConsumer (natsConn , signingBroker , pubsub , peerRegistry )
196+ keygenConsumer := eventconsumer .NewKeygenConsumer (natsConn , keygenBroker , pubsub , peerRegistry , genKeyResultQueue )
197+ signingConsumer := eventconsumer .NewSigningConsumer (natsConn , signingBroker , pubsub , peerRegistry , singingResultQueue )
199198
200199 // Make the node ready before starting the signing consumer
201200 if err := peerRegistry .Ready (); err != nil {
202201 logger .Error ("Failed to mark peer registry as ready" , err )
203202 }
204203 logger .Info ("[READY] Node is ready" , "nodeID" , nodeID )
204+
205+ logger .Info ("Starting consumers" , "nodeID" , nodeID )
205206 appContext , cancel := context .WithCancel (context .Background ())
206- // Setup signal handling to cancel context on termination signals.
207+ //Setup signal handling to cancel context on termination signals.
207208 go func () {
208209 sigChan := make (chan os.Signal , 1 )
209210 signal .Notify (sigChan , os .Interrupt , syscall .SIGTERM )
210211 <- sigChan
211212 logger .Warn ("Shutdown signal received, canceling context..." )
212213 cancel ()
213214
215+ // Resign from peer registry first (before closing NATS)
216+ if err := peerRegistry .Resign (); err != nil {
217+ logger .Error ("Failed to resign from peer registry" , err )
218+ }
219+
214220 // Gracefully close consumers
215221 if err := keygenConsumer .Close (); err != nil {
216222 logger .Error ("Failed to close keygen consumer" , err )
217223 }
218224 if err := signingConsumer .Close (); err != nil {
219225 logger .Error ("Failed to close signing consumer" , err )
220226 }
227+
228+ err := natsConn .Drain ()
229+ if err != nil {
230+ logger .Error ("Failed to drain NATS connection" , err )
231+ }
221232 }()
222233
223234 var wg sync.WaitGroup
224- errChan := make (chan error , 2 )
235+ errChan := make (chan error , 3 )
225236
226237 wg .Add (1 )
227238 go func () {
@@ -250,14 +261,14 @@ func runNode(ctx context.Context, c *cli.Command) error {
250261 logger .Info ("All consumers have finished" )
251262 close (errChan )
252263 }()
253-
254264 for err := range errChan {
255265 if err != nil {
256266 logger .Error ("Consumer error received" , err )
257267 cancel ()
258268 return err
259269 }
260270 }
271+
261272 return nil
262273}
263274
0 commit comments