Skip to content

Commit 76e7eda

Browse files
sukunrtMarcoPolo
authored andcommitted
gossipsub-interop: add a new scenario for partial messages
We create a chain topology 1 <-> 2 <-> ... <-> n. Provide each node with one part of the message. And then publish the message from one of the nodes. Eventually all the nodes should have all the messages.
1 parent 30af194 commit 76e7eda

2 files changed

Lines changed: 109 additions & 5 deletions

File tree

gossipsub-interop/experiment.py

Lines changed: 91 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ def partial_message_scenario(
4848
gs_params.GossipFactor = 0
4949
instructions.extend(spread_heartbeat_delay(node_count, gs_params))
5050

51-
number_of_conns_per_node = 20
52-
if number_of_conns_per_node >= node_count:
53-
number_of_conns_per_node = node_count - 1
51+
number_of_conns_per_node = min(20, node_count - 1)
5452
instructions.extend(random_network_mesh(node_count, number_of_conns_per_node))
5553

5654
topic = "a-subnet"
@@ -107,13 +105,103 @@ def partial_message_scenario(
107105
return instructions
108106

109107

108+
def partial_message_extension_scenario(
109+
disable_gossip: bool, node_count: int
110+
) -> List[ScriptInstruction]:
111+
instructions: List[ScriptInstruction] = []
112+
gs_params = GossipSubParams()
113+
if disable_gossip:
114+
gs_params.Dlazy = 0
115+
gs_params.GossipFactor = 0
116+
instructions.extend(spread_heartbeat_delay(node_count, gs_params))
117+
118+
# Create a bidirectional chain topology: 0<->1<->2....<->n-1
119+
# Each node connects to both previous and next (except first and last)
120+
for i in range(node_count):
121+
connections = []
122+
if i > 0:
123+
connections.append(i - 1) # Connect to previous
124+
if i < node_count - 1:
125+
connections.append(i + 1) # Connect to next
126+
127+
if connections:
128+
instructions.append(
129+
script_instruction.IfNodeIDEquals(
130+
nodeID=i,
131+
instruction=script_instruction.Connect(connectTo=connections),
132+
)
133+
)
134+
135+
topic = "partial-ext-topic"
136+
instructions.append(
137+
script_instruction.SubscribeToTopic(topicID=topic, partial=True)
138+
)
139+
140+
# Wait for setup time and mesh stabilization
141+
elapsed_seconds = 30
142+
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))
143+
144+
# 16 messages with 8 parts each
145+
num_messages = 16
146+
num_parts = 8
147+
148+
# Assign parts to nodes in round-robin fashion
149+
# Each message-part combination goes to exactly one node
150+
for msg_idx in range(num_messages):
151+
groupID = msg_idx # Unique group ID for each message
152+
153+
# Assign each of the 8 parts to nodes in round-robin
154+
for part_idx in range(num_parts):
155+
node_idx = (msg_idx * num_parts + part_idx) % node_count
156+
part_bitmap = 1 << part_idx # Single bit for this part
157+
158+
instructions.append(
159+
script_instruction.IfNodeIDEquals(
160+
nodeID=node_idx,
161+
instruction=script_instruction.AddPartialMessage(
162+
topicID=topic, groupID=groupID, parts=part_bitmap
163+
),
164+
)
165+
)
166+
167+
# Have multiple nodes with parts for each message try to publish
168+
# This creates redundancy and ensures the exchange process starts
169+
for msg_idx in range(num_messages):
170+
groupID = msg_idx
171+
172+
elapsed_seconds += 2 # Delay between message groups
173+
instructions.append(
174+
script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds)
175+
)
176+
177+
publisher_node = (msg_idx * num_parts) % node_count
178+
print("publisher node: ", publisher_node, msg_idx)
179+
instructions.append(
180+
script_instruction.IfNodeIDEquals(
181+
nodeID=publisher_node,
182+
instruction=script_instruction.PublishPartial(
183+
topicID=topic, groupID=groupID
184+
),
185+
)
186+
)
187+
188+
# Wait for propagation and assembly
189+
elapsed_seconds += 30
190+
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))
191+
return instructions
192+
193+
110194
def scenario(
111195
scenario_name: str, node_count: int, disable_gossip: bool
112196
) -> ExperimentParams:
113197
instructions: List[ScriptInstruction] = []
114198
match scenario_name:
115199
case "partial-messages":
116200
instructions = partial_message_scenario(disable_gossip, node_count)
201+
case "partial-message-extension":
202+
instructions = partial_message_extension_scenario(
203+
disable_gossip, node_count
204+
)
117205
case "subnet-blob-msg":
118206
gs_params = GossipSubParams()
119207
if disable_gossip:

gossipsub-interop/go-libp2p/experiment.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"log"
99
"log/slog"
10+
"slices"
1011
"time"
1112

1213
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -110,6 +111,8 @@ func (m *partialMsgManager) handleRPC(rpc incomingPartialRPC) {
110111
m.partialMessages[rpc.GetTopicID()][string(rpc.GroupID)] = pm
111112
}
112113

114+
prevParts := pm.PartsMetadata()
115+
113116
// Extend first, so we don't request something we just got.
114117
beforeExtend := pm.PartsMetadata()[0]
115118
if len(rpc.PartialMessage) != 0 {
@@ -126,10 +129,12 @@ func (m *partialMsgManager) handleRPC(rpc incomingPartialRPC) {
126129
shouldRepublish = true
127130

128131
}
132+
newParts := pm.PartsMetadata()
129133

130134
has := pm.PartsMetadata()
135+
gid := binary.BigEndian.Uint64(rpc.GroupID)
131136
if has[0] == 0xff {
132-
m.Info("All parts received")
137+
m.Info("All parts received", "group id", gid)
133138
}
134139

135140
pmHas := pm.PartsMetadata()
@@ -192,6 +197,7 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns
192197
switch a := instruction.(type) {
193198
case InitGossipSubInstruction:
194199
slog.SetLogLoggerLevel(slog.LevelDebug)
200+
slog.Error("gossip sub init", "node-id", n.nodeID)
195201
pme := &partialmessages.PartialMessageExtension{
196202
Logger: slog.Default(),
197203
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
@@ -205,8 +211,17 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns
205211
}
206212
return nil
207213
},
214+
MergePartsMetadata: func(_ string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
215+
a := slices.Clone(left)
216+
if len(a) == 0 {
217+
a = append(a, 0)
218+
}
219+
if len(right) > 0 {
220+
a[0] |= right[0]
221+
}
222+
return a
223+
},
208224
}
209-
210225
psOpts := pubsubOptions(n.slogger, a.GossipSubParams, pme)
211226
ps, err := pubsub.NewGossipSub(ctx, n.h, psOpts...)
212227
if err != nil {
@@ -291,6 +306,7 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns
291306
case PublishPartialInstruction:
292307
var groupID [8]byte
293308
binary.BigEndian.AppendUint64(groupID[:0], uint64(a.GroupID))
309+
n.slogger.Error("HELLO WORLD", "publish partial", a.GroupID)
294310
n.partialMsgMgr.publish <- publishReq{
295311
topic: a.TopicID,
296312
groupID: groupID[:],

0 commit comments

Comments
 (0)