From 6300e433879e76db0d508bcbceaf53ba61ad06f3 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Wed, 10 Jun 2026 19:29:51 -0400 Subject: [PATCH] Refactor network connections to process after ConnectionInfo This refactors the setup for network connections to be processed after receiving ConnectionInfo. Network bridges are established by each broker sending a BrokerInfo command to the other broker to provide remote broker information, but this is done before ConnectionInfo. This commit reworks the connection to capture the BrokerInfo information, but delay processing until after the connection information has been received and processed by broker.addConnection(). The future that was previously added for the connection id is no longer needed and removed. This commit also simplifies durable sync for bridges and prevents a race condition on startup by making sure the initial bridge and the duplex side only send back the BrokerSubscriptionInfo command after fully initialized. --- .../activemq/broker/TransportConnection.java | 137 +++++++++--------- .../activemq/broker/region/RegionBroker.java | 37 +++++ .../DemandForwardingBridgeSupport.java | 14 +- .../DurableSyncNetworkBridgeAuthTest.java | 115 +++++++++++++-- 4 files changed, 217 insertions(+), 86 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 9e771017445..b6fe5488574 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -165,7 +165,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private String duplexNetworkConnectorId; private final long connectedTimestamp; - private final CompletableFuture initialConnectionId = new CompletableFuture<>(); /** * @param taskRunnerFactory - can be null if you want direct dispatch to the transport @@ -854,16 +853,14 @@ public Response processAddConnection(ConnectionInfo info) throws Exception { try { broker.addConnection(context, info); - // Complete the future with the connectionId if we completed - // the broker.addConnection() chain successfully - initialConnectionId.complete(info.getConnectionId()); + // If we completed broker.addConnection() successfully we can now + // continue the required extra setup for any network connections + addNetworkConnection(); } catch (Exception e) { synchronized (brokerConnectionStates) { brokerConnectionStates.remove(info.getConnectionId()); } unregisterConnectionState(info.getConnectionId()); - // complete with the exception - initialConnectionId.completeExceptionally(e); LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}", info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage()); //AMQ-6561 - stop for all exceptions on addConnection @@ -1401,44 +1398,75 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException { throw new IOException("Unexpected extra broker info command received from: " + info.getBrokerId()); } if (info.isSlaveBroker()) { - LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); - } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { + LOG.error("Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); + throw new IOException("Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName()); + } + + // The only thing this method now does is capture the BrokerInfo object and mark as a network connection. + // Actual processing for starting up duplex bridges and for durable sync has been moved until + // after ConnectionInfo has been received. + + // If this is duplex we need to get the ID configured so we can use it + // to close existing connections later that match the same ID + // This will be done inside the RegionBroker + if (info.isNetworkConnection() && info.isDuplexConnection()) { + NetworkBridgeConfiguration config = getNetworkConfiguration(info); + config.setBrokerName(broker.getBrokerName()); + String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); + setDuplexNetworkConnectorId(duplexNetworkConnectorId); + } + + this.brokerInfo = info; + networkConnection = true; + List connectionStates = listConnectionStates(); + for (TransportConnectionState cs : connectionStates) { + cs.getContext().setNetworkConnection(true); + } + return null; + } + + // Process the network connection set up + private void addNetworkConnection() throws Exception { + final BrokerInfo info = this.brokerInfo; + if (info == null || !info.isNetworkConnection()){ + return; + } + + // For a one way bridge we need to respond on bridge creation by sending back the durable + // subs if durable sync is enabled via BrokerSubscriptionInfo command. The bridge is only + // initialized on one broker, so if this is the passive side we know it's initialized and + // we can respond. + // + // For a duplex bridge, we do NOT send back the durable subs. To simplify and ensure + // the bridge is fully initialized, the bridge startup will now handle sending + // BrokerSubscriptionInfo to the remote broker once fully started. + if (!info.isDuplexConnection()) { try { - // register durable sync to be sent after ConnectionInfo has been handled - registerDurableSync(getNetworkConfiguration(info), info); + NetworkBridgeConfiguration config = getNetworkConfiguration(info); + if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { + LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); + // Send back the durable subs as this is a one way bridge + dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); + } } catch (Exception e) { - LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e); - return null; + LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); + throw e; } - } else if (info.isNetworkConnection() && info.isDuplexConnection()) { + } else { + // duplex // so this TransportConnection is the rear end of a network bridge // We have been requested to create a two way pipe ... try { NetworkBridgeConfiguration config = getNetworkConfiguration(info); config.setBrokerName(broker.getBrokerName()); - // register durable sync to be sent after ConnectionInfo has been handled - registerDurableSync(config, info); - - // check for existing duplex connection hanging about - - // We first look if existing network connection already exists for the same broker Id and network connector name - // It's possible in case of brief network fault to have this transport connector side of the connection always active - // and the duplex network connector side wanting to open a new one - // In this case, the old connection must be broken - String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); - CopyOnWriteArrayList connections = this.connector.getConnections(); - synchronized (connections) { - for (TransportConnection c : connections) { - if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { - LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId); - c.stopAsync(); - // better to wait for a bit rather than get connection id already in use and failure to start new bridge - c.getStopped().await(1, TimeUnit.SECONDS); - } - } - setDuplexNetworkConnectorId(duplexNetworkConnectorId); - } + // Note: Durable sync used to be here and was moved to DemandForwardingBridgeSupport + // inside doStartLocalAndRemoteBridges() + + //The logic to clean up existing network connections for the same ID + // has been moved to the RegionBroker where it will check if the broker + // needs to close the connection before trying to create a duplicate connection + Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI()); Transport remoteBridgeTransport = transport; if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { @@ -1462,46 +1490,13 @@ public Response processBrokerInfo(BrokerInfo info) throws IOException { duplexBridge.setCreatedByDuplex(true); duplexBridge.duplexStart(this, brokerInfo, info); LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId); - return null; } catch (TransportDisposedIOException e) { LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId); - return null; } catch (Exception e) { LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e); - return null; + throw e; } } - this.brokerInfo = info; - networkConnection = true; - List connectionStates = listConnectionStates(); - for (TransportConnectionState cs : connectionStates) { - cs.getContext().setNetworkConnection(true); - } - return null; - } - - private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) { - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - // this will complete when the connection id has been set, or immediately if already set - initialConnectionId.whenComplete((connectionId, t) -> { - try { - if (t != null) { - LOG.warn("SyncDurableSubs will be skipped due to error {}", - t.getMessage()); - return; - } - // check connection still registered - if (lookupConnectionState(connectionId) != null) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo( - this.broker.getBrokerService(), config)); - } - } catch (Exception e) { - LOG.error("Failed to respond to network bridge creation from broker {}", - info.getBrokerId(), e); - } - }); - } } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -1707,7 +1702,7 @@ protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConn this.duplexNetworkConnectorId = duplexNetworkConnectorId; } - protected synchronized String getDuplexNetworkConnectorId() { + public synchronized String getDuplexNetworkConnectorId() { return this.duplexNetworkConnectorId; } @@ -1715,7 +1710,7 @@ public boolean isStopping() { return stopping.get(); } - protected CountDownLatch getStopped() { + public CountDownLatch getStopped() { return stopped; } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 2e6ee20497f..7d5de4c184d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -255,6 +256,10 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws throw new InvalidClientIDException("No clientID specified for connection request"); } + // Clean up existing duplex network connection if this is a reconnect attempt + // This was moved from TransportConnection + cleanupExistingDuplexNetworkConnection(context); + ConnectionContext oldContext = null; synchronized (clientIdSet) { @@ -289,6 +294,38 @@ public void addConnection(ConnectionContext context, ConnectionInfo info) throws connections.add(context.getConnection()); } + // We first look if existing network connection already exists for the same broker Id and network connector name + // It's possible in case of brief network fault to have this transport connector side of the connection always active + // and the duplex network connector side wanting to open a new one + // In this case, the old connection must be broken + private void cleanupExistingDuplexNetworkConnection(ConnectionContext context) { + try { + if (context.isNetworkConnection() + && context.getConnection() instanceof TransportConnection) { + final TransportConnection newConn = (TransportConnection) context.getConnection(); + if (newConn.getDuplexNetworkConnectorId() != null) { + for (Connection c : connections) { + if (c instanceof TransportConnection) { + final TransportConnection existingConn = (TransportConnection) c; + if (newConn.getDuplexNetworkConnectorId() + .equals(existingConn.getDuplexNetworkConnectorId())) { + LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", + c, existingConn.getDuplexNetworkConnectorId()); + existingConn.stopAsync(); + // better to wait for a bit rather than get connection id already in use and failure to start new bridge + existingConn.getStopped().await(2, TimeUnit.SECONDS); + break; + } + } + } + } + } + } catch (Exception e) { + LOG.warn("Error cleaning up Duplex connection: {}" , e.getMessage()); + LOG.debug(e.getMessage(), e); + } + } + @Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { String clientId = info.getClientId(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 26a5769ed0d..17eb9c55294 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -477,6 +477,15 @@ private void doStartLocalAndRemoteBridges() { if (safeWaitUntilStarted()) { setupStaticDestinations(); staticDestinationsLatch.countDown(); + + // Send to the remote broker the durable subs if sync is enabled after statup. + // This is done by the initiating side of a bridge as well as by duplex bridges to + // ensure everything is fully initialized before sending. + if (configuration.isSyncDurableSubs() && + remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { + remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService, + configuration)); + } } } catch (Throwable e) { serviceLocalException(e); @@ -599,11 +608,6 @@ protected void startRemoteBridge() throws Exception { brokerInfo.setNetworkProperties(str); brokerInfo.setBrokerId(this.localBrokerId); remoteBroker.oneway(brokerInfo); - if (configuration.isSyncDurableSubs() && - remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService, - configuration)); - } } if (remoteConnectionInfo != null) { remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java index 19c95da29b4..3a99029617e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java @@ -16,10 +16,12 @@ */ package org.apache.activemq.network; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.IOException; import java.net.URI; import java.util.Arrays; import java.util.Collection; @@ -39,8 +41,11 @@ import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.discovery.DiscoveryAgent; +import org.apache.activemq.transport.discovery.DiscoveryListener; import org.apache.activemq.util.Wait; import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -84,6 +89,7 @@ public static Collection data() { private static final String USER_PASSWORD = "password"; private final boolean duplex; private final AtomicReference brokerSubInfo = new AtomicReference<>(); + private final AtomicReference serviceFailed = new AtomicReference<>(); private String ncPassword = USER_PASSWORD; public DurableSyncNetworkBridgeAuthTest(boolean duplex) { @@ -94,6 +100,7 @@ public DurableSyncNetworkBridgeAuthTest(boolean duplex) { public void setUp() throws Exception { this.ncPassword = USER_PASSWORD; this.brokerSubInfo.set(null); + this.serviceFailed.set(null); } @After @@ -110,31 +117,77 @@ public void testAuthSuccess() throws Exception { // automatically on connect so the remote broker will always receive it. However, the // remote broker should only send back its list after the connection is properly authenticated. assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + assertNull(serviceFailed.get()); + DemandForwardingBridge bridge = getActiveBridge(); // Simulate a connection exception and reconnect, we should receive again brokerSubInfo.set(null); localBroker.getNetworkConnectors().get(0).activeBridges().stream() .findFirst().orElseThrow().serviceRemoteException(new Exception()); + // wait for failure + assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10)); + assertTrue(Wait.waitFor(bridge.localBroker::isDisposed,5000,10)); + + // should reconnect again and get updated info + assertTrue(Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(5), 10)); assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); } @Test public void testAuthFailure() throws Exception { this.ncPassword = "badpassword"; - try { - // set a shorter wait time, it won't connect with bad password - doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), - TimeUnit.SECONDS.toMillis(5)); - throw new IllegalStateException("Should have received assertion error with bad password"); - } catch (AssertionError e) { - // expected - } + doSetUpRemoteBroker(true, tempFolder.newFolder(), 0); + doSetUpLocalBroker(true, true, tempFolder.newFolder()); + // Wait for the failure due to authentication + assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10)); + assertTrue(Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty(), + TimeUnit.SECONDS.toMillis(5), 10)); // Because the local broker was not authenticated by the remote broker, the local broker // should not have received back the BrokerSubscriptionInfo assertNull(brokerSubInfo.get()); } + @Test + public void testDuplicateDuplexBridgeFailedAuthIgnored() throws Exception { + Assume.assumeTrue(duplex); + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // everything is good, no error and we got the sync command + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + assertNull(serviceFailed.get()); + + // Start a duplicate bridge with the same configuration but bad password + // so authentication fails. This should not cause a failure with the existing + // bridge because this connection won't be authenticated + DemandForwardingBridge bridge = getActiveBridge(); + this.ncPassword = "badpassword"; + NetworkConnector nc = localBroker.addNetworkConnector(configureLocalNetworkConnector()); + nc.start(); + try { + Thread.sleep(2000); + // Verify bridge is not disposed and still connected + assertFalse(bridge.disposed.get()); + } finally { + nc.stop(); + } + + // try again, this will connect successfully and the broker will detect it's a duplex bridge + // matching the same config and close the other + this.ncPassword = USER_PASSWORD; + nc = localBroker.addNetworkConnector(configureLocalNetworkConnector()); + nc.start(); + try { + // authentication is now correct so the RegionBroker should terminate the other duplex + // bridge as it matches + assertTrue(Wait.waitFor(bridge.disposed::get,5000,10)); + } finally { + nc.stop(); + } + } + @Test public void testRestartSync() throws Exception { doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), @@ -144,12 +197,17 @@ public void testRestartSync() throws Exception { // automatically on connect so the remote broker will always receive it. However, the // remote broker should only send back its list after the connection is properly authenticated. assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + assertNull(serviceFailed.get()); // Restart, should receive again with new connection brokerSubInfo.set(null); restartRemoteBroker(); + // should fail from restart + assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10)); // Wait for the reconnect and receive of BrokerSubInfo + assertTrue(Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(5), 10)); assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); } @@ -159,10 +217,10 @@ public void testDuplicateBrokerInfo() throws Exception { doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), TimeUnit.SECONDS.toMillis(15)); assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + assertNull(serviceFailed.get()); // find the established bridge - DemandForwardingBridge bridge = (DemandForwardingBridge) localBroker.getNetworkConnectors().get(0).activeBridges().stream() - .findFirst().orElseThrow(); + DemandForwardingBridge bridge = getActiveBridge(); // send to one of the brokers (networked brokers will have already received a BrokerInfo) // the duplicate will trigger the bridge connection to close @@ -232,6 +290,9 @@ protected NetworkConnector configureLocalNetworkConnector() throws Exception { URI remoteURI = transportConnectors.get(0).getConnectUri(); String uri = "static:(" + remoteURI + ")"; NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)) { + { + this.setDiscoveryAgent(new DiscoveryAgentFilter(getDiscoveryAgent())); + } @Override protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, DiscoveryEvent event) { @@ -248,6 +309,7 @@ public void onCommand(Object command) { } super.onCommand(command); } + }; return super.createBridge(localTransport, remoteFilter, event); } @@ -285,4 +347,37 @@ protected BrokerService createRemoteBroker(File dataDir, int port) throws Except return brokerService; } + private DemandForwardingBridge getActiveBridge() { + return(DemandForwardingBridge) localBroker.getNetworkConnectors().get(0).activeBridges().stream() + .findFirst().orElseThrow(); + } + + private class DiscoveryAgentFilter implements DiscoveryAgent { + private final DiscoveryAgent agent; + + public DiscoveryAgentFilter(DiscoveryAgent agent) { + this.agent = agent; + } + + public void setDiscoveryListener(DiscoveryListener listener) { + agent.setDiscoveryListener(listener); + } + + public void start() throws Exception { + agent.start(); + } + + public void stop() throws Exception { + agent.stop(); + } + + public void registerService(String name) throws IOException { + agent.registerService(name); + } + + public void serviceFailed(DiscoveryEvent event) throws IOException { + serviceFailed.set(event); + agent.serviceFailed(event); + } + } }