Skip to content

Commit e5b7bf0

Browse files
authored
fix(peerstore): replace stale addrs on newer signed peer record (#3487)
1 parent 544db45 commit e5b7bf0

5 files changed

Lines changed: 263 additions & 8 deletions

File tree

p2p/host/peerstore/pstoreds/addr_book.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,21 @@ func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
282282

283283
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
284284
// a record.Envelope), which will expire after the given TTL.
285-
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
285+
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook
286+
// for more details.
287+
//
288+
// The signed peer record's Seq is treated as monotonic per peer: a record
289+
// with a Seq lower than the last accepted one is rejected. Equal Seq is
290+
// accepted as a TTL refresh.
291+
//
292+
// When a newer signed record is accepted, addrs that were present in the
293+
// previously stored signed record but absent in the new one are evicted, so
294+
// the peerstore reflects the peer's current self-advertised set instead of
295+
// the union of every record we have ever seen. Unsigned addrs (added via
296+
// AddAddr / SetAddr from sources like DHT gossip, or from an identify
297+
// exchange where the peer did not send a signed record) are not touched, and
298+
// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so
299+
// active sessions are not dropped.
286300
func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
287301
r, err := recordEnvelope.Record()
288302
if err != nil {
@@ -303,6 +317,15 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim
303317
}
304318

305319
addrs := cleanAddrs(rec.Addrs, rec.PeerID)
320+
321+
// Diff against the previously stored signed record so we can drop addrs
322+
// the peer no longer advertises before adding the new ones.
323+
if superseded := ab.supersededSignedAddrs(rec.PeerID, addrs); len(superseded) > 0 {
324+
if err := ab.deleteAddrs(rec.PeerID, superseded); err != nil {
325+
return false, err
326+
}
327+
}
328+
306329
err = ab.setAddrs(rec.PeerID, addrs, ttl, ttlExtend, true)
307330
if err != nil {
308331
return false, err
@@ -315,6 +338,62 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim
315338
return true, nil
316339
}
317340

341+
// supersededSignedAddrs returns addrs that were present in the previously
342+
// stored signed peer record for p but are absent in newAddrs. Addrs held by
343+
// a live connection (TTL >= ConnectedAddrTTL) are excluded so an active
344+
// session is not torn down when the peer rotates its advertised set.
345+
func (ab *dsAddrBook) supersededSignedAddrs(p peer.ID, newAddrs []ma.Multiaddr) []ma.Multiaddr {
346+
prevEnv := ab.GetPeerRecord(p)
347+
if prevEnv == nil {
348+
return nil
349+
}
350+
prev, err := prevEnv.Record()
351+
if err != nil {
352+
return nil
353+
}
354+
prevRec, ok := prev.(*peer.PeerRecord)
355+
if !ok {
356+
return nil
357+
}
358+
359+
newSet := make(map[string]struct{}, len(newAddrs))
360+
for _, a := range newAddrs {
361+
newSet[string(a.Bytes())] = struct{}{}
362+
}
363+
364+
pr, err := ab.loadRecord(p, true, false)
365+
if err != nil {
366+
return nil
367+
}
368+
pr.RLock()
369+
connected := make(map[string]struct{})
370+
for _, a := range pr.Addrs {
371+
if ttlIsConnected(time.Duration(a.Ttl)) {
372+
connected[string(a.Addr)] = struct{}{}
373+
}
374+
}
375+
pr.RUnlock()
376+
377+
superseded := make([]ma.Multiaddr, 0, len(prevRec.Addrs))
378+
for _, a := range prevRec.Addrs {
379+
key := string(a.Bytes())
380+
if _, still := newSet[key]; still {
381+
continue
382+
}
383+
if _, isConn := connected[key]; isConn {
384+
continue
385+
}
386+
superseded = append(superseded, a)
387+
}
388+
return superseded
389+
}
390+
391+
// ttlIsConnected reports whether the given TTL marks the address as held by
392+
// a live connection.
393+
func ttlIsConnected(ttl time.Duration) bool {
394+
return ttl >= pstore.ConnectedAddrTTL
395+
}
396+
318397
func (ab *dsAddrBook) latestPeerRecordSeq(p peer.ID) uint64 {
319398
pr, err := ab.loadRecord(p, true, false)
320399
if err != nil {

p2p/host/peerstore/pstoreds/ds_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,17 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/libp2p/go-libp2p/core/crypto"
9+
"github.com/libp2p/go-libp2p/core/peer"
810
pstore "github.com/libp2p/go-libp2p/core/peerstore"
11+
"github.com/libp2p/go-libp2p/core/record"
12+
"github.com/libp2p/go-libp2p/core/test"
913
pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test"
1014

1115
mockclock "github.com/benbjohnson/clock"
1216
ds "github.com/ipfs/go-datastore"
1317
"github.com/ipfs/go-datastore/sync"
18+
ma "github.com/multiformats/go-multiaddr"
1419
"github.com/stretchr/testify/require"
1520
)
1621

@@ -72,6 +77,60 @@ func TestDsAddrBook(t *testing.T) {
7277
}
7378
}
7479

80+
// TestDsConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a
81+
// newer signed peer record: addrs dropped from the new record are evicted,
82+
// while unsigned addrs and addrs held by a live connection are kept.
83+
func TestDsConsumePeerRecordReplacesStaleAddrs(t *testing.T) {
84+
for name, dsFactory := range dstores {
85+
t.Run(name, func(t *testing.T) {
86+
opts := DefaultOpts()
87+
store, closeDs := dsFactory(t)
88+
defer closeDs()
89+
ab, err := NewAddrBook(context.Background(), store, opts)
90+
require.NoError(t, err)
91+
defer ab.Close()
92+
93+
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
94+
require.NoError(t, err)
95+
id, err := peer.IDFromPrivateKey(priv)
96+
require.NoError(t, err)
97+
98+
keep := ma.StringCast("/ip4/1.2.3.4/tcp/1")
99+
drop := ma.StringCast("/ip4/1.2.3.4/tcp/2")
100+
unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3")
101+
connected := ma.StringCast("/ip4/1.2.3.4/tcp/4")
102+
103+
rec1 := peer.NewPeerRecord()
104+
rec1.PeerID = id
105+
rec1.Seq = 1
106+
rec1.Addrs = []ma.Multiaddr{keep, drop, connected}
107+
env1, err := record.Seal(rec1, priv)
108+
require.NoError(t, err)
109+
110+
accepted, err := ab.ConsumePeerRecord(env1, time.Hour)
111+
require.NoError(t, err)
112+
require.True(t, accepted)
113+
114+
ab.AddAddr(id, connected, pstore.ConnectedAddrTTL)
115+
ab.AddAddr(id, unsigned, time.Hour)
116+
require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id))
117+
118+
rec2 := peer.NewPeerRecord()
119+
rec2.PeerID = id
120+
rec2.Seq = 2
121+
rec2.Addrs = []ma.Multiaddr{keep}
122+
env2, err := record.Seal(rec2, priv)
123+
require.NoError(t, err)
124+
125+
accepted, err = ab.ConsumePeerRecord(env2, time.Hour)
126+
require.NoError(t, err)
127+
require.True(t, accepted)
128+
129+
require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id))
130+
})
131+
}
132+
}
133+
75134
func TestDsKeyBook(t *testing.T) {
76135
for name, dsFactory := range dstores {
77136
t.Run(name, func(t *testing.T) {

p2p/host/peerstore/pstoremem/addr_book.go

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ func ttlIsConnected(ttl time.Duration) bool {
4444

4545
type peerRecordState struct {
4646
Envelope *record.Envelope
47-
Seq uint64
47+
// Seq is the sequence number from the stored signed peer record. Newer
48+
// records (higher Seq) supersede older ones for the same peer.
49+
Seq uint64
4850
}
4951

5052
// Essentially Go stdlib's Priority Queue example
@@ -289,8 +291,23 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
289291
mab.addAddrs(p, addrs, ttl)
290292
}
291293

292-
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL.
293-
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
294+
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will
295+
// expire after the given TTL. See
296+
// https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook
297+
// for more details.
298+
//
299+
// The signed peer record's Seq is treated as monotonic per peer: a record with
300+
// a Seq lower than the last accepted one is rejected. Equal Seq is accepted as
301+
// a TTL refresh.
302+
//
303+
// When a newer signed record is accepted, addrs that were present in the
304+
// previously stored signed record but absent in the new one are evicted, so
305+
// the peerstore reflects the peer's current self-advertised set instead of
306+
// the union of every record we have ever seen. Unsigned addrs (added via
307+
// AddAddr / SetAddr from sources like DHT gossip, or from an identify
308+
// exchange where the peer did not send a signed record) are not touched, and
309+
// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so
310+
// active sessions are not dropped.
294311
func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
295312
r, err := recordEnvelope.Record()
296313
if err != nil {
@@ -316,6 +333,33 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
316333
if !found && len(mab.signedPeerRecords) >= mab.maxSignedPeerRecords {
317334
return false, errors.New("too many signed peer records")
318335
}
336+
337+
// Drop addrs from the previous signed record that are absent in the
338+
// new one; addrs held by a live connection are preserved so we don't
339+
// drop an active session if the peer rotates its advertised set. The
340+
// prior addr set is recovered by decoding the stored envelope; that
341+
// call caches on first access (core/record/envelope.go), so repeated
342+
// lookups are cheap.
343+
if found {
344+
if prevRec := prevSignedAddrs(lastState); len(prevRec) > 0 {
345+
newAddrSet := make(map[string]struct{}, len(rec.Addrs))
346+
for _, a := range rec.Addrs {
347+
newAddrSet[string(a.Bytes())] = struct{}{}
348+
}
349+
for _, a := range prevRec {
350+
key := string(a.Bytes())
351+
if _, still := newAddrSet[key]; still {
352+
continue
353+
}
354+
ea, ok := mab.addrs.Addrs[rec.PeerID][key]
355+
if !ok || ea.IsConnected() {
356+
continue
357+
}
358+
mab.addrs.Delete(ea)
359+
}
360+
}
361+
}
362+
319363
mab.signedPeerRecords[rec.PeerID] = &peerRecordState{
320364
Envelope: recordEnvelope,
321365
Seq: rec.Seq,
@@ -324,6 +368,24 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
324368
return true, nil
325369
}
326370

371+
// prevSignedAddrs returns the addrs from the stored signed peer record, or
372+
// nil if the envelope is absent or can't be decoded. Envelope.Record() caches
373+
// its result, so repeated calls are cheap.
374+
func prevSignedAddrs(s *peerRecordState) []ma.Multiaddr {
375+
if s == nil || s.Envelope == nil {
376+
return nil
377+
}
378+
r, err := s.Envelope.Record()
379+
if err != nil {
380+
return nil
381+
}
382+
pr, ok := r.(*peer.PeerRecord)
383+
if !ok {
384+
return nil
385+
}
386+
return pr.Addrs
387+
}
388+
327389
func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) {
328390
if len(mab.addrs.Addrs[p]) == 0 {
329391
delete(mab.signedPeerRecords, p)

p2p/host/peerstore/pstoremem/addr_book_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/libp2p/go-libp2p/core/crypto"
1112
"github.com/libp2p/go-libp2p/core/peer"
13+
"github.com/libp2p/go-libp2p/core/peerstore"
14+
"github.com/libp2p/go-libp2p/core/record"
15+
"github.com/libp2p/go-libp2p/core/test"
1216
ma "github.com/multiformats/go-multiaddr"
1317
"github.com/stretchr/testify/require"
1418
)
@@ -186,6 +190,56 @@ func TestPeerLimits(t *testing.T) {
186190
require.Equal(t, 1024, ab.addrs.NumUnconnectedAddrs())
187191
}
188192

193+
// TestConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a
194+
// newer signed peer record: addrs dropped from the new record are evicted,
195+
// while unsigned addrs and addrs held by a live connection are kept.
196+
func TestConsumePeerRecordReplacesStaleAddrs(t *testing.T) {
197+
ab := NewAddrBook()
198+
defer ab.Close()
199+
200+
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
201+
require.NoError(t, err)
202+
id, err := peer.IDFromPrivateKey(priv)
203+
require.NoError(t, err)
204+
205+
keep := ma.StringCast("/ip4/1.2.3.4/tcp/1")
206+
drop := ma.StringCast("/ip4/1.2.3.4/tcp/2")
207+
unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3")
208+
connected := ma.StringCast("/ip4/1.2.3.4/tcp/4")
209+
210+
rec1 := peer.NewPeerRecord()
211+
rec1.PeerID = id
212+
rec1.Seq = 1
213+
rec1.Addrs = []ma.Multiaddr{keep, drop, connected}
214+
env1, err := record.Seal(rec1, priv)
215+
require.NoError(t, err)
216+
217+
accepted, err := ab.ConsumePeerRecord(env1, time.Hour)
218+
require.NoError(t, err)
219+
require.True(t, accepted)
220+
221+
// Pin `connected` via ConnectedAddrTTL and add an unsigned addr.
222+
ab.AddAddr(id, connected, peerstore.ConnectedAddrTTL)
223+
ab.AddAddr(id, unsigned, time.Hour)
224+
require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id))
225+
226+
// Newer record drops `drop` and only mentions `keep`. `drop` must go;
227+
// `unsigned` (never in a signed record) and `connected` (held by a
228+
// live connection) must stay.
229+
rec2 := peer.NewPeerRecord()
230+
rec2.PeerID = id
231+
rec2.Seq = 2
232+
rec2.Addrs = []ma.Multiaddr{keep}
233+
env2, err := record.Seal(rec2, priv)
234+
require.NoError(t, err)
235+
236+
accepted, err = ab.ConsumePeerRecord(env2, time.Hour)
237+
require.NoError(t, err)
238+
require.True(t, accepted)
239+
240+
require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id))
241+
}
242+
189243
func BenchmarkPeerAddrs(b *testing.B) {
190244
sizes := [...]int{1, 10, 100, 1000, 10_000, 100_000, 1000_000}
191245
for _, sz := range sizes {

p2p/host/peerstore/test/addr_book_suite.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -475,8 +475,8 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin
475475
t.Error("unable to retrieve signed routing record from addrbook")
476476
}
477477

478-
// Adding a new envelope should clear existing certified addresses.
479-
// Only the newly-added ones should remain
478+
// A newer signed record drops addrs the peer no longer advertises.
479+
// Unsigned addrs (added via plain AddAddrs) are retained.
480480
certifiedAddrs = certifiedAddrs[:3]
481481
rec4 := peer.NewPeerRecord()
482482
rec4.PeerID = id
@@ -488,8 +488,9 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin
488488
if !accepted {
489489
t.Error("expected peer record to be accepted")
490490
}
491-
// AssertAddressesEqual(t, certifiedAddrs, m.Addrs(id))
492-
AssertAddressesEqual(t, allAddrs, m.Addrs(id))
491+
expectedAfterRec4 := append([]multiaddr.Multiaddr{}, certifiedAddrs...)
492+
expectedAfterRec4 = append(expectedAfterRec4, uncertifiedAddrs...)
493+
AssertAddressesEqual(t, expectedAfterRec4, m.Addrs(id))
493494

494495
// update TTL on signed addrs to -1 to remove them.
495496
// the signed routing record should be deleted

0 commit comments

Comments
 (0)