Skip to content

Commit de9a6d3

Browse files
committed
Fix connection management
1 parent 7dd74fc commit de9a6d3

3 files changed

Lines changed: 20 additions & 10 deletions

File tree

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.MqttEvent;
2424
import java.util.ArrayList;
2525
import java.util.Collection;
26+
import java.util.HashSet;
27+
import java.util.LinkedHashSet;
28+
import java.util.Set;
2629
import java.util.concurrent.ConcurrentHashMap;
2730
import java.util.concurrent.ConcurrentMap;
2831
import java.util.concurrent.TimeUnit;
@@ -36,6 +39,8 @@
3639
@Slf4j
3740
public class MQTTConnectionManager {
3841

42+
private static final Connection REMOTE_CONNECTION_SOLT = Connection.builder().build();
43+
3944
private final ConcurrentMap<String, Connection> localConnections;
4045

4146
private final ConcurrentMap<String, Connection> eventConnections;
@@ -102,11 +107,11 @@ public Collection<Connection> getLocalConnections() {
102107
return this.localConnections.values();
103108
}
104109

105-
public Collection<Connection> getAllConnections() {
106-
Collection<Connection> connections = new ArrayList<>(this.localConnections.values().size()
107-
+ this.eventConnections.values().size());
108-
connections.addAll(this.localConnections.values());
109-
connections.addAll(eventConnections.values());
110+
public Collection<String> getAllConnectionsId() {
111+
Set<String> connections = new LinkedHashSet<>(this.localConnections.keySet().size()
112+
+ this.eventConnections.keySet().size());
113+
connections.addAll(this.localConnections.keySet());
114+
connections.addAll(eventConnections.keySet());
110115
return connections;
111116
}
112117

@@ -126,7 +131,7 @@ public void onChange(MqttEvent event) {
126131
log.warn("[ConnectEvent] close existing connection : {}", connection);
127132
connection.disconnect();
128133
} else {
129-
eventConnections.put(connectEvent.getClientId(), connection);
134+
eventConnections.put(connectEvent.getClientId(), REMOTE_CONNECTION_SOLT);
130135
}
131136
}
132137
}

mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,13 @@ private void refreshCache(Message<MqttEvent> msg) {
237237
default:
238238
break;
239239
}
240-
listeners.forEach(listener -> listener.onChange(value));
240+
listeners.forEach(listener -> {
241+
try {
242+
listener.onChange(value);
243+
} catch (Throwable e) {
244+
log.error("Failed to process event : {}", value.getKey(), e);
245+
}
246+
});
241247
} catch (Throwable ex) {
242248
log.error("refresh cache error", ex);
243249
}

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,8 @@ public class Devices extends WebResource {
4545
@ApiResponse(code = 500, message = "Internal server error")})
4646
public void getList(@Suspended final AsyncResponse asyncResponse) {
4747
try {
48-
final Collection<Connection> allConnections = service().getConnectionManager().getAllConnections();
49-
asyncResponse.resume(allConnections.stream().map(e ->
50-
e.getClientId()).collect(Collectors.toList()));
48+
final Collection<String> allConnections = service().getConnectionManager().getAllConnectionsId();
49+
asyncResponse.resume(allConnections);
5150
} catch (Exception e) {
5251
log.error("[{}] Failed to list devices {}", clientAppId(), e);
5352
asyncResponse.resume(new RestException(e));

0 commit comments

Comments
 (0)