diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 2e6ee20497f..f336e940d50 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -260,11 +260,18 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws synchronized (clientIdSet) { oldContext = clientIdSet.get(clientId); if (oldContext != null) { - if (context.isAllowLinkStealing()) { + Connection oldConnection = oldContext.getConnection(); + // Allow the new connection if link-stealing is enabled OR if the old + // connection is already in the process of stopping (race condition where + // the client reconnects before the broker has finished cleaning up the + // previous disconnected connection from clientIdSet). + boolean oldConnectionStopping = oldConnection instanceof TransportConnection + && ((TransportConnection) oldConnection).isStopping(); + if (context.isAllowLinkStealing() || oldConnectionStopping) { clientIdSet.put(clientId, context); } else { throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " - + oldContext.getConnection().getRemoteAddress()); + + oldConnection.getRemoteAddress()); } } else { clientIdSet.put(clientId, context); @@ -274,7 +281,11 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws if (oldContext != null) { if (oldContext.getConnection() != null) { Connection connection = oldContext.getConnection(); - LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection()); + if (connection instanceof TransportConnection && ((TransportConnection) connection).isStopping()) { + LOG.debug("Reconnect for clientId {} allowed; old connection {} is already stopping", clientId, connection); + } else { + LOG.warn("Stealing link for clientId {} From Connection {}", clientId, connection); + } if (connection instanceof TransportConnection) { TransportConnection transportConnection = (TransportConnection) connection; transportConnection.stopAsync(new IOException("Stealing link for clientId " + clientId + " From Connection " + oldContext.getConnection().getConnectionId()));