Skip to content

Commit 42c0780

Browse files
authored
Refactor network connections to process after ConnectionInfo (#2112) (#2117)
This refactors the setup for network connections to be processed after receiving ConnectionInfo. Network bridges are established by each broker sending a BrokerInfo command to the other broker to provide remote broker information, but this is done before ConnectionInfo. This commit reworks the connection to capture the BrokerInfo information, but delay processing until after the connection information has been received and processed by broker.addConnection(). The future that was previously added for the connection id is no longer needed and removed. This commit also simplifies durable sync for bridges and prevents a race condition on startup by making sure the initial bridge and the duplex side only send back the BrokerSubscriptionInfo command after fully initialized.
1 parent 4e541f6 commit 42c0780

4 files changed

Lines changed: 217 additions & 87 deletions

File tree

activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java

Lines changed: 66 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
164164
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
165165
private String duplexNetworkConnectorId;
166166
private final long connectedTimestamp;
167-
private final CompletableFuture<ConnectionId> initialConnectionId = new CompletableFuture<>();
168167

169168
/**
170169
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
@@ -853,16 +852,14 @@ public Response processAddConnection(ConnectionInfo info) throws Exception {
853852

854853
try {
855854
broker.addConnection(context, info);
856-
// Complete the future with the connectionId if we completed
857-
// the broker.addConnection() chain successfully
858-
initialConnectionId.complete(info.getConnectionId());
855+
// If we completed broker.addConnection() successfully we can now
856+
// continue the required extra setup for any network connections
857+
addNetworkConnection();
859858
} catch (Exception e) {
860859
synchronized (brokerConnectionStates) {
861860
brokerConnectionStates.remove(info.getConnectionId());
862861
}
863862
unregisterConnectionState(info.getConnectionId());
864-
// complete with the exception
865-
initialConnectionId.completeExceptionally(e);
866863
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}",
867864
info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage());
868865
//AMQ-6561 - stop for all exceptions on addConnection
@@ -1398,45 +1395,75 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException {
13981395
throw new IOException("Unexpected extra broker info command received from: " + info.getBrokerId());
13991396
}
14001397
if (info.isSlaveBroker()) {
1401-
LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1402-
} else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
1398+
LOG.error("Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
1399+
throw new IOException("Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName());
1400+
}
1401+
1402+
// The only thing this method now does is capture the BrokerInfo object and mark as a network connection.
1403+
// Actual processing for starting up duplex bridges and for durable sync has been moved until
1404+
// after ConnectionInfo has been received.
1405+
1406+
// If this is duplex we need to get the ID configured so we can use it
1407+
// to close existing connections later that match the same ID
1408+
// This will be done inside the RegionBroker
1409+
if (info.isNetworkConnection() && info.isDuplexConnection()) {
1410+
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1411+
config.setBrokerName(broker.getBrokerName());
1412+
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1413+
setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1414+
}
1415+
1416+
this.brokerInfo = info;
1417+
networkConnection = true;
1418+
List<TransportConnectionState> connectionStates = listConnectionStates();
1419+
for (TransportConnectionState cs : connectionStates) {
1420+
cs.getContext().setNetworkConnection(true);
1421+
}
1422+
return null;
1423+
}
1424+
1425+
// Process the network connection set up
1426+
private void addNetworkConnection() throws Exception {
1427+
final BrokerInfo info = this.brokerInfo;
1428+
if (info == null || !info.isNetworkConnection()){
1429+
return;
1430+
}
1431+
1432+
// For a one way bridge we need to respond on bridge creation by sending back the durable
1433+
// subs if durable sync is enabled via BrokerSubscriptionInfo command. The bridge is only
1434+
// initialized on one broker, so if this is the passive side we know it's initialized and
1435+
// we can respond.
1436+
//
1437+
// For a duplex bridge, we do NOT send back the durable subs. To simplify and ensure
1438+
// the bridge is fully initialized, the bridge startup will now handle sending
1439+
// BrokerSubscriptionInfo to the remote broker once fully started.
1440+
if (!info.isDuplexConnection()) {
14031441
try {
1404-
// register durable sync to be sent after ConnectionInfo has been handled
1405-
registerDurableSync(getNetworkConfiguration(info), info);
1442+
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
1443+
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
1444+
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1445+
// Send back the durable subs as this is a one way bridge
1446+
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
1447+
}
14061448
} catch (Exception e) {
1407-
LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e);
1408-
return null;
1449+
LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
1450+
throw e;
14091451
}
1410-
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1452+
} else {
1453+
// duplex
14111454
// so this TransportConnection is the rear end of a network bridge
14121455
// We have been requested to create a two way pipe ...
14131456
try {
14141457
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
14151458
config.setBrokerName(broker.getBrokerName());
14161459

1417-
// register durable sync to be sent after ConnectionInfo has been handled
1418-
registerDurableSync(config, info);
1419-
1420-
// check for existing duplex connection hanging about
1421-
1422-
// We first look if existing network connection already exists for the same broker Id and network connector name
1423-
// It's possible in case of brief network fault to have this transport connector side of the connection always active
1424-
// and the duplex network connector side wanting to open a new one
1425-
// In this case, the old connection must be broken
1426-
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1427-
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1428-
synchronized (connections) {
1429-
for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1430-
TransportConnection c = iter.next();
1431-
if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1432-
LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
1433-
c.stopAsync();
1434-
// better to wait for a bit rather than get connection id already in use and failure to start new bridge
1435-
c.getStopped().await(1, TimeUnit.SECONDS);
1436-
}
1437-
}
1438-
setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1439-
}
1460+
// Note: Durable sync used to be here and was moved to DemandForwardingBridgeSupport
1461+
// inside doStartLocalAndRemoteBridges()
1462+
1463+
//The logic to clean up existing network connections for the same ID
1464+
// has been moved to the RegionBroker where it will check if the broker
1465+
// needs to close the connection before trying to create a duplicate connection
1466+
14401467
Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
14411468
Transport remoteBridgeTransport = transport;
14421469
if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
@@ -1460,46 +1487,13 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException {
14601487
duplexBridge.setCreatedByDuplex(true);
14611488
duplexBridge.duplexStart(this, brokerInfo, info);
14621489
LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
1463-
return null;
14641490
} catch (TransportDisposedIOException e) {
14651491
LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
1466-
return null;
14671492
} catch (Exception e) {
14681493
LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
1469-
return null;
1494+
throw e;
14701495
}
14711496
}
1472-
this.brokerInfo = info;
1473-
networkConnection = true;
1474-
List<TransportConnectionState> connectionStates = listConnectionStates();
1475-
for (TransportConnectionState cs : connectionStates) {
1476-
cs.getContext().setNetworkConnection(true);
1477-
}
1478-
return null;
1479-
}
1480-
1481-
private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) {
1482-
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
1483-
// this will complete when the connection id has been set, or immediately if already set
1484-
initialConnectionId.whenComplete((connectionId, t) -> {
1485-
try {
1486-
if (t != null) {
1487-
LOG.warn("SyncDurableSubs will be skipped due to error {}",
1488-
t.getMessage());
1489-
return;
1490-
}
1491-
// check connection still registered
1492-
if (lookupConnectionState(connectionId) != null) {
1493-
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
1494-
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
1495-
this.broker.getBrokerService(), config));
1496-
}
1497-
} catch (Exception e) {
1498-
LOG.error("Failed to respond to network bridge creation from broker {}",
1499-
info.getBrokerId(), e);
1500-
}
1501-
});
1502-
}
15031497
}
15041498

15051499
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -1705,15 +1699,15 @@ protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConn
17051699
this.duplexNetworkConnectorId = duplexNetworkConnectorId;
17061700
}
17071701

1708-
protected synchronized String getDuplexNetworkConnectorId() {
1702+
public synchronized String getDuplexNetworkConnectorId() {
17091703
return this.duplexNetworkConnectorId;
17101704
}
17111705

17121706
public boolean isStopping() {
17131707
return stopping.get();
17141708
}
17151709

1716-
protected CountDownLatch getStopped() {
1710+
public CountDownLatch getStopped() {
17171711
return stopped;
17181712
}
17191713

activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.ConcurrentHashMap;
3030
import java.util.concurrent.CopyOnWriteArrayList;
3131
import java.util.concurrent.ThreadPoolExecutor;
32+
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.atomic.AtomicBoolean;
3334
import java.util.concurrent.locks.ReentrantReadWriteLock;
3435

@@ -255,6 +256,10 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws
255256
throw new InvalidClientIDException("No clientID specified for connection request");
256257
}
257258

259+
// Clean up existing duplex network connection if this is a reconnect attempt
260+
// This was moved from TransportConnection
261+
cleanupExistingDuplexNetworkConnection(context);
262+
258263
ConnectionContext oldContext = null;
259264

260265
synchronized (clientIdSet) {
@@ -289,6 +294,38 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws
289294
connections.add(context.getConnection());
290295
}
291296

297+
// We first look if existing network connection already exists for the same broker Id and network connector name
298+
// It's possible in case of brief network fault to have this transport connector side of the connection always active
299+
// and the duplex network connector side wanting to open a new one
300+
// In this case, the old connection must be broken
301+
private void cleanupExistingDuplexNetworkConnection(ConnectionContext context) {
302+
try {
303+
if (context.isNetworkConnection()
304+
&& context.getConnection() instanceof TransportConnection) {
305+
final TransportConnection newConn = (TransportConnection) context.getConnection();
306+
if (newConn.getDuplexNetworkConnectorId() != null) {
307+
for (Connection c : connections) {
308+
if (c instanceof TransportConnection) {
309+
final TransportConnection existingConn = (TransportConnection) c;
310+
if (newConn.getDuplexNetworkConnectorId()
311+
.equals(existingConn.getDuplexNetworkConnectorId())) {
312+
LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).",
313+
c, existingConn.getDuplexNetworkConnectorId());
314+
existingConn.stopAsync();
315+
// better to wait for a bit rather than get connection id already in use and failure to start new bridge
316+
existingConn.getStopped().await(2, TimeUnit.SECONDS);
317+
break;
318+
}
319+
}
320+
}
321+
}
322+
}
323+
} catch (Exception e) {
324+
LOG.warn("Error cleaning up Duplex connection: {}" , e.getMessage());
325+
LOG.debug(e.getMessage(), e);
326+
}
327+
}
328+
292329
@Override
293330
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
294331
String clientId = info.getClientId();

activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,15 @@ private void doStartLocalAndRemoteBridges() {
477477
if (safeWaitUntilStarted()) {
478478
setupStaticDestinations();
479479
staticDestinationsLatch.countDown();
480+
481+
// Send to the remote broker the durable subs if sync is enabled after statup.
482+
// This is done by the initiating side of a bridge as well as by duplex bridges to
483+
// ensure everything is fully initialized before sending.
484+
if (configuration.isSyncDurableSubs() &&
485+
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
486+
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
487+
configuration));
488+
}
480489
}
481490
} catch (Throwable e) {
482491
serviceLocalException(e);
@@ -599,11 +608,6 @@ protected void startRemoteBridge() throws Exception {
599608
brokerInfo.setNetworkProperties(str);
600609
brokerInfo.setBrokerId(this.localBrokerId);
601610
remoteBroker.oneway(brokerInfo);
602-
if (configuration.isSyncDurableSubs() &&
603-
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
604-
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
605-
configuration));
606-
}
607611
}
608612
if (remoteConnectionInfo != null) {
609613
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());

0 commit comments

Comments
 (0)