@@ -97,18 +97,8 @@ public void run() throws Exception {
9797 }
9898
9999 private Mqtt5AsyncClient connectAndSubscribeToClient () throws URISyntaxException {
100- Mqtt5SimpleAuth auth ;
101-
102100 URI parsedUrl = new URI (url );
103- if (parsedUrl .getUserInfo () != null ) {
104- String [] userinfo = parsedUrl .getUserInfo ().split (":" );
105- auth = Mqtt5SimpleAuth .builder ()
106- .username (userinfo [0 ])
107- .password (userinfo [1 ].getBytes (StandardCharsets .UTF_8 ))
108- .build ();
109- } else {
110- auth = null ;
111- }
101+ Mqtt5SimpleAuth auth = createAuthFromUrl (parsedUrl );
112102
113103 Mqtt5AsyncClient asyncClient = Mqtt5Client .builder ()
114104 .identifier ("OpenTripPlanner-" + UUID .randomUUID ())
@@ -120,7 +110,7 @@ private Mqtt5AsyncClient connectAndSubscribeToClient() throws URISyntaxException
120110 .addDisconnectedListener (this ::onDisconnect )
121111 .buildAsync ();
122112
123- asyncClient .connectWith ().keepAlive (30 ).cleanStart (false ).send ().join ();
113+ asyncClient .connectWith ().keepAlive (30 ).cleanStart (true ).send ().join ();
124114
125115 asyncClient
126116 .subscribeWith ()
@@ -133,6 +123,17 @@ private Mqtt5AsyncClient connectAndSubscribeToClient() throws URISyntaxException
133123 return asyncClient ;
134124 }
135125
126+ private Mqtt5SimpleAuth createAuthFromUrl (URI parsedUrl ) {
127+ if (parsedUrl .getUserInfo () != null ) {
128+ String [] userinfo = parsedUrl .getUserInfo ().split (":" );
129+ return Mqtt5SimpleAuth .builder ()
130+ .username (userinfo [0 ])
131+ .password (userinfo [1 ].getBytes (StandardCharsets .UTF_8 ))
132+ .build ();
133+ }
134+ return null ;
135+ }
136+
136137 private void onDisconnect (MqttClientDisconnectedContext ctx ) {
137138 LOG .info ("Disconnected client from MQTT broker: {}" , url , ctx .getCause ());
138139 }
0 commit comments