@@ -58,12 +58,15 @@ public Thread newThread(Runnable r) {
5858 }
5959 });
6060
61+ private static Thread sendMsgThread ;
62+
6163 private Queue <MessageRoundtrip > requestQueue = new ConcurrentLinkedQueue <>();
62- private Queue <MessageRoundtrip > respondQueue = new ConcurrentLinkedQueue <>();
64+ //private Queue<MessageRoundtrip> respondQueue = new ConcurrentLinkedQueue<>();
65+
66+ private BlockingQueue <Message > msgQueue = new LinkedBlockingQueue <>();
67+
6368 private ChannelHandlerContext ctx = null ;
6469
65- // @Autowired
66- // EthereumListener ethereumListener;
6770 boolean hasPing = false ;
6871 private ScheduledFuture <?> timerTask ;
6972 private Channel channel ;
@@ -73,13 +76,29 @@ public MessageQueue() {
7376
7477 public void activate (ChannelHandlerContext ctx ) {
7578 this .ctx = ctx ;
79+
7680 timerTask = timer .scheduleAtFixedRate (() -> {
7781 try {
7882 nudgeQueue ();
7983 } catch (Throwable t ) {
8084 logger .error ("Unhandled exception" , t );
8185 }
8286 }, 10 , 10 , TimeUnit .MILLISECONDS );
87+
88+ sendMsgThread = new Thread (()->{
89+ while (true ) {
90+ try {
91+ Message msg = msgQueue .take ();
92+ ctx .writeAndFlush (msg .getSendData ())
93+ .addListener (ChannelFutureListener .FIRE_EXCEPTION_ON_FAILURE );
94+ }catch (InterruptedException e ){
95+ break ;
96+ }catch (Exception e ) {
97+ logger .error ("send message failed, {}, error info: {}" , ctx .channel ().remoteAddress (), e .getMessage ());
98+ }
99+ }
100+ });
101+ sendMsgThread .start ();
83102 }
84103
85104 public void setChannel (Channel channel ) {
@@ -95,7 +114,7 @@ public void sendMessage(Message msg) {
95114 if (msg .getAnswerMessage () != null )
96115 requestQueue .add (new MessageRoundtrip (msg ));
97116 else
98- respondQueue . add ( new MessageRoundtrip ( msg ) );
117+ msgQueue . offer ( msg );
99118 }
100119
101120 public void disconnect () {
@@ -138,7 +157,7 @@ private void nudgeQueue() {
138157 // remove last answered message on the queue
139158 removeAnsweredMessage (requestQueue .peek ());
140159 // Now send the next message
141- sendToWire (respondQueue .poll ());
160+ // sendToWire(respondQueue.poll());
142161 sendToWire (requestQueue .peek ());
143162 }
144163
@@ -173,8 +192,7 @@ private void sendToWire(MessageRoundtrip messageRoundtrip) {
173192 }
174193
175194 public void close () {
176- if (!timerTask .isCancelled ()) {
177- timerTask .cancel (false );
178- }
195+ sendMsgThread .interrupt ();
196+ timerTask .cancel (false );
179197 }
180198}
0 commit comments