From 9a9c5653810e45328befd8e054652ade0deb8225 Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Tue, 26 Aug 2025 09:39:06 -0500 Subject: [PATCH] [AMQ-9692] Support destination gc sweep of destinations with only wildcard consumers --- .../broker/region/AbstractRegion.java | 13 +- .../broker/region/BaseDestination.java | 78 +++++-- .../activemq/broker/region/Destination.java | 1 + .../broker/region/DestinationFilter.java | 5 + .../activemq/broker/region/TempQueue.java | 2 +- .../broker/region/TempQueueRegion.java | 2 +- .../broker/region/policy/PolicyEntry.java | 12 + .../activemq/java/JavaPolicyEntryTest.java | 48 ++-- .../broker/region/DestinationGCTest.java | 59 +++++ .../region/DestinationIsActiveTest.java | 207 ++++++++++++++++++ 10 files changed, 376 insertions(+), 51 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationIsActiveTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index af77b1d4498..90acbe8d416 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -260,16 +260,21 @@ protected List addSubscriptionsForDestination(ConnectionContext co } @Override - public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) - throws Exception { - + public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { // No timeout.. then try to shut down right way, fails if there are // current subscribers. if (timeout == 0) { for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) { Subscription sub = iter.next(); if (sub.matches(destination) ) { - throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub); + if(sub.isWildcard()) { + var dest = destinations.get(destination); + if(dest != null && dest.isGcWithOnlyWildcardConsumers()) { + continue; + } + } else { + throw new JMSException("Destination: " + destination + " still has an active subscription: " + sub); + } } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 82ed8784a26..c835215bd7b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import jakarta.jms.ResourceAllocationException; @@ -105,7 +106,8 @@ public abstract class BaseDestination implements Destination { private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; private boolean gcIfInactive; private boolean gcWithNetworkConsumers; - private long lastActiveTime=0l; + private boolean gcWithOnlyWildcardConsumers; + private long lastActiveTime = 0L; private boolean reduceMemoryFootprint = false; protected final Scheduler scheduler; private boolean disposed = false; @@ -311,12 +313,37 @@ public final MessageStore getMessageStore() { @Override public boolean isActive() { - boolean isActive = destinationStatistics.getConsumers().getCount() > 0 || - destinationStatistics.getProducers().getCount() > 0; - if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() > 0) { - isActive = hasRegularConsumers(getConsumers()); + // if we have producers then we are active + if (destinationStatistics.getProducers().getCount() > 0) { + return true; } - return isActive; + + // Check if we have active consumers that should prevent GC + if (destinationStatistics.getConsumers().getCount() > 0) { + // if we have consumers and both gcWithNetwork and gcOnlyWildcard consumers + // are false we can just return true, otherwise we need to check each consumer + return (!isGcWithNetworkConsumers() && !isGcWithOnlyWildcardConsumers()) || + hasActiveConsumers(); + } + + return false; + } + + protected Predicate canGcConsumer = subscription -> { + // if isGcWithNetworkConsumers() is true and this is a network subscription then we can GC + boolean canGcNetwork = isGcWithNetworkConsumers() && subscription.getConsumerInfo().isNetworkSubscription(); + // if isGcWithOnlyWildcardConsumers() is true and this is a wildcard then we can GC + return canGcNetwork || (isGcWithOnlyWildcardConsumers() && subscription.isWildcard()); + }; + + protected boolean hasActiveConsumers() { + final List consumers = getConsumers(); + for (Subscription subscription: consumers) { + if (!canGcConsumer.test(subscription)) { + return true; + } + } + return false; } @Override @@ -824,19 +851,37 @@ public boolean isGcWithNetworkConsumers() { return gcWithNetworkConsumers; } + /** + * Indicate if it is ok to gc destinations that have only wildcard consumers + * @param gcWithOnlyWildcardConsumers + */ + public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) { + this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers; + } + + public boolean isGcWithOnlyWildcardConsumers() { + return gcWithOnlyWildcardConsumers; + } + @Override public void markForGC(long timeStamp) { - if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false - && destinationStatistics.getMessages().getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) { + if (isGcIfInactive() + && this.lastActiveTime == 0 + && destinationStatistics.getMessages().getCount() == 0 + && getInactiveTimeoutBeforeGC() > 0L + && !isActive()) { this.lastActiveTime = timeStamp; } } @Override public boolean canGC() { - boolean result = false; - final long currentLastActiveTime = this.lastActiveTime; - if (isGcIfInactive() && currentLastActiveTime != 0l && destinationStatistics.getMessages().getCount() == 0L ) { + var result = false; + final var currentLastActiveTime = this.lastActiveTime; + if (isGcIfInactive() + && currentLastActiveTime != 0L + && destinationStatistics.getMessages().getCount() == 0L + && !isActive()) { if ((System.currentTimeMillis() - currentLastActiveTime) >= getInactiveTimeoutBeforeGC()) { result = true; } @@ -893,17 +938,6 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic @Override public abstract List getConsumers(); - protected boolean hasRegularConsumers(List consumers) { - boolean hasRegularConsumers = false; - for (Subscription subscription: consumers) { - if (!subscription.getConsumerInfo().isNetworkSubscription()) { - hasRegularConsumers = true; - break; - } - } - return hasRegularConsumers; - } - public ConnectionContext createConnectionContext() { ConnectionContext answer = new ConnectionContext(); answer.setBroker(this.broker); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 22ba14894b7..2901fd0e18a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -267,4 +267,5 @@ public interface Destination extends Service, Task, Message.MessageDestination { void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatisticsEnabled); + boolean isGcWithOnlyWildcardConsumers(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 1ab96560ac2..9cf29d14b40 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -429,6 +429,11 @@ public void setAdvancedMessageStatisticsEnabled(boolean advancedMessageStatistic next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled); } + @Override + public boolean isGcWithOnlyWildcardConsumers() { + return next.isGcWithOnlyWildcardConsumers(); + } + public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { if (next instanceof DestinationFilter) { DestinationFilter filter = (DestinationFilter) next; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java index 17eeb056472..40dbcb4b4bc 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java @@ -33,7 +33,7 @@ * * */ -public class TempQueue extends Queue{ +public class TempQueue extends Queue { private static final Logger LOG = LoggerFactory.getLogger(TempQueue.class); private final ActiveMQTempDestination tempDest; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java index c6bf7320419..97c54b35ddd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java @@ -58,7 +58,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des super.removeDestination(context, destination, timeout); } - + /* * For a Queue, dispatch order is imperative to match acks, so the dispatch is deferred till * the notification to ensure that the subscription chosen by the master is used. diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 8ae052574bc..96c236afdc6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -99,6 +99,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean prioritizedMessages; private boolean allConsumersExclusiveByDefault; private boolean gcInactiveDestinations; + private boolean gcWithOnlyWildcardConsumers; private boolean gcWithNetworkConsumers; private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC; private boolean reduceMemoryFootprint; @@ -263,6 +264,9 @@ public void baseUpdate(BaseDestination destination, Set includedProperti if (isUpdate("gcInactiveDestinations", includedProperties)) { destination.setGcIfInactive(isGcInactiveDestinations()); } + if (isUpdate("gcWithOnlyWildcardConsumers", includedProperties)) { + destination.setGcWithOnlyWildcardConsumers(isGcWithOnlyWildcardConsumers()); + } if (isUpdate("gcWithNetworkConsumers", includedProperties)) { destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers()); } @@ -1082,6 +1086,14 @@ public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) { this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC; } + public void setGcWithOnlyWildcardConsumers(boolean gcWithOnlyWildcardConsumers) { + this.gcWithOnlyWildcardConsumers = gcWithOnlyWildcardConsumers; + } + + public boolean isGcWithOnlyWildcardConsumers() { + return gcWithOnlyWildcardConsumers; + } + public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) { this.gcWithNetworkConsumers = gcWithNetworkConsumers; } diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java index bc92505d7e9..9c22ec9d98b 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java @@ -662,7 +662,7 @@ private void testAllQueuePropertiesAppliedFilter(Set properties) throws //initial config setAllDestPolicyProperties(entry, true, true, 10, - 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true, 30, true, true, true, true, true, true, true, true, true); setAllQueuePolicyProperties(entry, 10000, true, true, true, true, 100, 100, true, true); @@ -675,7 +675,7 @@ private void testAllQueuePropertiesAppliedFilter(Set properties) throws //validate config assertAllDestPolicyProperties(getQueue("Before"), true, true, 10, - 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true, 30, true, true, true,true, true, true, true, true, true); assertAllQueuePolicyProperties(getQueue("Before"), 10000, true, true, true, true, 100, 100, true, true); @@ -683,7 +683,7 @@ private void testAllQueuePropertiesAppliedFilter(Set properties) throws //change config setAllDestPolicyProperties(entry, false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false,false, false, false, false, false, false); setAllQueuePolicyProperties(entry, 100000, false, false, false, false, 1000, 1000, false, false); @@ -692,14 +692,14 @@ private void testAllQueuePropertiesAppliedFilter(Set properties) throws TimeUnit.SECONDS.sleep(SLEEP); assertAllDestPolicyProperties(getQueue("Before"), false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false,false, false, false, false, false, false); assertAllQueuePolicyProperties(getQueue("Before"), 100000, false, false, false, false, 1000, 1000, false, false); //check new dest assertAllDestPolicyProperties(getQueue("After"), false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false, false, false, false, false, false, false); assertAllQueuePolicyProperties(getQueue("After"), 100000, false, false, false, false, 1000, 1000, false, false); @@ -713,7 +713,7 @@ private void testAllTopicPropertiesAppliedFilter(Set properties) throws //initial config setAllDestPolicyProperties(entry, true, true, 10, - 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true, 30, true, true, true, true, true, true, true, true, true); setAllTopicPolicyProperties(entry, 10000, true); @@ -725,14 +725,14 @@ private void testAllTopicPropertiesAppliedFilter(Set properties) throws //validate config assertAllDestPolicyProperties(getTopic("Before"), true, true, 10, - 100, 200, 1000, 400, 40, 30, true, true, 1000, true, true, + 100, 200, 1000, 400, 40, 30, true, true, true, 1000, true, true, 30, true, true, true, true, true, true, true, true, true); assertAllTopicPolicyProperties(getTopic("Before"), 10000, true); //change config setAllDestPolicyProperties(entry, false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false, false, false, false, false, false, false); setAllTopicPolicyProperties(entry, 100000, false); @@ -740,13 +740,13 @@ private void testAllTopicPropertiesAppliedFilter(Set properties) throws TimeUnit.SECONDS.sleep(SLEEP); assertAllDestPolicyProperties(getTopic("Before"), false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false, false, false, false, false, false, false); assertAllTopicPolicyProperties(getTopic("Before"), 100000, false); //check new dest assertAllDestPolicyProperties(getTopic("After"), false, false, 100, - 1000, 2000, 10000, 4000, 400, 300, false, false, 1000, false, false, + 1000, 2000, 10000, 4000, 400, 300, false, false, false, 1000, false, false, 300, false, false, false, false, false, false, false, false, false); assertAllTopicPolicyProperties(getTopic("After"), 100000, false); } @@ -820,6 +820,7 @@ private Set getDestPropertySet() { properties.add("cursorMemoryHighWaterMark"); properties.add("storeUsageHighWaterMark"); properties.add("gcInactiveDestinations"); + properties.add("gcWithOnlyWildcardConsumers"); properties.add("gcWithNetworkConsumers"); properties.add("inactiveTimeoutBeforeGC"); properties.add("reduceMemoryFootprint"); @@ -862,12 +863,12 @@ private void setAllTopicPolicyProperties(PolicyEntry entry, long memoryLimit, bo private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowControl, boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize, int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark, - int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers, - long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore, - int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery, - boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, - boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory, - boolean sendAdvisoryIfNoConsumers) { + int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers, + boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint, + boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, + boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, + boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull, + boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) { entry.setProducerFlowControl(producerFlowControl); entry.setAlwaysRetroactive(alwaysRetroactive); @@ -879,6 +880,7 @@ private void setAllDestPolicyProperties(PolicyEntry entry, boolean producerFlowC entry.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark); entry.setStoreUsageHighWaterMark(storeUsageHighWaterMark); entry.setGcInactiveDestinations(gcInactiveDestinations); + entry.setGcWithOnlyWildcardConsumers(gcWithOnlyWildcardConsumers); entry.setGcWithNetworkConsumers(gcWithNetworkConsumers); entry.setInactiveTimeoutBeforeGC(inactiveTimeoutBeforeGC); entry.setReduceMemoryFootprint(reduceMemoryFootprint); @@ -920,13 +922,12 @@ private void assertAllTopicPolicyProperties(Topic topic, long memoryLimit, boole private void assertAllDestPolicyProperties(BaseDestination dest, boolean producerFlowControl, boolean alwaysRetroactive, long blockedProducerWarningInterval, int maxPageSize, int maxBrowsePageSize, long minimumMessageSize, int maxExpirePageSize, int cursorMemoryHighWaterMark, - int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithNetworkConsumers, - long inactiveTimeoutBeforeGC,boolean reduceMemoryFootprint, boolean doOptimizeMessageStore, - int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, boolean advisoryForDelivery, - boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, boolean advisoryForSlowConsumers, - boolean advisoryForFastProducers, boolean advisoryWhenFull, boolean includeBodyForAdvisory, - boolean sendAdvisoryIfNoConsumers) { - + int storeUsageHighWaterMark, boolean gcInactiveDestinations, boolean gcWithOnlyWildcardConsumers, + boolean gcWithNetworkConsumers, long inactiveTimeoutBeforeGC, boolean reduceMemoryFootprint, + boolean doOptimizeMessageStore, int optimizeMessageStoreInFlightLimit, boolean advisoryForConsumed, + boolean advisoryForDelivery, boolean advisoryForDispatched, boolean advisoryForDiscardingMessages, + boolean advisoryForSlowConsumers, boolean advisoryForFastProducers, boolean advisoryWhenFull, + boolean includeBodyForAdvisory, boolean sendAdvisoryIfNoConsumers) { assertEquals(producerFlowControl, dest.isProducerFlowControl()); assertEquals(alwaysRetroactive, dest.isAlwaysRetroactive()); @@ -938,6 +939,7 @@ private void assertAllDestPolicyProperties(BaseDestination dest, boolean produce assertEquals(cursorMemoryHighWaterMark, dest.getCursorMemoryHighWaterMark()); assertEquals(storeUsageHighWaterMark, dest.getStoreUsageHighWaterMark()); assertEquals(gcInactiveDestinations, dest.isGcIfInactive()); + assertEquals(gcWithOnlyWildcardConsumers, dest.isGcWithOnlyWildcardConsumers()); assertEquals(gcWithNetworkConsumers, dest.isGcWithNetworkConsumers()); assertEquals(inactiveTimeoutBeforeGC, dest.getInactiveTimeoutBeforeGC()); assertEquals(reduceMemoryFootprint, dest.isReduceMemoryFootprint()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java index f57c9c8feee..33674e278a4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java @@ -43,6 +43,8 @@ import org.apache.activemq.test.annotations.ParallelTest; import org.junit.experimental.categories.Category; +import java.util.concurrent.atomic.AtomicInteger; + @Category(ParallelTest.class) public class DestinationGCTest { @@ -50,6 +52,8 @@ public class DestinationGCTest { private final ActiveMQQueue queue = new ActiveMQQueue("TEST"); private final ActiveMQQueue otherQueue = new ActiveMQQueue("TEST-OTHER"); + private final ActiveMQQueue wildcardQueueA = new ActiveMQQueue("TEST.FOO.A"); + private final ActiveMQQueue wildcardQueueB = new ActiveMQQueue("TEST.FOO.B"); private BrokerService brokerService; @@ -71,6 +75,7 @@ public void tearDown() throws Exception { protected BrokerService createBroker() throws Exception { PolicyEntry entry = new PolicyEntry(); entry.setGcInactiveDestinations(true); + entry.setGcWithOnlyWildcardConsumers(true); entry.setInactiveTimeoutBeforeGC(3000); PolicyMap map = new PolicyMap(); map.setDefaultEntry(entry); @@ -114,6 +119,60 @@ public boolean isSatisified() throws Exception { connection.close(); } + @Test + public void testDestinationGCWithOnlyWildcardConsumers() throws Exception { + assertEquals(1, brokerService.getAdminView().getQueues().length); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + + final AtomicInteger receivedCount = new AtomicInteger(0); + + try(Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try(var producerA = session.createProducer(wildcardQueueA)) { + producerA.send(session.createTextMessage("Test first step queueA")); + } + + try(var producerB = session.createProducer(wildcardQueueB)) { + producerB.send(session.createTextMessage("Test first step queueB")); + } + + MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.FOO.*")); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + receivedCount.incrementAndGet(); + } + }); + + connection.start(); + + // Confirm queues are gc'd + assertTrue("After GC runs there should be one Queue (count=" + brokerService.getAdminView().getQueues().length + ")", Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getQueues().length == 1; + } + }, 30000, 1000)); + + assertEquals(Integer.valueOf(2), Integer.valueOf(receivedCount.get())); + + // Confirm wild-card consumer is able to stay active after zero matching destinations + try(var producer = session.createProducer(wildcardQueueA)) { + producer.send(session.createTextMessage("Test second step queueA")); + } + + // Confirm queues are gc'd + assertTrue("After GC runs there should be one Queue (count=" + brokerService.getAdminView().getQueues().length + ")", Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getQueues().length == 1; + } + }, 30000, 1000)); + assertEquals(Integer.valueOf(3), Integer.valueOf(receivedCount.get())); + } + } + @Test(timeout = 60000) public void testDestinationGc() throws Exception { assertEquals(1, brokerService.getAdminView().getQueues().length); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationIsActiveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationIsActiveTest.java new file mode 100644 index 00000000000..7027288dd59 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationIsActiveTest.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import static org.junit.Assert.*; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.test.annotations.ParallelTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * [AMQ-9692] Support garbage collecting destinations + * that have a wildcard-only subscription. + * + * This test suite confirms the logic in the + * BaseDestination.isActive() method to ensure + * destinations are not accidentally deleted due + * to incorrect logic combination of PolicyEntr + * config flag and status of current subscriptions. + * + * appC = normal application consumer + * netC = network consumer + * wildC = wildcard consumer + */ +@Category(ParallelTest.class) +@RunWith(Parameterized.class) +public class DestinationIsActiveTest { + + private static BrokerService brokerService; + private static final AtomicInteger counter = new AtomicInteger(0); + + @BeforeClass + public static void beforeClass() throws Exception { + brokerService = createBroker(); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @AfterClass + public static void afterClass() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } + + @Parameterized.Parameters(name = "gcNC={0} gcWC={1} appC={2} netC={3} wildC={4} exp={5}") // Optional name attribute for better test reporting + public static Collection data() { + return Arrays.asList(new Object[][] { + // Simple app consumer + { false, false, false, false, false, false }, + { false, true, false, false, false, false }, + { true, false, false, false, false, false }, + { true, true, false, false, false, false }, + { false, false, true, false, false, true }, + { false, true, true, false, false, true }, + { true, false, true, false, false, true }, + { true, true, true, false, false, true }, + + // Network consumer + { false, false, false, true, false, true }, + { false, true, false, true, false, true }, + { true, false, false, true, false, false }, + { true, true, false, true, false, false }, + { false, false, true, true, false, true }, + { false, true, true, true, false, true }, + { true, false, true, true, false, true }, + { true, true, true, true, false, true }, + + // Wildcard consumer + { false, false, false, false, true, true }, + { false, true, false, false, true, false }, + { true, false, false, false, true, true }, + { true, true, false, false, true, false }, + { false, false, true, false, true, true }, + { false, true, true, false, true, true }, + { true, false, true, false, true, true }, + { true, true, true, false, true, true } + }); + } + + private final boolean gcWithNetworkConsumersEnabled; + private final boolean gcWithOnlyWildcardConsumersEnabled; + private final boolean appConsumerActive; + private final boolean networkConsumerActive; + private final boolean wildcardConsumerActive; + private final boolean activeExpected; + + public DestinationIsActiveTest(boolean gcWithNetworkConsumersEnabled, boolean gcWithOnlyWildcardConsumersEnabled, boolean appConsumerActive, boolean networkConsumerActive, boolean wildcardConsumerActive, boolean activeExpected) { + this.gcWithNetworkConsumersEnabled = gcWithNetworkConsumersEnabled; + this.gcWithOnlyWildcardConsumersEnabled = gcWithOnlyWildcardConsumersEnabled; + this.appConsumerActive = appConsumerActive; + this.networkConsumerActive = networkConsumerActive; + this.wildcardConsumerActive = wildcardConsumerActive; + this.activeExpected = activeExpected; + } + + @Test + public void testDestinationIsActive() throws Exception { + var queueName = "amq.gc." + counter.incrementAndGet(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setGcInactiveDestinations(true); + policyEntry.setGcWithOnlyWildcardConsumers(gcWithOnlyWildcardConsumersEnabled); + policyEntry.setGcWithNetworkConsumers(gcWithNetworkConsumersEnabled); + policyEntry.setInactiveTimeoutBeforeGC(3000L); + policyEntry.setQueue(queueName); + brokerService.getDestinationPolicy().setPolicyEntries(List.of(policyEntry)); + + brokerService.getAdminView().addQueue(queueName); + var activemqDestination = new ActiveMQQueue(queueName); + var queue = (Queue)brokerService.getDestination(activemqDestination); + + assertFalse(queue.isActive()); + + if(appConsumerActive) { + queue.addSubscription(null, new MockQueueSubscription(activemqDestination, false, false)); + } + if(networkConsumerActive) { + queue.addSubscription(null, new MockQueueSubscription(activemqDestination, true, false)); + } + if(wildcardConsumerActive) { + queue.addSubscription(null, new MockQueueSubscription(activemqDestination,false, true)); + } + + assertEquals(activeExpected, queue.isActive()); + + // Test parameter config safety check + // if an appConsumer is active, queue must *always* be active + if(appConsumerActive) { + assertTrue(queue.isActive()); + } + brokerService.getAdminView().removeQueue(queueName); + } + + protected static BrokerService createBroker() throws Exception { + PolicyMap map = new PolicyMap(); + map.setDefaultEntry(new PolicyEntry()); + + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(true); + broker.setSchedulePeriodForDestinationPurge(100_000_000); + broker.setSchedulerSupport(true); + broker.setMaxPurgedDestinationsPerSweep(1); + broker.setDestinationPolicy(map); + return broker; + } + + static class MockConsumerInfo extends ConsumerInfo { + + private final boolean networkSubscription; + + public MockConsumerInfo(ActiveMQDestination activeMQDestination, boolean networkSubscription) { + setDestination(activeMQDestination); + this.networkSubscription = networkSubscription; + } + + @Override + public boolean isNetworkSubscription() { + return this.networkSubscription; + } + } + + static class MockQueueSubscription extends QueueSubscription { + + private final boolean wildCardSubscription; + + public MockQueueSubscription(ActiveMQDestination activemqDestination, boolean networkSubscription, boolean wildCardSubscription) throws Exception { + super(brokerService.getBroker(), null, null, new MockConsumerInfo(activemqDestination, networkSubscription)); + this.wildCardSubscription = wildCardSubscription; + } + + @Override + public boolean isWildcard() { + return this.wildCardSubscription; + } + } +}