Skip to content

Commit 66f2dfd

Browse files
author
Szymon Lisiecki
committed
add MqttReconnectCallback
1 parent b393cb5 commit 66f2dfd

1 file changed

Lines changed: 54 additions & 8 deletions

File tree

src/main/java/com/orange/lo/sdk/mqtt/AbstractDataManagementMqtt.java

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,41 @@
1+
/**
2+
* Copyright (c) Orange. All Rights Reserved.
3+
* <p>
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*/
7+
18
package com.orange.lo.sdk.mqtt;
29

3-
import com.fasterxml.jackson.databind.ObjectMapper;
4-
import com.orange.lo.sdk.LOApiClientParameters;
5-
import com.orange.lo.sdk.mqtt.exceptions.LoMqttException;
10+
import java.util.ArrayList;
11+
import java.util.Arrays;
12+
import java.util.List;
13+
import java.util.stream.Collectors;
14+
615
import org.eclipse.paho.client.mqttv3.IMqttClient;
16+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
717
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
18+
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
819
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
920
import org.eclipse.paho.client.mqttv3.MqttException;
1021
import org.eclipse.paho.client.mqttv3.MqttMessage;
1122

23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import com.orange.lo.sdk.LOApiClientParameters;
25+
import com.orange.lo.sdk.mqtt.exceptions.LoMqttException;
26+
1227

1328
public abstract class AbstractDataManagementMqtt {
1429

1530
private final IMqttClient mqttClient;
1631
private final ObjectMapper objectMapper;
1732
private final LOApiClientParameters parameters;
33+
private final MqttReconnectCallback mqttReconnectCallback;
1834

1935
public AbstractDataManagementMqtt(LOApiClientParameters parameters, MqttClientFactory mqttClientFactory) {
36+
this.mqttReconnectCallback = new MqttReconnectCallback();
2037
this.mqttClient = mqttClientFactory.getMqttClient();
38+
this.mqttClient.setCallback(mqttReconnectCallback);
2139
this.parameters = parameters;
2240
this.objectMapper = new ObjectMapper();
2341
}
@@ -56,16 +74,13 @@ protected MqttConnectOptions getMqttConnectionOptions() {
5674
}
5775

5876
protected void subscribe(String topicFilter, int qos, IMqttMessageListener messageListener) {
59-
try {
60-
mqttClient.subscribe(topicFilter, qos, messageListener);
61-
} catch (MqttException e) {
62-
throw new LoMqttException(e);
63-
}
77+
subscribe(new String[] {topicFilter}, new int[] {qos}, new IMqttMessageListener[] {messageListener});
6478
}
6579

6680
protected void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) {
6781
try {
6882
mqttClient.subscribe(topicFilters, qos, messageListeners);
83+
mqttReconnectCallback.addSubscriptions(topicFilters, qos, messageListeners);
6984
} catch (MqttException e) {
7085
throw new LoMqttException(e);
7186
}
@@ -86,4 +101,35 @@ protected ObjectMapper getObjectMapper() {
86101
protected LOApiClientParameters getParameters() {
87102
return parameters;
88103
}
104+
105+
class MqttReconnectCallback implements MqttCallbackExtended {
106+
107+
private final List<String> topicFiltersList = new ArrayList<>();
108+
private final List<Integer> qosList = new ArrayList<>();
109+
private final List<IMqttMessageListener> listenersList = new ArrayList<>();
110+
111+
public void addSubscriptions(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) {
112+
topicFiltersList.addAll(Arrays.asList(topicFilters));
113+
qosList.addAll(Arrays.stream(qos).boxed().collect(Collectors.toList()));
114+
listenersList.addAll(Arrays.asList(messageListeners));
115+
}
116+
117+
public void connectionLost(Throwable cause) {
118+
}
119+
120+
public void messageArrived(String topic, MqttMessage message) throws Exception {
121+
}
122+
123+
public void deliveryComplete(IMqttDeliveryToken token) {
124+
}
125+
126+
public void connectComplete(boolean reconnect, String serverURI) {
127+
if(reconnect) {
128+
subscribe(
129+
topicFiltersList.toArray(new String[0]),
130+
qosList.stream().mapToInt(Integer::intValue).toArray(),
131+
listenersList.toArray(new IMqttMessageListener[0]));
132+
}
133+
}
134+
}
89135
}

0 commit comments

Comments
 (0)