Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.12.1</version>
<version>3.14.0</version>
<configuration>
<source>21</source>
<target>21</target>
Expand All @@ -20,7 +20,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<version>3.5.3</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand All @@ -44,12 +44,12 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>6.1.4</version>
<version>6.2.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.1</version>
<version>2.18.4</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
Expand All @@ -59,29 +59,29 @@
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.3.1</version>
<version>5.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.12</version>
<version>2.0.17</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.10.0</version>
<version>5.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>5.10.0</version>
<version>5.18.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.2</version>
<version>5.11.4</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/orange/lo/sdk/LOApiClientParameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.orange.lo.sdk.externalconnector.DataManagementExtConnectorCommandCallback;
import com.orange.lo.sdk.fifomqtt.DataManagementFifoCallback;
import com.orange.lo.sdk.mqtt.DataManagementReconnectCallback;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -47,6 +48,7 @@ public final class LOApiClientParameters {
private final List<String> topics;
private final DataManagementFifoCallback dataManagementFifoCallback;
private final DataManagementExtConnectorCommandCallback dataManagementExtConnectorCommandCallback;
private final DataManagementReconnectCallback dataManagementReconnectCallback;
private final String extConnectorCommandRequestTopic;
private final String extConnectorCommandResponseTopic;
private final String extConnectorStatusTopicTemplate;
Expand Down Expand Up @@ -74,6 +76,7 @@ private LOApiClientParameters(LOApiClientParametersBuilder builder) {
this.extConnectorCommandResponseTopic = builder.extConnectorCommandResponseTopic;
this.extConnectorStatusTopicTemplate = builder.extConnectorStatusTopicTemplate;
this.extConnectorDataTopicTemplate = builder.extConnectorDataTopicTemplate;
this.dataManagementReconnectCallback = builder.dataManagementReconnectCallback;
}

public String getConnectorType() {
Expand Down Expand Up @@ -144,6 +147,10 @@ public DataManagementExtConnectorCommandCallback getDataManagementExtConnectorCo
return dataManagementExtConnectorCommandCallback;
}

public DataManagementReconnectCallback getDataManagementReconnectCallback() {
return dataManagementReconnectCallback;
}

public String getExtConnectorCommandResponseTopic() {
return extConnectorCommandResponseTopic;
}
Expand Down Expand Up @@ -183,6 +190,7 @@ public static final class LOApiClientParametersBuilder {
private List<String> topics = new ArrayList<>();
private DataManagementFifoCallback dataManagementFifoCallback;
private DataManagementExtConnectorCommandCallback dataManagementExtConnectorCommandCallback;
private DataManagementReconnectCallback dataManagementReconnectCallback;
private String extConnectorCommandRequestTopic = DEFAULT_EXT_CONNECTOR_COMMAND_REQUEST_TOPIC;
private String extConnectorCommandResponseTopic = DEFAULT_EXT_CONNECTOR_COMMAND_RESPONSE_TOPIC;
private String extConnectorStatusTopicTemplate = DEFAULT_EXT_CONNECTOR_STATUS_TOPIC_TEMPLATE;
Expand Down Expand Up @@ -274,6 +282,12 @@ public LOApiClientParametersBuilder dataManagementExtConnectorCommandCallback(
return this;
}

public LOApiClientParametersBuilder dataManagementReconnectCallback(
DataManagementReconnectCallback dataManagementReconnectCallback) {
this.dataManagementReconnectCallback = dataManagementReconnectCallback;
return this;
}

public LOApiClientParametersBuilder extConnectorCommandRequestTopic(String extConnectorCommandRequestTopic) {
this.extConnectorCommandRequestTopic = extConnectorCommandRequestTopic;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
Expand All @@ -36,7 +37,8 @@ public abstract class AbstractDataManagementMqtt {
private final MqttReconnectCallback mqttReconnectCallback;

public AbstractDataManagementMqtt(LOApiClientParameters parameters, MqttClientFactory mqttClientFactory) {
this.mqttReconnectCallback = new MqttReconnectCallback();
DataManagementReconnectCallback reconnectCallback = parameters.getDataManagementReconnectCallback();
this.mqttReconnectCallback = new MqttReconnectCallback(reconnectCallback);
this.mqttClient = mqttClientFactory.getMqttClient();
this.mqttClient.setCallback(mqttReconnectCallback);
this.parameters = parameters;
Expand Down Expand Up @@ -147,6 +149,11 @@ class MqttReconnectCallback implements MqttCallbackExtended {
private final List<String> topicFiltersList = new ArrayList<>();
private final List<Integer> qosList = new ArrayList<>();
private final List<IMqttMessageListener> listenersList = new ArrayList<>();
private final DataManagementReconnectCallback reconnectCallback;

public MqttReconnectCallback(DataManagementReconnectCallback reconnectCallback) {
this.reconnectCallback = reconnectCallback;
}

public void addSubscriptions(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) {
// To remove duplicates causing the error "This Topic Is Already Subscribed In The Same MQTT Connection"
Expand All @@ -163,9 +170,12 @@ public void addSubscriptions(String[] topicFilters, int[] qos, IMqttMessageListe
LOG.info("MqttReconnectCallback listenersList: {}", listenersList);
}

public void connectionLost(Throwable cause) {
public void connectionLost(Throwable cause) {
LOG.error("Connection lost: {}", cause.getMessage());
}
if (thereIsA(reconnectCallback)) {
reconnectCallback.connectionLost(cause);
}
}

public void messageArrived(String topic, MqttMessage message) throws Exception {
}
Expand All @@ -181,6 +191,13 @@ public void connectComplete(boolean reconnect, String serverURI) {
qosList.stream().mapToInt(Integer::intValue).toArray(),
listenersList.toArray(new IMqttMessageListener[0]));
}
if (thereIsA(reconnectCallback)) {
reconnectCallback.connectComplete(reconnect, serverURI);
}
}

private boolean thereIsA(Object object) {
return Objects.nonNull(object);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/**
* Copyright (c) Orange. All Rights Reserved.
* <p>
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

package com.orange.lo.sdk.mqtt;

public interface DataManagementReconnectCallback {
void connectComplete(boolean reconnect, String serverURI);
void connectionLost(Throwable cause);
}