Skip to content

Commit 04772e4

Browse files
authored
Merge pull request #32 from elecbug/v0.8
Update v0.8.4
2 parents ad7d259 + d5a3eed commit 04772e4

2 files changed

Lines changed: 43 additions & 39 deletions

File tree

p2p/network.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64, cf
5050
j := maps[neighbor]
5151

5252
edge := p2pEdge{
53-
TargetID: PeerID(j),
54-
Latency: edgeLatency(),
53+
targetID: PeerID(j),
54+
edgeLatency: edgeLatency(),
5555
}
5656

57-
n.edges[edge.TargetID] = edge
57+
n.edges[edge.targetID] = edge
5858
}
5959
}
6060

p2p/node.go

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ type p2pNode struct {
2525

2626
// p2pEdge represents a connection from one node to another in the P2P network.
2727
type p2pEdge struct {
28-
TargetID PeerID
29-
Latency float64 // in milliseconds
28+
targetID PeerID
29+
edgeLatency float64 // in milliseconds
3030
}
3131

3232
// newNode creates a new Node with the given ID and node latency.
@@ -58,7 +58,6 @@ func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Cont
5858
return
5959
default:
6060
first := false
61-
var excludeSnapshot map[PeerID]struct{}
6261

6362
n.mu.Lock()
6463
if _, ok := n.recvFrom[msg.Content]; !ok {
@@ -69,32 +68,31 @@ func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Cont
6968
if _, ok := n.seenAt[msg.Content]; !ok {
7069
n.seenAt[msg.Content] = time.Now()
7170
first = true
72-
excludeSnapshot = copyIDSet(n.recvFrom[msg.Content])
7371
}
7472
n.mu.Unlock()
7573

7674
if first {
77-
go func(msg Message, exclude map[PeerID]struct{}) {
75+
go func(msg Message) {
7876
time.Sleep(time.Duration(n.nodeLatency) * time.Millisecond)
79-
n.publish(network, msg, exclude)
80-
}(msg, excludeSnapshot)
77+
n.publish(network, msg)
78+
}(msg)
8179
}
8280
}
8381
}
8482
}(ctx, wg)
8583
}
8684

87-
// copyIDSet creates a shallow copy of a set of IDs.
88-
func copyIDSet(src map[PeerID]struct{}) map[PeerID]struct{} {
89-
dst := make(map[PeerID]struct{}, len(src))
90-
for k := range src {
91-
dst[k] = struct{}{}
92-
}
93-
return dst
94-
}
85+
// // copyIDSet creates a shallow copy of a set of IDs.
86+
// func copyIDSet(src map[PeerID]struct{}) map[PeerID]struct{} {
87+
// dst := make(map[PeerID]struct{}, len(src))
88+
// for k := range src {
89+
// dst[k] = struct{}{}
90+
// }
91+
// return dst
92+
// }
9593

9694
// publish sends the message to neighbors, excluding 'exclude' and already-sent targets.
97-
func (n *p2pNode) publish(network *Network, msg Message, exclude map[PeerID]struct{}) {
95+
func (n *p2pNode) publish(network *Network, msg Message) {
9896
content := msg.Content
9997
protocol := msg.Protocol
10098
hopCount := msg.HopCount
@@ -105,40 +103,46 @@ func (n *p2pNode) publish(network *Network, msg Message, exclude map[PeerID]stru
105103
if _, ok := n.sentTo[content]; !ok {
106104
n.sentTo[content] = make(map[PeerID]struct{})
107105
}
106+
if _, ok := n.recvFrom[content]; !ok {
107+
n.recvFrom[content] = make(map[PeerID]struct{})
108+
}
108109

109110
willSendEdges := make([]p2pEdge, 0)
110111

111-
for _, edge := range n.edges {
112-
if _, wasSender := exclude[edge.TargetID]; wasSender {
113-
continue
114-
}
115-
if _, already := n.sentTo[content][edge.TargetID]; already {
116-
continue
117-
}
118-
if _, received := n.recvFrom[content][edge.TargetID]; received {
119-
continue
112+
if protocol == Flooding || protocol == Gossiping {
113+
for _, edge := range n.edges {
114+
if _, already := n.sentTo[content][edge.targetID]; already {
115+
continue
116+
}
117+
if _, received := n.recvFrom[content][edge.targetID]; received {
118+
continue
119+
}
120+
n.sentTo[content][edge.targetID] = struct{}{}
121+
122+
willSendEdges = append(willSendEdges, edge)
120123
}
121-
n.sentTo[content][edge.TargetID] = struct{}{}
122124

123-
willSendEdges = append(willSendEdges, edge)
124-
}
125+
if protocol == Gossiping && len(willSendEdges) > 0 {
126+
rand.Shuffle(len(willSendEdges), func(i, j int) {
127+
willSendEdges[i], willSendEdges[j] = willSendEdges[j], willSendEdges[i]
128+
})
125129

126-
if protocol == Gossiping && len(willSendEdges) > 0 {
127-
rand.Shuffle(len(willSendEdges), func(i, j int) {
128-
willSendEdges[i], willSendEdges[j] = willSendEdges[j], willSendEdges[i]
129-
})
130+
k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor)
131+
willSendEdges = willSendEdges[:k]
132+
}
133+
} else if protocol == Custom {
130134

131-
k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor)
132-
willSendEdges = willSendEdges[:k]
135+
} else {
136+
return
133137
}
134138

135139
for _, edge := range willSendEdges {
136140
edgeCopy := edge
137141

138142
go func(e p2pEdge) {
139-
time.Sleep(time.Duration(e.Latency) * time.Millisecond)
143+
time.Sleep(time.Duration(e.edgeLatency) * time.Millisecond)
140144

141-
network.nodes[e.TargetID].msgQueue <- Message{
145+
network.nodes[e.targetID].msgQueue <- Message{
142146
From: n.id,
143147
Content: content,
144148
Protocol: protocol,

0 commit comments

Comments
 (0)