Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 91 additions & 3 deletions gossipsub-interop/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ def partial_message_scenario(
gs_params.GossipFactor = 0
instructions.extend(spread_heartbeat_delay(node_count, gs_params))

number_of_conns_per_node = 20
if number_of_conns_per_node >= node_count:
number_of_conns_per_node = node_count - 1
number_of_conns_per_node = min(20, node_count - 1)
instructions.extend(random_network_mesh(node_count, number_of_conns_per_node))

topic = "a-subnet"
Expand Down Expand Up @@ -107,13 +105,103 @@ def partial_message_scenario(
return instructions


def partial_message_extension_scenario(
disable_gossip: bool, node_count: int
) -> List[ScriptInstruction]:
instructions: List[ScriptInstruction] = []
gs_params = GossipSubParams()
if disable_gossip:
gs_params.Dlazy = 0
gs_params.GossipFactor = 0
instructions.extend(spread_heartbeat_delay(node_count, gs_params))

# Create a bidirectional chain topology: 0<->1<->2....<->n-1
# Each node connects to both previous and next (except first and last)
for i in range(node_count):
connections = []
if i > 0:
connections.append(i - 1) # Connect to previous
if i < node_count - 1:
connections.append(i + 1) # Connect to next

if connections:
instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=i,
instruction=script_instruction.Connect(connectTo=connections),
)
)

topic = "partial-ext-topic"
instructions.append(
script_instruction.SubscribeToTopic(topicID=topic, partial=True)
)

# Wait for setup time and mesh stabilization
elapsed_seconds = 30
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))

# 16 messages with 8 parts each
num_messages = 16
num_parts = 8

# Assign parts to nodes in round-robin fashion
# Each message-part combination goes to exactly one node
for msg_idx in range(num_messages):
groupID = msg_idx # Unique group ID for each message

# Assign each of the 8 parts to nodes in round-robin
for part_idx in range(num_parts):
node_idx = (msg_idx * num_parts + part_idx) % node_count
part_bitmap = 1 << part_idx # Single bit for this part

instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=node_idx,
instruction=script_instruction.AddPartialMessage(
topicID=topic, groupID=groupID, parts=part_bitmap
),
)
)

# Have multiple nodes with parts for each message try to publish
# This creates redundancy and ensures the exchange process starts
for msg_idx in range(num_messages):
groupID = msg_idx

elapsed_seconds += 2 # Delay between message groups
instructions.append(
script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds)
)

publisher_node = (msg_idx * num_parts) % node_count
print("publisher node: ", publisher_node, msg_idx)
instructions.append(
script_instruction.IfNodeIDEquals(
nodeID=publisher_node,
instruction=script_instruction.PublishPartial(
topicID=topic, groupID=groupID
),
)
)

# Wait for propagation and assembly
elapsed_seconds += 30
instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds))
return instructions


def scenario(
scenario_name: str, node_count: int, disable_gossip: bool
) -> ExperimentParams:
instructions: List[ScriptInstruction] = []
match scenario_name:
case "partial-messages":
instructions = partial_message_scenario(disable_gossip, node_count)
case "partial-message-extension":
instructions = partial_message_extension_scenario(
disable_gossip, node_count
)
case "subnet-blob-msg":
gs_params = GossipSubParams()
if disable_gossip:
Expand Down
20 changes: 18 additions & 2 deletions gossipsub-interop/go-libp2p/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log"
"log/slog"
"slices"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -110,6 +111,8 @@ func (m *partialMsgManager) handleRPC(rpc incomingPartialRPC) {
m.partialMessages[rpc.GetTopicID()][string(rpc.GroupID)] = pm
}

prevParts := pm.PartsMetadata()

// Extend first, so we don't request something we just got.
beforeExtend := pm.PartsMetadata()[0]
if len(rpc.PartialMessage) != 0 {
Expand All @@ -126,10 +129,12 @@ func (m *partialMsgManager) handleRPC(rpc incomingPartialRPC) {
shouldRepublish = true

}
newParts := pm.PartsMetadata()

has := pm.PartsMetadata()
gid := binary.BigEndian.Uint64(rpc.GroupID)
if has[0] == 0xff {
m.Info("All parts received")
m.Info("All parts received", "group id", gid)
}

pmHas := pm.PartsMetadata()
Expand Down Expand Up @@ -192,6 +197,7 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns
switch a := instruction.(type) {
case InitGossipSubInstruction:
slog.SetLogLoggerLevel(slog.LevelDebug)
slog.Error("gossip sub init", "node-id", n.nodeID)
pme := &partialmessages.PartialMessageExtension{
Logger: slog.Default(),
ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
Expand All @@ -205,8 +211,17 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns
}
return nil
},
MergePartsMetadata: func(_ string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
a := slices.Clone(left)
if len(a) == 0 {
a = append(a, 0)
}
if len(right) > 0 {
a[0] |= right[0]
}
return a
},
}

psOpts := pubsubOptions(n.slogger, a.GossipSubParams, pme)
ps, err := pubsub.NewGossipSub(ctx, n.h, psOpts...)
if err != nil {
Expand Down Expand Up @@ -291,6 +306,7 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns
case PublishPartialInstruction:
var groupID [8]byte
binary.BigEndian.AppendUint64(groupID[:0], uint64(a.GroupID))
n.slogger.Error("HELLO WORLD", "publish partial", a.GroupID)
n.partialMsgMgr.publish <- publishReq{
topic: a.TopicID,
groupID: groupID[:],
Expand Down
2 changes: 2 additions & 0 deletions gossipsub-interop/go-libp2p/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,5 @@ require (
google.golang.org/protobuf v1.36.5 // indirect
lukechampine.com/blake3 v1.4.1 // indirect
)

replace github.com/libp2p/go-libp2p-pubsub => /home/sukun/dev/go-libp2p-pubsub