Periodically persist routing table snapshots#650
Conversation
|
ping @aschmahmann . |
|
@aschmahmann Have changed the base branch to the feature branch. Let's get this in first and then the restore changes. |
aschmahmann
left a comment
There was a problem hiding this comment.
Looks pretty good, mostly made some code organization comments. I don't feel super strongly about any of them, but I do think they'll make the code simpler, let me know if you disagree.
I also added a comment about my concerns with the transition to signed peer records for you to respond to.
|
|
||
|
|
||
| // Encapsulates a routing table snapshot for persistence. Not to be transmitted over the wire. | ||
| message RoutingTableSnapshot { | ||
| message Peer { | ||
| // ID of a given peer. | ||
| bytes id = 1; | ||
|
|
||
| // multiaddrs for a given peer | ||
| repeated bytes addrs = 2; | ||
|
|
||
| // timestamp for when the peer was added to the Routing Table. | ||
| // Unix epoch nano seconds. | ||
| int64 addedAtNs = 3; | ||
| } | ||
|
|
||
| // The peers that were members of the routing table. | ||
| repeated Peer peers = 1; | ||
|
|
||
| // The timestamp when this snapshot was taken. | ||
| // Unix epoch nano seconds. | ||
| int64 timestampNs = 2; | ||
| } No newline at end of file |
There was a problem hiding this comment.
I'd probably move this into the persist package since you've made one anyway
| func (p *RoutingTableSnapshot_Peer) Addresses() []ma.Multiaddr { | ||
| if p == nil { | ||
| return nil | ||
| } | ||
|
|
||
| maddrs := make([]ma.Multiaddr, 0, len(p.Addrs)) | ||
| for _, addr := range p.Addrs { | ||
| maddr, err := ma.NewMultiaddrBytes(addr) | ||
| if err != nil { | ||
| log.Debugw("error decoding multiaddr for peer", "peer", peer.ID(p.Id), "error", err) | ||
| continue | ||
| } | ||
|
|
||
| maddrs = append(maddrs, maddr) | ||
| } | ||
| return maddrs | ||
| } | ||
|
|
There was a problem hiding this comment.
I'd probably move this into the persist package since you've made one anyway
| ID peer.ID | ||
| Addrs []ma.Multiaddr |
There was a problem hiding this comment.
nit: any reason to keep these separate over using a peer.AddrInfo?
| bytes id = 1; | ||
|
|
||
| // multiaddrs for a given peer | ||
| repeated bytes addrs = 2; |
There was a problem hiding this comment.
@aarshkshah1992 if you'd prefer we can discuss this in the restore PR, but when we do a restore and try and load the addresses into the peerstore are we going to run into any issues if we had a signed peer record and then try to restore an unsigned peer record?
| // A Snapshotter provides the ability to save and restore a routing table from a Persistent medium. | ||
| type Snapshotter interface { | ||
| // Load recovers a snapshot from storage, and returns candidates to integrate in a fresh routing table. | ||
| Load() ([]*RtSnapshotPeerInfo, error) | ||
|
|
||
| // Store persists the current state of the routing table. | ||
| Store(h host.Host, rt *kbucket.RoutingTable) error | ||
| } |
There was a problem hiding this comment.
Curious, WDYT about defining this in the DHT instead, i.e. define it where it's used instead of created?
This will make doing the check var _ Snapshotter = (*dsSnapshotter)(nil) not work. However, this won't be necessary since our code won't compile as the DHT makes use of *dsSnapshotter.
I'd feel more strongly about this if this package was in a separate repo from the DHT since then the DHT would need to depend on an interface repo. As it is, this is more of a WDYT.
| s := &dht_pb.RoutingTableSnapshot{} | ||
| if err := s.Unmarshal(val); err != nil { | ||
| return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err) | ||
| } | ||
|
|
||
| result := make([]*RtSnapshotPeerInfo, 0, len(s.Peers)) | ||
| for i := range s.Peers { | ||
| p := s.Peers[i] | ||
| var id peer.ID | ||
| if err := id.Unmarshal(p.Id); err != nil { | ||
| logSnapshot.Warnw("failed to unmarshal peerId from snapshot", "err", err) | ||
| continue | ||
| } | ||
|
|
||
| result = append(result, &RtSnapshotPeerInfo{ | ||
| ID: id, | ||
| Addrs: p.Addresses(), | ||
| AddedAt: time.Unix(0, p.AddedAtNs)}) | ||
| } |
There was a problem hiding this comment.
This code is just about unmarshalling a routing table snapshot into the nice application friendly form. I'd put the conversion functions next to the protobufs like we do for the DHT messages in pb/message.go
| snapshotPeers := make([]*dht_pb.RoutingTableSnapshot_Peer, 0, len(pinfos)) | ||
|
|
||
| for _, p := range pinfos { | ||
| id, err := p.Id.MarshalBinary() | ||
| if err != nil { | ||
| logSnapshot.Warnw("encountered error while adding peer to routing table snapshot; skipping", "peer", p.Id, "err", err) | ||
| continue | ||
| } | ||
| rp := &dht_pb.RoutingTableSnapshot_Peer{} | ||
| rp.Id = id | ||
| addrs := h.Peerstore().Addrs(p.Id) | ||
| rp.Addrs = make([][]byte, len(addrs)) | ||
| for i, maddr := range addrs { | ||
| rp.Addrs[i] = maddr.Bytes() | ||
| } | ||
|
|
||
| rp.AddedAtNs = p.AddedAt.UnixNano() | ||
| snapshotPeers = append(snapshotPeers, rp) | ||
| } | ||
|
|
||
| snap := dht_pb.RoutingTableSnapshot{ | ||
| Peers: snapshotPeers, | ||
| TimestampNs: time.Now().Unix(), | ||
| } | ||
|
|
||
| bytes, err := snap.Marshal() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to marshal snapshot %w", err) | ||
| } |
There was a problem hiding this comment.
This code is just about marshalling a routing table snapshot from the nice application friendly form. I'd put the conversion functions next to the protobufs like we do for the DHT messages in pb/message.go
| } | ||
|
|
||
| // assert snapshot & close dht | ||
| time.Sleep(500 * time.Millisecond) // wait for one snapshot |
There was a problem hiding this comment.
Why 500ms when a snapshot should happen every 100ms? Just want to make sure I'm not missing anything
| testSetGet("valid", true, "newer", nil) | ||
| } | ||
|
|
||
| func TestRoutingTableSnapshot(t *testing.T) { |
There was a problem hiding this comment.
nit: maybe rename to TestRoutingTableSnapshotStore or something unless you're going to just extend this test when you test snapshot restoring.
@Stebalien @aschmahmann
One half of #387.
Once we land this, will create a PR for seeding the Routing Table when the DHT starts.