Skip to content

Commit 277b1e5

Browse files
committed
AMQ-8464: Implement receiveBody methods in the Consumer
1 parent 5f394c1 commit 277b1e5

2 files changed

Lines changed: 240 additions & 3 deletions

File tree

activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,17 +100,44 @@ public void close() {
100100

101101
@Override
102102
public <T> T receiveBody(Class<T> c) {
103-
throw new UnsupportedOperationException("receiveBody(Class<T>) is not supported");
103+
Message message = receive();
104+
if (message == null) {
105+
return null;
106+
}
107+
try {
108+
Object body = message.getBody(c);
109+
return c.cast(body);
110+
} catch (JMSException e) {
111+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
112+
}
104113
}
105114

106115
@Override
107116
public <T> T receiveBody(Class<T> c, long timeout) {
108-
throw new UnsupportedOperationException("receiveBody(Class<T>, long) is not supported");
117+
Message message = receive(timeout);
118+
if (message == null) {
119+
return null;
120+
}
121+
try {
122+
Object body = message.getBody(c);
123+
return c.cast(body);
124+
} catch (JMSException e) {
125+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
126+
}
109127
}
110128

111129
@Override
112130
public <T> T receiveBodyNoWait(Class<T> c) {
113-
throw new UnsupportedOperationException("receiveBodyNoWait(Class<T>) is not supported");
131+
Message message = receiveNoWait();
132+
if (message == null) {
133+
return null;
134+
}
135+
try {
136+
Object body = message.getBody(c);
137+
return c.cast(body);
138+
} catch (JMSException e) {
139+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
140+
}
114141
}
115142

116143
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq;
18+
19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertNull;
22+
import static org.junit.Assert.assertTrue;
23+
import static org.junit.Assert.fail;
24+
25+
import jakarta.jms.JMSContext;
26+
import jakarta.jms.JMSRuntimeException;
27+
import jakarta.jms.MessageFormatException;
28+
import jakarta.jms.Queue;
29+
30+
import org.apache.activemq.ActiveMQConnectionFactory;
31+
import org.junit.After;
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
35+
public class ActiveMQConsumerTest {
36+
37+
private ActiveMQConnectionFactory connectionFactory;
38+
private JMSContext jmsContext;
39+
private Queue testQueue;
40+
41+
@Before
42+
public void setUp() throws Exception {
43+
connectionFactory = new ActiveMQConnectionFactory("vm://localhost?marshal=false&broker.persistent=false");
44+
jmsContext = connectionFactory.createContext();
45+
jmsContext.start();
46+
testQueue = jmsContext.createQueue("test.queue.receiveBody");
47+
}
48+
49+
@After
50+
public void tearDown() {
51+
if (jmsContext != null) {
52+
jmsContext.close();
53+
}
54+
}
55+
56+
@Test
57+
public void testReceiveBody() {
58+
// Send a message
59+
String testMessage = "Test message body";
60+
jmsContext.createProducer().send(testQueue, testMessage);
61+
62+
// Receive body using receiveBody(Class)
63+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
64+
String body = consumer.receiveBody(String.class);
65+
assertNotNull("Received body should not be null", body);
66+
assertEquals("Received body should match sent message", testMessage, body);
67+
}
68+
}
69+
70+
@Test
71+
public void testReceiveBodyWithTimeout() {
72+
// Send a message
73+
String testMessage = "Test message body with timeout";
74+
jmsContext.createProducer().send(testQueue, testMessage);
75+
76+
// Receive body using receiveBody(Class, long)
77+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
78+
String body = consumer.receiveBody(String.class, 5000L);
79+
assertNotNull("Received body should not be null", body);
80+
assertEquals("Received body should match sent message", testMessage, body);
81+
}
82+
}
83+
84+
@Test
85+
public void testReceiveBodyNoWait() {
86+
// Send a message
87+
String testMessage = "Test message body no wait";
88+
jmsContext.createProducer().send(testQueue, testMessage);
89+
90+
// Receive body using receiveBodyNoWait(Class)
91+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
92+
String body = consumer.receiveBodyNoWait(String.class);
93+
assertNotNull("Received body should not be null", body);
94+
assertEquals("Received body should match sent message", testMessage, body);
95+
}
96+
}
97+
98+
@Test
99+
public void testReceiveBodyReturnsNullWhenNoMessage() {
100+
// Don't send any message
101+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
102+
// Use a short timeout to avoid blocking too long
103+
String body = consumer.receiveBody(String.class, 100L);
104+
assertNull("Received body should be null when no message available", body);
105+
}
106+
}
107+
108+
@Test
109+
public void testReceiveBodyNoWaitReturnsNullWhenNoMessage() {
110+
// Don't send any message
111+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
112+
String body = consumer.receiveBodyNoWait(String.class);
113+
assertNull("Received body should be null when no message available", body);
114+
}
115+
}
116+
117+
@Test
118+
public void testReceiveBodyWithWrongType() {
119+
// Send a text message
120+
String testMessage = "Test message";
121+
jmsContext.createProducer().send(testQueue, testMessage);
122+
123+
// Try to receive as wrong type
124+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
125+
try {
126+
consumer.receiveBody(Integer.class);
127+
fail("Should throw JMSRuntimeException for wrong type");
128+
} catch (JMSRuntimeException e) {
129+
// Expected - MessageFormatException wrapped in JMSRuntimeException
130+
assertNotNull("Exception should have a cause", e.getCause());
131+
assertTrue("Cause should be MessageFormatException",
132+
e.getCause() instanceof MessageFormatException);
133+
}
134+
}
135+
}
136+
137+
@Test
138+
public void testReceiveBodyWithTimeoutWithWrongType() {
139+
// Send a text message
140+
String testMessage = "Test message";
141+
jmsContext.createProducer().send(testQueue, testMessage);
142+
143+
// Try to receive as wrong type
144+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
145+
try {
146+
consumer.receiveBody(Integer.class, 5000L);
147+
fail("Should throw JMSRuntimeException for wrong type");
148+
} catch (JMSRuntimeException e) {
149+
// Expected - MessageFormatException wrapped in JMSRuntimeException
150+
assertNotNull("Exception should have a cause", e.getCause());
151+
assertTrue("Cause should be MessageFormatException",
152+
e.getCause() instanceof MessageFormatException);
153+
}
154+
}
155+
}
156+
157+
@Test
158+
public void testReceiveBodyNoWaitWithWrongType() {
159+
// Send a text message
160+
String testMessage = "Test message";
161+
jmsContext.createProducer().send(testQueue, testMessage);
162+
163+
// Try to receive as wrong type
164+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
165+
try {
166+
consumer.receiveBodyNoWait(Integer.class);
167+
fail("Should throw JMSRuntimeException for wrong type");
168+
} catch (JMSRuntimeException e) {
169+
// Expected - MessageFormatException wrapped in JMSRuntimeException
170+
assertNotNull("Exception should have a cause", e.getCause());
171+
assertTrue("Cause should be MessageFormatException",
172+
e.getCause() instanceof MessageFormatException);
173+
}
174+
}
175+
}
176+
177+
@Test
178+
public void testReceiveBodyMultipleMessages() {
179+
// Send multiple messages
180+
String[] messages = {"Message 1", "Message 2", "Message 3"};
181+
for (String msg : messages) {
182+
jmsContext.createProducer().send(testQueue, msg);
183+
}
184+
185+
// Receive all messages using receiveBody
186+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
187+
for (String expected : messages) {
188+
String body = consumer.receiveBody(String.class);
189+
assertNotNull("Received body should not be null", body);
190+
assertEquals("Received body should match sent message", expected, body);
191+
}
192+
}
193+
}
194+
195+
@Test
196+
public void testReceiveBodyWithTimeoutExpires() {
197+
// Don't send any message
198+
try (jakarta.jms.JMSConsumer consumer = jmsContext.createConsumer(testQueue)) {
199+
// Use a short timeout
200+
long startTime = System.currentTimeMillis();
201+
String body = consumer.receiveBody(String.class, 200L);
202+
long elapsed = System.currentTimeMillis() - startTime;
203+
204+
assertNull("Received body should be null when timeout expires", body);
205+
// Verify it actually waited (at least 100ms, but not too long)
206+
assertTrue("Should have waited at least some time", elapsed >= 100);
207+
}
208+
}
209+
}
210+

0 commit comments

Comments
 (0)