Skip to content

Commit ac56558

Browse files
committed
fix conflict
2 parents 19a2679 + 28a5ab9 commit ac56558

33 files changed

Lines changed: 888 additions & 405 deletions

src/main/java/org/tron/common/overlay/discover/NodeManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ public List<NodeHandler> getNodes(Predicate<NodeHandler> predicate, int limit)
281281

282282
logger.debug("nodeHandlerMap size {} filter peer size {}", nodeHandlerMap.size(), filtered.size());
283283

284+
//TODO: here can use head num sort.
285+
filtered.sort((o1, o2) -> o2.getNodeStatistics().getReputation() - o1.getNodeStatistics().getReputation());
286+
284287
return CollectionUtils.truncate(filtered, limit);
285288
}
286289

src/main/java/org/tron/common/overlay/discover/UDPListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public UDPListener(final NodeManager nodeManager) {
6161
} else {
6262
new Thread(() -> {
6363
try {
64-
UDPListener.this.start();
64+
start();
6565
} catch (Exception e) {
6666
logger.debug(e.getMessage(), e);
6767
throw new RuntimeException(e);

src/main/java/org/tron/common/overlay/server/MessageQueue.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,15 @@ public Thread newThread(Runnable r) {
5858
}
5959
});
6060

61+
private static Thread sendMsgThread;
62+
6163
private Queue<MessageRoundtrip> requestQueue = new ConcurrentLinkedQueue<>();
62-
private Queue<MessageRoundtrip> respondQueue = new ConcurrentLinkedQueue<>();
64+
//private Queue<MessageRoundtrip> respondQueue = new ConcurrentLinkedQueue<>();
65+
66+
private BlockingQueue<Message> msgQueue = new LinkedBlockingQueue<>();
67+
6368
private ChannelHandlerContext ctx = null;
6469

65-
// @Autowired
66-
// EthereumListener ethereumListener;
6770
boolean hasPing = false;
6871
private ScheduledFuture<?> timerTask;
6972
private Channel channel;
@@ -73,13 +76,29 @@ public MessageQueue() {
7376

7477
public void activate(ChannelHandlerContext ctx) {
7578
this.ctx = ctx;
79+
7680
timerTask = timer.scheduleAtFixedRate(() -> {
7781
try {
7882
nudgeQueue();
7983
} catch (Throwable t) {
8084
logger.error("Unhandled exception", t);
8185
}
8286
}, 10, 10, TimeUnit.MILLISECONDS);
87+
88+
sendMsgThread = new Thread(()->{
89+
while (true) {
90+
try {
91+
Message msg = msgQueue.take();
92+
ctx.writeAndFlush(msg.getSendData())
93+
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
94+
}catch (InterruptedException e){
95+
break;
96+
}catch (Exception e) {
97+
logger.error("send message failed, {}, error info: {}", ctx.channel().remoteAddress(), e.getMessage());
98+
}
99+
}
100+
});
101+
sendMsgThread.start();
83102
}
84103

85104
public void setChannel(Channel channel) {
@@ -95,7 +114,7 @@ public void sendMessage(Message msg) {
95114
if (msg.getAnswerMessage() != null)
96115
requestQueue.add(new MessageRoundtrip(msg));
97116
else
98-
respondQueue.add(new MessageRoundtrip(msg));
117+
msgQueue.offer(msg);
99118
}
100119

101120
public void disconnect() {
@@ -138,7 +157,7 @@ private void nudgeQueue() {
138157
// remove last answered message on the queue
139158
removeAnsweredMessage(requestQueue.peek());
140159
// Now send the next message
141-
sendToWire(respondQueue.poll());
160+
//sendToWire(respondQueue.poll());
142161
sendToWire(requestQueue.peek());
143162
}
144163

@@ -173,8 +192,7 @@ private void sendToWire(MessageRoundtrip messageRoundtrip) {
173192
}
174193

175194
public void close() {
176-
if (!timerTask.isCancelled()) {
177-
timerTask.cancel(false);
178-
}
195+
sendMsgThread.interrupt();
196+
timerTask.cancel(false);
179197
}
180198
}

src/main/java/org/tron/common/overlay/server/SyncPool.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ private synchronized void prepareActive() {
116116

117117
if (active.isEmpty()) return;
118118

119+
//sort by latency
119120
active.sort(Comparator.comparingDouble(c -> c.getPeerStats().getAvgLatency()));
120121

121122
for (PeerConnection channel : active) {
@@ -129,10 +130,6 @@ private synchronized void prepareActive() {
129130
}
130131

131132
synchronized void logActivePeers() {
132-
logger.info("-------- active node.");
133-
for (NodeHandler handler : nodeManager.dumpActiveNodes()) {
134-
logger.info(handler.getNode().toString());
135-
}
136133
logger.info("-------- active channel {}, node in user size {}", channelManager.getActivePeers().size(), channelManager.nodesInUse().size());
137134
for (Channel channel: channelManager.getActivePeers()){
138135
logger.info(channel.toString());

src/main/java/org/tron/common/utils/ExecutorLoop.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import java.util.concurrent.TimeUnit;
88
import java.util.concurrent.atomic.AtomicInteger;
99
import java.util.function.Consumer;
10-
import java.util.function.Function;
1110

1211
public class ExecutorLoop<In> {
1312

src/main/java/org/tron/core/Wallet.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@
4141
import org.tron.core.db.BlockStore;
4242
import org.tron.core.db.Manager;
4343
import org.tron.core.db.UtxoStore;
44+
import org.tron.core.exception.BadItemException;
4445
import org.tron.core.exception.ContractExeException;
4546
import org.tron.core.exception.ContractValidateException;
47+
import org.tron.core.exception.ItemNotFoundException;
4648
import org.tron.core.exception.ValidateSignatureException;
4749
import org.tron.core.net.message.TransactionMessage;
4850
import org.tron.core.net.node.Node;
@@ -232,12 +234,35 @@ public Transaction createTransaction(WitnessUpdateContract witnessUpdateContract
232234

233235
public Block getNowBlock() {
234236
Sha256Hash headBlockId = dbManager.getHeadBlockId();
235-
return dbManager.getBlockById(headBlockId).getInstance();
237+
try {
238+
return dbManager.getBlockById(headBlockId).getInstance();
239+
} catch (BadItemException e) {
240+
logger.info(e.getMessage());
241+
return null;
242+
} catch (ItemNotFoundException e) {
243+
logger.info(e.getMessage());
244+
return null;
245+
}
236246
}
237247

238248
public Block getBlockByNum(long blockNum) {
239-
Sha256Hash headBlockId = dbManager.getBlockIdByNum(blockNum);
240-
return dbManager.getBlockById(headBlockId).getInstance();
249+
Sha256Hash headBlockId = null;
250+
try {
251+
headBlockId = dbManager.getBlockIdByNum(blockNum);
252+
} catch (BadItemException e) {
253+
logger.info(e.getMessage());
254+
} catch (ItemNotFoundException e) {
255+
logger.info(e.getMessage());
256+
}
257+
try {
258+
return dbManager.getBlockById(headBlockId).getInstance();
259+
} catch (BadItemException e) {
260+
logger.info(e.getMessage());
261+
return null;
262+
} catch (ItemNotFoundException e) {
263+
logger.info(e.getMessage());
264+
return null;
265+
}
241266
}
242267

243268
public AccountList getAllAccounts() {

src/main/java/org/tron/core/capsule/BlockCapsule.java

Lines changed: 24 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.tron.common.crypto.ECKey.ECDSASignature;
2929
import org.tron.common.utils.Sha256Hash;
3030
import org.tron.core.capsule.utils.MerkleTree;
31+
import org.tron.core.exception.BadItemException;
3132
import org.tron.core.exception.ValidateSignatureException;
3233
import org.tron.protos.Protocol.Block;
3334
import org.tron.protos.Protocol.BlockHeader;
@@ -108,26 +109,25 @@ public long getNum() {
108109

109110
private BlockId blockId = new BlockId(Sha256Hash.ZERO_HASH, 0);
110111

111-
private byte[] data;
112-
113112
private Block block;
114-
115-
private boolean unpacked;
116-
117113
public boolean generatedByMyself = false;
118114

119-
private synchronized void unPack() {
120-
if (unpacked) {
121-
return;
122-
}
115+
public BlockCapsule(long number, Sha256Hash hash, long when, ByteString witnessAddress) {
116+
// blockheader raw
117+
BlockHeader.raw.Builder blockHeaderRawBuild = BlockHeader.raw.newBuilder();
118+
BlockHeader.raw blockHeaderRaw = blockHeaderRawBuild
119+
.setNumber(number)
120+
.setParentHash(hash.getByteString())
121+
.setTimestamp(when)
122+
.setWitnessAddress(witnessAddress).build();
123123

124-
try {
125-
this.block = Block.parseFrom(data);
126-
} catch (InvalidProtocolBufferException e) {
127-
logger.debug(e.getMessage());
128-
}
124+
// block header
125+
BlockHeader.Builder blockHeaderBuild = BlockHeader.newBuilder();
126+
BlockHeader blockHeader = blockHeaderBuild.setRawData(blockHeaderRaw).build();
129127

130-
unpacked = true;
128+
// block
129+
Block.Builder blockBuild = Block.newBuilder();
130+
this.block = blockBuild.setBlockHeader(blockHeader).build();
131131
}
132132

133133
public BlockCapsule(long number, ByteString hash, long when, ByteString witnessAddress) {
@@ -146,7 +146,6 @@ public BlockCapsule(long number, ByteString hash, long when, ByteString witnessA
146146
// block
147147
Block.Builder blockBuild = Block.newBuilder();
148148
this.block = blockBuild.setBlockHeader(blockHeader).build();
149-
unpacked = true;
150149
}
151150

152151
public BlockCapsule(long timestamp, ByteString parentHash, long number,
@@ -167,7 +166,7 @@ public BlockCapsule(long timestamp, ByteString parentHash, long number,
167166
Block.Builder blockBuild = Block.newBuilder();
168167
transactionList.forEach(trx -> blockBuild.addTransactions(trx));
169168
this.block = blockBuild.setBlockHeader(blockHeader).build();
170-
unpacked = true;
169+
171170
}
172171

173172
public void addTransaction(TransactionCapsule pendingTrx) {
@@ -193,7 +192,6 @@ public void sign(byte[] privateKey) {
193192
}
194193

195194
private Sha256Hash getRawHash() {
196-
unPack();
197195
return Sha256Hash.of(this.block.getBlockHeader().getRawData().toByteArray());
198196
}
199197

@@ -210,15 +208,10 @@ public boolean validateSignature() throws ValidateSignatureException {
210208
}
211209

212210
public BlockId getBlockId() {
213-
unPack();
214211
if (blockId.equals(Sha256Hash.ZERO_HASH)) {
215212
blockId = new BlockId(Sha256Hash.of(this.block.getBlockHeader().toByteArray()), getNum());
216213
}
217-
218214
return blockId;
219-
// return blockId.equals(Sha256Hash.ZERO_HASH)
220-
// ? blockId = new BlockId(Sha256Hash.of(this.block.getBlockHeader().toByteArray()), getNum())
221-
// : blockId;
222215
}
223216

224217
public Sha256Hash calcMerkleRoot() {
@@ -246,41 +239,29 @@ public void setMerkleRoot() {
246239
}
247240

248241
public Sha256Hash getMerkleRoot() {
249-
unPack();
250242
return Sha256Hash.wrap(this.block.getBlockHeader().getRawData().getTxTrieRoot());
251243
}
252244

253-
public ByteString getWitnessAddress(){
254-
unPack();
245+
public ByteString getWitnessAddress() {
255246
return this.block.getBlockHeader().getRawData().getWitnessAddress();
256247
}
257248

258249

259-
private void pack() {
260-
if (data == null) {
261-
this.data = this.block.toByteArray();
262-
}
263-
}
264-
265-
public boolean validate() {
266-
unPack();
267-
return true;
268-
}
269-
270250
public BlockCapsule(Block block) {
271251
this.block = block;
272-
unpacked = true;
273252
}
274253

275-
public BlockCapsule(byte[] data) {
276-
this.data = data;
277-
unPack();
254+
public BlockCapsule(byte[] data) throws BadItemException {
255+
try {
256+
this.block = Block.parseFrom(data);
257+
} catch (InvalidProtocolBufferException e) {
258+
throw new BadItemException();
259+
}
278260
}
279261

280262
@Override
281263
public byte[] getData() {
282-
pack();
283-
return data;
264+
return this.block.toByteArray();
284265
}
285266

286267
@Override
@@ -289,28 +270,23 @@ public Block getInstance() {
289270
}
290271

291272
public Sha256Hash getParentHash() {
292-
unPack();
293273
return Sha256Hash.wrap(this.block.getBlockHeader().getRawData().getParentHash());
294274
}
295275

296276
public ByteString getParentHashStr() {
297-
unPack();
298277
return this.block.getBlockHeader().getRawData().getParentHash();
299278
}
300279

301280
public long getNum() {
302-
unPack();
303281
return this.block.getBlockHeader().getRawData().getNumber();
304282
}
305283

306284
public long getTimeStamp() {
307-
unPack();
308285
return this.block.getBlockHeader().getRawData().getTimestamp();
309286
}
310287

311288
@Override
312289
public String toString() {
313-
unPack();
314290
return "BlockCapsule{" +
315291
"blockId=" + blockId +
316292
", num=" + getNum() +

0 commit comments

Comments
 (0)