@@ -65,6 +65,7 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati
6565 Logger .trace(" Clearing message history..." )
6666 messageHistory.entries.removeIf { (_, value) -> currentTime - value > maximumAge }
6767 }
68+
6869 }
6970
7071 abstract fun processMessage (message : Message )
@@ -91,7 +92,10 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati
9192
9293 /* * Returns [Configuration.broadcastSpreadPercentage] number of nodes. */
9394 fun pickRandomNodes (amount : Int = 0): List <Node > {
94- val toTake = if (amount > 0 ) amount else 5 + (configuration.broadcastSpreadPercentage * Integer .max(totalKnownNodes, 1 ) / 100 )
95+ val toTake = if (amount > 0 ) amount else 5 + (configuration.broadcastSpreadPercentage * Integer .max(
96+ totalKnownNodes,
97+ 1
98+ ) / 100 )
9599 return getRandomNodes(toTake).filter { it.identifier != localNode.identifier }
96100 }
97101
@@ -160,7 +164,11 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati
160164 val message = ProtoBuf .decodeFromByteArray<Message >(data)
161165 if (alreadySeen(message.uid.asHex)) return @use
162166 processingQueue.add(message)
163- if (message.endpoint.transmissionType == TransmissionType .Broadcast ) broadcast(TransmissionLayer .TCP , message.uid.asHex, data)
167+ if (message.endpoint.transmissionType == TransmissionType .Broadcast ) broadcast(
168+ TransmissionLayer .TCP ,
169+ message.uid.asHex,
170+ data
171+ )
164172 }
165173 }
166174 }
@@ -199,22 +207,30 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati
199207 broadcastNodes.add(neighbour)
200208 broadcastNodes.addAll(childrenKeys)
201209 broadcastNodes.addAll(neighbourChildrenKeys)
202- Logger .error(" [$index ] [$children ] Neighbour: $neighbourIndex ... Children: ${childrenKeys.joinToString(" ," ) { " ${shuffled.indexOf(it)} " }} " )
210+ Logger .error(
211+ " [$index ] [$children ] Neighbour: $neighbourIndex ... Children: ${
212+ childrenKeys.joinToString(
213+ " ,"
214+ ) { " ${shuffled.indexOf(it)} " }
215+ } "
216+ )
203217
204218 }
205219
206220 else -> broadcastNodes.addAll(pickRandomNodes().map { it.publicKey })
207221 }
208222
209- val knownAndNotInSet = knownNodes.values.map(Node ::publicKey).filter { ! validatorSet.activeValidators.contains(it) }
223+ val knownAndNotInSet =
224+ knownNodes.values.map(Node ::publicKey).filter { ! validatorSet.activeValidators.contains(it) }
210225 broadcastNodes.addAll(knownAndNotInSet)
211226
212227 Logger .trace(" We have to retransmit to [total: ${shuffled.size} ] --> ${broadcastNodes.size} nodes." )
213228
214229 broadcastNodes.forEach { publicKey ->
215230 query(publicKey) {
216231 val outgoingData = OutgoingData (it, * data)
217- val outgoingQueue = if (transmissionLayer == TransmissionLayer .UDP ) udpOutgoingQueue else tcpOutgoingQueue
232+ val outgoingQueue =
233+ if (transmissionLayer == TransmissionLayer .UDP ) udpOutgoingQueue else tcpOutgoingQueue
218234 outgoingQueue.add(outgoingData)
219235 }
220236 }
@@ -242,7 +258,11 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati
242258 if (alreadySeen(packetId) || alreadySeen(messageId)) return @tryAndReport
243259
244260 val endpoint = Endpoint .byId(inputStream.read().toByte()) ? : return @tryAndReport
245- if (endpoint.transmissionType == TransmissionType .Broadcast ) broadcast(TransmissionLayer .UDP , messageId, packet.data.copyOf())
261+ if (endpoint.transmissionType == TransmissionType .Broadcast ) broadcast(
262+ TransmissionLayer .UDP ,
263+ messageId,
264+ packet.data.copyOf()
265+ )
246266
247267 val totalSlices = inputStream.readInt()
248268 val currentSlice = inputStream.readInt()
0 commit comments