diff --git a/p2p/host/peerstore/pstoreds/addr_book.go b/p2p/host/peerstore/pstoreds/addr_book.go index fe56b185af..7e95a203e4 100644 --- a/p2p/host/peerstore/pstoreds/addr_book.go +++ b/p2p/host/peerstore/pstoreds/addr_book.go @@ -282,7 +282,21 @@ func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio // ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in // a record.Envelope), which will expire after the given TTL. -// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details. +// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook +// for more details. +// +// The signed peer record's Seq is treated as monotonic per peer: a record +// with a Seq lower than the last accepted one is rejected. Equal Seq is +// accepted as a TTL refresh. +// +// When a newer signed record is accepted, addrs that were present in the +// previously stored signed record but absent in the new one are evicted, so +// the peerstore reflects the peer's current self-advertised set instead of +// the union of every record we have ever seen. Unsigned addrs (added via +// AddAddr / SetAddr from sources like DHT gossip, or from an identify +// exchange where the peer did not send a signed record) are not touched, and +// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so +// active sessions are not dropped. func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) { r, err := recordEnvelope.Record() if err != nil { @@ -303,6 +317,15 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim } addrs := cleanAddrs(rec.Addrs, rec.PeerID) + + // Diff against the previously stored signed record so we can drop addrs + // the peer no longer advertises before adding the new ones. + if superseded := ab.supersededSignedAddrs(rec.PeerID, addrs); len(superseded) > 0 { + if err := ab.deleteAddrs(rec.PeerID, superseded); err != nil { + return false, err + } + } + err = ab.setAddrs(rec.PeerID, addrs, ttl, ttlExtend, true) if err != nil { return false, err @@ -315,6 +338,62 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim return true, nil } +// supersededSignedAddrs returns addrs that were present in the previously +// stored signed peer record for p but are absent in newAddrs. Addrs held by +// a live connection (TTL >= ConnectedAddrTTL) are excluded so an active +// session is not torn down when the peer rotates its advertised set. +func (ab *dsAddrBook) supersededSignedAddrs(p peer.ID, newAddrs []ma.Multiaddr) []ma.Multiaddr { + prevEnv := ab.GetPeerRecord(p) + if prevEnv == nil { + return nil + } + prev, err := prevEnv.Record() + if err != nil { + return nil + } + prevRec, ok := prev.(*peer.PeerRecord) + if !ok { + return nil + } + + newSet := make(map[string]struct{}, len(newAddrs)) + for _, a := range newAddrs { + newSet[string(a.Bytes())] = struct{}{} + } + + pr, err := ab.loadRecord(p, true, false) + if err != nil { + return nil + } + pr.RLock() + connected := make(map[string]struct{}) + for _, a := range pr.Addrs { + if ttlIsConnected(time.Duration(a.Ttl)) { + connected[string(a.Addr)] = struct{}{} + } + } + pr.RUnlock() + + superseded := make([]ma.Multiaddr, 0, len(prevRec.Addrs)) + for _, a := range prevRec.Addrs { + key := string(a.Bytes()) + if _, still := newSet[key]; still { + continue + } + if _, isConn := connected[key]; isConn { + continue + } + superseded = append(superseded, a) + } + return superseded +} + +// ttlIsConnected reports whether the given TTL marks the address as held by +// a live connection. +func ttlIsConnected(ttl time.Duration) bool { + return ttl >= pstore.ConnectedAddrTTL +} + func (ab *dsAddrBook) latestPeerRecordSeq(p peer.ID) uint64 { pr, err := ab.loadRecord(p, true, false) if err != nil { diff --git a/p2p/host/peerstore/pstoreds/ds_test.go b/p2p/host/peerstore/pstoreds/ds_test.go index a240de3a1c..c5aeb70beb 100644 --- a/p2p/host/peerstore/pstoreds/ds_test.go +++ b/p2p/host/peerstore/pstoreds/ds_test.go @@ -5,12 +5,17 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" pstore "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/record" + "github.com/libp2p/go-libp2p/core/test" pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test" mockclock "github.com/benbjohnson/clock" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" + ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) @@ -72,6 +77,60 @@ func TestDsAddrBook(t *testing.T) { } } +// TestDsConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a +// newer signed peer record: addrs dropped from the new record are evicted, +// while unsigned addrs and addrs held by a live connection are kept. +func TestDsConsumePeerRecordReplacesStaleAddrs(t *testing.T) { + for name, dsFactory := range dstores { + t.Run(name, func(t *testing.T) { + opts := DefaultOpts() + store, closeDs := dsFactory(t) + defer closeDs() + ab, err := NewAddrBook(context.Background(), store, opts) + require.NoError(t, err) + defer ab.Close() + + priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) + require.NoError(t, err) + id, err := peer.IDFromPrivateKey(priv) + require.NoError(t, err) + + keep := ma.StringCast("/ip4/1.2.3.4/tcp/1") + drop := ma.StringCast("/ip4/1.2.3.4/tcp/2") + unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3") + connected := ma.StringCast("/ip4/1.2.3.4/tcp/4") + + rec1 := peer.NewPeerRecord() + rec1.PeerID = id + rec1.Seq = 1 + rec1.Addrs = []ma.Multiaddr{keep, drop, connected} + env1, err := record.Seal(rec1, priv) + require.NoError(t, err) + + accepted, err := ab.ConsumePeerRecord(env1, time.Hour) + require.NoError(t, err) + require.True(t, accepted) + + ab.AddAddr(id, connected, pstore.ConnectedAddrTTL) + ab.AddAddr(id, unsigned, time.Hour) + require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id)) + + rec2 := peer.NewPeerRecord() + rec2.PeerID = id + rec2.Seq = 2 + rec2.Addrs = []ma.Multiaddr{keep} + env2, err := record.Seal(rec2, priv) + require.NoError(t, err) + + accepted, err = ab.ConsumePeerRecord(env2, time.Hour) + require.NoError(t, err) + require.True(t, accepted) + + require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id)) + }) + } +} + func TestDsKeyBook(t *testing.T) { for name, dsFactory := range dstores { t.Run(name, func(t *testing.T) { diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index 4c389cfcd1..a09c172312 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -44,7 +44,9 @@ func ttlIsConnected(ttl time.Duration) bool { type peerRecordState struct { Envelope *record.Envelope - Seq uint64 + // Seq is the sequence number from the stored signed peer record. Newer + // records (higher Seq) supersede older ones for the same peer. + Seq uint64 } // Essentially Go stdlib's Priority Queue example @@ -289,8 +291,23 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du mab.addAddrs(p, addrs, ttl) } -// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL. -// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details. +// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will +// expire after the given TTL. See +// https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook +// for more details. +// +// The signed peer record's Seq is treated as monotonic per peer: a record with +// a Seq lower than the last accepted one is rejected. Equal Seq is accepted as +// a TTL refresh. +// +// When a newer signed record is accepted, addrs that were present in the +// previously stored signed record but absent in the new one are evicted, so +// the peerstore reflects the peer's current self-advertised set instead of +// the union of every record we have ever seen. Unsigned addrs (added via +// AddAddr / SetAddr from sources like DHT gossip, or from an identify +// exchange where the peer did not send a signed record) are not touched, and +// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so +// active sessions are not dropped. func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) { r, err := recordEnvelope.Record() if err != nil { @@ -316,6 +333,33 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt if !found && len(mab.signedPeerRecords) >= mab.maxSignedPeerRecords { return false, errors.New("too many signed peer records") } + + // Drop addrs from the previous signed record that are absent in the + // new one; addrs held by a live connection are preserved so we don't + // drop an active session if the peer rotates its advertised set. The + // prior addr set is recovered by decoding the stored envelope; that + // call caches on first access (core/record/envelope.go), so repeated + // lookups are cheap. + if found { + if prevRec := prevSignedAddrs(lastState); len(prevRec) > 0 { + newAddrSet := make(map[string]struct{}, len(rec.Addrs)) + for _, a := range rec.Addrs { + newAddrSet[string(a.Bytes())] = struct{}{} + } + for _, a := range prevRec { + key := string(a.Bytes()) + if _, still := newAddrSet[key]; still { + continue + } + ea, ok := mab.addrs.Addrs[rec.PeerID][key] + if !ok || ea.IsConnected() { + continue + } + mab.addrs.Delete(ea) + } + } + } + mab.signedPeerRecords[rec.PeerID] = &peerRecordState{ Envelope: recordEnvelope, Seq: rec.Seq, @@ -324,6 +368,24 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt return true, nil } +// prevSignedAddrs returns the addrs from the stored signed peer record, or +// nil if the envelope is absent or can't be decoded. Envelope.Record() caches +// its result, so repeated calls are cheap. +func prevSignedAddrs(s *peerRecordState) []ma.Multiaddr { + if s == nil || s.Envelope == nil { + return nil + } + r, err := s.Envelope.Record() + if err != nil { + return nil + } + pr, ok := r.(*peer.PeerRecord) + if !ok { + return nil + } + return pr.Addrs +} + func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) { if len(mab.addrs.Addrs[p]) == 0 { delete(mab.signedPeerRecords, p) diff --git a/p2p/host/peerstore/pstoremem/addr_book_test.go b/p2p/host/peerstore/pstoremem/addr_book_test.go index 21fa8d751b..72cc474498 100644 --- a/p2p/host/peerstore/pstoremem/addr_book_test.go +++ b/p2p/host/peerstore/pstoremem/addr_book_test.go @@ -8,7 +8,11 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/record" + "github.com/libp2p/go-libp2p/core/test" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" ) @@ -186,6 +190,56 @@ func TestPeerLimits(t *testing.T) { require.Equal(t, 1024, ab.addrs.NumUnconnectedAddrs()) } +// TestConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a +// newer signed peer record: addrs dropped from the new record are evicted, +// while unsigned addrs and addrs held by a live connection are kept. +func TestConsumePeerRecordReplacesStaleAddrs(t *testing.T) { + ab := NewAddrBook() + defer ab.Close() + + priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256) + require.NoError(t, err) + id, err := peer.IDFromPrivateKey(priv) + require.NoError(t, err) + + keep := ma.StringCast("/ip4/1.2.3.4/tcp/1") + drop := ma.StringCast("/ip4/1.2.3.4/tcp/2") + unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3") + connected := ma.StringCast("/ip4/1.2.3.4/tcp/4") + + rec1 := peer.NewPeerRecord() + rec1.PeerID = id + rec1.Seq = 1 + rec1.Addrs = []ma.Multiaddr{keep, drop, connected} + env1, err := record.Seal(rec1, priv) + require.NoError(t, err) + + accepted, err := ab.ConsumePeerRecord(env1, time.Hour) + require.NoError(t, err) + require.True(t, accepted) + + // Pin `connected` via ConnectedAddrTTL and add an unsigned addr. + ab.AddAddr(id, connected, peerstore.ConnectedAddrTTL) + ab.AddAddr(id, unsigned, time.Hour) + require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id)) + + // Newer record drops `drop` and only mentions `keep`. `drop` must go; + // `unsigned` (never in a signed record) and `connected` (held by a + // live connection) must stay. + rec2 := peer.NewPeerRecord() + rec2.PeerID = id + rec2.Seq = 2 + rec2.Addrs = []ma.Multiaddr{keep} + env2, err := record.Seal(rec2, priv) + require.NoError(t, err) + + accepted, err = ab.ConsumePeerRecord(env2, time.Hour) + require.NoError(t, err) + require.True(t, accepted) + + require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id)) +} + func BenchmarkPeerAddrs(b *testing.B) { sizes := [...]int{1, 10, 100, 1000, 10_000, 100_000, 1000_000} for _, sz := range sizes { diff --git a/p2p/host/peerstore/test/addr_book_suite.go b/p2p/host/peerstore/test/addr_book_suite.go index dbe48f5e84..099d64a2fd 100644 --- a/p2p/host/peerstore/test/addr_book_suite.go +++ b/p2p/host/peerstore/test/addr_book_suite.go @@ -475,8 +475,8 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin t.Error("unable to retrieve signed routing record from addrbook") } - // Adding a new envelope should clear existing certified addresses. - // Only the newly-added ones should remain + // A newer signed record drops addrs the peer no longer advertises. + // Unsigned addrs (added via plain AddAddrs) are retained. certifiedAddrs = certifiedAddrs[:3] rec4 := peer.NewPeerRecord() rec4.PeerID = id @@ -488,8 +488,9 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin if !accepted { t.Error("expected peer record to be accepted") } - // AssertAddressesEqual(t, certifiedAddrs, m.Addrs(id)) - AssertAddressesEqual(t, allAddrs, m.Addrs(id)) + expectedAfterRec4 := append([]multiaddr.Multiaddr{}, certifiedAddrs...) + expectedAfterRec4 = append(expectedAfterRec4, uncertifiedAddrs...) + AssertAddressesEqual(t, expectedAfterRec4, m.Addrs(id)) // update TTL on signed addrs to -1 to remove them. // the signed routing record should be deleted