Skip to content

Commit 9ea2c0a

Browse files
committed
fix messagebus-mqtt
1 parent 5aa2eed commit 9ea2c0a

4 files changed

Lines changed: 31 additions & 25 deletions

File tree

messagebus-mqtt/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
data

messagebus-mqtt/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
<artifactId>fa3st-service-core</artifactId>
2020
<version>${project.version}</version>
2121
</dependency>
22+
<dependency>
23+
<groupId>com.h2database</groupId>
24+
<artifactId>h2</artifactId>
25+
</dependency>
2226
<dependency>
2327
<groupId>de.fraunhofer.iosb.io.moquette</groupId>
2428
<artifactId>moquette-broker</artifactId>

messagebus-mqtt/src/main/java/org/eclipse/digitaltwin/fa3st/service/messagebus/mqtt/MoquetteServer.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
*/
1414
package org.eclipse.digitaltwin.fa3st.service.messagebus.mqtt;
1515

16-
import io.moquette.BrokerConstants;
1716
import io.moquette.broker.Server;
1817
import io.moquette.broker.config.IConfig;
1918
import io.moquette.broker.config.MemoryConfig;
@@ -80,29 +79,30 @@ public void publish(String topic, String message, int qos) {
8079
public void start() throws IOException {
8180
server = new Server();
8281
IConfig serverConfig = new MemoryConfig(new Properties());
83-
// Ensure the immediate_flush property has a default of true.
84-
serverConfig.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, String.valueOf(true));
85-
serverConfig.setProperty(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(config.getPort()));
86-
serverConfig.setProperty(BrokerConstants.HOST_PROPERTY_NAME, config.getHost());
87-
serverConfig.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.toString(config.getUsers().isEmpty()));
82+
serverConfig.setProperty(IConfig.DATA_PATH_PROPERTY_NAME, "data/" + UUID.randomUUID());
83+
// Ensure immediate flush
84+
serverConfig.setProperty(IConfig.BUFFER_FLUSH_MS_PROPERTY_NAME, "0");
85+
serverConfig.setProperty(IConfig.PORT_PROPERTY_NAME, Integer.toString(config.getPort()));
86+
serverConfig.setProperty(IConfig.HOST_PROPERTY_NAME, config.getHost());
87+
serverConfig.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.toString(config.getUsers().isEmpty()));
8888
if (config.getUseWebsocket()) {
89-
serverConfig.setProperty(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, Integer.toString(config.getWebsocketPort()));
89+
serverConfig.setProperty(IConfig.WEB_SOCKET_PORT_PROPERTY_NAME, Integer.toString(config.getWebsocketPort()));
9090
}
9191
if (Objects.nonNull(config.getServerCertificate())
9292
&& Objects.nonNull(config.getServerCertificate().getKeyStorePath())
9393
&& !config.getServerCertificate().getKeyStorePath().isEmpty()) {
9494
LOGGER.debug("Configuring keystore for ssl");
95-
serverConfig.setProperty(BrokerConstants.JKS_PATH_PROPERTY_NAME, config.getServerCertificate().getKeyStorePath());
96-
serverConfig.setProperty(BrokerConstants.KEY_STORE_TYPE, config.getServerCertificate().getKeyStoreType());
95+
serverConfig.setProperty(IConfig.JKS_PATH_PROPERTY_NAME, config.getServerCertificate().getKeyStorePath());
96+
serverConfig.setProperty(IConfig.KEY_STORE_TYPE, config.getServerCertificate().getKeyStoreType());
9797
if (Objects.nonNull(config.getServerCertificate().getKeyStorePassword())) {
98-
serverConfig.setProperty(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, config.getServerCertificate().getKeyStorePassword());
98+
serverConfig.setProperty(IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME, config.getServerCertificate().getKeyStorePassword());
9999
}
100100
if (Objects.nonNull(config.getServerCertificate().getKeyPassword())) {
101-
serverConfig.setProperty(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, config.getServerCertificate().getKeyPassword());
101+
serverConfig.setProperty(IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME, config.getServerCertificate().getKeyPassword());
102102
}
103-
serverConfig.setProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME, Integer.toString(config.getSslPort()));
103+
serverConfig.setProperty(IConfig.SSL_PORT_PROPERTY_NAME, Integer.toString(config.getSslPort()));
104104
if (config.getUseWebsocket()) {
105-
serverConfig.setProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME, Integer.toString(config.getSslWebsocketPort()));
105+
serverConfig.setProperty(IConfig.WSS_PORT_PROPERTY_NAME, Integer.toString(config.getSslWebsocketPort()));
106106
}
107107
}
108108
MoquetteAuthenticator authenticator = new MoquetteAuthenticator(config);

messagebus-mqtt/src/test/java/org/eclipse/digitaltwin/fa3st/service/messagebus/mqtt/MessageBusMqttExternalTest.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
*/
1414
package org.eclipse.digitaltwin.fa3st.service.messagebus.mqtt;
1515

16-
import io.moquette.BrokerConstants;
1716
import io.moquette.broker.Server;
1817
import io.moquette.broker.config.IConfig;
1918
import io.moquette.broker.config.MemoryConfig;
2019
import java.io.IOException;
2120
import java.util.Objects;
2221
import java.util.Properties;
22+
import java.util.UUID;
2323

2424

2525
public class MessageBusMqttExternalTest extends AbstractMessageBusMqttTest<Server> {
@@ -52,26 +52,27 @@ protected void stopServer(Server server) {
5252

5353
private static IConfig getMqttServerConfig(MessageBusMqttConfig config) {
5454
MemoryConfig result = new MemoryConfig(new Properties());
55-
result.setProperty(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(config.getPort()));
56-
result.setProperty(BrokerConstants.HOST_PROPERTY_NAME, LOCALHOST);
57-
result.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.toString(config.getUsers().isEmpty()));
55+
result.setProperty(IConfig.DATA_PATH_PROPERTY_NAME, "data/" + UUID.randomUUID());
56+
result.setProperty(IConfig.PORT_PROPERTY_NAME, Integer.toString(config.getPort()));
57+
result.setProperty(IConfig.HOST_PROPERTY_NAME, LOCALHOST);
58+
result.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.toString(config.getUsers().isEmpty()));
5859
if (config.getUseWebsocket()) {
59-
result.setProperty(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, Integer.toString(config.getWebsocketPort()));
60+
result.setProperty(IConfig.WEB_SOCKET_PORT_PROPERTY_NAME, Integer.toString(config.getWebsocketPort()));
6061
}
6162
if (Objects.nonNull(config.getServerCertificate())
6263
&& Objects.nonNull(config.getServerCertificate().getKeyStorePath())) {
63-
result.setProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME, Integer.toString(config.getSslPort()));
64-
result.setProperty(BrokerConstants.SSL_PROVIDER, "JDK");
65-
result.setProperty(BrokerConstants.JKS_PATH_PROPERTY_NAME, config.getServerCertificate().getKeyStorePath());
66-
result.setProperty(BrokerConstants.KEY_STORE_TYPE, config.getServerCertificate().getKeyStoreType());
64+
result.setProperty(IConfig.SSL_PORT_PROPERTY_NAME, Integer.toString(config.getSslPort()));
65+
result.setProperty(IConfig.SSL_PROVIDER, "JDK");
66+
result.setProperty(IConfig.JKS_PATH_PROPERTY_NAME, config.getServerCertificate().getKeyStorePath());
67+
result.setProperty(IConfig.KEY_STORE_TYPE, config.getServerCertificate().getKeyStoreType());
6768
if (Objects.nonNull(config.getServerCertificate().getKeyStorePassword())) {
68-
result.setProperty(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, config.getServerCertificate().getKeyStorePassword());
69+
result.setProperty(IConfig.KEY_STORE_PASSWORD_PROPERTY_NAME, config.getServerCertificate().getKeyStorePassword());
6970
}
7071
if (Objects.nonNull(config.getServerCertificate().getKeyPassword())) {
71-
result.setProperty(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, config.getServerCertificate().getKeyPassword());
72+
result.setProperty(IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME, config.getServerCertificate().getKeyPassword());
7273
}
7374
if (config.getUseWebsocket()) {
74-
result.setProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME, Integer.toString(config.getSslWebsocketPort()));
75+
result.setProperty(IConfig.WSS_PORT_PROPERTY_NAME, Integer.toString(config.getSslWebsocketPort()));
7576
}
7677
}
7778

0 commit comments

Comments
 (0)