Skip to content

Commit 17eb614

Browse files
authored
Merge pull request #33 from elecbug/v0.8
Update v0.8.5
2 parents 04772e4 + eeefc56 commit 17eb614

4 files changed

Lines changed: 50 additions & 22 deletions

File tree

p2p/network.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,13 @@ func (n *Network) PeerIDs() []PeerID {
8585
}
8686

8787
// Publish sends a message to the specified node's message queue.
88-
func (n *Network) Publish(nodeID PeerID, msg string, protocol BroadcastProtocol) error {
88+
func (n *Network) Publish(nodeID PeerID, msg string, protocol BroadcastProtocol, customProtocol func(msg Message, known []PeerID, sent []PeerID, received []PeerID, params map[string]any) *[]PeerID) error {
8989
if node, ok := n.nodes[nodeID]; ok {
9090
if !node.alive {
9191
return fmt.Errorf("node %d is not alive", nodeID)
9292
}
9393

94-
node.msgQueue <- Message{From: nodeID, Content: msg, Protocol: protocol, HopCount: 0}
94+
node.msgQueue <- Message{From: nodeID, Content: msg, Protocol: protocol, HopCount: 0, CustomProtocol: customProtocol}
9595
return nil
9696
}
9797

p2p/node.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Cont
5151
n.alive = true
5252
wg.Done()
5353

54-
for msg := range n.msgQueue {
55-
select {
56-
case <-ctx.Done():
57-
n.alive = false
58-
return
59-
default:
54+
select {
55+
case <-ctx.Done():
56+
n.alive = false
57+
return
58+
default:
59+
for msg := range n.msgQueue {
6060
first := false
6161

6262
n.mu.Lock()
@@ -117,7 +117,6 @@ func (n *p2pNode) publish(network *Network, msg Message) {
117117
if _, received := n.recvFrom[content][edge.targetID]; received {
118118
continue
119119
}
120-
n.sentTo[content][edge.targetID] = struct{}{}
121120

122121
willSendEdges = append(willSendEdges, edge)
123122
}
@@ -128,25 +127,52 @@ func (n *p2pNode) publish(network *Network, msg Message) {
128127
})
129128

130129
k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor)
130+
131131
willSendEdges = willSendEdges[:k]
132132
}
133133
} else if protocol == Custom {
134+
allEdges := make([]PeerID, 0)
135+
for _, edge := range n.edges {
136+
allEdges = append(allEdges, edge.targetID)
137+
}
134138

139+
sentEdges := make([]PeerID, 0)
140+
for targetID := range n.sentTo[content] {
141+
sentEdges = append(sentEdges, targetID)
142+
}
143+
144+
receivedEdges := make([]PeerID, 0)
145+
for senderID := range n.recvFrom[content] {
146+
receivedEdges = append(receivedEdges, senderID)
147+
}
148+
149+
targets := msg.CustomProtocol(msg, allEdges, sentEdges, receivedEdges, network.cfg.CustomParams)
150+
151+
for _, targetID := range *targets {
152+
for _, edge := range n.edges {
153+
if edge.targetID == targetID {
154+
willSendEdges = append(willSendEdges, edge)
155+
break
156+
}
157+
}
158+
}
135159
} else {
136160
return
137161
}
138162

139163
for _, edge := range willSendEdges {
140164
edgeCopy := edge
165+
n.sentTo[content][edge.targetID] = struct{}{}
141166

142167
go func(e p2pEdge) {
143168
time.Sleep(time.Duration(e.edgeLatency) * time.Millisecond)
144169

145170
network.nodes[e.targetID].msgQueue <- Message{
146-
From: n.id,
147-
Content: content,
148-
Protocol: protocol,
149-
HopCount: hopCount + 1,
171+
From: n.id,
172+
Content: content,
173+
Protocol: protocol,
174+
HopCount: hopCount + 1,
175+
CustomProtocol: msg.CustomProtocol,
150176
}
151177
}(edgeCopy)
152178
}

p2p/p2p.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ type PeerID uint64
66

77
// Message represents a message sent between nodes in the P2P network.
88
type Message struct {
9-
From PeerID
10-
Content string
11-
Protocol BroadcastProtocol
12-
HopCount int
9+
From PeerID
10+
Content string
11+
Protocol BroadcastProtocol
12+
HopCount int
13+
CustomProtocol func(Message, []PeerID, []PeerID, []PeerID, map[string]any) *[]PeerID
1314
}
1415

1516
// Config holds configuration parameters for the P2P network.
1617
type Config struct {
17-
GossipFactor float64 // fraction of neighbors to gossip to
18+
GossipFactor float64 // fraction of neighbors to gossip to
19+
CustomParams map[string]any // parameters for custom protocols
1820
}

p2p/p2p_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ func TestGenerateNetwork(t *testing.T) {
3737
nw.RunNetworkSimulation(ctx)
3838

3939
t.Logf("Publishing message '%s' from node %d\n", msg1, nw.PeerIDs()[0])
40-
err = nw.Publish(nw.PeerIDs()[0], msg1, p2p.Flooding)
40+
err = nw.Publish(nw.PeerIDs()[0], msg1, p2p.Flooding, nil)
4141
if err != nil {
4242
t.Fatalf("Failed to publish message: %v", err)
4343
}
4444
time.Sleep(1000 * time.Millisecond)
4545
t.Logf("Reachability of message '%s': %f\n", msg1, nw.Reachability(msg1))
4646

4747
t.Logf("Publishing message '%s' from node %d\n", msg2, nw.PeerIDs()[1])
48-
err = nw.Publish(nw.PeerIDs()[1], msg2, p2p.Gossiping)
48+
err = nw.Publish(nw.PeerIDs()[1], msg2, p2p.Gossiping, nil)
4949
if err != nil {
5050
t.Fatalf("Failed to publish message: %v", err)
5151
}
@@ -56,7 +56,7 @@ func TestGenerateNetwork(t *testing.T) {
5656

5757
nw.RunNetworkSimulation(context.Background())
5858
t.Logf("Publishing message '%s' from node %d\n", msg3, nw.PeerIDs()[2])
59-
err = nw.Publish(nw.PeerIDs()[2], msg3, p2p.Gossiping)
59+
err = nw.Publish(nw.PeerIDs()[2], msg3, p2p.Gossiping, nil)
6060
if err != nil {
6161
t.Fatalf("Failed to publish message: %v", err)
6262
}
@@ -104,7 +104,7 @@ func TestMetrics(t *testing.T) {
104104
nw.RunNetworkSimulation(ctx)
105105

106106
t.Logf("Publishing message '%s' from node %d\n", msg1, nw.PeerIDs()[0])
107-
err = nw.Publish(nw.PeerIDs()[0], msg1, p2p.Flooding)
107+
err = nw.Publish(nw.PeerIDs()[0], msg1, p2p.Flooding, nil)
108108
if err != nil {
109109
t.Fatalf("Failed to publish message: %v", err)
110110
}

0 commit comments

Comments
 (0)