1919import static org .junit .jupiter .api .Assertions .assertNull ;
2020
2121import java .io .IOException ;
22+ import java .time .Duration ;
2223import java .util .concurrent .TimeoutException ;
2324
2425import org .junit .jupiter .api .AfterEach ;
2526import org .junit .jupiter .api .BeforeEach ;
2627import org .junit .jupiter .api .Test ;
2728
28- import com .rabbitmq .client .Channel ;
29- import com .rabbitmq .client .Connection ;
3029import com .rabbitmq .client .QueueingConsumer ;
3130import com .rabbitmq .client .test .BrokerTestCase ;
3231import org .junit .jupiter .api .TestInfo ;
@@ -35,34 +34,22 @@ public class MemoryAlarms extends BrokerTestCase {
3534
3635 private static final String Q = "Restart" ;
3736
38- private Connection connection2 ;
39- private Channel channel2 ;
37+ private static final Duration HEARTBEAT = Duration .ofSeconds (1 );
38+ // Must exceed 2x heartbeat to also verify the connection survives the heartbeat timeout
39+ private static final long DELIVERY_TIMEOUT_MS = HEARTBEAT .multipliedBy (3 ).plusMillis (100 ).toMillis ();
4040
4141 @ BeforeEach
4242 @ Override
4343 public void setUp (TestInfo info ) throws IOException , TimeoutException {
44- connectionFactory .setRequestedHeartbeat (1 );
44+ connectionFactory .setRequestedHeartbeat (( int ) HEARTBEAT . getSeconds () );
4545 super .setUp (info );
46- if (connection2 == null ) {
47- connection2 = connectionFactory .newConnection ();
48- }
49- channel2 = connection2 .createChannel ();
5046 }
5147
5248 @ AfterEach
5349 @ Override
5450 public void tearDown (TestInfo info ) throws IOException , TimeoutException {
5551 clearAllResourceAlarms ();
56- if (channel2 != null ) {
57- channel2 .abort ();
58- channel2 = null ;
59- }
60- if (connection2 != null ) {
61- connection2 .abort (10_000 );
62- connection2 = null ;
63- }
6452 super .tearDown (info );
65- connectionFactory .setRequestedHeartbeat (0 );
6653 }
6754
6855 @ Override
@@ -85,19 +72,18 @@ protected void releaseResources() throws IOException {
8572 String consumerTag = channel .basicConsume (Q , true , c );
8673 // publishes after an alarm should not go through
8774 basicPublishVolatile (Q );
88- // the publish is async, so this is racy. This also tests we don't die
89- // by heartbeat (3x heartbeat interval + epsilon)
90- assertNull (c .nextDelivery (3100 ));
75+ // the publish is async, so this is racy.
76+ // This also tests we don't die by heartbeat
77+ assertNull (c .nextDelivery (DELIVERY_TIMEOUT_MS ));
9178 // once the alarm has cleared the publishes should go through
9279 clearResourceAlarm ("memory" );
93- assertNotNull (c .nextDelivery (3100 ));
80+ assertNotNull (c .nextDelivery (DELIVERY_TIMEOUT_MS ));
9481 // everything should be back to normal
9582 channel .basicCancel (consumerTag );
9683 basicPublishVolatile (Q );
9784 assertNotNull (basicGet (Q ));
9885 }
9986
100-
10187 @ Test public void overlappingAlarmsFlowControl () throws IOException , InterruptedException {
10288 QueueingConsumer c = new QueueingConsumer (channel );
10389 String consumerTag = channel .basicConsume (Q , true , c );
@@ -111,12 +97,11 @@ protected void releaseResources() throws IOException {
11197 clearResourceAlarm ("memory" );
11298 assertNull (c .nextDelivery (100 ));
11399 clearResourceAlarm ("disk" );
114- assertNotNull (c .nextDelivery (3100 ));
100+ assertNotNull (c .nextDelivery (DELIVERY_TIMEOUT_MS ));
115101
116102 channel .basicCancel (consumerTag );
117103 basicPublishVolatile (Q );
118104 assertNotNull (basicGet (Q ));
119105 }
120106
121-
122107}
0 commit comments