@@ -41,10 +41,6 @@ public class MemoryAlarms extends BrokerTestCase {
4141 @ BeforeEach
4242 @ Override
4343 public void setUp (TestInfo info ) throws IOException , TimeoutException {
44- // only flowControl verifies that heartbeat timeouts do not kill a blocked connection
45- if (info .getTestMethod ().map (m -> m .getName ().equals ("flowControl" )).orElse (false )) {
46- connectionFactory .setRequestedHeartbeat (1 );
47- }
4844 super .setUp (info );
4945 if (connection2 == null ) {
5046 connection2 = connectionFactory .newConnection ();
@@ -65,7 +61,6 @@ public void tearDown(TestInfo info) throws IOException, TimeoutException {
6561 connection2 = null ;
6662 }
6763 super .tearDown (info );
68- connectionFactory .setRequestedHeartbeat (0 );
6964 }
7065
7166 @ Override
@@ -80,24 +75,33 @@ protected void releaseResources() throws IOException {
8075 }
8176
8277 @ Test public void flowControl () throws IOException , InterruptedException {
83- basicPublishVolatile (Q );
84- setResourceAlarm ("memory" );
85- // non-publish actions only after an alarm should be fine
86- assertNotNull (basicGet (Q ));
87- QueueingConsumer c = new QueueingConsumer (channel );
88- String consumerTag = channel .basicConsume (Q , true , c );
89- // publishes after an alarm should not go through
90- basicPublishVolatile (Q );
91- // the publish is async, so this is racy. This also tests we don't die
92- // by heartbeat (3x heartbeat interval + epsilon)
93- assertNull (c .nextDelivery (3100 ));
94- // once the alarm has cleared the publishes should go through
95- clearResourceAlarm ("memory" );
96- assertNotNull (c .nextDelivery (3100 ));
97- // everything should be back to normal
98- channel .basicCancel (consumerTag );
99- basicPublishVolatile (Q );
100- assertNotNull (basicGet (Q ));
78+ connectionFactory .setRequestedHeartbeat (1 );
79+ closeChannel ();
80+ closeConnection ();
81+ openConnection ();
82+ openChannel ();
83+ try {
84+ basicPublishVolatile (Q );
85+ setResourceAlarm ("memory" );
86+ // non-publish actions only after an alarm should be fine
87+ assertNotNull (basicGet (Q ));
88+ QueueingConsumer c = new QueueingConsumer (channel );
89+ String consumerTag = channel .basicConsume (Q , true , c );
90+ // publishes after an alarm should not go through
91+ basicPublishVolatile (Q );
92+ // the publish is async, so this is racy. This also tests we don't die
93+ // by heartbeat (3x heartbeat interval + epsilon)
94+ assertNull (c .nextDelivery (3100 ));
95+ // once the alarm has cleared the publishes should go through
96+ clearResourceAlarm ("memory" );
97+ assertNotNull (c .nextDelivery (3100 ));
98+ // everything should be back to normal
99+ channel .basicCancel (consumerTag );
100+ basicPublishVolatile (Q );
101+ assertNotNull (basicGet (Q ));
102+ } finally {
103+ connectionFactory .setRequestedHeartbeat (0 );
104+ }
101105 }
102106
103107
0 commit comments