Skip to content

Commit ca38846

Browse files
ozangunalpcescoffier
authored andcommitted
Added JMS Failure handler test
1 parent 92ed4e6 commit ca38846

4 files changed

Lines changed: 409 additions & 32 deletions

File tree

documentation/src/main/docs/jms/receiving-jms-messages.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,24 @@ When the Reactive Messaging `Message` gets acknowledged, the associated
7777
JMS Message is acknowledged. As JMS acknowledgement is blocking, this
7878
acknowledgement is delegated to a worker thread.
7979

80+
## Failure Handling
81+
82+
If a message produced from a JMS message is *nacked*, a failure strategy is applied. The JMS connector supports 3 strategies:
83+
84+
- `fail` - (default) fail the application, no more messages will be processed. The failing message is not acknowledged and may be redelivered by the JMS broker. The application is marked as unhealthy (impacting liveness checks).
85+
86+
- `ignore` - the failure is logged, but the processing continues. The failing message is acknowledged and will not be redelivered.
87+
88+
- `dead-letter-queue` - the failing message is acknowledged and sent to a JMS *dead letter queue* destination. The processing continues with the next message.
89+
90+
The dead letter queue destination can be configured using the `dead-letter-queue.destination` attribute. If not specified, it defaults to `dead-letter-queue-$channel`.
91+
Messages sent to the dead letter queue preserve the original message body and properties. In addition, the following properties are added:
92+
93+
- `dead_letter_exception_class_name` - the fully qualified class name of the exception
94+
- `dead_letter_reason` - the exception message
95+
- `dead_letter_cause_class_name` - the fully qualified class name of the root cause (if available)
96+
- `dead_letter_cause` - the root cause exception message (if available)
97+
8098
## Configuration Reference
8199

82100
{{ insert('../../../target/connectors/smallrye-jms-incoming.md') }}

smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/fault/JmsDlqFailure.java

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.function.BiConsumer;
1212

1313
import jakarta.enterprise.context.ApplicationScoped;
14-
import jakarta.enterprise.inject.Any;
1514
import jakarta.enterprise.inject.Instance;
1615
import jakarta.inject.Inject;
1716

@@ -30,10 +29,10 @@
3029
public class JmsDlqFailure implements JmsFailureHandler {
3130

3231
public static final String CHANNEL_DLQ_SUFFIX = "dead-letter-queue";
33-
public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead-letter-exception-class-name";
34-
public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead-letter-cause-class-name";
35-
public static final String DEAD_LETTER_REASON = "dead-letter-reason";
36-
public static final String DEAD_LETTER_CAUSE = "dead-letter-cause";
32+
public static final String DEAD_LETTER_EXCEPTION_CLASS_NAME = "dead_letter_exception_class_name";
33+
public static final String DEAD_LETTER_CAUSE_CLASS_NAME = "dead_letter_cause_class_name";
34+
public static final String DEAD_LETTER_REASON = "dead_letter_reason";
35+
public static final String DEAD_LETTER_CAUSE = "dead_letter_cause";
3736

3837
private final JmsConnectorIncomingConfiguration config;
3938
private final String dlqDestination;
@@ -44,11 +43,6 @@ public class JmsDlqFailure implements JmsFailureHandler {
4443
public static class Factory implements JmsFailureHandler.Factory {
4544
@Inject
4645
Instance<SubscriberDecorator> subscriberDecorators;
47-
@Inject
48-
Instance<Config> rootConfig;
49-
@Inject
50-
@Any
51-
Instance<Map<String, Object>> configurations;
5246

5347
@Override
5448
public JmsFailureHandler create(JmsConnector connector, JmsConnectorIncomingConfiguration config,
@@ -57,12 +51,8 @@ public JmsFailureHandler create(JmsConnector connector, JmsConnectorIncomingConf
5751
Optional<String> deadLetterQueueDestination = config.getDeadLetterQueueDestination();
5852
Config connectorConfig = Configs.prefixOverride(config.config(), CHANNEL_DLQ_SUFFIX,
5953
Map.of("destination", c -> deadLetterQueueDestination.orElse("dead-letter-queue-" + config.getChannel())));
60-
// ConnectorConfig connectorConfig = new OverrideConfig(INCOMING_PREFIX, rootConfig.get(),
61-
// JmsConnector.CONNECTOR_NAME, config.getChannel(), null,
62-
// Map.of("destination", c -> deadLetterQueueDestination.orElse("dead-letter-queue-" + config.getChannel())));
6354

6455
JmsConnectorOutgoingConfiguration producerConfig = new JmsConnectorOutgoingConfiguration(connectorConfig);
65-
// String destination = producerConfig.getDestination().get();
6656

6757
String deadQueueDestination = config.getDeadLetterQueueDestination()
6858
.orElse("dead-letter-queue-" + config.getChannel());
@@ -87,17 +77,11 @@ public JmsDlqFailure(JmsConnectorIncomingConfiguration config, String dlqDestina
8777
public <T> Uni<Void> handle(IncomingJmsMessage<T> incomingMessage, Throwable reason, Metadata metadata) {
8878
OutgoingJmsMessageMetadata outgoingJmsMessageMetadata = getOutgoingJmsMessageMetadata(incomingMessage, reason);
8979

90-
Message<T> dead = Message.of(incomingMessage.getPayload());
91-
dead.addMetadata(outgoingJmsMessageMetadata);
92-
80+
Message<T> dead = Message.of(incomingMessage.getPayload(), Metadata.of(outgoingJmsMessageMetadata));
9381
log.messageNackedDeadLetter(config.getChannel(), dlqDestination);
94-
9582
CompletableFuture<Void> future = new CompletableFuture<>();
96-
9783
dlqSource.onNext(dead
98-
.withAck(() -> {
99-
return dead.ack().thenAccept(__ -> future.complete(null));
100-
})
84+
.withAck(() -> dead.ack().thenAccept(__ -> future.complete(null)))
10185
.withNack(throwable -> {
10286
future.completeExceptionally(throwable);
10387
return future;
@@ -107,9 +91,8 @@ public <T> Uni<Void> handle(IncomingJmsMessage<T> incomingMessage, Throwable rea
10791
}
10892

10993
private <T> OutgoingJmsMessageMetadata getOutgoingJmsMessageMetadata(IncomingJmsMessage<T> message, Throwable reason) {
110-
Optional<JmsProperties> optionalJmsProperties = message.getMetadata(IncomingJmsMessageMetadata.class).map(a -> {
111-
return a.getProperties();
112-
});
94+
Optional<JmsProperties> optionalJmsProperties = message.getMetadata(IncomingJmsMessageMetadata.class)
95+
.map(JmsMessageMetadata::getProperties);
11396
JmsProperties jmsProperties = optionalJmsProperties
11497
.orElse(new ImmutableJmsProperties(message.unwrap(jakarta.jms.Message.class)));
11598
Enumeration<String> propertyNames = jmsProperties.getPropertyNames();
@@ -127,10 +110,9 @@ private <T> OutgoingJmsMessageMetadata getOutgoingJmsMessageMetadata(IncomingJms
127110
jmsPropertiesBuilder.with(DEAD_LETTER_CAUSE, getThrowableMessage(reason.getCause()));
128111
}
129112

130-
JmsProperties outgoingJmsProperties = jmsPropertiesBuilder.build();
131-
OutgoingJmsMessageMetadata outgoingJmsMessageMetadata = new OutgoingJmsMessageMetadata.OutgoingJmsMessageMetadataBuilder()
132-
.withProperties(outgoingJmsProperties).build();
133-
return outgoingJmsMessageMetadata;
113+
return new OutgoingJmsMessageMetadata.OutgoingJmsMessageMetadataBuilder()
114+
.withProperties(jmsPropertiesBuilder.build())
115+
.build();
134116
}
135117

136118
@Override

0 commit comments

Comments
 (0)