diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java index 58d94ece3f800..a9e41d0113934 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -49,6 +49,7 @@ import org.apache.camel.support.EventDrivenPollingConsumer; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.cache.DefaultConsumerCache; +import org.apache.camel.support.cache.EmptyConsumerCache; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.URISupport; import org.slf4j.Logger; @@ -544,9 +545,14 @@ protected static String resolveScheme(Exchange exchange, String uri) { @Override protected void doBuild() throws Exception { if (consumerCache == null) { - // create consumer cache if we use dynamic expressions for computing the endpoints to poll - consumerCache = new DefaultConsumerCache(this, camelContext, cacheSize); - LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize); + if (cacheSize < 0) { + consumerCache = new EmptyConsumerCache(this, camelContext); + LOG.debug("PollEnrich {} is not using ConsumerCache", this); + } else { + // create consumer cache if we use dynamic expressions for computing the endpoints to poll + consumerCache = new DefaultConsumerCache(this, camelContext, cacheSize); + LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize); + } } if (aggregationStrategy == null) { aggregationStrategy = new CopyAggregationStrategy(); diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/EmptyConsumerCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/EmptyConsumerCacheTest.java new file mode 100644 index 0000000000000..be89c3d7886b8 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/EmptyConsumerCacheTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.PollingConsumer; +import org.apache.camel.spi.ConsumerCache; +import org.apache.camel.support.cache.EmptyConsumerCache; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class EmptyConsumerCacheTest extends ContextTestSupport { + + @Test + public void testEmptyCache() { + ConsumerCache cache = new EmptyConsumerCache(this, context); + cache.start(); + + assertEquals(0, cache.size(), "Size should be 0"); + + // we never cache any consumers + Endpoint e = context.getEndpoint("direct:queue:1"); + PollingConsumer c = cache.acquirePollingConsumer(e); + + assertEquals(0, cache.size(), "Size should be 0"); + + cache.releasePollingConsumer(e, c); + + assertEquals(0, cache.size(), "Size should be 0"); + + cache.stop(); + } + + @Test + public void testCacheConsumerAcquireAndRelease() { + ConsumerCache cache = new EmptyConsumerCache(this, context); + cache.start(); + + assertEquals(0, cache.size(), "Size should be 0"); + + // we never cache any consumers + for (int i = 0; i < 1003; i++) { + Endpoint e = context.getEndpoint("direct:queue:" + i); + PollingConsumer c = cache.acquirePollingConsumer(e); + cache.releasePollingConsumer(e, c); + } + + assertEquals(0, cache.size(), "Size should be 0"); + cache.stop(); + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java index b6c3cd1cc44d1..7b7dfb19aa594 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java @@ -17,10 +17,20 @@ package org.apache.camel.processor; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.camel.Consumer; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; +import org.apache.camel.Producer; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.support.DefaultComponent; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.support.PollingConsumerSupport; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -30,6 +40,9 @@ public class PollEnrichNoCacheTest extends ContextTestSupport { @Test public void testNoCache() throws Exception { + final AtomicInteger stopped = new AtomicInteger(); + context.addComponent("pollAssert", stopCountingComponent(stopped)); + assertEquals(1, context.getEndpointRegistry().size()); sendBody("foo", "seda:x"); @@ -39,7 +52,7 @@ public void testNoCache() throws Exception { sendBody("bar", "seda:y"); sendBody("bar", "seda:z"); - // make sure its using an empty producer cache as the cache is disabled + // make sure its using an empty consumer cache as the cache is disabled List list = getProcessors("foo"); PollEnricher ep = (PollEnricher) list.get(0); assertNotNull(ep); @@ -68,12 +81,69 @@ public void testNoCache() throws Exception { assertMockEndpointsSatisfied(); assertEquals(4, context.getEndpointRegistry().size()); + + // also verify that cacheSize(-1) means consumers are not retained + sendBody("poll-one", "pollAssert:one"); + sendBody("poll-two", "pollAssert:two"); + + assertEquals(2, stopped.get()); } protected void sendBody(String body, String uri) { template.sendBodyAndHeader("direct:a", body, "myHeader", uri); } + /** + * A test-only component whose polling consumers increment the given counter in + * {@link PollingConsumerSupport#doStop()}, allowing the test to verify whether the consumer cache retains or + * discards consumers after use. + */ + private static DefaultComponent stopCountingComponent(AtomicInteger stopped) { + return new DefaultComponent() { + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map parameters) { + return new DefaultEndpoint(uri, this) { + @Override + public Producer createProducer() { + return null; + } + + @Override + public Consumer createConsumer(Processor processor) { + return null; + } + + @Override + public PollingConsumer createPollingConsumer() { + return new PollingConsumerSupport(this) { + @Override + public Exchange receive() { + Exchange ex = getEndpoint().createExchange(); + ex.getIn().setBody(remaining); + return ex; + } + + @Override + public Exchange receive(long timeout) { + return receive(); + } + + @Override + public Exchange receiveNoWait() { + return receive(); + } + + @Override + protected void doStop() { + stopped.incrementAndGet(); + } + }; + } + }; + } + }; + } + @Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultConsumerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultConsumerCache.java index f85ab47ee68ff..4e95e99915097 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultConsumerCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultConsumerCache.java @@ -53,7 +53,12 @@ public DefaultConsumerCache(Object source, CamelContext camelContext, int cacheS this.source = source; this.camelContext = camelContext; this.maxCacheSize = cacheSize <= 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : cacheSize; - this.consumers = createServicePool(camelContext, maxCacheSize); + if (cacheSize >= 0) { + this.consumers = createServicePool(camelContext, maxCacheSize); + } else { + // no cache then empty + this.consumers = null; + } // only if JMX is enabled if (camelContext.getManagementStrategy().getManagementAgent() != null) { this.extendedStatistics @@ -194,7 +199,7 @@ public int getCapacity() { */ @Override public int size() { - int size = consumers.size(); + int size = consumers != null ? consumers.size() : 0; LOG.trace("size = {}", size); return size; } @@ -207,8 +212,10 @@ public void purge() { lock.lock(); try { try { - consumers.stop(); - consumers.start(); + if (consumers != null) { + consumers.stop(); + consumers.start(); + } } catch (Exception e) { LOG.debug("Error restarting consumer pool", e); } @@ -222,7 +229,9 @@ public void purge() { @Override public void cleanUp() { - consumers.cleanUp(); + if (consumers != null) { + consumers.cleanUp(); + } } @Override diff --git a/core/camel-support/src/main/java/org/apache/camel/support/cache/EmptyConsumerCache.java b/core/camel-support/src/main/java/org/apache/camel/support/cache/EmptyConsumerCache.java new file mode 100644 index 0000000000000..bca84dee8252a --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/cache/EmptyConsumerCache.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support.cache; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.FailedToCreateConsumerException; +import org.apache.camel.PollingConsumer; +import org.apache.camel.support.service.ServiceHelper; + +/** + * A {@link org.apache.camel.spi.ConsumerCache} that does not cache {@link PollingConsumer}s but instead creates a new + * consumer on every {@link #acquirePollingConsumer(Endpoint)} and stops and shuts it down on + * {@link #releasePollingConsumer(Endpoint, PollingConsumer)}. + * + * @since 4.21 + */ +public class EmptyConsumerCache extends DefaultConsumerCache { + + private final Object source; + private final CamelContext ecc; + + public EmptyConsumerCache(Object source, CamelContext camelContext) { + super(source, camelContext, -1); + this.source = source; + this.ecc = camelContext; + setExtendedStatistics(false); + } + + @Override + public PollingConsumer acquirePollingConsumer(Endpoint endpoint) { + // always create a new consumer + PollingConsumer answer; + try { + answer = endpoint.createPollingConsumer(); + boolean startingRoutes + = ecc.getCamelContextExtension().isSetupRoutes() || ecc.getRouteController().isStartingRoutes(); + if (startingRoutes) { + // if we are currently starting a route, then add as service and enlist in JMX + getCamelContext().addService(answer); + } else { + // must then start service so consumer is ready to be used + ServiceHelper.startService(answer); + } + } catch (Exception e) { + throw new FailedToCreateConsumerException(endpoint, e); + } + return answer; + } + + @Override + public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) { + // stop and shutdown the consumer as its not cache or reused + ServiceHelper.stopAndShutdownService(pollingConsumer); + } + + @Override + public int getCapacity() { + return 0; + } + + @Override + public String toString() { + return "EmptyConsumerCache for source: " + source; + } +}