Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
private final long connectedTimestamp;
private final CompletableFuture<ConnectionId> initialConnectionId = new CompletableFuture<>();

/**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
Expand Down Expand Up @@ -854,16 +853,14 @@ public Response processAddConnection(ConnectionInfo info) throws Exception {

try {
broker.addConnection(context, info);
// Complete the future with the connectionId if we completed
// the broker.addConnection() chain successfully
initialConnectionId.complete(info.getConnectionId());
// If we completed broker.addConnection() successfully we can now
// continue the required extra setup for any network connections
addNetworkConnection();
} catch (Exception e) {
synchronized (brokerConnectionStates) {
brokerConnectionStates.remove(info.getConnectionId());
}
unregisterConnectionState(info.getConnectionId());
// complete with the exception
initialConnectionId.completeExceptionally(e);
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}",
info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage());
//AMQ-6561 - stop for all exceptions on addConnection
Expand Down Expand Up @@ -1401,44 +1398,75 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException {
throw new IOException("Unexpected extra broker info command received from: " + info.getBrokerId());
}
if (info.isSlaveBroker()) {
LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
} else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
LOG.error("Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
throw new IOException("Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName());
}

// The only thing this method now does is capture the BrokerInfo object and mark as a network connection.
// Actual processing for starting up duplex bridges and for durable sync has been moved until
// after ConnectionInfo has been received.

// If this is duplex we need to get the ID configured so we can use it
// to close existing connections later that match the same ID
// This will be done inside the RegionBroker
if (info.isNetworkConnection() && info.isDuplexConnection()) {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
setDuplexNetworkConnectorId(duplexNetworkConnectorId);
}

this.brokerInfo = info;
networkConnection = true;
List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().setNetworkConnection(true);
}
return null;
}

// Process the network connection set up
private void addNetworkConnection() throws Exception {
final BrokerInfo info = this.brokerInfo;
if (info == null || !info.isNetworkConnection()){
return;
}

// For a one way bridge we need to respond on bridge creation by sending back the durable
// subs if durable sync is enabled via BrokerSubscriptionInfo command. The bridge is only
// initialized on one broker, so if this is the passive side we know it's initialized and
// we can respond.
//
// For a duplex bridge, we do NOT send back the durable subs. To simplify and ensure
// the bridge is fully initialized, the bridge startup will now handle sending
// BrokerSubscriptionInfo to the remote broker once fully started.
if (!info.isDuplexConnection()) {
try {
// register durable sync to be sent after ConnectionInfo has been handled
registerDurableSync(getNetworkConfiguration(info), info);
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
// Send back the durable subs as this is a one way bridge
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
} catch (Exception e) {
LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e);
return null;
LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
throw e;
}
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
} else {
// duplex
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
try {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());

// register durable sync to be sent after ConnectionInfo has been handled
registerDurableSync(config, info);

// check for existing duplex connection hanging about

// We first look if existing network connection already exists for the same broker Id and network connector name
// It's possible in case of brief network fault to have this transport connector side of the connection always active
// and the duplex network connector side wanting to open a new one
// In this case, the old connection must be broken
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
synchronized (connections) {
for (TransportConnection c : connections) {
if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
c.stopAsync();
// better to wait for a bit rather than get connection id already in use and failure to start new bridge
c.getStopped().await(1, TimeUnit.SECONDS);
}
}
setDuplexNetworkConnectorId(duplexNetworkConnectorId);
}
// Note: Durable sync used to be here and was moved to DemandForwardingBridgeSupport
// inside doStartLocalAndRemoteBridges()

//The logic to clean up existing network connections for the same ID
// has been moved to the RegionBroker where it will check if the broker
// needs to close the connection before trying to create a duplicate connection

Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
Transport remoteBridgeTransport = transport;
if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
Expand All @@ -1462,46 +1490,13 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException {
duplexBridge.setCreatedByDuplex(true);
duplexBridge.duplexStart(this, brokerInfo, info);
LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId);
return null;
} catch (TransportDisposedIOException e) {
LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId);
return null;
} catch (Exception e) {
LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e);
return null;
throw e;
}
}
this.brokerInfo = info;
networkConnection = true;
List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().setNetworkConnection(true);
}
return null;
}

private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) {
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
// this will complete when the connection id has been set, or immediately if already set
initialConnectionId.whenComplete((connectionId, t) -> {
try {
if (t != null) {
LOG.warn("SyncDurableSubs will be skipped due to error {}",
t.getMessage());
return;
}
// check connection still registered
if (lookupConnectionState(connectionId) != null) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
this.broker.getBrokerService(), config));
}
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}",
info.getBrokerId(), e);
}
});
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -1707,15 +1702,15 @@ protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConn
this.duplexNetworkConnectorId = duplexNetworkConnectorId;
}

protected synchronized String getDuplexNetworkConnectorId() {
public synchronized String getDuplexNetworkConnectorId() {
return this.duplexNetworkConnectorId;
}

public boolean isStopping() {
return stopping.get();
}

protected CountDownLatch getStopped() {
public CountDownLatch getStopped() {
return stopped;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -255,6 +256,10 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws
throw new InvalidClientIDException("No clientID specified for connection request");
}

// Clean up existing duplex network connection if this is a reconnect attempt
// This was moved from TransportConnection
cleanupExistingDuplexNetworkConnection(context);

ConnectionContext oldContext = null;

synchronized (clientIdSet) {
Expand Down Expand Up @@ -289,6 +294,38 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws
connections.add(context.getConnection());
}

// We first look if existing network connection already exists for the same broker Id and network connector name
// It's possible in case of brief network fault to have this transport connector side of the connection always active
// and the duplex network connector side wanting to open a new one
// In this case, the old connection must be broken
private void cleanupExistingDuplexNetworkConnection(ConnectionContext context) {
try {
if (context.isNetworkConnection()
&& context.getConnection() instanceof TransportConnection) {
final TransportConnection newConn = (TransportConnection) context.getConnection();
if (newConn.getDuplexNetworkConnectorId() != null) {
for (Connection c : connections) {
if (c instanceof TransportConnection) {
final TransportConnection existingConn = (TransportConnection) c;
if (newConn.getDuplexNetworkConnectorId()
.equals(existingConn.getDuplexNetworkConnectorId())) {
LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).",
c, existingConn.getDuplexNetworkConnectorId());
existingConn.stopAsync();
// better to wait for a bit rather than get connection id already in use and failure to start new bridge
existingConn.getStopped().await(2, TimeUnit.SECONDS);
break;
}
}
}
}
}
} catch (Exception e) {
LOG.warn("Error cleaning up Duplex connection: {}" , e.getMessage());
LOG.debug(e.getMessage(), e);
}
}

@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
String clientId = info.getClientId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,15 @@ private void doStartLocalAndRemoteBridges() {
if (safeWaitUntilStarted()) {
setupStaticDestinations();
staticDestinationsLatch.countDown();

// Send to the remote broker the durable subs if sync is enabled after statup.
// This is done by the initiating side of a bridge as well as by duplex bridges to
// ensure everything is fully initialized before sending.
if (configuration.isSyncDurableSubs() &&
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
configuration));
}
}
} catch (Throwable e) {
serviceLocalException(e);
Expand Down Expand Up @@ -599,11 +608,6 @@ protected void startRemoteBridge() throws Exception {
brokerInfo.setNetworkProperties(str);
brokerInfo.setBrokerId(this.localBrokerId);
remoteBroker.oneway(brokerInfo);
if (configuration.isSyncDurableSubs() &&
remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
configuration));
}
}
if (remoteConnectionInfo != null) {
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
Expand Down
Loading