Skip to content

Commit 75fae22

Browse files
committed
Simplify the receiveBody methods and implement body receive body failure logic
1 parent 664ad85 commit 75fae22

1 file changed

Lines changed: 69 additions & 6 deletions

File tree

activemq-client/src/main/java/org/apache/activemq/ActiveMQConsumer.java

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import jakarta.jms.Message;
2323
import jakarta.jms.MessageConsumer;
2424
import jakarta.jms.MessageListener;
25+
import jakarta.jms.Session;
2526

27+
import org.apache.activemq.command.ActiveMQMessage;
2628
import org.apache.activemq.util.JMSExceptionSupport;
2729

2830
public class ActiveMQConsumer implements JMSConsumer {
@@ -105,9 +107,9 @@ public <T> T receiveBody(Class<T> c) {
105107
return null;
106108
}
107109
try {
108-
Object body = message.getBody(c);
109-
return c.cast(body);
110+
return message.getBody(c);
110111
} catch (JMSException e) {
112+
handleReceiveBodyFailure(message);
111113
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
112114
}
113115
}
@@ -119,9 +121,9 @@ public <T> T receiveBody(Class<T> c, long timeout) {
119121
return null;
120122
}
121123
try {
122-
Object body = message.getBody(c);
123-
return c.cast(body);
124+
return message.getBody(c);
124125
} catch (JMSException e) {
126+
handleReceiveBodyFailure(message);
125127
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
126128
}
127129
}
@@ -133,11 +135,72 @@ public <T> T receiveBodyNoWait(Class<T> c) {
133135
return null;
134136
}
135137
try {
136-
Object body = message.getBody(c);
137-
return c.cast(body);
138+
return message.getBody(c);
138139
} catch (JMSException e) {
140+
handleReceiveBodyFailure(message);
139141
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
140142
}
141143
}
142144

145+
/**
146+
* Handles failure of getBody() according to Jakarta Messaging 3.1 specification.
147+
* For AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE modes, the message should be
148+
* redelivered without incrementing the redelivery counter.
149+
*
150+
* The JMS provider will behave as if the unsuccessful call to receiveBody had not occurred.
151+
* The message will be delivered again before any subsequent messages. This is not considered
152+
* to be redelivery and does not cause the JMSRedelivered message header field to be set or
153+
* the JMSXDeliveryCount message property to be incremented.
154+
*/
155+
private void handleReceiveBodyFailure(Message message) {
156+
try {
157+
// Check if we're in AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE mode
158+
int sessionMode = getSessionMode();
159+
if (sessionMode == Session.AUTO_ACKNOWLEDGE || sessionMode == Session.DUPS_OK_ACKNOWLEDGE) {
160+
// Save the current redelivery counter before rollback
161+
ActiveMQMessage activeMQMessage = null;
162+
int savedRedeliveryCounter = 0;
163+
if (message instanceof ActiveMQMessage) {
164+
activeMQMessage = (ActiveMQMessage) message;
165+
savedRedeliveryCounter = activeMQMessage.getRedeliveryCounter();
166+
}
167+
168+
// Rollback to trigger redelivery without acknowledgment
169+
// According to spec, the message should be redelivered as if receiveBody had not occurred
170+
if (activemqMessageConsumer instanceof ActiveMQMessageConsumer) {
171+
// Rollback will increment the counter, so we need to restore it immediately after
172+
((ActiveMQMessageConsumer) activemqMessageConsumer).rollback();
173+
174+
// Restore the redelivery counter immediately to prevent it from being marked as redelivered
175+
// This ensures the message is redelivered without incrementing JMSXDeliveryCount
176+
if (activeMQMessage != null) {
177+
// The counter may have been incremented by rollback(), restore it to original value
178+
activeMQMessage.setRedeliveryCounter(savedRedeliveryCounter);
179+
}
180+
}
181+
}
182+
} catch (JMSException e) {
183+
// If rollback fails, we can't do much, but we've already thrown the original exception
184+
// This exception will be swallowed since we're already throwing the getBody() exception
185+
}
186+
}
187+
188+
/**
189+
* Gets the session acknowledgment mode.
190+
*/
191+
private int getSessionMode() {
192+
try {
193+
activemqContext.checkContextState();
194+
if (activemqContext.activemqSession != null) {
195+
return activemqContext.activemqSession.getAcknowledgeMode();
196+
}
197+
// If session not created yet, use the sessionMode from context
198+
// We need to access it via reflection or make it accessible
199+
// For now, try to get it from the session
200+
return Session.AUTO_ACKNOWLEDGE; // Default fallback
201+
} catch (Exception e) {
202+
return Session.AUTO_ACKNOWLEDGE; // Default fallback
203+
}
204+
}
205+
143206
}

0 commit comments

Comments
 (0)