99
1010import java .lang .invoke .MethodHandles ;
1111import java .util .ArrayList ;
12- import java .util .Arrays ;
1312import java .util .List ;
14- import java .util .stream .Collectors ;
1513
1614import org .eclipse .paho .client .mqttv3 .IMqttClient ;
1715import org .eclipse .paho .client .mqttv3 .IMqttDeliveryToken ;
@@ -80,7 +78,7 @@ protected MqttConnectOptions getMqttConnectionOptions() {
8078 opts .setConnectionTimeout (parameters .getConnectionTimeout ());
8179 opts .setCleanSession (parameters .isCleanSession ());
8280 opts .setMaxInflight (parameters .getMaxInflight ());
83-
81+
8482 return opts ;
8583 }
8684
@@ -90,9 +88,30 @@ protected void subscribe(String topicFilter, int qos, IMqttMessageListener messa
9088
9189 protected void subscribe (String [] topicFilters , int [] qos , IMqttMessageListener [] messageListeners ) {
9290 try {
93- mqttClient .subscribe (topicFilters , qos , messageListeners );
94- mqttReconnectCallback .addSubscriptions (topicFilters , qos , messageListeners );
95- LOG .info ("Subscribed mqtt topics: {}" , (Object ) topicFilters );
91+ // To remove duplicates causing the error "This Topic Is Already Subscribed In The Same MQTT Connection"
92+ //Helpers
93+ List <String > topicFiltersList = new ArrayList <>();
94+ List <Integer > qosList = new ArrayList <>();
95+ List <IMqttMessageListener > listenersList = new ArrayList <>();
96+
97+ //Finding unique values from Topics Filters and their qos and listeners
98+ for (int i = 0 ; i < topicFilters .length ; i ++) {
99+ String topicFilter = topicFilters [i ];
100+ if (!topicFiltersList .contains (topicFilter )) {
101+ topicFiltersList .add (topicFilter );
102+ qosList .add (qos [i ] > 2 ? parameters .getMessageQos () : qos [i ]);
103+ listenersList .add (messageListeners [i ]);
104+ }
105+ }
106+
107+ //Transformation to subscription input structures
108+ String [] uniqueTopicFilters = topicFiltersList .toArray (new String [0 ]);
109+ int [] qosForUniqueFilters = qosList .stream ().mapToInt (i -> i ).toArray ();
110+ IMqttMessageListener [] listenersForUniqueFilters = listenersList .toArray (new IMqttMessageListener [0 ]);
111+
112+ mqttClient .subscribe (uniqueTopicFilters , qosForUniqueFilters , listenersForUniqueFilters );
113+ mqttReconnectCallback .addSubscriptions (uniqueTopicFilters , qosForUniqueFilters , listenersForUniqueFilters );
114+ LOG .info ("Subscribed mqtt topics: {}" , (Object ) uniqueTopicFilters );
96115 } catch (MqttException e ) {
97116 throw new LoMqttException (e );
98117 }
@@ -113,18 +132,27 @@ protected ObjectMapper getObjectMapper() {
113132 protected LOApiClientParameters getParameters () {
114133 return parameters ;
115134 }
116-
135+
117136 class MqttReconnectCallback implements MqttCallbackExtended {
118137
119138 private final List <String > topicFiltersList = new ArrayList <>();
120139 private final List <Integer > qosList = new ArrayList <>();
121140 private final List <IMqttMessageListener > listenersList = new ArrayList <>();
122141
123- public void addSubscriptions (String [] topicFilters , int [] qos , IMqttMessageListener [] messageListeners ) {
124- topicFiltersList .addAll (Arrays .asList (topicFilters ));
125- qosList .addAll (Arrays .stream (qos ).boxed ().collect (Collectors .toList ()));
126- listenersList .addAll (Arrays .asList (messageListeners ));
127- }
142+ public void addSubscriptions (String [] topicFilters , int [] qos , IMqttMessageListener [] messageListeners ) {
143+ // To remove duplicates causing the error "This Topic Is Already Subscribed In The Same MQTT Connection"
144+ for (int i = 0 ; i < topicFilters .length ; i ++) {
145+ String topicFilter = topicFilters [i ];
146+ if (!topicFiltersList .contains (topicFilter )) {
147+ topicFiltersList .add (topicFilter );
148+ qosList .add (qos [i ] > 2 ? parameters .getMessageQos () : qos [i ]);
149+ listenersList .add (messageListeners [i ]);
150+ }
151+ }
152+ LOG .info ("MqttReconnectCallback topicFiltersList: {}" , topicFiltersList );
153+ LOG .info ("MqttReconnectCallback qosList: {}" , qosList );
154+ LOG .info ("MqttReconnectCallback listenersList: {}" , listenersList );
155+ }
128156
129157 public void connectionLost (Throwable cause ) {
130158 LOG .error ("Connection lost: {}" , cause .getMessage ());
@@ -137,7 +165,7 @@ public void deliveryComplete(IMqttDeliveryToken token) {
137165 }
138166
139167 public void connectComplete (boolean reconnect , String serverURI ) {
140- LOG .error ("Connect complete. Reconnect: {}" , reconnect );
168+ LOG .info ("Connect complete. Reconnect: {}" , reconnect );
141169 if (reconnect ) {
142170 subscribe (
143171 topicFiltersList .toArray (new String [0 ]),
0 commit comments