Skip to content

Commit 5d6c8f9

Browse files
artembilanspring-builds
authored andcommitted
Fix race condition in the ExternalTxManagerSMLCTests
(cherry picked from commit 5c2b3ae)
1 parent 8209bbf commit 5d6c8f9

1 file changed

Lines changed: 14 additions & 15 deletions

File tree

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerSMLCTests.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@
4444
import static org.mockito.ArgumentMatchers.anyString;
4545
import static org.mockito.BDDMockito.given;
4646
import static org.mockito.BDDMockito.willAnswer;
47+
import static org.mockito.Mockito.atLeastOnce;
4748
import static org.mockito.Mockito.mock;
48-
import static org.mockito.Mockito.times;
4949
import static org.mockito.Mockito.verify;
5050

5151
/**
5252
* @author Gary Russell
5353
* @author Thomas Badie
54+
* @author Artem Bilan
55+
*
5456
* @since 2.0
5557
*
5658
*/
@@ -63,19 +65,18 @@ protected AbstractMessageListenerContainer createContainer(AbstractConnectionFac
6365
return container;
6466
}
6567

66-
6768
@Test
6869
public void testMessageListenerTxFail() throws Exception {
6970
ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true);
70-
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
71-
Connection mockConnection = mock(Connection.class);
72-
final Channel mockChannel = mock(Channel.class);
71+
ConnectionFactory mockConnectionFactory = mock();
72+
Connection mockConnection = mock();
73+
final Channel mockChannel = mock();
7374
given(mockChannel.isOpen()).willReturn(true);
74-
given(mockChannel.txSelect()).willReturn(mock(AMQP.Tx.SelectOk.class));
75-
final AtomicReference<CountDownLatch> commitLatch = new AtomicReference<>(new CountDownLatch(1));
75+
given(mockChannel.txSelect()).willReturn(mock());
76+
CountDownLatch commitLatch = new CountDownLatch(1);
7677
String exceptionMessage = "Failed to commit.";
7778
willAnswer(invocation -> {
78-
commitLatch.get().countDown();
79+
commitLatch.countDown();
7980
throw new IllegalStateException(exceptionMessage);
8081
}).given(mockChannel).txCommit();
8182

@@ -86,7 +87,7 @@ public void testMessageListenerTxFail() throws Exception {
8687

8788
willAnswer(invocation -> mockChannel).given(mockConnection).createChannel();
8889

89-
final AtomicReference<Consumer> consumer = new AtomicReference<Consumer>();
90+
final AtomicReference<Consumer> consumer = new AtomicReference<>();
9091
final CountDownLatch consumerLatch = new CountDownLatch(1);
9192

9293
willAnswer(invocation -> {
@@ -97,7 +98,6 @@ public void testMessageListenerTxFail() throws Exception {
9798
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
9899
any(Consumer.class));
99100

100-
101101
final CountDownLatch latch = new CountDownLatch(1);
102102
AbstractMessageListenerContainer container = createContainer(cachingConnectionFactory);
103103
container.setMessageListener(message -> {
@@ -112,7 +112,7 @@ public void testMessageListenerTxFail() throws Exception {
112112
container.setShutdownTimeout(100);
113113
DummyTxManager transactionManager = new DummyTxManager();
114114
container.setTransactionManager(transactionManager);
115-
ApplicationEventPublisher applicationEventPublisher = mock(ApplicationEventPublisher.class);
115+
ApplicationEventPublisher applicationEventPublisher = mock();
116116
final CountDownLatch applicationEventPublisherLatch = new CountDownLatch(1);
117117
willAnswer(invocation -> {
118118
if (invocation.getArgument(0) instanceof ListenerContainerConsumerFailedEvent) {
@@ -128,12 +128,12 @@ public void testMessageListenerTxFail() throws Exception {
128128

129129
consumer.get().handleDelivery("qux",
130130
new Envelope(1, false, "foo", "bar"), new AMQP.BasicProperties(),
131-
new byte[] { 0 });
131+
new byte[] {0});
132132

133133
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
134134

135-
verify(mockConnection, times(1)).createChannel();
136-
assertThat(commitLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
135+
verify(mockConnection, atLeastOnce()).createChannel();
136+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
137137
verify(mockChannel).basicAck(anyLong(), anyBoolean());
138138
verify(mockChannel).txCommit();
139139

@@ -150,5 +150,4 @@ public void testMessageListenerTxFail() throws Exception {
150150
container.stop();
151151
}
152152

153-
154153
}

0 commit comments

Comments
 (0)