Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 10 additions & 25 deletions src/test/java/com/rabbitmq/client/test/server/MemoryAlarms.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import static org.junit.jupiter.api.Assertions.assertNull;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.test.BrokerTestCase;
import org.junit.jupiter.api.TestInfo;
Expand All @@ -35,34 +34,22 @@ public class MemoryAlarms extends BrokerTestCase {

private static final String Q = "Restart";

private Connection connection2;
private Channel channel2;
private static final Duration HEARTBEAT = Duration.ofSeconds(1);
// Must exceed 2x heartbeat to also verify the connection survives the heartbeat timeout
private static final long DELIVERY_TIMEOUT_MS = HEARTBEAT.multipliedBy(3).plusMillis(100).toMillis();

@BeforeEach
@Override
public void setUp(TestInfo info) throws IOException, TimeoutException {
connectionFactory.setRequestedHeartbeat(1);
connectionFactory.setRequestedHeartbeat((int) HEARTBEAT.getSeconds());
super.setUp(info);
if (connection2 == null) {
connection2 = connectionFactory.newConnection();
}
channel2 = connection2.createChannel();
}

@AfterEach
@Override
public void tearDown(TestInfo info) throws IOException, TimeoutException {
clearAllResourceAlarms();
if (channel2 != null) {
channel2.abort();
channel2 = null;
}
if (connection2 != null) {
connection2.abort(10_000);
connection2 = null;
}
super.tearDown(info);
connectionFactory.setRequestedHeartbeat(0);
}

@Override
Expand All @@ -85,19 +72,18 @@ protected void releaseResources() throws IOException {
String consumerTag = channel.basicConsume(Q, true, c);
// publishes after an alarm should not go through
basicPublishVolatile(Q);
// the publish is async, so this is racy. This also tests we don't die
// by heartbeat (3x heartbeat interval + epsilon)
assertNull(c.nextDelivery(3100));
// the publish is async, so this is racy.
// This also tests we don't die by heartbeat
assertNull(c.nextDelivery(DELIVERY_TIMEOUT_MS));
// once the alarm has cleared the publishes should go through
clearResourceAlarm("memory");
assertNotNull(c.nextDelivery(3100));
assertNotNull(c.nextDelivery(DELIVERY_TIMEOUT_MS));
// everything should be back to normal
channel.basicCancel(consumerTag);
basicPublishVolatile(Q);
assertNotNull(basicGet(Q));
}


@Test public void overlappingAlarmsFlowControl() throws IOException, InterruptedException {
QueueingConsumer c = new QueueingConsumer(channel);
String consumerTag = channel.basicConsume(Q, true, c);
Expand All @@ -111,12 +97,11 @@ protected void releaseResources() throws IOException {
clearResourceAlarm("memory");
assertNull(c.nextDelivery(100));
clearResourceAlarm("disk");
assertNotNull(c.nextDelivery(3100));
assertNotNull(c.nextDelivery(DELIVERY_TIMEOUT_MS));

channel.basicCancel(consumerTag);
basicPublishVolatile(Q);
assertNotNull(basicGet(Q));
}


}
Loading