Skip to content

Commit 75062c0

Browse files
committed
Automatically disconnect and remove the peer from consul registry if peer suddenly dies
1 parent fa7e84c commit 75062c0

4 files changed

Lines changed: 91 additions & 6 deletions

File tree

cmd/mpcium/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
162162
logger.Info("Node is running", "ID", nodeID, "name", nodeName)
163163

164164
peerNodeIDs := GetPeerIDs(peers)
165-
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, consulClient.KV())
165+
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, consulClient.KV(), directMessaging)
166166

167167
mpcNode := mpc.NewNode(
168168
nodeID,
@@ -232,6 +232,11 @@ func runNode(ctx context.Context, c *cli.Command) error {
232232
if err := ecdhSession.Close(); err != nil {
233233
logger.Error("Failed to close ECDH session", err)
234234
}
235+
236+
err := natsConn.Drain()
237+
if err != nil {
238+
logger.Error("Failed to drain NATS connection", err)
239+
}
235240
}()
236241

237242
var wg sync.WaitGroup

pkg/eventconsumer/keygen_consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,10 @@ func (sc *keygenConsumer) Run(ctx context.Context) error {
9090
sc.handleKeygenEvent,
9191
)
9292
if err != nil {
93-
return fmt.Errorf("failed to subscribe to signing events: %w", err)
93+
return fmt.Errorf("failed to subscribe to keygen events: %w", err)
9494
}
9595
sc.jsSub = sub
96-
logger.Info("SigningConsumer: Subscribed to signing events")
96+
logger.Info("SigningConsumer: Subscribed to keygen events")
9797

9898
// Block until context cancellation.
9999
<-ctx.Done()

pkg/messaging/point2point.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,17 @@ import (
1313
type DirectMessaging interface {
1414
Listen(topic string, handler func(data []byte)) (Subscription, error)
1515
SendToOther(topic string, data []byte) error
16+
SendToOtherWithRetry(topic string, data []byte, config RetryConfig) error
1617
SendToSelf(topic string, data []byte) error
1718
}
1819

20+
type RetryConfig struct {
21+
RetryAttempt uint
22+
ExponentialBackoff bool
23+
Delay time.Duration
24+
OnRetry func(n uint, err error)
25+
}
26+
1927
type natsDirectMessaging struct {
2028
natsConn *nats.Conn
2129
handlers map[string][]func([]byte)
@@ -65,6 +73,36 @@ func (d *natsDirectMessaging) SendToOther(topic string, message []byte) error {
6573
)
6674
}
6775

76+
func (d *natsDirectMessaging) SendToOtherWithRetry(topic string, message []byte, config RetryConfig) error {
77+
opts := []retry.Option{
78+
retry.MaxJitter(80 * time.Millisecond),
79+
}
80+
81+
if config.RetryAttempt > 0 {
82+
opts = append(opts, retry.Attempts(config.RetryAttempt))
83+
}
84+
if config.ExponentialBackoff {
85+
opts = append(opts, retry.DelayType(retry.BackOffDelay))
86+
}
87+
if config.Delay > 0 {
88+
opts = append(opts, retry.Delay(config.Delay))
89+
}
90+
if config.OnRetry != nil {
91+
opts = append(opts, retry.OnRetry(config.OnRetry))
92+
}
93+
94+
return retry.Do(
95+
func() error {
96+
_, err := d.natsConn.Request(topic, message, 3*time.Second)
97+
if err != nil {
98+
return err
99+
}
100+
return nil
101+
},
102+
opts...,
103+
)
104+
}
105+
68106
func (d *natsDirectMessaging) Listen(topic string, handler func(data []byte)) (Subscription, error) {
69107
sub, err := d.natsConn.Subscribe(topic, func(m *nats.Msg) {
70108
handler(m.Data)
@@ -76,6 +114,14 @@ func (d *natsDirectMessaging) Listen(topic string, handler func(data []byte)) (S
76114
return nil, err
77115
}
78116

117+
if err := d.natsConn.Flush(); err != nil {
118+
err := sub.Unsubscribe()
119+
if err != nil {
120+
logger.Error("Failed to unsubscribe", err)
121+
}
122+
return nil, fmt.Errorf("flush after subscribe failed: %w", err)
123+
}
124+
79125
d.mu.Lock()
80126
d.handlers[topic] = append(d.handlers[topic], handler)
81127
d.mu.Unlock()

pkg/mpc/registry.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ package mpc
22

33
import (
44
"fmt"
5+
"strings"
56
"sync"
67
"sync/atomic"
78
"time"
89

910
"github.com/fystack/mpcium/pkg/infra"
1011
"github.com/fystack/mpcium/pkg/logger"
12+
"github.com/fystack/mpcium/pkg/messaging"
1113
"github.com/hashicorp/consul/api"
1214
"github.com/samber/lo"
1315
)
@@ -35,20 +37,23 @@ type registry struct {
3537
mu sync.RWMutex
3638
ready bool // ready is true when all peers are ready
3739

38-
consulKV infra.ConsulKV
40+
consulKV infra.ConsulKV
41+
healthCheck messaging.DirectMessaging
3942
}
4043

4144
func NewRegistry(
4245
nodeID string,
4346
peerNodeIDs []string,
4447
consulKV infra.ConsulKV,
48+
directMessaging messaging.DirectMessaging,
4549
) *registry {
4650
return &registry{
4751
consulKV: consulKV,
4852
nodeID: nodeID,
4953
peerNodeIDs: getPeerIDsExceptSelf(nodeID, peerNodeIDs),
5054
readyMap: make(map[string]bool),
5155
readyCount: 1, // self
56+
healthCheck: directMessaging,
5257
}
5358
}
5459

@@ -104,12 +109,22 @@ func (r *registry) Ready() error {
104109
return fmt.Errorf("Put ready key failed: %w", err)
105110
}
106111

112+
_, err = r.healthCheck.Listen(r.composeHealthCheckTopic(r.nodeID), func(data []byte) {
113+
logger.Debug("Health check", "peerID", string(data))
114+
})
115+
if err != nil {
116+
return fmt.Errorf("Listen health check failed: %w", err)
117+
}
107118
return nil
108119
}
109120

121+
func (r *registry) composeHealthCheckTopic(nodeID string) string {
122+
return fmt.Sprintf("healthcheck:%s", nodeID)
123+
}
124+
110125
func (r *registry) WatchPeersReady(callback func()) {
111126
ticker := time.NewTicker(ReadinessCheckPeriod)
112-
go r.logReadyStatus()
127+
go r.checkPeersHeath()
113128
// first tick is executed immediately
114129
for ; true; <-ticker.C {
115130
pairs, _, err := r.consulKV.List("ready/", nil)
@@ -146,12 +161,31 @@ func (r *registry) WatchPeersReady(callback func()) {
146161

147162
}
148163

149-
func (r *registry) logReadyStatus() {
164+
func (r *registry) checkPeersHeath() {
150165
for {
151166
time.Sleep(5 * time.Second)
152167
if !r.ArePeersReady() {
153168
logger.Info("Peers are not ready yet", "ready", r.GetReadyPeersCount(), "expected", len(r.peerNodeIDs)+1)
154169
}
170+
171+
pairs, _, err := r.consulKV.List("ready/", nil)
172+
if err != nil {
173+
logger.Error("List ready keys failed", err)
174+
continue
175+
}
176+
readyPeerIDs := r.getReadyPeersFromKVStore(pairs)
177+
for _, peerID := range readyPeerIDs {
178+
err := r.healthCheck.SendToOtherWithRetry(r.composeHealthCheckTopic(peerID), []byte(peerID), messaging.RetryConfig{
179+
RetryAttempt: 2,
180+
})
181+
if err != nil && strings.Contains(err.Error(), "no responders") {
182+
logger.Info("No response from peer", "peerID", peerID)
183+
_, err := r.consulKV.Delete(r.readyKey(peerID), nil)
184+
if err != nil {
185+
logger.Error("Delete ready key failed", err)
186+
}
187+
}
188+
}
155189
}
156190
}
157191

0 commit comments

Comments
 (0)