Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public PreviouslyDeliveredMap(TransactionId transactionId) {
class PreviouslyDelivered {
org.apache.activemq.command.Message message;
boolean redelivered;
boolean prefetchedOnly;

PreviouslyDelivered(MessageDispatch messageDispatch) {
message = messageDispatch.getMessage();
Expand All @@ -123,12 +122,6 @@ class PreviouslyDelivered {
message = messageDispatch.getMessage();
this.redelivered = redelivered;
}

PreviouslyDelivered(MessageDispatch messageDispatch, boolean redelivered, boolean prefetchedOnly) {
message = messageDispatch.getMessage();
this.redelivered = redelivered;
this.prefetchedOnly = prefetchedOnly;
}
}

private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
Expand Down Expand Up @@ -778,9 +771,6 @@ void clearMessagesInProgress() {
// ensure unconsumed are rolledback up front as they may get redelivered to another consumer
List<MessageDispatch> list = unconsumedMessages.removeAll();
if (!this.info.isBrowser()) {
if (session.isTransacted()) {
capturePrefetchedMessagesForDuplicateSuppression(list);
}
for (MessageDispatch old : list) {
session.connection.rollbackDuplicate(this, old.getMessage());
}
Expand Down Expand Up @@ -943,16 +933,6 @@ private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
if (session.isTransacted()) {
PreviouslyDelivered entry = null;
if (previouslyDeliveredMessages != null) {
entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId());
}
if (entry != null && entry.prefetchedOnly) {
entry.prefetchedOnly = false;
entry.redelivered = true;
}
}
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
Expand Down Expand Up @@ -1440,8 +1420,7 @@ public void dispatch(MessageDispatch md) {
synchronized (unconsumedMessages.getMutex()) {
if (!unconsumedMessages.isClosed()) {
// deliverySequenceId non zero means previously queued dispatch
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || isPrefetchedRedelivery(md)
|| !session.connection.isDuplicate(this, md.getMessage())) {
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) {
poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
Expand Down Expand Up @@ -1591,37 +1570,6 @@ private void captureDeliveredMessagesForDuplicateSuppressionWithRequireRedeliver
LOG.trace("{} tracking existing transacted {} delivered list({})", getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
}

private void capturePrefetchedMessagesForDuplicateSuppression(final List<MessageDispatch> list) {
if (list.isEmpty()) {
return;
}
if (previouslyDeliveredMessages == null) {
previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, PreviouslyDelivered>(session.getTransactionContext().getTransactionId());
}
for (MessageDispatch pending : list) {
if (pending.getMessage() != null) {
previouslyDeliveredMessages.put(pending.getMessage().getMessageId(), new PreviouslyDelivered(pending, false, true));
}
}
LOG.trace("{} tracking existing transacted {} prefetched list({})", getConsumerId(), previouslyDeliveredMessages.transactionId, list.size());
}

private boolean isPrefetchedRedelivery(final MessageDispatch md) {
if (!session.isTransacted()) {
return false;
}
if (md.getMessage() == null) {
return false;
}
synchronized (deliveredMessages) {
if (previouslyDeliveredMessages != null) {
PreviouslyDelivered entry = previouslyDeliveredMessages.get(md.getMessage().getMessageId());
return entry != null && entry.prefetchedOnly;
}
}
return false;
}

public int getMessageSize() {
return unconsumedMessages.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class PooledConnectionSecurityExceptionTest {
@Test
public void testFailedConnectThenSucceeds() throws JMSException {
try (final Connection connection1 = pooledConnFact.createConnection("invalid", "credentials")) {
assertSecurityExceptionOnStart(connection1);
assertThrows(JMSSecurityException.class, connection1::start);

try (final Connection connection2 = pooledConnFact.createConnection("system", "manager")) {
connection2.start();
Expand All @@ -93,7 +93,7 @@ public void onException(JMSException exception) {
onExceptionCalled.countDown();
}
});
assertSecurityExceptionOnStart(connection1);
assertThrows(JMSSecurityException.class, connection1::start);

try (final Connection connection2 = pooledConnFact.createConnection("system", "manager")) {
connection2.start();
Expand All @@ -118,7 +118,7 @@ public void testFailureGetsNewConnectionOnRetry() throws Exception {
pooledConnFact.setMaxConnections(1);

try (final Connection connection1 = pooledConnFact.createConnection("invalid", "credentials")) {
assertSecurityExceptionOnStart(connection1);
assertThrows(JMSSecurityException.class, connection1::start);

// The pool should process the async error
// we should eventually get a different connection instance from the pool regardless of the underlying connection
Expand All @@ -145,9 +145,9 @@ public void testFailureGetsNewConnectionOnRetryBigPool() throws JMSException {
pooledConnFact.setMaxConnections(10);

try (final Connection connection1 = pooledConnFact.createConnection("invalid", "credentials")) {
assertSecurityExceptionOnStart(connection1);
assertThrows(JMSSecurityException.class, connection1::start);
try (final Connection connection2 = pooledConnFact.createConnection("invalid", "credentials")) {
assertSecurityExceptionOnStart(connection2);
assertThrows(JMSSecurityException.class, connection2::start);
assertNotSame(connection1, connection2);
}
}
Expand All @@ -165,7 +165,7 @@ public void testFailoverWithInvalidCredentialsCanConnect() throws JMSException {
pooledConnFact.setMaxConnections(1);

try (final Connection connection = pooledConnFact.createConnection("invalid", "credentials")) {
assertSecurityExceptionOnStart(connection);
assertThrows(JMSSecurityException.class, connection::start);

try (final Connection connection2 = pooledConnFact.createConnection("system", "manager")) {
connection2.start();
Expand All @@ -185,7 +185,7 @@ public void testFailoverWithInvalidCredentials() throws Exception {
pooledConnFact.setMaxConnections(1);

try (final PooledConnection connection1 = (PooledConnection) pooledConnFact.createConnection("invalid", "credentials")) {
assertSecurityExceptionOnStart(connection1);
assertThrows(JMSSecurityException.class, connection1::start);

// The pool should process the async error
assertTrue("Should get new connection", Wait.waitFor(new Wait.Condition() {
Expand All @@ -202,7 +202,7 @@ public boolean isSatisified() throws Exception {

try (final PooledConnection connection2 = (PooledConnection) pooledConnFact.createConnection("invalid", "credentials")) {
assertNotSame(connection1.pool, connection2.pool);
assertSecurityExceptionOnStart(connection2);
assertThrows(JMSSecurityException.class, connection2::start);
}
}
}
Expand Down Expand Up @@ -230,55 +230,6 @@ public String getName() {
return name.getMethodName();
}

/**
* Helper method to assert that a connection start fails with security exception.
* On different test environments, the connection may be disposed asynchronously
* before the security exception is fully propagated, resulting in either JMSSecurityException
* or generic JMSException with "Disposed" message. Both indicate authentication failure.
*
* This method uses an ExceptionListener to detect when async disposal completes, providing
* more reliable detection of security failures across different Java versions and environments.
*
* @param connection the connection to start
* @throws AssertionError if no exception is thrown or the exception doesn't indicate auth failure
*/
private void assertSecurityExceptionOnStart(final Connection connection) {
try {
final ExceptionListener listener = connection.getExceptionListener();
if (listener == null) { // some tests already leverage the exception listener
final CountDownLatch exceptionLatch = new CountDownLatch(1);

// Install listener to capture async exception propagation
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(final JMSException exception) {
LOG.info("Connection received exception: {}", exception.getMessage());
assertTrue(exception instanceof JMSSecurityException);
exceptionLatch.countDown();
}
});
connection.start(); // should trigger the security exception reliably and asynchronously
exceptionLatch.await(1, java.util.concurrent.TimeUnit.SECONDS);

} else {

// Attempt to start and capture the synchronous exception.
final JMSException thrownException = assertThrows(JMSException.class, connection::start);
assertTrue("Should be JMSSecurityException or disposed due to security exception",
thrownException instanceof JMSSecurityException ||
thrownException.getMessage().contains("Disposed"));
}


} catch (final JMSException e) {
// Ignore

} catch (final InterruptedException e) {
throw new RuntimeException(e);
}

}

@Before
public void setUp() throws Exception {
LOG.info("========== start " + getName() + " ==========");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testGcDoneAtStop() throws Exception {
LOG.info("kahadb store: " + kahaDBPersistenceAdapter);
int numKahadbFiles = kahaDBPersistenceAdapter.getStore().getJournal().getFileMap().size();

LOG.info("Num files, job store: {}, message store: {}", numSchedulerFiles, numKahadbFiles);
LOG.info("Num files, job store: {}, message store: {}", numKahadbFiles, numKahadbFiles);

// pull the dirs before we stop
File jobDir = jobSchedulerStore.getJournal().getDirectory();
Expand Down Expand Up @@ -94,10 +94,8 @@ public void testNoGcAtStop() throws Exception {

brokerService.stop();

final int jobFilesOnDisk = verifyFilesOnDisk(jobDir);
final int kahaFilesOnDisk = verifyFilesOnDisk(kahaDir);
assertTrue("Expected job store data files at least " + numSchedulerFiles, jobFilesOnDisk >= numSchedulerFiles);
assertTrue("Expected kahadb data files at least " + numKahadbFiles, kahaFilesOnDisk >= numKahadbFiles);
assertEquals("Expected job store data files", numSchedulerFiles, verifyFilesOnDisk(jobDir));
assertEquals("Expected kahadb data files", numKahadbFiles, verifyFilesOnDisk(kahaDir));
}

private int verifyFilesOnDisk(File directory) {
Expand Down
13 changes: 1 addition & 12 deletions activemq-mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,23 +198,12 @@
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<argLine>-javaagent:${org.mockito:mockito-core:jar}</argLine>
<argLine>${surefire.argLine}</argLine>
<runOrder>alphabetical</runOrder>
<systemPropertyValues>
<org.apache.activemq.default.directory.prefix>target</org.apache.activemq.default.directory.prefix>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void run() {
executorService.awaitTermination(10, TimeUnit.SECONDS);

ArgumentCaptor<RemoveInfo> removeInfo = ArgumentCaptor.forClass(RemoveInfo.class);
Mockito.verify(transport, times(1)).sendToActiveMQ(removeInfo.capture());
Mockito.verify(transport, times(4)).sendToActiveMQ(removeInfo.capture());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testXAResourceReconnect() throws Exception {
try {
final TransportConnector transportConnector = brokerService.getTransportConnectors().get(0);

String failoverUrl = String.format("failover:(%s)?maxReconnectAttempts=10&initialReconnectDelay=100", transportConnector.getConnectUri());
String failoverUrl = String.format("failover:(%s)?maxReconnectAttempts=1", transportConnector.getConnectUri());

ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.start(null);
Expand Down Expand Up @@ -165,22 +165,6 @@ public boolean isSatisified() throws Exception {

transportConnector.start();

// Wait for failover to reconnect and recover() to succeed
// The ReconnectingXAResource should handle reconnection transparently
final XAResource resource = resources[0];
assertTrue("connection re-established and can recover", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
resource.recover(100);
return true;
} catch (Exception e) {
// Still reconnecting
return false;
}
}
}, 30000, 500));

// should recover ok
assertEquals("no pending transactions", 0, resources[0].recover(100).length);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,26 +565,13 @@ public void testAckMessageWithNoId() throws Exception {
received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(ack);

// Unsubscribe immediately after invalid ACK to prevent message redelivery
// while waiting for ERROR frame. This avoids race condition where message
// could be redelivered before ERROR is received.
StompFrame error = stompConnection.receive();
LOG.info("Received Frame: {}", error);
assertTrue("Expected ERROR but got: " + error.getAction(), error.getAction().equals("ERROR"));

String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub);

// Receive frames until we get the ERROR frame, ignoring any MESSAGE frames
// that arrive due to redelivery (especially relevant for SSL transport)
StompFrame error = null;
for (int i = 0; i < 5; i++) {
error = stompConnection.receive();
LOG.info("Received Frame: {}", error);
if (error.getAction().equals("ERROR")) {
break;
}
// If we get a MESSAGE, it's a redelivery - keep trying for ERROR
}
assertNotNull("Did not receive any frame", error);
assertTrue("Expected ERROR but got: " + error.getAction(), error.getAction().equals("ERROR"));
}

@Test(timeout = 60000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.activemq.transport.stomp;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand Down Expand Up @@ -159,32 +158,9 @@ public void testClientAckWithoutAckId() throws Exception {
String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);

// Unsubscribe immediately to prevent message redelivery while waiting for ERROR
String unsubscribe = "UNSUBSCRIBE\n" + "id:1\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsubscribe);

// Receive frames until we get the ERROR frame, ignoring any MESSAGE frames
// that arrive due to redelivery (especially relevant for SSL transport)
StompFrame error = null;
for (int i = 0; i < 5; i++) {
error = stompConnection.receive();
LOG.info("Broker sent: " + error);
if (error.getAction().equals("ERROR")) {
break;
}
// If we get a MESSAGE, it's a redelivery - keep trying for ERROR
}
assertNotNull("Did not receive any frame", error);
assertTrue("Expected ERROR but got: " + error.getAction(), error.getAction().equals("ERROR"));

// Re-subscribe to receive the message again and test correct ACK
stompConnection.sendFrame(subscribe);
receipt = stompConnection.receive();
assertTrue(receipt.getAction().startsWith("RECEIPT"));

received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
ackId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
assertTrue(received.getAction().equals("ERROR"));
LOG.info("Broker sent: " + received);

// Now place it in the correct location and check it still works.
frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2\n\n" + Stomp.NULL;
Expand Down
Loading
Loading