Skip to content

Commit c42c6ea

Browse files
committed
Fix ECDH key exchange race condition in distributed deployments
1 parent c7e0875 commit c42c6ea

2 files changed

Lines changed: 47 additions & 11 deletions

File tree

pkg/mpc/key_exchange_session.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,19 @@ type ECDHSession interface {
3232
GetReadyPeersCount() int
3333
ErrChan() <-chan error
3434
Close() error
35+
OnKeyExchangeComplete(callback func())
3536
}
3637

3738
type ecdhSession struct {
38-
nodeID string
39-
peerIDs []string
40-
pubSub messaging.PubSub
41-
ecdhSub messaging.Subscription
42-
identityStore identity.Store
43-
privateKey *ecdh.PrivateKey
44-
publicKey *ecdh.PublicKey
45-
errCh chan error
39+
nodeID string
40+
peerIDs []string
41+
pubSub messaging.PubSub
42+
ecdhSub messaging.Subscription
43+
identityStore identity.Store
44+
privateKey *ecdh.PrivateKey
45+
publicKey *ecdh.PublicKey
46+
errCh chan error
47+
onKeyExchangeComplete func()
4648
}
4749

4850
func NewECDHSession(
@@ -72,6 +74,10 @@ func (e *ecdhSession) ErrChan() <-chan error {
7274
return e.errCh
7375
}
7476

77+
func (e *ecdhSession) OnKeyExchangeComplete(callback func()) {
78+
e.onKeyExchangeComplete = callback
79+
}
80+
7581
func (e *ecdhSession) ListenKeyExchange() error {
7682
// Generate an ephemeral ECDH key pair
7783
privateKey, err := ecdh.X25519().GenerateKey(rand.Reader)
@@ -113,7 +119,15 @@ func (e *ecdhSession) ListenKeyExchange() error {
113119
// Derive symmetric key using HKDF
114120
symmetricKey := e.deriveSymmetricKey(sharedSecret, ecdhMsg.From)
115121
e.identityStore.SetSymmetricKey(ecdhMsg.From, symmetricKey)
116-
logger.Debug("ECDH progress", "peer", ecdhMsg.From, "current", e.identityStore.GetSymetricKeyCount())
122+
123+
currentKeyCount := e.identityStore.GetSymetricKeyCount()
124+
logger.Debug("ECDH progress", "peer", ecdhMsg.From, "current", currentKeyCount, "expected", len(e.peerIDs))
125+
126+
// Check if ECDH exchange is complete and notify callback
127+
if currentKeyCount == len(e.peerIDs) && e.onKeyExchangeComplete != nil {
128+
logger.Info("ECDH key exchange completed successfully", "totalKeys", currentKeyCount)
129+
e.onKeyExchangeComplete()
130+
}
117131
})
118132

119133
e.ecdhSub = sub

pkg/mpc/registry.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ func NewRegistry(
8585
mpcThreshold: mpcThreshold,
8686
}
8787

88+
// Set up callback to check ready state when ECDH completes
89+
ecdhSession.OnKeyExchangeComplete(func() {
90+
reg.checkAndUpdateReadyState()
91+
})
92+
8893
go reg.consumeECDHErrors()
8994

9095
return reg
@@ -126,11 +131,28 @@ func (r *registry) registerReadyPairs(peerIDs []string) {
126131
r.readyMap[peerID] = true
127132
}
128133

129-
if len(peerIDs) == len(r.peerNodeIDs) && !r.ready {
134+
// Check if we should update ready state
135+
r.checkAndUpdateReadyState()
136+
}
137+
138+
// checkAndUpdateReadyState checks if all conditions are met to mark the registry as ready
139+
func (r *registry) checkAndUpdateReadyState() {
140+
// Count ready peers in readyMap
141+
readyPeersCount := 0
142+
for _, isReady := range r.readyMap {
143+
if isReady {
144+
readyPeersCount++
145+
}
146+
}
147+
148+
// Only mark as ready when both conditions are met:
149+
// 1. All peers are registered in Consul
150+
// 2. ECDH key exchange is complete
151+
if readyPeersCount == len(r.peerNodeIDs) && r.isECDHReady() && !r.ready {
130152
r.mu.Lock()
131153
r.ready = true
132154
r.mu.Unlock()
133-
logger.Info("All peers are ready including ECDH exchange completion")
155+
logger.Info("[READY] All peers are ready including ECDH exchange completion")
134156
}
135157
}
136158

0 commit comments

Comments
 (0)