Skip to content

Commit f8d9ddc

Browse files
msnijder30OpenCode (DeepSeek V4 Pro)
andauthored
CAMEL-23840: Fix pollEnrich cacheSize(-1) not disabling consumer cache
pollEnrich().cacheSize(-1) should disable the consumer cache entirely, but DefaultConsumerCache normalizes cacheSize <= 0 to the context maximum (default 1000), retaining up to 1000 polling consumers. For resource-backed components like SFTP, each retained consumer holds an open connection that is never cleaned up. Fix by introducing EmptyConsumerCache (mirroring EmptyProducerCache) that creates a fresh consumer on every acquire and stops it on release. PollEnricher.doBuild() now selects EmptyConsumerCache when cacheSize < 0, matching the pattern used by SendDynamicProcessor, RecipientList, and RoutingSlip on the producer side. Closes #24283 Co-Authored-By: OpenCode (DeepSeek V4 Pro) <contact@anoma.ly> Co-Authored-By: OpenCode (GLM 5.2) <contact@anoma.ly>
1 parent c061eae commit f8d9ddc

5 files changed

Lines changed: 241 additions & 9 deletions

File tree

core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.camel.support.EventDrivenPollingConsumer;
5050
import org.apache.camel.support.ExchangeHelper;
5151
import org.apache.camel.support.cache.DefaultConsumerCache;
52+
import org.apache.camel.support.cache.EmptyConsumerCache;
5253
import org.apache.camel.support.service.ServiceHelper;
5354
import org.apache.camel.util.URISupport;
5455
import org.slf4j.Logger;
@@ -544,9 +545,14 @@ protected static String resolveScheme(Exchange exchange, String uri) {
544545
@Override
545546
protected void doBuild() throws Exception {
546547
if (consumerCache == null) {
547-
// create consumer cache if we use dynamic expressions for computing the endpoints to poll
548-
consumerCache = new DefaultConsumerCache(this, camelContext, cacheSize);
549-
LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize);
548+
if (cacheSize < 0) {
549+
consumerCache = new EmptyConsumerCache(this, camelContext);
550+
LOG.debug("PollEnrich {} is not using ConsumerCache", this);
551+
} else {
552+
// create consumer cache if we use dynamic expressions for computing the endpoints to poll
553+
consumerCache = new DefaultConsumerCache(this, camelContext, cacheSize);
554+
LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize);
555+
}
550556
}
551557
if (aggregationStrategy == null) {
552558
aggregationStrategy = new CopyAggregationStrategy();
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.impl.engine;
18+
19+
import org.apache.camel.ContextTestSupport;
20+
import org.apache.camel.Endpoint;
21+
import org.apache.camel.PollingConsumer;
22+
import org.apache.camel.spi.ConsumerCache;
23+
import org.apache.camel.support.cache.EmptyConsumerCache;
24+
import org.junit.jupiter.api.Test;
25+
26+
import static org.junit.jupiter.api.Assertions.assertEquals;
27+
28+
public class EmptyConsumerCacheTest extends ContextTestSupport {
29+
30+
@Test
31+
public void testEmptyCache() {
32+
ConsumerCache cache = new EmptyConsumerCache(this, context);
33+
cache.start();
34+
35+
assertEquals(0, cache.size(), "Size should be 0");
36+
37+
// we never cache any consumers
38+
Endpoint e = context.getEndpoint("direct:queue:1");
39+
PollingConsumer c = cache.acquirePollingConsumer(e);
40+
41+
assertEquals(0, cache.size(), "Size should be 0");
42+
43+
cache.releasePollingConsumer(e, c);
44+
45+
assertEquals(0, cache.size(), "Size should be 0");
46+
47+
cache.stop();
48+
}
49+
50+
@Test
51+
public void testCacheConsumerAcquireAndRelease() {
52+
ConsumerCache cache = new EmptyConsumerCache(this, context);
53+
cache.start();
54+
55+
assertEquals(0, cache.size(), "Size should be 0");
56+
57+
// we never cache any consumers
58+
for (int i = 0; i < 1003; i++) {
59+
Endpoint e = context.getEndpoint("direct:queue:" + i);
60+
PollingConsumer c = cache.acquirePollingConsumer(e);
61+
cache.releasePollingConsumer(e, c);
62+
}
63+
64+
assertEquals(0, cache.size(), "Size should be 0");
65+
cache.stop();
66+
}
67+
}

core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichNoCacheTest.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,20 @@
1717
package org.apache.camel.processor;
1818

1919
import java.util.List;
20+
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicInteger;
2022

23+
import org.apache.camel.Consumer;
2124
import org.apache.camel.ContextTestSupport;
25+
import org.apache.camel.Endpoint;
26+
import org.apache.camel.Exchange;
27+
import org.apache.camel.PollingConsumer;
2228
import org.apache.camel.Processor;
29+
import org.apache.camel.Producer;
2330
import org.apache.camel.builder.RouteBuilder;
31+
import org.apache.camel.support.DefaultComponent;
32+
import org.apache.camel.support.DefaultEndpoint;
33+
import org.apache.camel.support.PollingConsumerSupport;
2434
import org.junit.jupiter.api.Test;
2535

2636
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,6 +40,9 @@ public class PollEnrichNoCacheTest extends ContextTestSupport {
3040

3141
@Test
3242
public void testNoCache() throws Exception {
43+
final AtomicInteger stopped = new AtomicInteger();
44+
context.addComponent("pollAssert", stopCountingComponent(stopped));
45+
3346
assertEquals(1, context.getEndpointRegistry().size());
3447

3548
sendBody("foo", "seda:x");
@@ -39,7 +52,7 @@ public void testNoCache() throws Exception {
3952
sendBody("bar", "seda:y");
4053
sendBody("bar", "seda:z");
4154

42-
// make sure its using an empty producer cache as the cache is disabled
55+
// make sure its using an empty consumer cache as the cache is disabled
4356
List<Processor> list = getProcessors("foo");
4457
PollEnricher ep = (PollEnricher) list.get(0);
4558
assertNotNull(ep);
@@ -68,12 +81,69 @@ public void testNoCache() throws Exception {
6881
assertMockEndpointsSatisfied();
6982

7083
assertEquals(4, context.getEndpointRegistry().size());
84+
85+
// also verify that cacheSize(-1) means consumers are not retained
86+
sendBody("poll-one", "pollAssert:one");
87+
sendBody("poll-two", "pollAssert:two");
88+
89+
assertEquals(2, stopped.get());
7190
}
7291

7392
protected void sendBody(String body, String uri) {
7493
template.sendBodyAndHeader("direct:a", body, "myHeader", uri);
7594
}
7695

96+
/**
97+
* A test-only component whose polling consumers increment the given counter in
98+
* {@link PollingConsumerSupport#doStop()}, allowing the test to verify whether the consumer cache retains or
99+
* discards consumers after use.
100+
*/
101+
private static DefaultComponent stopCountingComponent(AtomicInteger stopped) {
102+
return new DefaultComponent() {
103+
@Override
104+
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) {
105+
return new DefaultEndpoint(uri, this) {
106+
@Override
107+
public Producer createProducer() {
108+
return null;
109+
}
110+
111+
@Override
112+
public Consumer createConsumer(Processor processor) {
113+
return null;
114+
}
115+
116+
@Override
117+
public PollingConsumer createPollingConsumer() {
118+
return new PollingConsumerSupport(this) {
119+
@Override
120+
public Exchange receive() {
121+
Exchange ex = getEndpoint().createExchange();
122+
ex.getIn().setBody(remaining);
123+
return ex;
124+
}
125+
126+
@Override
127+
public Exchange receive(long timeout) {
128+
return receive();
129+
}
130+
131+
@Override
132+
public Exchange receiveNoWait() {
133+
return receive();
134+
}
135+
136+
@Override
137+
protected void doStop() {
138+
stopped.incrementAndGet();
139+
}
140+
};
141+
}
142+
};
143+
}
144+
};
145+
}
146+
77147
@Override
78148
protected RouteBuilder createRouteBuilder() {
79149
return new RouteBuilder() {

core/camel-support/src/main/java/org/apache/camel/support/cache/DefaultConsumerCache.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ public DefaultConsumerCache(Object source, CamelContext camelContext, int cacheS
5353
this.source = source;
5454
this.camelContext = camelContext;
5555
this.maxCacheSize = cacheSize <= 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : cacheSize;
56-
this.consumers = createServicePool(camelContext, maxCacheSize);
56+
if (cacheSize >= 0) {
57+
this.consumers = createServicePool(camelContext, maxCacheSize);
58+
} else {
59+
// no cache then empty
60+
this.consumers = null;
61+
}
5762
// only if JMX is enabled
5863
if (camelContext.getManagementStrategy().getManagementAgent() != null) {
5964
this.extendedStatistics
@@ -194,7 +199,7 @@ public int getCapacity() {
194199
*/
195200
@Override
196201
public int size() {
197-
int size = consumers.size();
202+
int size = consumers != null ? consumers.size() : 0;
198203
LOG.trace("size = {}", size);
199204
return size;
200205
}
@@ -207,8 +212,10 @@ public void purge() {
207212
lock.lock();
208213
try {
209214
try {
210-
consumers.stop();
211-
consumers.start();
215+
if (consumers != null) {
216+
consumers.stop();
217+
consumers.start();
218+
}
212219
} catch (Exception e) {
213220
LOG.debug("Error restarting consumer pool", e);
214221
}
@@ -222,7 +229,9 @@ public void purge() {
222229

223230
@Override
224231
public void cleanUp() {
225-
consumers.cleanUp();
232+
if (consumers != null) {
233+
consumers.cleanUp();
234+
}
226235
}
227236

228237
@Override
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.support.cache;
18+
19+
import org.apache.camel.CamelContext;
20+
import org.apache.camel.Endpoint;
21+
import org.apache.camel.FailedToCreateConsumerException;
22+
import org.apache.camel.PollingConsumer;
23+
import org.apache.camel.support.service.ServiceHelper;
24+
25+
/**
26+
* A {@link org.apache.camel.spi.ConsumerCache} that does not cache {@link PollingConsumer}s but instead creates a new
27+
* consumer on every {@link #acquirePollingConsumer(Endpoint)} and stops and shuts it down on
28+
* {@link #releasePollingConsumer(Endpoint, PollingConsumer)}.
29+
*
30+
* @since 4.21
31+
*/
32+
public class EmptyConsumerCache extends DefaultConsumerCache {
33+
34+
private final Object source;
35+
private final CamelContext ecc;
36+
37+
public EmptyConsumerCache(Object source, CamelContext camelContext) {
38+
super(source, camelContext, -1);
39+
this.source = source;
40+
this.ecc = camelContext;
41+
setExtendedStatistics(false);
42+
}
43+
44+
@Override
45+
public PollingConsumer acquirePollingConsumer(Endpoint endpoint) {
46+
// always create a new consumer
47+
PollingConsumer answer;
48+
try {
49+
answer = endpoint.createPollingConsumer();
50+
boolean startingRoutes
51+
= ecc.getCamelContextExtension().isSetupRoutes() || ecc.getRouteController().isStartingRoutes();
52+
if (startingRoutes) {
53+
// if we are currently starting a route, then add as service and enlist in JMX
54+
getCamelContext().addService(answer);
55+
} else {
56+
// must then start service so consumer is ready to be used
57+
ServiceHelper.startService(answer);
58+
}
59+
} catch (Exception e) {
60+
throw new FailedToCreateConsumerException(endpoint, e);
61+
}
62+
return answer;
63+
}
64+
65+
@Override
66+
public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) {
67+
// stop and shutdown the consumer as its not cache or reused
68+
ServiceHelper.stopAndShutdownService(pollingConsumer);
69+
}
70+
71+
@Override
72+
public int getCapacity() {
73+
return 0;
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "EmptyConsumerCache for source: " + source;
79+
}
80+
}

0 commit comments

Comments
 (0)