Skip to content

Commit 8a9ed76

Browse files
committed
feat(#1716): Implement receiveBody()
1 parent 8ffd468 commit 8a9ed76

File tree

3 files changed

+412
-16
lines changed

3 files changed

+412
-16
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,26 @@
1919
import jakarta.jms.JMSConsumer;
2020
import jakarta.jms.JMSException;
2121
import jakarta.jms.JMSRuntimeException;
22-
import jakarta.jms.Message;
2322
import jakarta.jms.MessageConsumer;
2423
import jakarta.jms.MessageListener;
24+
import jakarta.jms.Message;
2525

2626
import org.apache.activemq.util.JMSExceptionSupport;
2727

2828
public class ActiveMQConsumer implements JMSConsumer {
29-
29+
3030
private final ActiveMQContext activemqContext;
31-
private final MessageConsumer activemqMessageConsumer;
31+
private final ActiveMQMessageConsumer consumer;
3232

33-
ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer activemqMessageConsumer) {
33+
ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer consumer) {
3434
this.activemqContext = activemqContext;
35-
this.activemqMessageConsumer = activemqMessageConsumer;
35+
this.consumer = (ActiveMQMessageConsumer) consumer;
3636
}
3737

3838
@Override
3939
public String getMessageSelector() {
4040
try {
41-
return activemqMessageConsumer.getMessageSelector();
41+
return consumer.getMessageSelector();
4242
} catch (JMSException e) {
4343
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
4444
}
@@ -47,7 +47,7 @@ public String getMessageSelector() {
4747
@Override
4848
public MessageListener getMessageListener() throws JMSRuntimeException {
4949
try {
50-
return activemqMessageConsumer.getMessageListener();
50+
return consumer.getMessageListener();
5151
} catch (JMSException e) {
5252
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
5353
}
@@ -56,7 +56,7 @@ public MessageListener getMessageListener() throws JMSRuntimeException {
5656
@Override
5757
public void setMessageListener(MessageListener listener) throws JMSRuntimeException {
5858
try {
59-
activemqMessageConsumer.setMessageListener(listener);
59+
consumer.setMessageListener(listener);
6060
} catch (JMSException e) {
6161
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
6262
}
@@ -65,7 +65,7 @@ public void setMessageListener(MessageListener listener) throws JMSRuntimeExcept
6565
@Override
6666
public Message receive() {
6767
try {
68-
return activemqMessageConsumer.receive();
68+
return consumer.receive();
6969
} catch (JMSException e) {
7070
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
7171
}
@@ -74,7 +74,7 @@ public Message receive() {
7474
@Override
7575
public Message receive(long timeout) {
7676
try {
77-
return activemqMessageConsumer.receive(timeout);
77+
return consumer.receive(timeout);
7878
} catch (JMSException e) {
7979
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
8080
}
@@ -83,7 +83,7 @@ public Message receive(long timeout) {
8383
@Override
8484
public Message receiveNoWait() {
8585
try {
86-
return activemqMessageConsumer.receiveNoWait();
86+
return consumer.receiveNoWait();
8787
} catch (JMSException e) {
8888
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
8989
}
@@ -92,25 +92,37 @@ public Message receiveNoWait() {
9292
@Override
9393
public void close() {
9494
try {
95-
activemqMessageConsumer.close();
95+
consumer.close();
9696
} catch (JMSException e) {
9797
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
98-
}
98+
}
9999
}
100100

101101
@Override
102102
public <T> T receiveBody(Class<T> c) {
103-
throw new UnsupportedOperationException("receiveBody(Class<T>) is not supported");
103+
try {
104+
return consumer.receiveBody(c);
105+
} catch (JMSException e) {
106+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
107+
}
104108
}
105109

106110
@Override
107111
public <T> T receiveBody(Class<T> c, long timeout) {
108-
throw new UnsupportedOperationException("receiveBody(Class<T>, long) is not supported");
112+
try {
113+
return consumer.receiveBody(c, timeout);
114+
} catch (JMSException e) {
115+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
116+
}
109117
}
110118

111119
@Override
112120
public <T> T receiveBodyNoWait(Class<T> c) {
113-
throw new UnsupportedOperationException("receiveBodyNoWait(Class<T>) is not supported");
121+
try {
122+
return consumer.receiveBodyNoWait(c);
123+
} catch (JMSException e) {
124+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
125+
}
114126
}
115127

116128
}

activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import jakarta.jms.JMSException;
3636
import jakarta.jms.Message;
3737
import jakarta.jms.MessageConsumer;
38+
import jakarta.jms.MessageFormatException;
3839
import jakarta.jms.MessageListener;
3940
import jakarta.jms.TransactionRolledBackException;
4041

@@ -713,6 +714,144 @@ public Message receiveNoWait() throws JMSException {
713714
return createActiveMQMessage(md);
714715
}
715716

717+
/**
718+
* Receives the next message produced for this message consumer and returns
719+
* its body as an object of the specified type. This call blocks
720+
* indefinitely until a message is produced or until this message consumer
721+
* is closed.
722+
* <p>
723+
* If the message is not of a type for which the body can be assigned to
724+
* the specified type, a {@code MessageFormatException} is thrown and the
725+
* message is not acknowledged. It may be delivered again when a subsequent
726+
* {@code receive} or {@code receiveBody} call is made.
727+
*
728+
* @param c the type to which the body of the next message should be
729+
* assigned
730+
* @return the body of the next message, or null if this message consumer
731+
* is concurrently closed
732+
* @throws MessageFormatException if the message body cannot be assigned to
733+
* the specified type
734+
* @throws JMSException if the JMS provider fails to receive the next
735+
* message due to some internal error
736+
*/
737+
public <T> T receiveBody(Class<T> c) throws JMSException {
738+
checkClosed();
739+
checkMessageListener();
740+
741+
sendPullCommand(0);
742+
MessageDispatch md = dequeue(-1);
743+
if (md == null) {
744+
return null;
745+
}
746+
747+
return doReceiveBody(md, c);
748+
}
749+
750+
/**
751+
* Receives the next message produced for this message consumer and returns
752+
* its body as an object of the specified type, blocking up to the
753+
* specified timeout. A {@code timeout} of zero never expires and the call
754+
* blocks indefinitely.
755+
* <p>
756+
* If the message is not of a type for which the body can be assigned to
757+
* the specified type, a {@code MessageFormatException} is thrown and the
758+
* message is not acknowledged. It may be delivered again when a subsequent
759+
* {@code receive} or {@code receiveBody} call is made.
760+
*
761+
* @param c the type to which the body of the next message should be
762+
* assigned
763+
* @param timeout the timeout value (in milliseconds), a timeout of zero
764+
* never expires
765+
* @return the body of the next message, or null if the timeout expires or
766+
* this message consumer is concurrently closed
767+
* @throws MessageFormatException if the message body cannot be assigned to
768+
* the specified type
769+
* @throws JMSException if the JMS provider fails to receive the next
770+
* message due to some internal error
771+
*/
772+
public <T> T receiveBody(Class<T> c, long timeout) throws JMSException {
773+
checkClosed();
774+
checkMessageListener();
775+
if (timeout == 0) {
776+
return this.receiveBody(c);
777+
}
778+
779+
sendPullCommand(timeout);
780+
while (timeout > 0) {
781+
MessageDispatch md;
782+
if (info.getPrefetchSize() == 0) {
783+
md = dequeue(-1);
784+
} else {
785+
md = dequeue(timeout);
786+
}
787+
788+
if (md == null) {
789+
return null;
790+
}
791+
792+
return doReceiveBody(md, c);
793+
}
794+
return null;
795+
}
796+
797+
/**
798+
* Receives the next message produced for this message consumer and returns
799+
* its body as an object of the specified type if one is immediately
800+
* available.
801+
* <p>
802+
* If the message is not of a type for which the body can be assigned to
803+
* the specified type, a {@code MessageFormatException} is thrown and the
804+
* message is not acknowledged. It may be delivered again when a subsequent
805+
* {@code receive} or {@code receiveBody} call is made.
806+
*
807+
* @param c the type to which the body of the next message should be
808+
* assigned
809+
* @return the body of the next message, or null if one is not immediately
810+
* available
811+
* @throws MessageFormatException if the message body cannot be assigned to
812+
* the specified type
813+
* @throws JMSException if the JMS provider fails to receive the next
814+
* message due to some internal error
815+
*/
816+
public <T> T receiveBodyNoWait(Class<T> c) throws JMSException {
817+
checkClosed();
818+
checkMessageListener();
819+
sendPullCommand(-1);
820+
821+
MessageDispatch md;
822+
if (info.getPrefetchSize() == 0) {
823+
md = dequeue(-1);
824+
} else {
825+
md = dequeue(0);
826+
}
827+
828+
if (md == null) {
829+
return null;
830+
}
831+
832+
return doReceiveBody(md, c);
833+
}
834+
835+
/**
836+
* Checks that the message body can be assigned to the requested type,
837+
* acknowledges the message, and returns its body. If the body cannot be
838+
* assigned, the message is re-enqueued without acknowledgement so that it
839+
* remains available for a subsequent {@code receive} or
840+
* {@code receiveBody} call.
841+
*/
842+
private <T> T doReceiveBody(MessageDispatch md, Class<T> c) throws JMSException {
843+
ActiveMQMessage message = createActiveMQMessage(md);
844+
if (!message.isBodyAssignableTo(c)) {
845+
// spec: message is not acknowledged and left for redelivery
846+
unconsumedMessages.enqueueFirst(md);
847+
throw new MessageFormatException("Message body cannot be read as type: " + c);
848+
}
849+
850+
beforeMessageIsConsumed(md);
851+
afterMessageIsConsumed(md, false);
852+
return message.getBody(c);
853+
}
854+
716855
/**
717856
* Closes the message consumer.
718857
* <P>

0 commit comments

Comments
 (0)