@@ -73,6 +73,8 @@ public class MQTTProxyService implements Closeable {
7373 private Channel listenChannel ;
7474 private Channel listenChannelTls ;
7575 private Channel listenChannelTlsPsk ;
76+ private Channel listenChannelWs ;
77+ private Channel listenChannelWss ;
7678 private final EventLoopGroup acceptorGroup ;
7779 private final EventLoopGroup workerGroup ;
7880 private final WebService webService ;
@@ -130,7 +132,7 @@ public void start() throws MQTTProxyException {
130132 serverBootstrap .channel (EventLoopUtil .getServerSocketChannelClass (workerGroup ));
131133 EventLoopUtil .enableTriggeredMode (serverBootstrap );
132134 serverBootstrap .childHandler (new MQTTProxyChannelInitializer (
133- this , proxyConfig , false , sslContextRefresher ));
135+ this , proxyConfig , false , false , sslContextRefresher ));
134136
135137 try {
136138 listenChannel = serverBootstrap .bind (proxyConfig .getMqttProxyPort ()).sync ().channel ();
@@ -142,7 +144,7 @@ public void start() throws MQTTProxyException {
142144 if (proxyConfig .isMqttProxyTlsEnabled () || proxyConfig .isMqttProxyMTlsAuthenticationEnabled ()) {
143145 ServerBootstrap tlsBootstrap = serverBootstrap .clone ();
144146 tlsBootstrap .childHandler (new MQTTProxyChannelInitializer (
145- this , proxyConfig , true , sslContextRefresher ));
147+ this , proxyConfig , true , false , sslContextRefresher ));
146148 try {
147149 listenChannelTls = tlsBootstrap .bind (proxyConfig .getMqttProxyTlsPort ()).sync ().channel ();
148150 log .info ("Started MQTT Proxy with TLS on {}" , listenChannelTls .localAddress ());
@@ -162,14 +164,38 @@ public void start() throws MQTTProxyException {
162164 // Add channel initializer
163165 ServerBootstrap tlsPskBootstrap = serverBootstrap .clone ();
164166 tlsPskBootstrap .childHandler (new MQTTProxyChannelInitializer (
165- this , proxyConfig , false , true , sslContextRefresher ));
167+ this , proxyConfig , false , true , false , sslContextRefresher ));
166168 try {
167169 listenChannelTlsPsk = tlsPskBootstrap .bind (proxyConfig .getMqttProxyTlsPskPort ()).sync ().channel ();
168170 log .info ("Started MQTT Proxy with TLS-PSK on {}" , listenChannelTlsPsk .localAddress ());
169171 } catch (InterruptedException e ) {
170172 throw new MQTTProxyException (e );
171173 }
172174 }
175+
176+ if (proxyConfig .isMqttProxyWsEnabled ()) {
177+ ServerBootstrap wsBootstrap = serverBootstrap .clone ();
178+ wsBootstrap .childHandler (new MQTTProxyChannelInitializer (
179+ this , proxyConfig , false , true , sslContextRefresher ));
180+ try {
181+ listenChannelWs = wsBootstrap .bind (proxyConfig .getMqttProxyWsPort ()).sync ().channel ();
182+ log .info ("Started MQTT Proxy with WS on {}" , listenChannelWs .localAddress ());
183+ } catch (InterruptedException e ) {
184+ throw new MQTTProxyException (e );
185+ }
186+ }
187+
188+ if (proxyConfig .isMqttProxyWssEnabled ()) {
189+ ServerBootstrap wssBootstrap = serverBootstrap .clone ();
190+ wssBootstrap .childHandler (new MQTTProxyChannelInitializer (
191+ this , proxyConfig , true , true , sslContextRefresher ));
192+ try {
193+ listenChannelWss = wssBootstrap .bind (proxyConfig .getMqttProxyWssPort ()).sync ().channel ();
194+ log .info ("Started MQTT Proxy with WSS on {}" , listenChannelWss .localAddress ());
195+ } catch (InterruptedException e ) {
196+ throw new MQTTProxyException (e );
197+ }
198+ }
173199 this .lookupHandler = new PulsarServiceLookupHandler (pulsarService , proxyConfig );
174200 this .eventService .start ();
175201 this .webService .start ();
@@ -184,7 +210,7 @@ public void start0() throws MQTTProxyException {
184210 if (proxyConfig .isMqttProxyTlsEnabled () || proxyConfig .isMqttProxyMTlsAuthenticationEnabled ()) {
185211 ServerBootstrap tlsBootstrap = serverBootstrap .clone ();
186212 tlsBootstrap .childHandler (new MQTTProxyChannelInitializer (
187- this , proxyConfig , true , sslContextRefresher ));
213+ this , proxyConfig , true , false , sslContextRefresher ));
188214 try {
189215 listenChannelTls = tlsBootstrap .bind (proxyConfig .getMqttProxyTlsPort ()).sync ().channel ();
190216 log .info ("Started MQTT Proxy with TLS on {}" , listenChannelTls .localAddress ());
@@ -204,14 +230,38 @@ public void start0() throws MQTTProxyException {
204230 // Add channel initializer
205231 ServerBootstrap tlsPskBootstrap = serverBootstrap .clone ();
206232 tlsPskBootstrap .childHandler (new MQTTProxyChannelInitializer (
207- this , proxyConfig , false , true , sslContextRefresher ));
233+ this , proxyConfig , false , true , false , sslContextRefresher ));
208234 try {
209235 listenChannelTlsPsk = tlsPskBootstrap .bind (proxyConfig .getMqttProxyTlsPskPort ()).sync ().channel ();
210236 log .info ("Started MQTT Proxy with TLS-PSK on {}" , listenChannelTlsPsk .localAddress ());
211237 } catch (InterruptedException e ) {
212238 throw new MQTTProxyException (e );
213239 }
214240 }
241+
242+ if (proxyConfig .isMqttProxyWsEnabled ()) {
243+ ServerBootstrap wsBootstrap = serverBootstrap .clone ();
244+ wsBootstrap .childHandler (new MQTTProxyChannelInitializer (
245+ this , proxyConfig , false , true , sslContextRefresher ));
246+ try {
247+ listenChannelWs = wsBootstrap .bind (proxyConfig .getMqttProxyWsPort ()).sync ().channel ();
248+ log .info ("Started MQTT Proxy with WS on {}" , listenChannelWs .localAddress ());
249+ } catch (InterruptedException e ) {
250+ throw new MQTTProxyException (e );
251+ }
252+ }
253+
254+ if (proxyConfig .isMqttProxyWssEnabled ()) {
255+ ServerBootstrap wssBootstrap = serverBootstrap .clone ();
256+ wssBootstrap .childHandler (new MQTTProxyChannelInitializer (
257+ this , proxyConfig , true , true , sslContextRefresher ));
258+ try {
259+ listenChannelWss = wssBootstrap .bind (proxyConfig .getMqttProxyWssPort ()).sync ().channel ();
260+ log .info ("Started MQTT Proxy with WSS on {}" , listenChannelWss .localAddress ());
261+ } catch (InterruptedException e ) {
262+ throw new MQTTProxyException (e );
263+ }
264+ }
215265 this .lookupHandler = new PulsarServiceLookupHandler (pulsarService , proxyConfig );
216266 this .eventService .start ();
217267 }
@@ -227,6 +277,12 @@ public void close() {
227277 if (listenChannelTlsPsk != null ) {
228278 listenChannelTlsPsk .close ();
229279 }
280+ if (listenChannelWs != null ) {
281+ listenChannelWs .close ();
282+ }
283+ if (listenChannelWss != null ) {
284+ listenChannelWss .close ();
285+ }
230286 this .acceptorGroup .shutdownGracefully ();
231287 this .workerGroup .shutdownGracefully ();
232288 this .eventService .close ();
0 commit comments