Skip to content

Commit ea1d761

Browse files
committed
fix rejoining ecdh bug and add ecdh cache key cleaning
1 parent 75062c0 commit ea1d761

4 files changed

Lines changed: 47 additions & 24 deletions

File tree

pkg/identity/identity.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,17 @@ func (s *fileStore) GetSymmetricKey(peerID string) ([]byte, error) {
239239
}
240240

241241
func (s *fileStore) CheckSymmetricKeyComplete(desired int) bool {
242-
return len(s.symmetricKeys) == desired
242+
s.mu.RLock()
243+
defer s.mu.RUnlock()
244+
245+
completeCount := 0
246+
for _, value := range s.symmetricKeys {
247+
if len(value) > 0 {
248+
completeCount++
249+
}
250+
}
251+
252+
return completeCount == desired
243253
}
244254

245255
// GetPublicKey retrieves a node's public key by its ID

pkg/mpc/key_exchange_session.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ const (
2828
)
2929

3030
type ECDHSession interface {
31-
StartKeyExchange() error
31+
ListenKeyExchange() error
3232
BroadcastPublicKey() error
3333
WaitForExchangeComplete() error
34+
ResetLocalKeys()
3435
ErrChan() <-chan error
3536
Close() error
3637
}
@@ -65,7 +66,14 @@ func NewECDHSession(
6566
}
6667
}
6768

68-
func (e *ecdhSession) StartKeyExchange() error {
69+
func (e *ecdhSession) ResetLocalKeys() {
70+
// Set a specific key to an empty []byte
71+
for _, peerID := range e.peerIDs {
72+
e.identityStore.SetSymmetricKey(peerID, []byte{})
73+
}
74+
}
75+
76+
func (e *ecdhSession) ListenKeyExchange() error {
6977
// Generate an ephemeral ECDH key pair
7078
privateKey, err := ecdh.X25519().GenerateKey(rand.Reader)
7179
if err != nil {
@@ -85,7 +93,7 @@ func (e *ecdhSession) StartKeyExchange() error {
8593
if ecdhMsg.From == e.nodeID {
8694
return
8795
}
88-
logger.Info("Received ECDH message from", "node", ecdhMsg.From)
96+
8997
//TODO: consider how to avoid replay attack
9098
if err := e.identityStore.VerifySignature(&ecdhMsg); err != nil {
9199
e.errCh <- err
@@ -115,10 +123,11 @@ func (e *ecdhSession) StartKeyExchange() error {
115123
logger.Info("ALL PEERS ARE READY! Starting to accept MPC requests")
116124

117125
e.mu.Lock()
118-
e.exchangeDone = true
126+
if !e.exchangeDone {
127+
e.exchangeDone = true
128+
e.exchangeComplete <- struct{}{}
129+
}
119130
e.mu.Unlock()
120-
121-
e.exchangeComplete <- struct{}{}
122131
}
123132
})
124133

pkg/mpc/node.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ func NewNode(
5757
logger.Info("Starting new node, preparams is generated successfully!", "elapsed", elapsed.Milliseconds())
5858
// Each node initiates the DH key exchange listener at the beginning and invoke message sending when all peers are ready
5959
dhSession := NewECDHSession(nodeID, peerIDs, pubSub, identityStore)
60-
if err := dhSession.StartKeyExchange(); err != nil {
61-
logger.Fatal("Failed to start DH key exchange", err)
60+
if err := dhSession.ListenKeyExchange(); err != nil {
61+
logger.Fatal("Failed to listen to DH key exchange", err)
6262
}
6363

6464
node := &Node{
@@ -74,13 +74,18 @@ func NewNode(
7474
}
7575
node.ecdsaPreParams = node.generatePreParams()
7676

77-
ecdhTask := func() {
78-
if err := dhSession.BroadcastPublicKey(); err != nil {
79-
logger.Fatal("DH key broadcast failed", err)
77+
// we define two types of tasks, initTask and resetTask
78+
ecdhTasks := func(isInit bool) {
79+
if isInit {
80+
if err := dhSession.BroadcastPublicKey(); err != nil {
81+
logger.Fatal("DH key broadcast failed", err)
82+
}
83+
} else {
84+
dhSession.ResetLocalKeys()
8085
}
8186
}
8287

83-
go peerRegistry.WatchPeersReady(ecdhTask)
88+
go peerRegistry.WatchPeersReady(ecdhTasks)
8489
return node
8590
}
8691

@@ -429,10 +434,6 @@ func (p *Node) GetECDHSession() ECDHSession {
429434
return p.ecdhSession
430435
}
431436

432-
func (p *Node) GetDHSession() ECDHSession {
433-
return p.ecdhSession
434-
}
435-
436437
func (p *Node) generatePreParams() []*keygen.LocalPreParams {
437438
start := time.Now()
438439
// Try to load from kvstore

pkg/mpc/registry.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ const (
2121
type PeerRegistry interface {
2222
Ready() error
2323
ArePeersReady() bool
24-
WatchPeersReady(callback func())
24+
WatchPeersReady(callback func(bool))
2525
// Resign is called by the node when it is going to shutdown
2626
Resign() error
2727
GetReadyPeersCount() int64
@@ -71,7 +71,7 @@ func (r *registry) readyKey(nodeID string) string {
7171
return fmt.Sprintf("ready/%s", nodeID)
7272
}
7373

74-
func (r *registry) registerReadyPairs(peerIDs []string, callback func()) {
74+
func (r *registry) registerReadyPairs(peerIDs []string, callback func(isInit bool)) {
7575
for _, peerID := range peerIDs {
7676
ready, exist := r.readyMap[peerID]
7777
if !exist {
@@ -89,9 +89,10 @@ func (r *registry) registerReadyPairs(peerIDs []string, callback func()) {
8989
r.mu.Lock()
9090
r.ready = true
9191
r.mu.Unlock()
92-
time.AfterFunc(5*time.Second, callback)
92+
time.AfterFunc(5*time.Second, func() {
93+
callback(true)
94+
})
9395
}
94-
9596
}
9697

9798
// Ready is called by the node when it complete generate preparams and starting to accept
@@ -122,9 +123,10 @@ func (r *registry) composeHealthCheckTopic(nodeID string) string {
122123
return fmt.Sprintf("healthcheck:%s", nodeID)
123124
}
124125

125-
func (r *registry) WatchPeersReady(callback func()) {
126+
func (r *registry) WatchPeersReady(callback func(isInit bool)) {
127+
go r.checkPeersHealth()
128+
126129
ticker := time.NewTicker(ReadinessCheckPeriod)
127-
go r.checkPeersHeath()
128130
// first tick is executed immediately
129131
for ; true; <-ticker.C {
130132
pairs, _, err := r.consulKV.List("ready/", nil)
@@ -151,6 +153,7 @@ func (r *registry) WatchPeersReady(callback func()) {
151153
logger.Warn("Peer disconnected!", "peerID", peerID)
152154
r.readyMap[peerID] = false
153155
atomic.AddInt64(&r.readyCount, -1)
156+
callback(false)
154157
}
155158

156159
}
@@ -161,7 +164,7 @@ func (r *registry) WatchPeersReady(callback func()) {
161164

162165
}
163166

164-
func (r *registry) checkPeersHeath() {
167+
func (r *registry) checkPeersHealth() {
165168
for {
166169
time.Sleep(5 * time.Second)
167170
if !r.ArePeersReady() {

0 commit comments

Comments
 (0)