diff --git a/gossipsub-interop/experiment.py b/gossipsub-interop/experiment.py index 6117f53e9..230a3e454 100644 --- a/gossipsub-interop/experiment.py +++ b/gossipsub-interop/experiment.py @@ -48,9 +48,7 @@ def partial_message_scenario( gs_params.GossipFactor = 0 instructions.extend(spread_heartbeat_delay(node_count, gs_params)) - number_of_conns_per_node = 20 - if number_of_conns_per_node >= node_count: - number_of_conns_per_node = node_count - 1 + number_of_conns_per_node = min(20, node_count - 1) instructions.extend(random_network_mesh(node_count, number_of_conns_per_node)) topic = "a-subnet" @@ -107,6 +105,92 @@ def partial_message_scenario( return instructions +def partial_message_extension_scenario( + disable_gossip: bool, node_count: int +) -> List[ScriptInstruction]: + instructions: List[ScriptInstruction] = [] + gs_params = GossipSubParams() + if disable_gossip: + gs_params.Dlazy = 0 + gs_params.GossipFactor = 0 + instructions.extend(spread_heartbeat_delay(node_count, gs_params)) + + # Create a bidirectional chain topology: 0<->1<->2....<->n-1 + # Each node connects to both previous and next (except first and last) + for i in range(node_count): + connections = [] + if i > 0: + connections.append(i - 1) # Connect to previous + if i < node_count - 1: + connections.append(i + 1) # Connect to next + + if connections: + instructions.append( + script_instruction.IfNodeIDEquals( + nodeID=i, + instruction=script_instruction.Connect(connectTo=connections), + ) + ) + + topic = "partial-ext-topic" + instructions.append( + script_instruction.SubscribeToTopic(topicID=topic, partial=True) + ) + + # Wait for setup time and mesh stabilization + elapsed_seconds = 30 + instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds)) + + # 16 messages with 8 parts each + num_messages = 16 + num_parts = 8 + + # Assign parts to nodes in round-robin fashion + # Each message-part combination goes to exactly one node + for msg_idx in range(num_messages): + groupID = msg_idx # Unique group ID for each message + + # Assign each of the 8 parts to nodes in round-robin + for part_idx in range(num_parts): + node_idx = (msg_idx * num_parts + part_idx) % node_count + part_bitmap = 1 << part_idx # Single bit for this part + + instructions.append( + script_instruction.IfNodeIDEquals( + nodeID=node_idx, + instruction=script_instruction.AddPartialMessage( + topicID=topic, groupID=groupID, parts=part_bitmap + ), + ) + ) + + # Have multiple nodes with parts for each message try to publish + # This creates redundancy and ensures the exchange process starts + for msg_idx in range(num_messages): + groupID = msg_idx + + elapsed_seconds += 2 # Delay between message groups + instructions.append( + script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds) + ) + + publisher_node = (msg_idx * num_parts) % node_count + print("publisher node: ", publisher_node, msg_idx) + instructions.append( + script_instruction.IfNodeIDEquals( + nodeID=publisher_node, + instruction=script_instruction.PublishPartial( + topicID=topic, groupID=groupID + ), + ) + ) + + # Wait for propagation and assembly + elapsed_seconds += 30 + instructions.append(script_instruction.WaitUntil(elapsedSeconds=elapsed_seconds)) + return instructions + + def scenario( scenario_name: str, node_count: int, disable_gossip: bool ) -> ExperimentParams: @@ -114,6 +198,10 @@ def scenario( match scenario_name: case "partial-messages": instructions = partial_message_scenario(disable_gossip, node_count) + case "partial-message-extension": + instructions = partial_message_extension_scenario( + disable_gossip, node_count + ) case "subnet-blob-msg": gs_params = GossipSubParams() if disable_gossip: diff --git a/gossipsub-interop/go-libp2p/experiment.go b/gossipsub-interop/go-libp2p/experiment.go index 9010c2ead..4827d0d87 100644 --- a/gossipsub-interop/go-libp2p/experiment.go +++ b/gossipsub-interop/go-libp2p/experiment.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "log/slog" + "slices" "time" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -110,6 +111,8 @@ func (m *partialMsgManager) handleRPC(rpc incomingPartialRPC) { m.partialMessages[rpc.GetTopicID()][string(rpc.GroupID)] = pm } + prevParts := pm.PartsMetadata() + // Extend first, so we don't request something we just got. beforeExtend := pm.PartsMetadata()[0] if len(rpc.PartialMessage) != 0 { @@ -126,10 +129,12 @@ func (m *partialMsgManager) handleRPC(rpc incomingPartialRPC) { shouldRepublish = true } + newParts := pm.PartsMetadata() has := pm.PartsMetadata() + gid := binary.BigEndian.Uint64(rpc.GroupID) if has[0] == 0xff { - m.Info("All parts received") + m.Info("All parts received", "group id", gid) } pmHas := pm.PartsMetadata() @@ -192,6 +197,7 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns switch a := instruction.(type) { case InitGossipSubInstruction: slog.SetLogLoggerLevel(slog.LevelDebug) + slog.Error("gossip sub init", "node-id", n.nodeID) pme := &partialmessages.PartialMessageExtension{ Logger: slog.Default(), ValidateRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error { @@ -205,8 +211,17 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns } return nil }, + MergePartsMetadata: func(_ string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata { + a := slices.Clone(left) + if len(a) == 0 { + a = append(a, 0) + } + if len(right) > 0 { + a[0] |= right[0] + } + return a + }, } - psOpts := pubsubOptions(n.slogger, a.GossipSubParams, pme) ps, err := pubsub.NewGossipSub(ctx, n.h, psOpts...) if err != nil { @@ -291,6 +306,7 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns case PublishPartialInstruction: var groupID [8]byte binary.BigEndian.AppendUint64(groupID[:0], uint64(a.GroupID)) + n.slogger.Error("HELLO WORLD", "publish partial", a.GroupID) n.partialMsgMgr.publish <- publishReq{ topic: a.TopicID, groupID: groupID[:], diff --git a/gossipsub-interop/go-libp2p/go.mod b/gossipsub-interop/go-libp2p/go.mod index ea6eabceb..b5ac5d13e 100644 --- a/gossipsub-interop/go-libp2p/go.mod +++ b/gossipsub-interop/go-libp2p/go.mod @@ -110,3 +110,5 @@ require ( google.golang.org/protobuf v1.36.5 // indirect lukechampine.com/blake3 v1.4.1 // indirect ) + +replace github.com/libp2p/go-libp2p-pubsub => /home/sukun/dev/go-libp2p-pubsub