|
49 | 49 | import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; |
50 | 50 | import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; |
51 | 51 | import org.apache.activemq.util.Wait; |
52 | | -import org.apache.activemq.util.Wait.Condition; |
53 | 52 | import org.junit.After; |
54 | 53 | import org.junit.Assume; |
55 | 54 | import org.junit.Before; |
@@ -83,7 +82,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { |
83 | 82 | private final FLOW flow; |
84 | 83 |
|
85 | 84 | @Rule |
86 | | - public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS); |
| 85 | + public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS); |
87 | 86 |
|
88 | 87 | @Parameters |
89 | 88 | public static Collection<Object[]> data() { |
@@ -531,7 +530,6 @@ public void testAddOnlineSubscriptionsWithBridgeOffline() throws Exception { |
531 | 530 | session1.createDurableSubscriber(topic, "sub3"); |
532 | 531 | session1.createDurableSubscriber(excludeTopic, "sub-exclude"); |
533 | 532 |
|
534 | | - Thread.sleep(1000); |
535 | 533 | assertNCDurableSubsCount(broker2, topic, 1); |
536 | 534 | assertNCDurableSubsCount(broker2, excludeTopic, 0); |
537 | 535 |
|
@@ -570,13 +568,10 @@ public void testAddOnlineSubscriptionsTwoBridges() throws Exception { |
570 | 568 | secondConnector.start(); |
571 | 569 |
|
572 | 570 | //Make sure both bridges are connected |
573 | | - assertTrue(Wait.waitFor(new Condition() { |
574 | | - @Override |
575 | | - public boolean isSatisified() throws Exception { |
576 | | - return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 && |
577 | | - localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1; |
578 | | - } |
579 | | - }, 10000, 500)); |
| 571 | + assertTrue(Wait.waitFor(() -> |
| 572 | + localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 && |
| 573 | + localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1, |
| 574 | + TimeUnit.SECONDS.toMillis(10), 500)); |
580 | 575 |
|
581 | 576 | //Make sure NC durables exist for both bridges |
582 | 577 | assertNCDurableSubsCount(broker2, topic2, 1); |
@@ -637,13 +632,7 @@ public void testVirtualDestSubForceDurableSync() throws Exception { |
637 | 632 | final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination( |
638 | 633 | new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); |
639 | 634 |
|
640 | | - assertTrue(Wait.waitFor(new Condition() { |
641 | | - |
642 | | - @Override |
643 | | - public boolean isSatisified() throws Exception { |
644 | | - return remoteDestStatistics2.getMessages().getCount() == 501; |
645 | | - } |
646 | | - })); |
| 635 | + assertTrue(Wait.waitFor(() -> remoteDestStatistics2.getMessages().getCount() == 501)); |
647 | 636 |
|
648 | 637 | } |
649 | 638 |
|
@@ -723,8 +712,36 @@ protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, |
723 | 712 | included = new ActiveMQTopic(testTopicName); |
724 | 713 | doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0); |
725 | 714 | doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); |
726 | | - //Give time for advisories to propagate |
727 | | - Thread.sleep(1000); |
| 715 | + //Wait for the bridge to be fully started (advisory consumers registered). |
| 716 | + //Note: activeBridges().size() == 1 is NOT sufficient because bridges are added |
| 717 | + //to the map before start() completes asynchronously. We must wait for the |
| 718 | + //startedLatch which counts down after advisory consumers are registered. |
| 719 | + if (startNetworkConnector) { |
| 720 | + waitForBridgeFullyStarted(); |
| 721 | + } |
| 722 | + } |
| 723 | + |
| 724 | + private void waitForBridgeFullyStarted() throws Exception { |
| 725 | + // Wait for the local bridge to be fully started (advisory consumers registered) |
| 726 | + assertTrue("Local bridge should be fully started", Wait.waitFor(() -> { |
| 727 | + if (localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { |
| 728 | + return false; |
| 729 | + } |
| 730 | + final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); |
| 731 | + if (bridge instanceof DemandForwardingBridgeSupport) { |
| 732 | + return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; |
| 733 | + } |
| 734 | + return true; |
| 735 | + }, TimeUnit.SECONDS.toMillis(10), 100)); |
| 736 | + |
| 737 | + // Also wait for the duplex bridge on the remote broker to be fully started. |
| 738 | + // The duplex connector creates a separate DemandForwardingBridge on the remote side |
| 739 | + // that also needs its advisory consumers registered before it can process events. |
| 740 | + assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> { |
| 741 | + final DemandForwardingBridge duplexBridge = findDuplexBridge( |
| 742 | + remoteBroker.getTransportConnectors().get(0)); |
| 743 | + return duplexBridge != null && duplexBridge.startedLatch.getCount() == 0; |
| 744 | + }, TimeUnit.SECONDS.toMillis(10), 100)); |
728 | 745 | } |
729 | 746 |
|
730 | 747 | protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { |
@@ -757,12 +774,12 @@ protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetwor |
757 | 774 | localConnection.start(); |
758 | 775 |
|
759 | 776 | if (startNetworkConnector) { |
760 | | - Wait.waitFor(new Condition() { |
761 | | - @Override |
762 | | - public boolean isSatisified() throws Exception { |
763 | | - return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1; |
764 | | - } |
765 | | - }, 5000, 500); |
| 777 | + // Best-effort wait for the bridge to appear. Do NOT use assertTrue here |
| 778 | + // because some tests restart localBroker before remoteBroker is running, |
| 779 | + // relying on the bridge connecting later when remoteBroker restarts. |
| 780 | + // Tests that need the bridge to be fully started call assertBridgeStarted() explicitly. |
| 781 | + Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, |
| 782 | + TimeUnit.SECONDS.toMillis(10), 500); |
766 | 783 | } |
767 | 784 | localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
768 | 785 |
|
@@ -881,19 +898,16 @@ protected BrokerService createRemoteBroker(File dataDir, int port) throws Except |
881 | 898 | protected void waitForSubscriptionInactive(final BrokerService brokerService, |
882 | 899 | final ActiveMQTopic topic, |
883 | 900 | final String subName) throws Exception { |
884 | | - assertTrue("Subscription should become inactive", Wait.waitFor(new Condition() { |
885 | | - @Override |
886 | | - public boolean isSatisified() throws Exception { |
887 | | - List<org.apache.activemq.broker.region.DurableTopicSubscription> subs = getSubscriptions(brokerService, topic); |
888 | | - for (org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) { |
889 | | - if (sub.getSubscriptionKey().getSubscriptionName().equals(subName)) { |
890 | | - return !sub.isActive(); |
891 | | - } |
| 901 | + assertTrue("Subscription should become inactive", Wait.waitFor(() -> { |
| 902 | + final List<org.apache.activemq.broker.region.DurableTopicSubscription> subs = getSubscriptions(brokerService, topic); |
| 903 | + for (final org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) { |
| 904 | + if (sub.getSubscriptionKey().getSubscriptionName().equals(subName)) { |
| 905 | + return !sub.isActive(); |
892 | 906 | } |
893 | | - // If subscription doesn't exist, it's considered inactive |
894 | | - return true; |
895 | 907 | } |
896 | | - }, 5000, 100)); |
| 908 | + // If subscription doesn't exist, it's considered inactive |
| 909 | + return true; |
| 910 | + }, TimeUnit.SECONDS.toMillis(10), 100)); |
897 | 911 | } |
898 | 912 |
|
899 | 913 | } |
0 commit comments