Skip to content

Commit f7993bc

Browse files
jeanouiiroot
andauthored
test(unit-tests): improve reliability of message consumption in tests (#1711)
* test(unit-tests): improve reliability of message consumption in tests * [test] Fix ConnectionFailureEvictsFromPoolTest: eliminate flaky async races Two race conditions caused testEvictionXA to fail intermittently: 1. Exception event propagation: ActiveMQConnection.addTransportListener() callbacks fire via executeAsync(), which silently drops tasks when the pool's ExceptionListener closes the connection and shuts down the executor first. Fixed by intercepting at the MockTransport level where exception propagation is synchronous. 2. Pool eviction timing: The pool evicts broken connections asynchronously via ExceptionListener fired through executeAsync(). The test could request a new connection before eviction completed. Fixed by using Wait.waitFor() retry pattern (consistent with other pool tests). * test(MaxFrameSizeEnabled): increase timeouts to improve test reliability * test(AMQ2149): enhance prefetch policy for transactional connections --------- Co-authored-by: root <root@srv1405615.hstgr.cloud>
1 parent 92c8b58 commit f7993bc

7 files changed

Lines changed: 66 additions & 33 deletions

File tree

activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.activemq.test.TestSupport;
4040
import org.apache.activemq.transport.TransportListener;
4141
import org.apache.activemq.transport.mock.MockTransport;
42+
import org.apache.activemq.util.Wait;
4243
import org.slf4j.Logger;
4344
import org.slf4j.LoggerFactory;
4445

@@ -94,17 +95,27 @@ public void doTestEviction(ConnectionFactory pooledFactory) throws Exception {
9495
final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
9596
try (final PooledConnection connection = (PooledConnection) pooledFactory.createConnection()) {
9697
final ActiveMQConnection amqC = (ActiveMQConnection) connection.getConnection();
97-
amqC.addTransportListener(new TransportListener() {
98+
// Intercept exception propagation at the MockTransport level where it fires
99+
// synchronously. ActiveMQConnection.addTransportListener() callbacks fire via
100+
// executeAsync(), which silently drops the task if the pool's ExceptionListener
101+
// closes the connection and shuts down the executor first (race condition that
102+
// affects the XA path).
103+
final MockTransport mockTransport = (MockTransport) amqC.getTransportChannel().narrow(MockTransport.class);
104+
final TransportListener originalListener = mockTransport.getTransportListener();
105+
mockTransport.setTransportListener(new TransportListener() {
98106
public void onCommand(Object command) {
107+
originalListener.onCommand(command);
99108
}
100109
public void onException(IOException error) {
101-
// we know connection is dead...
102-
// listeners are fired async
110+
// fires synchronously when MockTransport.onException() is called
103111
gotExceptionEvent.countDown();
112+
originalListener.onException(error);
104113
}
105114
public void transportInterupted() {
115+
originalListener.transportInterupted();
106116
}
107117
public void transportResumed() {
118+
originalListener.transportResumed();
108119
}
109120
});
110121

@@ -116,18 +127,21 @@ public void transportResumed() {
116127
TestCase.fail("Expected Error");
117128
} catch (JMSException e) {
118129
}
119-
// Wait for async exception event BEFORE the try-with-resources closes the connection.
120-
// ActiveMQConnection.onException() fires TransportListener callbacks via executeAsync(),
121-
// so the callback runs in a separate thread. If we wait after connection.close(), the
122-
// async executor may already be shut down and the callback never fires.
123-
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(15, TimeUnit.SECONDS));
130+
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(5, TimeUnit.SECONDS));
124131
}
125-
// If we get another connection now it should be a new connection that
126-
// works.
132+
// After the failure, a new connection from the pool should work.
133+
// The pool eviction is async (ExceptionListener fires via executeAsync),
134+
// so retry until the pool returns a working connection.
127135
LOG.info("expect new connection after failure");
128-
try (final Connection connection2 = pooledFactory.createConnection()) {
129-
sendMessage(connection2);
130-
}
136+
assertTrue("pool should provide working connection after eviction",
137+
Wait.waitFor(() -> {
138+
try (final Connection connection2 = pooledFactory.createConnection()) {
139+
sendMessage(connection2);
140+
return true;
141+
} catch (Exception e) {
142+
return false;
143+
}
144+
}, 5000, 100));
131145
}
132146

133147
private void createConnectionFailure(Connection connection) throws Exception {

activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.junit.Test;
4242

4343
import org.apache.activemq.ActiveMQConnectionFactory;
44+
import org.apache.activemq.ActiveMQPrefetchPolicy;
4445
import org.apache.activemq.broker.BrokerService;
4546
import org.apache.activemq.broker.region.Destination;
4647
import org.apache.activemq.broker.region.DestinationStatistics;
@@ -171,6 +172,12 @@ public Receiver(jakarta.jms.Destination dest, boolean transactional) throws JMSE
171172
this.transactional = transactional;
172173
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
173174
connectionFactory.setWatchTopicAdvisories(false);
175+
if (transactional) {
176+
final ActiveMQPrefetchPolicy policy = connectionFactory.getPrefetchPolicy();
177+
policy.setQueuePrefetch(1);
178+
policy.setTopicPrefetch(1);
179+
policy.setDurableTopicPrefetch(1);
180+
}
174181
connection = connectionFactory.createConnection();
175182
connection.setClientID(dest.toString());
176183
session = connection.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
@@ -194,7 +201,7 @@ public long getNextExpectedSeqNo() {
194201

195202
final int TRANSACITON_BATCH = 500;
196203
boolean resumeOnNextOrPreviousIsOk = false;
197-
public void onMessage(Message message) {
204+
public synchronized void onMessage(Message message) {
198205
try {
199206
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
200207
if ((seqNum % TRANSACITON_BATCH) == 0) {

activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ public void stopBroker() throws Exception {
120120
}
121121
if (broker != null) {
122122
broker.stop();
123+
broker.waitUntilStopped();
123124
}
124125
}
125126

@@ -257,6 +258,7 @@ protected BrokerService startBroker(boolean deleteMessagesOnStartup) throws Exce
257258
configureBroker(newBroker);
258259
newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
259260
newBroker.start();
261+
newBroker.waitUntilStarted();
260262
return newBroker;
261263
}
262264

activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,7 @@ public void testStatsAndBrowseAfterAckPreparedRolledback() throws Exception {
275275

276276
dumpMessages();
277277

278-
Wait.waitFor(new Wait.Condition() {
279-
@Override
280-
public boolean isSatisified() throws Exception {
281-
return proxy.getInFlightCount() == 0l;
282-
}
283-
});
278+
Wait.waitFor(() -> proxy.getInFlightCount() == 0L && proxy.cursorSize() == 0);
284279
assertEquals("prefetch", 0, proxy.getInFlightCount());
285280
assertEquals("size", 10, proxy.getQueueSize());
286281
assertEquals("cursor size", 0, proxy.cursorSize());

activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public class MaxFrameSizeEnabledTest {
5555
private static final int CONNECTION_COUNT = 3;
5656
private static final int MESSAGE_ATTEMPTS = 3;
5757
private static final int BODY_SIZE = 20000; // large enough to trip 2k limit, compressible enough for 60k
58+
private static final long BROKER_START_TIMEOUT_MS = 30_000;
59+
private static final long BROKER_STOP_TIMEOUT_MS = 30_000;
60+
private static final int TEST_TIMEOUT_MS = 120_000;
5861

5962
private BrokerService broker;
6063
private final String transportType;
@@ -158,30 +161,32 @@ public void after() throws Exception {
158161
}
159162

160163
public BrokerService createBroker(String connectorName, String connectorString) throws Exception {
161-
BrokerService broker = new BrokerService();
164+
final BrokerService broker = new BrokerService();
162165
broker.setPersistent(false);
163166
broker.setUseJmx(false);
164-
TransportConnector connector = broker.addConnector(connectorString);
167+
final TransportConnector connector = broker.addConnector(connectorString);
165168
connector.setName(connectorName);
166169
broker.start();
167-
broker.waitUntilStarted();
170+
assertTrue("Broker should start within timeout",
171+
Wait.waitFor(broker::isStarted, BROKER_START_TIMEOUT_MS, 100));
168172
return broker;
169173
}
170174

171175
public void stopBroker(BrokerService broker) throws Exception {
172176
if (broker != null) {
173177
broker.stop();
174-
broker.waitUntilStopped();
178+
assertTrue("Broker should stop within timeout",
179+
Wait.waitFor(broker::isStopped, BROKER_STOP_TIMEOUT_MS, 100));
175180
}
176181
}
177182

178-
@Test
183+
@Test(timeout = TEST_TIMEOUT_MS)
179184
public void testMaxFrameSize() throws Exception {
180185
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
181186
testMaxFrameSize(transportType, getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()), false);
182187
}
183188

184-
@Test
189+
@Test(timeout = TEST_TIMEOUT_MS)
185190
public void testMaxFrameSizeCompression() throws Exception {
186191
// Test message body length is 99841 bytes. Compresses to ~ 48000
187192
broker = createBroker(transportType, transportType + "://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());

activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
public class RestrictedThreadPoolInactivityTimeoutTest extends JmsTestSupport {
3737
private static final Logger LOG = LoggerFactory.getLogger(RestrictedThreadPoolInactivityTimeoutTest.class);
38+
private static final int TEST_TIMEOUT_MS = 120_000;
3839

3940
public String brokerTransportScheme = "tcp";
4041
public Boolean rejectWork = Boolean.FALSE;
@@ -86,6 +87,7 @@ public void initCombosForTestThreadsInvolvedInXInactivityTimeouts() {
8687
addCombinationValues("rejectWork", new Object[] {Boolean.TRUE, Boolean.FALSE});
8788
}
8889

90+
@org.junit.Test(timeout = TEST_TIMEOUT_MS)
8991
public void testThreadsInvolvedInXInactivityTimeouts() throws Exception {
9092

9193
URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());

activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -199,15 +199,23 @@ public void testPriorityMessagesWithJmsBrowser() throws Exception {
199199
assertNotNull(message);
200200
assertEquals(5, message.getJMSPriority());
201201

202-
// consume messages
203-
final ArrayList<Message> consumeList = consumeMessages("TestQ");
202+
// Wait for remaining messages to be fully available after consumeOneMessage closes its connection.
203+
// With lazyDispatch=true + optimizedDispatch=true, messages may briefly be in-flight
204+
// during connection teardown and not yet re-queued for dispatch to a new consumer.
205+
final int remaining = numToSend - 1;
206+
assertTrue("Remaining messages available for dispatch", Wait.waitFor(() -> {
207+
final Queue q = (Queue) broker.getDestination(destination);
208+
return q != null
209+
&& q.getDestinationStatistics().getMessages().getCount() == remaining
210+
&& q.getDestinationStatistics().getInflight().getCount() == 0;
211+
}, 5000, 100));
212+
213+
// consume messages (use timeout-based overload for reliable dispatch on slow CI)
214+
final ArrayList<Message> consumeList = consumeMessages("TestQ", remaining, TimeUnit.SECONDS.toMillis(30));
204215
LOG.info("Consumed list {}", consumeList.size());
205216

206-
// compare lists
207-
// assertEquals("Iteration: " + i
208-
// +", message 1 should be priority high", 5,
209-
// consumeList.get(0).getJMSPriority());
210-
for (int j = 1; j < (numToSend - 1); j++) {
217+
assertEquals("Iteration: " + i + ", all remaining messages consumed", remaining, consumeList.size());
218+
for (int j = 0; j < consumeList.size(); j++) {
211219
assertEquals("Iteration: " + i + ", message " + j + " should be priority medium", 4, consumeList.get(j).getJMSPriority());
212220
}
213221
}

0 commit comments

Comments
 (0)