Skip to content

Commit 568735d

Browse files
iliya-grtabish121
authored andcommitted
ARTEMIS-5800 Fix AMQP session leak
When a connection is disconnected, it should be destroyed, otherwise the AMQP session will not be closed on the local close event, which can cause a session leak.
1 parent f988349 commit 568735d

2 files changed

Lines changed: 32 additions & 7 deletions

File tree

artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ public void destroy() {
144144

145145
@Override
146146
public void disconnect(boolean criticalError) {
147+
if (destroyed) {
148+
return;
149+
}
150+
151+
destroyed = true;
152+
147153
ErrorCondition errorCondition = new ErrorCondition();
148154
errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED);
149155
amqpConnection.close(errorCondition);

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,10 @@
1616
*/
1717
package org.apache.activemq.artemis.tests.integration.amqp;
1818

19-
import static org.junit.jupiter.api.Assertions.assertEquals;
20-
import static org.junit.jupiter.api.Assertions.assertNull;
21-
import static org.junit.jupiter.api.Assertions.assertNotNull;
22-
23-
import java.lang.invoke.MethodHandles;
24-
import java.util.concurrent.TimeUnit;
25-
2619
import org.apache.activemq.artemis.core.server.ServerSession;
2720
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
21+
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
22+
import org.apache.activemq.artemis.tests.util.Wait;
2823
import org.apache.activemq.transport.amqp.client.AmqpClient;
2924
import org.apache.activemq.transport.amqp.client.AmqpConnection;
3025
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
@@ -39,6 +34,13 @@
3934
import org.slf4j.Logger;
4035
import org.slf4j.LoggerFactory;
4136

37+
import java.lang.invoke.MethodHandles;
38+
import java.util.concurrent.TimeUnit;
39+
40+
import static org.junit.jupiter.api.Assertions.assertEquals;
41+
import static org.junit.jupiter.api.Assertions.assertNotNull;
42+
import static org.junit.jupiter.api.Assertions.assertNull;
43+
4244
public class AmqpSessionTest extends AmqpClientTestSupport {
4345

4446
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -113,6 +115,23 @@ public void testCreateSessionProducerConsumerDoesNotLeakClosable() throws Except
113115
connection.close();
114116
}
115117

118+
@Test
119+
public void testServerSessionCloseOnRemotingConnectionDisconnect() throws Exception {
120+
AmqpClient client = createAmqpClient();
121+
AmqpConnection connection = addConnection(client.connect());
122+
AmqpSession session = connection.createSession();
123+
124+
assertNotNull(session);
125+
126+
for (RemotingConnection remoteConnection : server.getRemotingService().getConnections()) {
127+
remoteConnection.disconnect(true);
128+
}
129+
130+
Wait.assertTrue(connection::isClosed);
131+
132+
assertEquals(0, server.getSessions().size());
133+
}
134+
116135
@Test
117136
public void testSessionClosedOnServerEndsClientSession() throws Exception {
118137
doTestSessionClosedOnServerEndsClientSession(false, false);

0 commit comments

Comments
 (0)