Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
};

outputs = { self, nixpkgs }:
let
system = "x86_64-linux";
pkgs = import nixpkgs { inherit system; };
in {
devShells.${system}.default = pkgs.mkShell {
nativeBuildInputs = with pkgs; [
openssl.dev
];
};
};
}

179 changes: 113 additions & 66 deletions main.nim
Original file line number Diff line number Diff line change
Expand Up @@ -126,80 +126,122 @@ proc startMetricsServer(
ok(metricsServerRes.value)

const uidLen = 32

proc main() {.async.} =
randomize()

type
MainObj = ref object
hostname: string
id: int
nodeCount: int
msgRate: int
msgSize: int
publisherCount: int
isPublisher: bool
mixCount: int
connectTo: int
filePath: string
rng: ref HmacDrbgContext

proc makeMainObj(): Result[MainObj, string] =
error "this is the nodes env: " & getEnv("NODES")
let
hostname = getHostname()
node_count = parseInt(getEnv("NODES"))
msg_rate = parseInt(getEnv("MSGRATE"))
msg_size = parseInt(getEnv("MSGSIZE"))
nodeCount = parseInt(getEnv("NODES"))
msgRate = parseInt(getEnv("MSGRATE"))
msgSize = parseInt(getEnv("MSGSIZE"))
publisherCount = parseInt(getEnv("PUBLISHERS"))
mixCount = publisherCount # Publishers will be the mix nodes for now
connectTo = parseInt(getEnv("CONNECTTO"))
filePath = getEnv("FILEPATH", "./")
rng = libp2p.newRng()

if publisherCount > node_count:
error "Publisher count is greater than total node count"
return
if publisherCount > nodeCount:
return err("Publisher count is greater than total node count")

let id = hostname.split('-')[^1].parseInt()
info "Hostname", host = hostname
let myId = getHostname().split('-')[^1].parseInt()
info "ID", id = myId
info "ID", id = id
result = ok(MainObj(
hostname: hostname,
id: id,
nodeCount: nodeCount,
msgRate: msgRate,
msgSize: msgSize,
publisherCount: publisherCount,
isPublisher: id < publisherCount,
mixCount: publisherCount, # same for now
connectTo: connectTo,
filePath: filePath,
rng: rng
))


proc switchFromMainObj(m: MainObj, port: int): Switch =
createSwitch(m.id, port, m.isPublisher, m.filePath)

proc mixProtoFromMainObj(m: MainObj, switch: Switch): Result[MixProtocol, string] =
MixProtocol.new(m.id, m.mixCount, switch, m.filePath)

proc doMix(m: MainObj, switch: Switch): GossipSub =
let mixProto = mixProtoFromMainObj(m, switch).expect(
"could not instantiate mix"
)

let mixConn = proc(
destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string
): Connection {.gcsafe, raises: [].} =
try:
return mixProto.createMixEntryConnection(destAddr, destPeerId, codec)
except CatchableError as e:
error "Error during execution of MixEntryConnection callback", err = e.msg
return nil

let mixPeerSelect = proc(
allPeers: HashSet[PubSubPeer],
directPeers: HashSet[PubSubPeer],
meshPeers: HashSet[PubSubPeer],
fanoutPeers: HashSet[PubSubPeer],
): HashSet[PubSubPeer] {.gcsafe, raises: [].} =
try:
return mixPeerSelection(allPeers, directPeers, meshPeers, fanoutPeers)
except CatchableError as e:
error "Error during execution of MixPeerSelection callback", err = e.msg
return initHashSet[PubSubPeer]()

let gossipSub = GossipSub.init(
switch = switch,
triggerSelf = true,
msgIdProvider = msgIdProvider,
verifySignature = false,
anonymize = true,
customConnCallbacks = some(
CustomConnectionCallbacks(
customConnCreationCB: mixConn, peerSelectionCB: mixPeerSelect
)
),
)

switch.mount(mixProto)
return gossipSub


proc main() {.async.} =
randomize()

let main_obj = makeMainObj().valueOr:
error "Failed to make a main object: ", error
return

let
isPublisher = myId < publisherCount
# [0..<publisherCount] contains all the publishers
isMix = isPublisher # Publishers will be the mix nodes for now
myport = parseInt(getEnv("PORT", "5000"))
switch = createSwitch(myId, myport, isMix, filePath)
isMix = main_obj.isPublisher # Publishers will be the mix nodes for now
# myport = parseInt(getEnv("PORT", "5000"))
switch = switchFromMainObj(main_obj, parseInt(getEnv("PORT", "5000")))

# TODO: what's this await for? any way to trigger an await on the setup?
await sleepAsync(10.seconds)

var gossipSub: GossipSub

if isMix:
let mixProto = MixProtocol.new(myId, mixCount, switch, filePath).expect(
"could not instantiate mix"
)

let mixConn = proc(
destAddr: Option[MultiAddress], destPeerId: PeerId, codec: string
): Connection {.gcsafe, raises: [].} =
try:
return mixProto.createMixEntryConnection(destAddr, destPeerId, codec)
except CatchableError as e:
error "Error during execution of MixEntryConnection callback", err = e.msg
return nil

let mixPeerSelect = proc(
allPeers: HashSet[PubSubPeer],
directPeers: HashSet[PubSubPeer],
meshPeers: HashSet[PubSubPeer],
fanoutPeers: HashSet[PubSubPeer],
): HashSet[PubSubPeer] {.gcsafe, raises: [].} =
try:
return mixPeerSelection(allPeers, directPeers, meshPeers, fanoutPeers)
except CatchableError as e:
error "Error during execution of MixPeerSelection callback", err = e.msg
return initHashSet[PubSubPeer]()

gossipSub = GossipSub.init(
switch = switch,
triggerSelf = true,
msgIdProvider = msgIdProvider,
verifySignature = false,
anonymize = true,
customConnCallbacks = some(
CustomConnectionCallbacks(
customConnCreationCB: mixConn, peerSelectionCB: mixPeerSelect
)
),
)

switch.mount(mixProto)
gossipSub = doMix(main_obj, switch)
else:
gossipSub = GossipSub.init(
switch = switch,
Expand All @@ -213,6 +255,7 @@ proc main() {.async.} =
info "Starting metrics HTTP server"
let metricsServer = startMetricsServer(parseIpAddress("0.0.0.0"), Port(8008))

# can this be done with the builder pattern
gossipSub.parameters.floodPublish = true
gossipSub.parameters.opportunisticGraftThreshold = -10000
gossipSub.parameters.heartbeatInterval = 1.seconds
Expand All @@ -231,6 +274,7 @@ proc main() {.async.} =
firstMessageDeliveriesDecay: 0.9,
)

# TODO? make a message interface for handler and validator?
proc messageHandler(topic: string, data: seq[byte]) {.async.} =
if data.len < 16:
warn "Message too short"
Expand Down Expand Up @@ -259,28 +303,30 @@ proc main() {.async.} =

info "Listening", addrs = switch.peerInfo.addrs

# TODO: can node-building be an async so we don't have to hard-code the await?
info "Waiting 20 seconds for node building..."

await sleepAsync(20.seconds)

var connected = 0
var addrs: seq[MultiAddress]

for i in 0 ..< node_count:
if i == myId:
# This loop can be a function.
for i in 0 ..< main_obj.node_count:
if i == main_obj.id:
continue

let pubInfo = readPubInfoFromFile(i, filePath / fmt"libp2pPubInfo").expect(
let pubInfo = readPubInfoFromFile(i, main_obj.filePath / fmt"libp2pPubInfo").expect(
"should be able to read pubinfo"
)
let (multiAddr, _) = getPubInfo(pubInfo)
let ma = MultiAddress.init(multiAddr).expect("should be a multiaddr")
addrs.add ma

rng.shuffle(addrs)
main_obj.rng.shuffle(addrs)
var index = 0
# `while true` is often a tell.
while true:
if connected >= connectTo:
if connected >= main_obj.connectTo:
break
while true:
try:
Expand All @@ -296,14 +342,15 @@ proc main() {.async.} =
info "Waiting 15 seconds..."
await sleepAsync(15.seconds)

# why hard coded pause?
await sleepAsync(2.seconds)

info "Mesh size", meshSize = gossipSub.mesh.getOrDefault("test").len

info "Publishing turn", id = myId
info "Publishing turn", id = main_obj.id
for msg in 0 ..< 10000: #client.param(int, "message_count"):
await sleepAsync(msg_rate)
if msg mod publisherCount == myId:
await sleepAsync(main_obj.msg_rate)
if msg mod main_obj.publisherCount == main_obj.id:
info "Sending message", time = times.getTime()
let now = getTime()
let timestampNs = now.toUnix().int64 * 1_000_000_000 + times.nanosecond(now).int64
Expand All @@ -312,7 +359,7 @@ proc main() {.async.} =
var payload: seq[byte]
payload.add(toBytesLE(uint64(timestampNs)))
payload.add(toBytesLE(msgId))
payload.add(newSeq[byte](msg_size - 16)) # Fill the rest with padding
payload.add(newSeq[byte](main_obj.msg_size - 16)) # Fill the rest with padding

info "Publishing message", msgId = msgId, timestamp = timestampNs

Expand Down