Skip to content

Commit 230db1f

Browse files
authored
Refactor network connections to process after ConnectionInfo (#2112)
This refactors the setup for network connections to be processed after receiving ConnectionInfo. Network bridges are established by each broker sending a BrokerInfo command to the other broker to provide remote broker information, but this is done before ConnectionInfo. This commit reworks the connection to capture the BrokerInfo information, but delay processing until after the connection information has been received and processed by broker.addConnection(). The future that was previously added for the connection id is no longer needed and removed. This commit also simplifies durable sync for bridges and prevents a race condition on startup by making sure the initial bridge and the duplex side only send back the BrokerSubscriptionInfo command after fully initialized.
1 parent b20a4c7 commit 230db1f

4 files changed

Lines changed: 217 additions & 86 deletions

File tree

activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java

Lines changed: 66 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
165165
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
166166
private String duplexNetworkConnectorId;
167167
private final long connectedTimestamp;
168-
private final CompletableFuture<ConnectionId> initialConnectionId = new CompletableFuture<>();
169168

170169
/**
171170
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
@@ -854,16 +853,14 @@ public Response processAddConnection(ConnectionInfo info) throws Exception {
854853

855854
try {
856855
broker.addConnection(context, info);
857-
// Complete the future with the connectionId if we completed
858-
// the broker.addConnection() chain successfully
859-
initialConnectionId.complete(info.getConnectionId());
856+
// If we completed broker.addConnection() successfully we can now
857+
// continue the required extra setup for any network connections
858+
addNetworkConnection();
860859
} catch (Exception e) {
861860
synchronized (brokerConnectionStates) {
862861
brokerConnectionStates.remove(info.getConnectionId());
863862
}
864863
unregisterConnectionState(info.getConnectionId());
865-
// complete with the exception
866-
initialConnectionId.completeExceptionally(e);
867864
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}",
868865
info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage());
869866
//AMQ-6561 - stop for all exceptions on addConnection
@@ -1401,44 +1398,75 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException {
14011398
throw new IOException("Unexpected extra broker info command received from: " + info.getBrokerId());
14021399
}
14031400
if (info.isSlaveBroker()) {
1404-
LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1405-
} else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
1401+
LOG.error("Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1402+
throw new IOException("Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName());
1403+
}
1404+
1405+
// The only thing this method now does is capture the BrokerInfo object and mark as a network connection.
1406+
// Actual processing for starting up duplex bridges and for durable sync has been moved until
1407+
// after ConnectionInfo has been received.
1408+
1409+
// If this is duplex we need to get the ID configured so we can use it
1410+
// to close existing connections later that match the same ID
1411+
// This will be done inside the RegionBroker
1412+
if (info.isNetworkConnection() && info.isDuplexConnection()) {
1413+
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1414+
config.setBrokerName(broker.getBrokerName());
1415+
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1416+
setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1417+
}
1418+
1419+
this.brokerInfo = info;
1420+
networkConnection = true;
1421+
List<TransportConnectionState> connectionStates = listConnectionStates();
1422+
for (TransportConnectionState cs : connectionStates) {
1423+
cs.getContext().setNetworkConnection(true);
1424+
}
1425+
return null;
1426+
}
1427+
1428+
// Process the network connection set up
1429+
private void addNetworkConnection() throws Exception {
1430+
final BrokerInfo info = this.brokerInfo;
1431+
if (info == null || !info.isNetworkConnection()){
1432+
return;
1433+
}
1434+
1435+
// For a one way bridge we need to respond on bridge creation by sending back the durable
1436+
// subs if durable sync is enabled via BrokerSubscriptionInfo command. The bridge is only
1437+
// initialized on one broker, so if this is the passive side we know it's initialized and
1438+
// we can respond.
1439+
//
1440+
// For a duplex bridge, we do NOT send back the durable subs. To simplify and ensure
1441+
// the bridge is fully initialized, the bridge startup will now handle sending
1442+
// BrokerSubscriptionInfo to the remote broker once fully started.
1443+
if (!info.isDuplexConnection()) {
14061444
try {
1407-
// register durable sync to be sent after ConnectionInfo has been handled
1408-
registerDurableSync(getNetworkConfiguration(info), info);
1445+
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1446+
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
1447+
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1448+
// Send back the durable subs as this is a one way bridge
1449+
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
1450+
}
14091451
} catch (Exception e) {
1410-
LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e);
1411-
return null;
1452+
LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
1453+
throw e;
14121454
}
1413-
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1455+
} else {
1456+
// duplex
14141457
// so this TransportConnection is the rear end of a network bridge
14151458
// We have been requested to create a two way pipe ...
14161459
try {
14171460
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
14181461
config.setBrokerName(broker.getBrokerName());
14191462

1420-
// register durable sync to be sent after ConnectionInfo has been handled
1421-
registerDurableSync(config, info);
1422-
1423-
// check for existing duplex connection hanging about
1424-
1425-
// We first look if existing network connection already exists for the same broker Id and network connector name
1426-
// It's possible in case of brief network fault to have this transport connector side of the connection always active
1427-
// and the duplex network connector side wanting to open a new one
1428-
// In this case, the old connection must be broken
1429-
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1430-
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1431-
synchronized (connections) {
1432-
for (TransportConnection c : connections) {
1433-
if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1434-
LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
1435-
c.stopAsync();
1436-
// better to wait for a bit rather than get connection id already in use and failure to start new bridge
1437-
c.getStopped().await(1, TimeUnit.SECONDS);
1438-
}
1439-
}
1440-
setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1441-
}
1463+
// Note: Durable sync used to be here and was moved to DemandForwardingBridgeSupport
1464+
// inside doStartLocalAndRemoteBridges()
1465+
1466+
//The logic to clean up existing network connections for the same ID
1467+
// has been moved to the RegionBroker where it will check if the broker
1468+
// needs to close the connection before trying to create a duplicate connection
1469+
14421470
Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
14431471
Transport remoteBridgeTransport = transport;
14441472
if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
@@ -1462,46 +1490,13 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException {
14621490
duplexBridge.setCreatedByDuplex(true);
14631491
duplexBridge.duplexStart(this, brokerInfo, info);
14641492
LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
1465-
return null;
14661493
} catch (TransportDisposedIOException e) {
14671494
LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
1468-
return null;
14691495
} catch (Exception e) {
14701496
LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
1471-
return null;
1497+
throw e;
14721498
}
14731499
}
1474-
this.brokerInfo = info;
1475-
networkConnection = true;
1476-
List<TransportConnectionState> connectionStates = listConnectionStates();
1477-
for (TransportConnectionState cs : connectionStates) {
1478-
cs.getContext().setNetworkConnection(true);
1479-
}
1480-
return null;
1481-
}
1482-
1483-
private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) {
1484-
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
1485-
// this will complete when the connection id has been set, or immediately if already set
1486-
initialConnectionId.whenComplete((connectionId, t) -> {
1487-
try {
1488-
if (t != null) {
1489-
LOG.warn("SyncDurableSubs will be skipped due to error {}",
1490-
t.getMessage());
1491-
return;
1492-
}
1493-
// check connection still registered
1494-
if (lookupConnectionState(connectionId) != null) {
1495-
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1496-
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
1497-
this.broker.getBrokerService(), config));
1498-
}
1499-
} catch (Exception e) {
1500-
LOG.error("Failed to respond to network bridge creation from broker {}",
1501-
info.getBrokerId(), e);
1502-
}
1503-
});
1504-
}
15051500
}
15061501

15071502
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -1707,15 +1702,15 @@ protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConn
17071702
this.duplexNetworkConnectorId = duplexNetworkConnectorId;
17081703
}
17091704

1710-
protected synchronized String getDuplexNetworkConnectorId() {
1705+
public synchronized String getDuplexNetworkConnectorId() {
17111706
return this.duplexNetworkConnectorId;
17121707
}
17131708

17141709
public boolean isStopping() {
17151710
return stopping.get();
17161711
}
17171712

1718-
protected CountDownLatch getStopped() {
1713+
public CountDownLatch getStopped() {
17191714
return stopped;
17201715
}
17211716

activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.ConcurrentHashMap;
3030
import java.util.concurrent.CopyOnWriteArrayList;
3131
import java.util.concurrent.ThreadPoolExecutor;
32+
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.concurrent.locks.ReentrantReadWriteLock;
3435

@@ -255,6 +256,10 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws
255256
throw new InvalidClientIDException("No clientID specified for connection request");
256257
}
257258

259+
// Clean up existing duplex network connection if this is a reconnect attempt
260+
// This was moved from TransportConnection
261+
cleanupExistingDuplexNetworkConnection(context);
262+
258263
ConnectionContext oldContext = null;
259264

260265
synchronized (clientIdSet) {
@@ -289,6 +294,38 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws
289294
connections.add(context.getConnection());
290295
}
291296

297+
// We first look if existing network connection already exists for the same broker Id and network connector name
298+
// It's possible in case of brief network fault to have this transport connector side of the connection always active
299+
// and the duplex network connector side wanting to open a new one
300+
// In this case, the old connection must be broken
301+
private void cleanupExistingDuplexNetworkConnection(ConnectionContext context) {
302+
try {
303+
if (context.isNetworkConnection()
304+
&& context.getConnection() instanceof TransportConnection) {
305+
final TransportConnection newConn = (TransportConnection) context.getConnection();
306+
if (newConn.getDuplexNetworkConnectorId() != null) {
307+
for (Connection c : connections) {
308+
if (c instanceof TransportConnection) {
309+
final TransportConnection existingConn = (TransportConnection) c;
310+
if (newConn.getDuplexNetworkConnectorId()
311+
.equals(existingConn.getDuplexNetworkConnectorId())) {
312+
LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).",
313+
c, existingConn.getDuplexNetworkConnectorId());
314+
existingConn.stopAsync();
315+
// better to wait for a bit rather than get connection id already in use and failure to start new bridge
316+
existingConn.getStopped().await(2, TimeUnit.SECONDS);
317+
break;
318+
}
319+
}
320+
}
321+
}
322+
}
323+
} catch (Exception e) {
324+
LOG.warn("Error cleaning up Duplex connection: {}" , e.getMessage());
325+
LOG.debug(e.getMessage(), e);
326+
}
327+
}
328+
292329
@Override
293330
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
294331
String clientId = info.getClientId();

activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,15 @@ private void doStartLocalAndRemoteBridges() {
477477
if (safeWaitUntilStarted()) {
478478
setupStaticDestinations();
479479
staticDestinationsLatch.countDown();
480+
481+
// Send to the remote broker the durable subs if sync is enabled after statup.
482+
// This is done by the initiating side of a bridge as well as by duplex bridges to
483+
// ensure everything is fully initialized before sending.
484+
if (configuration.isSyncDurableSubs() &&
485+
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
486+
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
487+
configuration));
488+
}
480489
}
481490
} catch (Throwable e) {
482491
serviceLocalException(e);
@@ -599,11 +608,6 @@ protected void startRemoteBridge() throws Exception {
599608
brokerInfo.setNetworkProperties(str);
600609
brokerInfo.setBrokerId(this.localBrokerId);
601610
remoteBroker.oneway(brokerInfo);
602-
if (configuration.isSyncDurableSubs() &&
603-
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
604-
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
605-
configuration));
606-
}
607611
}
608612
if (remoteConnectionInfo != null) {
609613
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());

0 commit comments

Comments
 (0)