diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 9e771017445..b6fe5488574 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -165,7 +165,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private String duplexNetworkConnectorId; private final long connectedTimestamp; - private final CompletableFuture initialConnectionId = new CompletableFuture<>(); /** * @param taskRunnerFactory - can be null if you want direct dispatch to the transport @@ -854,16 +853,14 @@ public Response processAddConnection(ConnectionInfo info) throws Exception { try { broker.addConnection(context, info); - // Complete the future with the connectionId if we completed - // the broker.addConnection() chain successfully - initialConnectionId.complete(info.getConnectionId()); + // If we completed broker.addConnection() successfully we can now + // continue the required extra setup for any network connections + addNetworkConnection(); } catch (Exception e) { synchronized (brokerConnectionStates) { brokerConnectionStates.remove(info.getConnectionId()); } unregisterConnectionState(info.getConnectionId()); - // complete with the exception - initialConnectionId.completeExceptionally(e); LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}", info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage()); //AMQ-6561 - stop for all exceptions on addConnection @@ -1401,44 +1398,75 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException { throw new IOException("Unexpected extra broker info command received from: " + info.getBrokerId()); } if (info.isSlaveBroker()) { - LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); - } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { + LOG.error("Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); + throw new IOException("Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName()); + } + + // The only thing this method now does is capture the BrokerInfo object and mark as a network connection. + // Actual processing for starting up duplex bridges and for durable sync has been moved until + // after ConnectionInfo has been received. + + // If this is duplex we need to get the ID configured so we can use it + // to close existing connections later that match the same ID + // This will be done inside the RegionBroker + if (info.isNetworkConnection() && info.isDuplexConnection()) { + NetworkBridgeConfiguration config = getNetworkConfiguration(info); + config.setBrokerName(broker.getBrokerName()); + String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); + setDuplexNetworkConnectorId(duplexNetworkConnectorId); + } + + this.brokerInfo = info; + networkConnection = true; + List connectionStates = listConnectionStates(); + for (TransportConnectionState cs : connectionStates) { + cs.getContext().setNetworkConnection(true); + } + return null; + } + + // Process the network connection set up + private void addNetworkConnection() throws Exception { + final BrokerInfo info = this.brokerInfo; + if (info == null || !info.isNetworkConnection()){ + return; + } + + // For a one way bridge we need to respond on bridge creation by sending back the durable + // subs if durable sync is enabled via BrokerSubscriptionInfo command. The bridge is only + // initialized on one broker, so if this is the passive side we know it's initialized and + // we can respond. + // + // For a duplex bridge, we do NOT send back the durable subs. To simplify and ensure + // the bridge is fully initialized, the bridge startup will now handle sending + // BrokerSubscriptionInfo to the remote broker once fully started. + if (!info.isDuplexConnection()) { try { - // register durable sync to be sent after ConnectionInfo has been handled - registerDurableSync(getNetworkConfiguration(info), info); + NetworkBridgeConfiguration config = getNetworkConfiguration(info); + if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { + LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); + // Send back the durable subs as this is a one way bridge + dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); + } } catch (Exception e) { - LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e); - return null; + LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); + throw e; } - } else if (info.isNetworkConnection() && info.isDuplexConnection()) { + } else { + // duplex // so this TransportConnection is the rear end of a network bridge // We have been requested to create a two way pipe ... try { NetworkBridgeConfiguration config = getNetworkConfiguration(info); config.setBrokerName(broker.getBrokerName()); - // register durable sync to be sent after ConnectionInfo has been handled - registerDurableSync(config, info); - - // check for existing duplex connection hanging about - - // We first look if existing network connection already exists for the same broker Id and network connector name - // It's possible in case of brief network fault to have this transport connector side of the connection always active - // and the duplex network connector side wanting to open a new one - // In this case, the old connection must be broken - String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); - CopyOnWriteArrayList connections = this.connector.getConnections(); - synchronized (connections) { - for (TransportConnection c : connections) { - if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { - LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId); - c.stopAsync(); - // better to wait for a bit rather than get connection id already in use and failure to start new bridge - c.getStopped().await(1, TimeUnit.SECONDS); - } - } - setDuplexNetworkConnectorId(duplexNetworkConnectorId); - } + // Note: Durable sync used to be here and was moved to DemandForwardingBridgeSupport + // inside doStartLocalAndRemoteBridges() + + //The logic to clean up existing network connections for the same ID + // has been moved to the RegionBroker where it will check if the broker + // needs to close the connection before trying to create a duplicate connection + Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI()); Transport remoteBridgeTransport = transport; if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { @@ -1462,46 +1490,13 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException { duplexBridge.setCreatedByDuplex(true); duplexBridge.duplexStart(this, brokerInfo, info); LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId); - return null; } catch (TransportDisposedIOException e) { LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId); - return null; } catch (Exception e) { LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e); - return null; + throw e; } } - this.brokerInfo = info; - networkConnection = true; - List connectionStates = listConnectionStates(); - for (TransportConnectionState cs : connectionStates) { - cs.getContext().setNetworkConnection(true); - } - return null; - } - - private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) { - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - // this will complete when the connection id has been set, or immediately if already set - initialConnectionId.whenComplete((connectionId, t) -> { - try { - if (t != null) { - LOG.warn("SyncDurableSubs will be skipped due to error {}", - t.getMessage()); - return; - } - // check connection still registered - if (lookupConnectionState(connectionId) != null) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo( - this.broker.getBrokerService(), config)); - } - } catch (Exception e) { - LOG.error("Failed to respond to network bridge creation from broker {}", - info.getBrokerId(), e); - } - }); - } } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -1707,7 +1702,7 @@ protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConn this.duplexNetworkConnectorId = duplexNetworkConnectorId; } - protected synchronized String getDuplexNetworkConnectorId() { + public synchronized String getDuplexNetworkConnectorId() { return this.duplexNetworkConnectorId; } @@ -1715,7 +1710,7 @@ public boolean isStopping() { return stopping.get(); } - protected CountDownLatch getStopped() { + public CountDownLatch getStopped() { return stopped; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 2e6ee20497f..7d5de4c184d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -255,6 +256,10 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws throw new InvalidClientIDException("No clientID specified for connection request"); } + // Clean up existing duplex network connection if this is a reconnect attempt + // This was moved from TransportConnection + cleanupExistingDuplexNetworkConnection(context); + ConnectionContext oldContext = null; synchronized (clientIdSet) { @@ -289,6 +294,38 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws connections.add(context.getConnection()); } + // We first look if existing network connection already exists for the same broker Id and network connector name + // It's possible in case of brief network fault to have this transport connector side of the connection always active + // and the duplex network connector side wanting to open a new one + // In this case, the old connection must be broken + private void cleanupExistingDuplexNetworkConnection(ConnectionContext context) { + try { + if (context.isNetworkConnection() + && context.getConnection() instanceof TransportConnection) { + final TransportConnection newConn = (TransportConnection) context.getConnection(); + if (newConn.getDuplexNetworkConnectorId() != null) { + for (Connection c : connections) { + if (c instanceof TransportConnection) { + final TransportConnection existingConn = (TransportConnection) c; + if (newConn.getDuplexNetworkConnectorId() + .equals(existingConn.getDuplexNetworkConnectorId())) { + LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", + c, existingConn.getDuplexNetworkConnectorId()); + existingConn.stopAsync(); + // better to wait for a bit rather than get connection id already in use and failure to start new bridge + existingConn.getStopped().await(2, TimeUnit.SECONDS); + break; + } + } + } + } + } + } catch (Exception e) { + LOG.warn("Error cleaning up Duplex connection: {}" , e.getMessage()); + LOG.debug(e.getMessage(), e); + } + } + @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { String clientId = info.getClientId(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 26a5769ed0d..17eb9c55294 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -477,6 +477,15 @@ private void doStartLocalAndRemoteBridges() { if (safeWaitUntilStarted()) { setupStaticDestinations(); staticDestinationsLatch.countDown(); + + // Send to the remote broker the durable subs if sync is enabled after statup. + // This is done by the initiating side of a bridge as well as by duplex bridges to + // ensure everything is fully initialized before sending. + if (configuration.isSyncDurableSubs() && + remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { + remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService, + configuration)); + } } } catch (Throwable e) { serviceLocalException(e); @@ -599,11 +608,6 @@ protected void startRemoteBridge() throws Exception { brokerInfo.setNetworkProperties(str); brokerInfo.setBrokerId(this.localBrokerId); remoteBroker.oneway(brokerInfo); - if (configuration.isSyncDurableSubs() && - remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService, - configuration)); - } } if (remoteConnectionInfo != null) { remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java index 19c95da29b4..3a99029617e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java @@ -16,10 +16,12 @@ */ package org.apache.activemq.network; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.IOException; import java.net.URI; import java.util.Arrays; import java.util.Collection; @@ -39,8 +41,11 @@ import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.discovery.DiscoveryAgent; +import org.apache.activemq.transport.discovery.DiscoveryListener; import org.apache.activemq.util.Wait; import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -84,6 +89,7 @@ public static Collection data() { private static final String USER_PASSWORD = "password"; private final boolean duplex; private final AtomicReference brokerSubInfo = new AtomicReference<>(); + private final AtomicReference serviceFailed = new AtomicReference<>(); private String ncPassword = USER_PASSWORD; public DurableSyncNetworkBridgeAuthTest(boolean duplex) { @@ -94,6 +100,7 @@ public DurableSyncNetworkBridgeAuthTest(boolean duplex) { public void setUp() throws Exception { this.ncPassword = USER_PASSWORD; this.brokerSubInfo.set(null); + this.serviceFailed.set(null); } @After @@ -110,31 +117,77 @@ public void testAuthSuccess() throws Exception { // automatically on connect so the remote broker will always receive it. However, the // remote broker should only send back its list after the connection is properly authenticated. assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + assertNull(serviceFailed.get()); + DemandForwardingBridge bridge = getActiveBridge(); // Simulate a connection exception and reconnect, we should receive again brokerSubInfo.set(null); localBroker.getNetworkConnectors().get(0).activeBridges().stream() .findFirst().orElseThrow().serviceRemoteException(new Exception()); + // wait for failure + assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10)); + assertTrue(Wait.waitFor(bridge.localBroker::isDisposed,5000,10)); + + // should reconnect again and get updated info + assertTrue(Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(5), 10)); assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); } @Test public void testAuthFailure() throws Exception { this.ncPassword = "badpassword"; - try { - // set a shorter wait time, it won't connect with bad password - doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), - TimeUnit.SECONDS.toMillis(5)); - throw new IllegalStateException("Should have received assertion error with bad password"); - } catch (AssertionError e) { - // expected - } + doSetUpRemoteBroker(true, tempFolder.newFolder(), 0); + doSetUpLocalBroker(true, true, tempFolder.newFolder()); + // Wait for the failure due to authentication + assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10)); + assertTrue(Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty(), + TimeUnit.SECONDS.toMillis(5), 10)); // Because the local broker was not authenticated by the remote broker, the local broker // should not have received back the BrokerSubscriptionInfo assertNull(brokerSubInfo.get()); } + @Test + public void testDuplicateDuplexBridgeFailedAuthIgnored() throws Exception { + Assume.assumeTrue(duplex); + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // everything is good, no error and we got the sync command + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + assertNull(serviceFailed.get()); + + // Start a duplicate bridge with the same configuration but bad password + // so authentication fails. This should not cause a failure with the existing + // bridge because this connection won't be authenticated + DemandForwardingBridge bridge = getActiveBridge(); + this.ncPassword = "badpassword"; + NetworkConnector nc = localBroker.addNetworkConnector(configureLocalNetworkConnector()); + nc.start(); + try { + Thread.sleep(2000); + // Verify bridge is not disposed and still connected + assertFalse(bridge.disposed.get()); + } finally { + nc.stop(); + } + + // try again, this will connect successfully and the broker will detect it's a duplex bridge + // matching the same config and close the other + this.ncPassword = USER_PASSWORD; + nc = localBroker.addNetworkConnector(configureLocalNetworkConnector()); + nc.start(); + try { + // authentication is now correct so the RegionBroker should terminate the other duplex + // bridge as it matches + assertTrue(Wait.waitFor(bridge.disposed::get,5000,10)); + } finally { + nc.stop(); + } + } + @Test public void testRestartSync() throws Exception { doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), @@ -144,12 +197,17 @@ public void testRestartSync() throws Exception { // automatically on connect so the remote broker will always receive it. However, the // remote broker should only send back its list after the connection is properly authenticated. assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + assertNull(serviceFailed.get()); // Restart, should receive again with new connection brokerSubInfo.set(null); restartRemoteBroker(); + // should fail from restart + assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10)); // Wait for the reconnect and receive of BrokerSubInfo + assertTrue(Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(5), 10)); assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); } @@ -159,10 +217,10 @@ public void testDuplicateBrokerInfo() throws Exception { doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), TimeUnit.SECONDS.toMillis(15)); assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + assertNull(serviceFailed.get()); // find the established bridge - DemandForwardingBridge bridge = (DemandForwardingBridge) localBroker.getNetworkConnectors().get(0).activeBridges().stream() - .findFirst().orElseThrow(); + DemandForwardingBridge bridge = getActiveBridge(); // send to one of the brokers (networked brokers will have already received a BrokerInfo) // the duplicate will trigger the bridge connection to close @@ -232,6 +290,9 @@ protected NetworkConnector configureLocalNetworkConnector() throws Exception { URI remoteURI = transportConnectors.get(0).getConnectUri(); String uri = "static:(" + remoteURI + ")"; NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)) { + { + this.setDiscoveryAgent(new DiscoveryAgentFilter(getDiscoveryAgent())); + } @Override protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, DiscoveryEvent event) { @@ -248,6 +309,7 @@ public void onCommand(Object command) { } super.onCommand(command); } + }; return super.createBridge(localTransport, remoteFilter, event); } @@ -285,4 +347,37 @@ protected BrokerService createRemoteBroker(File dataDir, int port) throws Except return brokerService; } + private DemandForwardingBridge getActiveBridge() { + return(DemandForwardingBridge) localBroker.getNetworkConnectors().get(0).activeBridges().stream() + .findFirst().orElseThrow(); + } + + private class DiscoveryAgentFilter implements DiscoveryAgent { + private final DiscoveryAgent agent; + + public DiscoveryAgentFilter(DiscoveryAgent agent) { + this.agent = agent; + } + + public void setDiscoveryListener(DiscoveryListener listener) { + agent.setDiscoveryListener(listener); + } + + public void start() throws Exception { + agent.start(); + } + + public void stop() throws Exception { + agent.stop(); + } + + public void registerService(String name) throws IOException { + agent.registerService(name); + } + + public void serviceFailed(DiscoveryEvent event) throws IOException { + serviceFailed.set(event); + agent.serviceFailed(event); + } + } }