1- import chronos, chronicles, hashes, math, sequtils, strutils, tables, os
1+ import chronos, chronicles, results
22import metrics, metrics/ chronos_httpserver
33import stew/ [byteutils, endians2]
4- import std/ [options, strformat, random, posix]
5- import node
4+ import
5+ std/ [
6+ strformat, random, posix, hashes, math, sequtils, strutils, tables, os,
7+ nativesockets,
8+ ]
69import mix
10+ import ./ node
711import
812 libp2p,
913 libp2p/ [
@@ -13,10 +17,7 @@ import
1317 protocols/ pubsub/ gossipsub,
1418 protocols/ pubsub/ pubsubpeer,
1519 protocols/ pubsub/ rpc/ messages,
16- transports/ tcptransport,
1720 ]
18- from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds
19- from nativesockets import getHostname
2021
2122const D* = 4 # No. of peers to forward to
2223
@@ -44,8 +45,12 @@ proc createSwitch(id, port: int, isMix: bool, filePath: string): Switch =
4445 libp2pPubKey: SkPublicKey
4546 libp2pPrivKey: SkPrivateKey
4647
48+ var mixNodes: MixNodes = @ []
49+
4750 if isMix:
48- discard initializeMixNodes (1 , port)
51+ mixNodes = initializeMixNodes (1 , port).valueOr:
52+ error " Could not generate mix nodes"
53+ return
4954
5055 let mixNodeInfo = getMixNodeInfo (mixNodes[0 ])
5156 multiAddrStr = mixNodeInfo[0 ]
@@ -86,14 +91,17 @@ proc createSwitch(id, port: int, isMix: bool, filePath: string): Switch =
8691 externalMultiAddr = fmt" /ip4/{ externalAddr} /tcp/{ port} /p2p/{ peerId} "
8792
8893 if isMix:
89- discard initMixMultiAddrByIndex (0 , externalMultiAddr)
94+ let initRes = mixNodes.initMixMultiAddrByIndex (0 , externalMultiAddr)
95+ if initRes.isErr:
96+ error " Failed to initialize mix node" , id = 0 , err = initRes.error
97+ return
9098 let writeNodeRes =
9199 writeMixNodeInfoToFile (mixNodes[0 ], id, filePath / fmt" nodeInfo " )
92100 if writeNodeRes.isErr:
93101 error " Failed to write mix info to file" , nodeId = id, err = writeNodeRes.error
94102 return
95103
96- let nodePubInfo = getMixPubInfoByIndex (0 ).valueOr:
104+ let nodePubInfo = mixNodes. getMixPubInfoByIndex (0 ).valueOr:
97105 error " Get mix pub info by index error" , err = error
98106 return
99107
@@ -132,6 +140,19 @@ proc startMetricsServer(
132140
133141 ok (server)
134142
143+ proc makeMixConnCb (mixProto: MixProtocol ): CustomConnCreationProc =
144+ return proc (
145+ destAddr: Option [MultiAddress ], destPeerId: PeerId , codec: string
146+ ): Connection {.gcsafe , raises : [].} =
147+ try :
148+ let dest = destAddr.valueOr:
149+ debug " No destination address available"
150+ return
151+ return mixProto.toConnection (MixDestination .init (destPeerId, dest), codec).get ()
152+ except CatchableError as e:
153+ error " Error during execution of MixEntryConnection callback: " , err = e.msg
154+ return nil
155+
135156proc main () {.async .} =
136157 randomize ()
137158
@@ -171,14 +192,7 @@ proc main() {.async.} =
171192 " could not instantiate mix"
172193 )
173194
174- let mixConn = proc (
175- destAddr: Option [MultiAddress ], destPeerId: PeerId , codec: string
176- ): Connection {.gcsafe , raises : [].} =
177- try :
178- return mixProto.toConnection (destPeerId, Opt .none (MultiAddress ), codec)
179- except CatchableError as e:
180- error " Error during execution of MixEntryConnection callback" , err = e.msg
181- return nil
195+ let mixConn = makeMixConnCb (mixProto)
182196
183197 let mixPeerSelect = proc (
184198 allPeers: HashSet [PubSubPeer ],
@@ -189,7 +203,7 @@ proc main() {.async.} =
189203 try :
190204 return mixPeerSelection (allPeers, directPeers, meshPeers, fanoutPeers)
191205 except CatchableError as e:
192- error " Error during execution of MixPeerSelection callback" , err = e.msg
206+ error " Error during execution of MixPeerSelection callback: " , err = e.msg
193207 return initHashSet [PubSubPeer ]()
194208
195209 gossipSub = GossipSub .init (
@@ -245,14 +259,12 @@ proc main() {.async.} =
245259 let
246260 timestampNs = uint64 .fromBytesLE (data[0 ..< 8 ])
247261 msgId = uint64 .fromBytesLE (data[8 ..< 16 ])
248- sentMoment = nanoseconds (int64 (timestampNs))
249- sentNanosecs = nanoseconds (sentMoment - seconds (sentMoment.seconds))
250- sentDate = initTime (sentMoment.seconds, sentNanosecs)
251- recvTime = getTime ()
252- delay = recvTime - sentDate
262+ sentTime = Moment .init (int64 (timestampNs), Nanosecond )
263+ recvTime = Moment .now ()
264+ delay = recvTime - sentTime
253265
254266 info " Received message" ,
255- msgId = msgId, sentAt = timestampNs, delayMs = delay.inMilliseconds ()
267+ msgId = msgId, sentAt = timestampNs, delayMs = delay.milliseconds ()
256268
257269 proc messageValidator (
258270 topic: string , msg: Message
@@ -312,12 +324,11 @@ proc main() {.async.} =
312324 for msg in 0 ..< messages: # client.param(int, "message_count"):
313325 await sleepAsync (msg_rate.milliseconds)
314326 if msg mod publisherCount == myId:
315- let now = getTime ()
316- let timestampNs = now.toUnix ().int64 * 1_000_000_000 + times.nanosecond (now).int64
327+ let timestampNs = Moment .now ().epochNanoSeconds ()
317328 let msgId = uint64 (msg)
318329
319330 var payload: seq [byte ]
320- payload.add (toBytesLE (uint64 ( timestampNs) ))
331+ payload.add (toBytesLE (timestampNs. uint64 ))
321332 payload.add (toBytesLE (msgId))
322333 payload.add (newSeq [byte ](msg_size - 16 )) # Fill the rest with padding
323334
0 commit comments