Skip to content

Commit d2fde0e

Browse files
committed
feat(#1716): Implement receiveBody()
Implement JMS 2.0 / Jakarta Messaging 3.1 receiveBody() methods on ActiveMQMessageConsumer and wire them through ActiveMQConsumer (JMSConsumer). The doReceiveBody() helper handles ack-mode-dependent failure behavior per section 8.6 of the Jakarta Messaging 3.1 specification: - AUTO_ACK / DUPS_OK: re-enqueue without acknowledgement - CLIENT_ACK / TRANSACTED: treat as delivered (credit window expansion) Plain Message and StreamMessage types are always rejected with MessageFormatException, per spec (even though Message.isBodyAssignableTo returns true for any type). ActiveMQObjectMessage.isBodyAssignableTo now returns false on deserialization failure instead of propagating the exception.
1 parent 8ffd468 commit d2fde0e

File tree

4 files changed

+754
-18
lines changed

4 files changed

+754
-18
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,26 @@
1919
import jakarta.jms.JMSConsumer;
2020
import jakarta.jms.JMSException;
2121
import jakarta.jms.JMSRuntimeException;
22-
import jakarta.jms.Message;
2322
import jakarta.jms.MessageConsumer;
2423
import jakarta.jms.MessageListener;
24+
import jakarta.jms.Message;
2525

2626
import org.apache.activemq.util.JMSExceptionSupport;
2727

2828
public class ActiveMQConsumer implements JMSConsumer {
29-
29+
3030
private final ActiveMQContext activemqContext;
31-
private final MessageConsumer activemqMessageConsumer;
31+
private final ActiveMQMessageConsumer consumer;
3232

33-
ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer activemqMessageConsumer) {
33+
ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer consumer) {
3434
this.activemqContext = activemqContext;
35-
this.activemqMessageConsumer = activemqMessageConsumer;
35+
this.consumer = (ActiveMQMessageConsumer) consumer;
3636
}
3737

3838
@Override
3939
public String getMessageSelector() {
4040
try {
41-
return activemqMessageConsumer.getMessageSelector();
41+
return consumer.getMessageSelector();
4242
} catch (JMSException e) {
4343
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
4444
}
@@ -47,7 +47,7 @@ public String getMessageSelector() {
4747
@Override
4848
public MessageListener getMessageListener() throws JMSRuntimeException {
4949
try {
50-
return activemqMessageConsumer.getMessageListener();
50+
return consumer.getMessageListener();
5151
} catch (JMSException e) {
5252
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
5353
}
@@ -56,7 +56,7 @@ public MessageListener getMessageListener() throws JMSRuntimeException {
5656
@Override
5757
public void setMessageListener(MessageListener listener) throws JMSRuntimeException {
5858
try {
59-
activemqMessageConsumer.setMessageListener(listener);
59+
consumer.setMessageListener(listener);
6060
} catch (JMSException e) {
6161
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
6262
}
@@ -65,7 +65,7 @@ public void setMessageListener(MessageListener listener) throws JMSRuntimeExcept
6565
@Override
6666
public Message receive() {
6767
try {
68-
return activemqMessageConsumer.receive();
68+
return consumer.receive();
6969
} catch (JMSException e) {
7070
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
7171
}
@@ -74,7 +74,7 @@ public Message receive() {
7474
@Override
7575
public Message receive(long timeout) {
7676
try {
77-
return activemqMessageConsumer.receive(timeout);
77+
return consumer.receive(timeout);
7878
} catch (JMSException e) {
7979
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
8080
}
@@ -83,7 +83,7 @@ public Message receive(long timeout) {
8383
@Override
8484
public Message receiveNoWait() {
8585
try {
86-
return activemqMessageConsumer.receiveNoWait();
86+
return consumer.receiveNoWait();
8787
} catch (JMSException e) {
8888
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
8989
}
@@ -92,25 +92,37 @@ public Message receiveNoWait() {
9292
@Override
9393
public void close() {
9494
try {
95-
activemqMessageConsumer.close();
95+
consumer.close();
9696
} catch (JMSException e) {
9797
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
98-
}
98+
}
9999
}
100100

101101
@Override
102102
public <T> T receiveBody(Class<T> c) {
103-
throw new UnsupportedOperationException("receiveBody(Class<T>) is not supported");
103+
try {
104+
return consumer.receiveBody(c);
105+
} catch (JMSException e) {
106+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
107+
}
104108
}
105109

106110
@Override
107111
public <T> T receiveBody(Class<T> c, long timeout) {
108-
throw new UnsupportedOperationException("receiveBody(Class<T>, long) is not supported");
112+
try {
113+
return consumer.receiveBody(c, timeout);
114+
} catch (JMSException e) {
115+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
116+
}
109117
}
110118

111119
@Override
112120
public <T> T receiveBodyNoWait(Class<T> c) {
113-
throw new UnsupportedOperationException("receiveBodyNoWait(Class<T>) is not supported");
121+
try {
122+
return consumer.receiveBodyNoWait(c);
123+
} catch (JMSException e) {
124+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
125+
}
114126
}
115127

116128
}

activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import jakarta.jms.JMSException;
3636
import jakarta.jms.Message;
3737
import jakarta.jms.MessageConsumer;
38+
import jakarta.jms.MessageFormatException;
3839
import jakarta.jms.MessageListener;
3940
import jakarta.jms.TransactionRolledBackException;
4041

@@ -43,6 +44,7 @@
4344
import org.apache.activemq.command.ActiveMQDestination;
4445
import org.apache.activemq.command.ActiveMQMessage;
4546
import org.apache.activemq.command.ActiveMQObjectMessage;
47+
import org.apache.activemq.command.ActiveMQStreamMessage;
4648
import org.apache.activemq.command.ActiveMQTempDestination;
4749
import org.apache.activemq.command.CommandTypes;
4850
import org.apache.activemq.command.ConsumerId;
@@ -713,6 +715,221 @@ public Message receiveNoWait() throws JMSException {
713715
return createActiveMQMessage(md);
714716
}
715717

718+
/**
719+
* Receives the next message produced for this message consumer and returns
720+
* its body as an object of the specified type. This call blocks
721+
* indefinitely until a message is produced or until this message consumer
722+
* is closed.
723+
* <p>
724+
* If the message is not of a type for which the body can be assigned to
725+
* the specified type, a {@code MessageFormatException} is thrown. The
726+
* subsequent behaviour depends on the session's acknowledge mode:
727+
* <ul>
728+
* <li>{@code AUTO_ACKNOWLEDGE} / {@code DUPS_OK_ACKNOWLEDGE}: the
729+
* message is not acknowledged and will be delivered again before any
730+
* subsequent messages. This is not considered redelivery and does not
731+
* cause the {@code JMSRedelivered} header or
732+
* {@code JMSXDeliveryCount} property to be updated.</li>
733+
* <li>{@code CLIENT_ACKNOWLEDGE}: the message is treated as delivered.
734+
* The application must call {@code session.recover()} to have it
735+
* redelivered.</li>
736+
* <li>Transacted session: the message is treated as delivered within the
737+
* transaction. The application must call {@code session.rollback()}
738+
* to have it redelivered.</li>
739+
* </ul>
740+
* <p>
741+
* This method cannot be used to receive {@code Message} or
742+
* {@code StreamMessage} objects; a {@code MessageFormatException} will
743+
* always be thrown for these types.
744+
*
745+
* @param c the type to which the body of the next message should be
746+
* assigned
747+
* @return the body of the next message, or null if this message consumer
748+
* is concurrently closed
749+
* @throws MessageFormatException if the message body cannot be assigned to
750+
* the specified type, or if the message is a {@code Message} or
751+
* {@code StreamMessage}
752+
* @throws JMSException if the JMS provider fails to receive the next
753+
* message due to some internal error
754+
*/
755+
public <T> T receiveBody(Class<T> c) throws JMSException {
756+
checkClosed();
757+
checkMessageListener();
758+
759+
sendPullCommand(0);
760+
MessageDispatch md = dequeue(-1);
761+
if (md == null) {
762+
return null;
763+
}
764+
765+
return doReceiveBody(md, c);
766+
}
767+
768+
/**
769+
* Receives the next message produced for this message consumer and returns
770+
* its body as an object of the specified type, blocking up to the
771+
* specified timeout. A {@code timeout} of zero never expires and the call
772+
* blocks indefinitely.
773+
* <p>
774+
* If the message is not of a type for which the body can be assigned to
775+
* the specified type, a {@code MessageFormatException} is thrown. The
776+
* subsequent behaviour depends on the session's acknowledge mode:
777+
* <ul>
778+
* <li>{@code AUTO_ACKNOWLEDGE} / {@code DUPS_OK_ACKNOWLEDGE}: the
779+
* message is not acknowledged and will be delivered again before any
780+
* subsequent messages. This is not considered redelivery and does not
781+
* cause the {@code JMSRedelivered} header or
782+
* {@code JMSXDeliveryCount} property to be updated.</li>
783+
* <li>{@code CLIENT_ACKNOWLEDGE}: the message is treated as delivered.
784+
* The application must call {@code session.recover()} to have it
785+
* redelivered.</li>
786+
* <li>Transacted session: the message is treated as delivered within the
787+
* transaction. The application must call {@code session.rollback()}
788+
* to have it redelivered.</li>
789+
* </ul>
790+
* <p>
791+
* This method cannot be used to receive {@code Message} or
792+
* {@code StreamMessage} objects; a {@code MessageFormatException} will
793+
* always be thrown for these types.
794+
*
795+
* @param c the type to which the body of the next message should be
796+
* assigned
797+
* @param timeout the timeout value (in milliseconds), a timeout of zero
798+
* never expires
799+
* @return the body of the next message, or null if the timeout expires or
800+
* this message consumer is concurrently closed
801+
* @throws MessageFormatException if the message body cannot be assigned to
802+
* the specified type, or if the message is a {@code Message} or
803+
* {@code StreamMessage}
804+
* @throws JMSException if the JMS provider fails to receive the next
805+
* message due to some internal error
806+
*/
807+
public <T> T receiveBody(Class<T> c, long timeout) throws JMSException {
808+
checkClosed();
809+
checkMessageListener();
810+
if (timeout == 0) {
811+
return this.receiveBody(c);
812+
}
813+
814+
sendPullCommand(timeout);
815+
while (timeout > 0) {
816+
MessageDispatch md;
817+
if (info.getPrefetchSize() == 0) {
818+
md = dequeue(-1);
819+
} else {
820+
md = dequeue(timeout);
821+
}
822+
823+
if (md == null) {
824+
return null;
825+
}
826+
827+
return doReceiveBody(md, c);
828+
}
829+
return null;
830+
}
831+
832+
/**
833+
* Receives the next message produced for this message consumer and returns
834+
* its body as an object of the specified type if one is immediately
835+
* available.
836+
* <p>
837+
* If the message is not of a type for which the body can be assigned to
838+
* the specified type, a {@code MessageFormatException} is thrown. The
839+
* subsequent behaviour depends on the session's acknowledge mode:
840+
* <ul>
841+
* <li>{@code AUTO_ACKNOWLEDGE} / {@code DUPS_OK_ACKNOWLEDGE}: the
842+
* message is not acknowledged and will be delivered again before any
843+
* subsequent messages. This is not considered redelivery and does not
844+
* cause the {@code JMSRedelivered} header or
845+
* {@code JMSXDeliveryCount} property to be updated.</li>
846+
* <li>{@code CLIENT_ACKNOWLEDGE}: the message is treated as delivered.
847+
* The application must call {@code session.recover()} to have it
848+
* redelivered.</li>
849+
* <li>Transacted session: the message is treated as delivered within the
850+
* transaction. The application must call {@code session.rollback()}
851+
* to have it redelivered.</li>
852+
* </ul>
853+
* <p>
854+
* This method cannot be used to receive {@code Message} or
855+
* {@code StreamMessage} objects; a {@code MessageFormatException} will
856+
* always be thrown for these types.
857+
*
858+
* @param c the type to which the body of the next message should be
859+
* assigned
860+
* @return the body of the next message, or null if one is not immediately
861+
* available
862+
* @throws MessageFormatException if the message body cannot be assigned to
863+
* the specified type, or if the message is a {@code Message} or
864+
* {@code StreamMessage}
865+
* @throws JMSException if the JMS provider fails to receive the next
866+
* message due to some internal error
867+
*/
868+
public <T> T receiveBodyNoWait(Class<T> c) throws JMSException {
869+
checkClosed();
870+
checkMessageListener();
871+
sendPullCommand(-1);
872+
873+
MessageDispatch md;
874+
if (info.getPrefetchSize() == 0) {
875+
md = dequeue(-1);
876+
} else {
877+
md = dequeue(0);
878+
}
879+
880+
if (md == null) {
881+
return null;
882+
}
883+
884+
return doReceiveBody(md, c);
885+
}
886+
887+
/**
888+
* Checks that the message body can be assigned to the requested type,
889+
* acknowledges the message, and returns its body. If the body cannot be
890+
* assigned, the handling depends on the session's acknowledge mode:
891+
* <ul>
892+
* <li>AUTO_ACKNOWLEDGE / DUPS_OK_ACKNOWLEDGE: the message is re-enqueued
893+
* without acknowledgement so that it remains available for a subsequent
894+
* {@code receive} or {@code receiveBody} call.</li>
895+
* <li>CLIENT_ACKNOWLEDGE / TRANSACTED: the message is treated as delivered
896+
* (not re-enqueued). The application may call {@code session.recover()}
897+
* or {@code session.rollback()} respectively to redeliver.</li>
898+
* </ul>
899+
* <p>
900+
* Per Jakarta Messaging 3.1, {@code receiveBody} must always throw
901+
* {@code MessageFormatException} for plain {@code Message} and
902+
* {@code StreamMessage} types, regardless of what
903+
* {@code isBodyAssignableTo} returns.
904+
*/
905+
private <T> T doReceiveBody(MessageDispatch md, Class<T> c) throws JMSException {
906+
ActiveMQMessage message = createActiveMQMessage(md);
907+
908+
// Jakarta Messaging 3.1: receiveBody must always fail for Message and StreamMessage.
909+
// Note: Message.isBodyAssignableTo() returns true for any type per spec,
910+
// which conflicts with receiveBody's requirement, so we must check explicitly.
911+
boolean bodyNotAssignable = message.getClass() == ActiveMQMessage.class
912+
|| message instanceof ActiveMQStreamMessage
913+
|| !message.isBodyAssignableTo(c);
914+
915+
if (bodyNotAssignable) {
916+
if (session.isAutoAcknowledge() || session.isDupsOkAcknowledge()) {
917+
// re-enqueue for redelivery on next receive/receiveBody call
918+
unconsumedMessages.enqueueFirst(md);
919+
} else {
920+
// CLIENT_ACKNOWLEDGE or TRANSACTED: message is considered delivered,
921+
// application must use session.recover() or session.rollback()
922+
beforeMessageIsConsumed(md);
923+
afterMessageIsConsumed(md, false);
924+
}
925+
throw new MessageFormatException("Message body cannot be read as type: " + c);
926+
}
927+
928+
beforeMessageIsConsumed(md);
929+
afterMessageIsConsumed(md, false);
930+
return message.getBody(c);
931+
}
932+
716933
/**
717934
* Closes the message consumer.
718935
* <P>

activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,15 @@ public void initTransients() {
288288
}
289289

290290
@Override
291-
public boolean isBodyAssignableTo(Class c) throws JMSException {
292-
final Serializable object = getObject();
291+
public boolean isBodyAssignableTo(Class c) {
292+
final Serializable object;
293+
try {
294+
object = getObject();
295+
} catch (JMSException e) {
296+
// Per Jakarta Messaging 3.1: if the message is an ObjectMessage
297+
// and object deserialization fails then false is returned.
298+
return false;
299+
}
293300
if (object == null) {
294301
return true;
295302
}

0 commit comments

Comments
 (0)