Skip to content

Commit b9c5849

Browse files
committed
feat(#1716): Implement receiveBody()
1 parent 84477a1 commit b9c5849

File tree

3 files changed

+814
-16
lines changed

3 files changed

+814
-16
lines changed

activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,26 @@
1919
import jakarta.jms.JMSConsumer;
2020
import jakarta.jms.JMSException;
2121
import jakarta.jms.JMSRuntimeException;
22-
import jakarta.jms.Message;
2322
import jakarta.jms.MessageConsumer;
2423
import jakarta.jms.MessageListener;
24+
import jakarta.jms.Message;
2525

2626
import org.apache.activemq.util.JMSExceptionSupport;
2727

2828
public class ActiveMQConsumer implements JMSConsumer {
29-
29+
3030
private final ActiveMQContext activemqContext;
31-
private final MessageConsumer activemqMessageConsumer;
31+
private final ActiveMQMessageConsumer consumer;
3232

33-
ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer activemqMessageConsumer) {
33+
ActiveMQConsumer(ActiveMQContext activemqContext, MessageConsumer consumer) {
3434
this.activemqContext = activemqContext;
35-
this.activemqMessageConsumer = activemqMessageConsumer;
35+
this.consumer = (ActiveMQMessageConsumer) consumer;
3636
}
3737

3838
@Override
3939
public String getMessageSelector() {
4040
try {
41-
return activemqMessageConsumer.getMessageSelector();
41+
return consumer.getMessageSelector();
4242
} catch (JMSException e) {
4343
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
4444
}
@@ -47,7 +47,7 @@ public String getMessageSelector() {
4747
@Override
4848
public MessageListener getMessageListener() throws JMSRuntimeException {
4949
try {
50-
return activemqMessageConsumer.getMessageListener();
50+
return consumer.getMessageListener();
5151
} catch (JMSException e) {
5252
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
5353
}
@@ -56,7 +56,7 @@ public MessageListener getMessageListener() throws JMSRuntimeException {
5656
@Override
5757
public void setMessageListener(MessageListener listener) throws JMSRuntimeException {
5858
try {
59-
activemqMessageConsumer.setMessageListener(listener);
59+
consumer.setMessageListener(listener);
6060
} catch (JMSException e) {
6161
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
6262
}
@@ -65,7 +65,7 @@ public void setMessageListener(MessageListener listener) throws JMSRuntimeExcept
6565
@Override
6666
public Message receive() {
6767
try {
68-
return activemqMessageConsumer.receive();
68+
return consumer.receive();
6969
} catch (JMSException e) {
7070
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
7171
}
@@ -74,7 +74,7 @@ public Message receive() {
7474
@Override
7575
public Message receive(long timeout) {
7676
try {
77-
return activemqMessageConsumer.receive(timeout);
77+
return consumer.receive(timeout);
7878
} catch (JMSException e) {
7979
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
8080
}
@@ -83,7 +83,7 @@ public Message receive(long timeout) {
8383
@Override
8484
public Message receiveNoWait() {
8585
try {
86-
return activemqMessageConsumer.receiveNoWait();
86+
return consumer.receiveNoWait();
8787
} catch (JMSException e) {
8888
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
8989
}
@@ -92,25 +92,37 @@ public Message receiveNoWait() {
9292
@Override
9393
public void close() {
9494
try {
95-
activemqMessageConsumer.close();
95+
consumer.close();
9696
} catch (JMSException e) {
9797
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
98-
}
98+
}
9999
}
100100

101101
@Override
102102
public <T> T receiveBody(Class<T> c) {
103-
throw new UnsupportedOperationException("receiveBody(Class<T>) is not supported");
103+
try {
104+
return consumer.receiveBody(c);
105+
} catch (JMSException e) {
106+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
107+
}
104108
}
105109

106110
@Override
107111
public <T> T receiveBody(Class<T> c, long timeout) {
108-
throw new UnsupportedOperationException("receiveBody(Class<T>, long) is not supported");
112+
try {
113+
return consumer.receiveBody(c, timeout);
114+
} catch (JMSException e) {
115+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
116+
}
109117
}
110118

111119
@Override
112120
public <T> T receiveBodyNoWait(Class<T> c) {
113-
throw new UnsupportedOperationException("receiveBodyNoWait(Class<T>) is not supported");
121+
try {
122+
return consumer.receiveBodyNoWait(c);
123+
} catch (JMSException e) {
124+
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
125+
}
114126
}
115127

116128
}

activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import jakarta.jms.JMSException;
3636
import jakarta.jms.Message;
3737
import jakarta.jms.MessageConsumer;
38+
import jakarta.jms.MessageFormatException;
3839
import jakarta.jms.MessageListener;
3940
import jakarta.jms.TransactionRolledBackException;
4041

@@ -713,6 +714,192 @@ public Message receiveNoWait() throws JMSException {
713714
return createActiveMQMessage(md);
714715
}
715716

717+
/**
718+
* Receives the next message produced for this message consumer and returns
719+
* its body as an object of the specified type. This call blocks
720+
* indefinitely until a message is produced or until this message consumer
721+
* is closed.
722+
* <p>
723+
* If the message body cannot be assigned to the specified type, a
724+
* {@code MessageFormatException} is thrown. The outcome depends on the
725+
* session's acknowledgement mode:
726+
* <ul>
727+
* <li>{@code AUTO_ACKNOWLEDGE} / {@code DUPS_OK_ACKNOWLEDGE} &ndash; the
728+
* message is returned to the head of the prefetch queue and will be
729+
* delivered again before any subsequent messages.</li>
730+
* <li>{@code CLIENT_ACKNOWLEDGE} &ndash; the message is treated as
731+
* delivered; the application may call {@code session.recover()} to
732+
* redeliver or {@code message.acknowledge()} to retire it.</li>
733+
* <li>Transacted session &ndash; the message is treated as delivered
734+
* within the current transaction; the application may call
735+
* {@code session.rollback()} to redeliver or
736+
* {@code session.commit()} to retire it.</li>
737+
* </ul>
738+
*
739+
* @param c the type to which the body of the next message should be
740+
* assigned
741+
* @return the body of the next message, or null if this message consumer
742+
* is concurrently closed
743+
* @throws MessageFormatException if the message body cannot be assigned to
744+
* the specified type
745+
* @throws JMSException if the JMS provider fails to receive the next
746+
* message due to some internal error
747+
*/
748+
public <T> T receiveBody(Class<T> c) throws JMSException {
749+
checkClosed();
750+
checkMessageListener();
751+
752+
sendPullCommand(0);
753+
MessageDispatch md = dequeue(-1);
754+
if (md == null) {
755+
return null;
756+
}
757+
758+
return doReceiveBody(md, c);
759+
}
760+
761+
/**
762+
* Receives the next message produced for this message consumer and returns
763+
* its body as an object of the specified type, blocking up to the
764+
* specified timeout. A {@code timeout} of zero never expires and the call
765+
* blocks indefinitely.
766+
* <p>
767+
* If the message body cannot be assigned to the specified type, a
768+
* {@code MessageFormatException} is thrown. The outcome depends on the
769+
* session's acknowledgement mode:
770+
* <ul>
771+
* <li>{@code AUTO_ACKNOWLEDGE} / {@code DUPS_OK_ACKNOWLEDGE} &ndash; the
772+
* message is returned to the head of the prefetch queue and will be
773+
* delivered again before any subsequent messages.</li>
774+
* <li>{@code CLIENT_ACKNOWLEDGE} &ndash; the message is treated as
775+
* delivered; the application may call {@code session.recover()} to
776+
* redeliver or {@code message.acknowledge()} to retire it.</li>
777+
* <li>Transacted session &ndash; the message is treated as delivered
778+
* within the current transaction; the application may call
779+
* {@code session.rollback()} to redeliver or
780+
* {@code session.commit()} to retire it.</li>
781+
* </ul>
782+
*
783+
* @param c the type to which the body of the next message should be
784+
* assigned
785+
* @param timeout the timeout value (in milliseconds), a timeout of zero
786+
* never expires
787+
* @return the body of the next message, or null if the timeout expires or
788+
* this message consumer is concurrently closed
789+
* @throws MessageFormatException if the message body cannot be assigned to
790+
* the specified type
791+
* @throws JMSException if the JMS provider fails to receive the next
792+
* message due to some internal error
793+
*/
794+
public <T> T receiveBody(Class<T> c, long timeout) throws JMSException {
795+
checkClosed();
796+
checkMessageListener();
797+
if (timeout == 0) {
798+
return this.receiveBody(c);
799+
}
800+
801+
sendPullCommand(timeout);
802+
while (timeout > 0) {
803+
MessageDispatch md;
804+
if (info.getPrefetchSize() == 0) {
805+
md = dequeue(-1);
806+
} else {
807+
md = dequeue(timeout);
808+
}
809+
810+
if (md == null) {
811+
return null;
812+
}
813+
814+
return doReceiveBody(md, c);
815+
}
816+
return null;
817+
}
818+
819+
/**
820+
* Receives the next message produced for this message consumer and returns
821+
* its body as an object of the specified type if one is immediately
822+
* available.
823+
* <p>
824+
* If the message body cannot be assigned to the specified type, a
825+
* {@code MessageFormatException} is thrown. The outcome depends on the
826+
* session's acknowledgement mode:
827+
* <ul>
828+
* <li>{@code AUTO_ACKNOWLEDGE} / {@code DUPS_OK_ACKNOWLEDGE} &ndash; the
829+
* message is returned to the head of the prefetch queue and will be
830+
* delivered again before any subsequent messages.</li>
831+
* <li>{@code CLIENT_ACKNOWLEDGE} &ndash; the message is treated as
832+
* delivered; the application may call {@code session.recover()} to
833+
* redeliver or {@code message.acknowledge()} to retire it.</li>
834+
* <li>Transacted session &ndash; the message is treated as delivered
835+
* within the current transaction; the application may call
836+
* {@code session.rollback()} to redeliver or
837+
* {@code session.commit()} to retire it.</li>
838+
* </ul>
839+
*
840+
* @param c the type to which the body of the next message should be
841+
* assigned
842+
* @return the body of the next message, or null if one is not immediately
843+
* available
844+
* @throws MessageFormatException if the message body cannot be assigned to
845+
* the specified type
846+
* @throws JMSException if the JMS provider fails to receive the next
847+
* message due to some internal error
848+
*/
849+
public <T> T receiveBodyNoWait(Class<T> c) throws JMSException {
850+
checkClosed();
851+
checkMessageListener();
852+
sendPullCommand(-1);
853+
854+
MessageDispatch md;
855+
if (info.getPrefetchSize() == 0) {
856+
md = dequeue(-1);
857+
} else {
858+
md = dequeue(0);
859+
}
860+
861+
if (md == null) {
862+
return null;
863+
}
864+
865+
return doReceiveBody(md, c);
866+
}
867+
868+
/**
869+
* Checks that the message body can be assigned to the requested type,
870+
* acknowledges the message, and returns its body.
871+
* <p>
872+
* On type mismatch the behaviour depends on the session mode:
873+
* AUTO/DUPS_OK re-enqueue the message for immediate re-read;
874+
* CLIENT_ACKNOWLEDGE and transacted sessions treat the message as
875+
* delivered so that the application can recover or commit as appropriate.
876+
*/
877+
private <T> T doReceiveBody(MessageDispatch md, Class<T> c) throws JMSException {
878+
ActiveMQMessage message = createActiveMQMessage(md);
879+
if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_MESSAGE
880+
|| !message.isBodyAssignableTo(c)) {
881+
if (session.getTransacted() || session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
882+
// Jakarta Messaging 3.1 spec: in TRANSACTED and CLIENT_ACKNOWLEDGE modes the
883+
// message is considered delivered. The application can use session.rollback()
884+
// / session.recover() to redeliver, or session.commit() / message.acknowledge()
885+
// to retire it. Calling beforeMessageIsConsumed and afterMessageIsConsumed
886+
// tracks the message as delivered and expands the consumer credit window so
887+
// that subsequent receive calls do not stall.
888+
beforeMessageIsConsumed(md);
889+
afterMessageIsConsumed(md, false);
890+
} else {
891+
// AUTO_ACKNOWLEDGE / DUPS_OK_ACKNOWLEDGE: put the message back on the
892+
// prefetch queue so the caller can retrieve it again with the correct type.
893+
unconsumedMessages.enqueueFirst(md);
894+
}
895+
throw new MessageFormatException("Message body cannot be read as type: " + c);
896+
}
897+
898+
beforeMessageIsConsumed(md);
899+
afterMessageIsConsumed(md, false);
900+
return message.getBody(c);
901+
}
902+
716903
/**
717904
* Closes the message consumer.
718905
* <P>

0 commit comments

Comments
 (0)