diff --git a/activemq-filters/pom.xml b/activemq-filters/pom.xml
index 060ec626..a514e548 100644
--- a/activemq-filters/pom.xml
+++ b/activemq-filters/pom.xml
@@ -69,7 +69,7 @@
org.apache.maven.plugins
maven-shade-plugin
-
+
diff --git a/pom.xml b/pom.xml
index 3457981b..246e05cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
4.2.0
3.1.0
2.0
- 1.20.4
+ 1.21.4
1.18.42
--add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED
@@ -325,6 +325,18 @@
pom
import
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+ test
+
+
+ org.testcontainers
+ pulsar
+ ${testcontainers.version}
+ test
+
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index f662541c..0ca33949 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -112,7 +112,7 @@
-
+
@@ -480,8 +480,8 @@
-
-
+
+
true
diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml
index 60f29202..cc3206e8 100644
--- a/pulsar-jms-integration-tests/pom.xml
+++ b/pulsar-jms-integration-tests/pom.xml
@@ -96,8 +96,8 @@
copy filters
-
-
+
+
diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml
index 138ec436..9b74138f 100644
--- a/pulsar-jms/pom.xml
+++ b/pulsar-jms/pom.xml
@@ -150,6 +150,13 @@
org.testcontainers
junit-jupiter
+ ${testcontainers.version}
+ test
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
test
@@ -173,15 +180,28 @@
copy filters
-
-
-
-
+
+
+
+
+
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+ org.example.App
+
+
+
+
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java
index cd83bc3c..aefc1dad 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java
@@ -493,7 +493,7 @@ public void start() throws JMSException {
paused = false;
pausedCondition.signalAll();
} catch (Throwable err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
} finally {
connectionPausedLock.writeLock().unlock();
}
@@ -555,7 +555,7 @@ public void stop() throws JMSException {
paused = true;
pausedCondition.signalAll();
} catch (Throwable err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
} finally {
connectionPausedLock.writeLock().unlock();
}
@@ -893,7 +893,7 @@ public T executeInConnectionPausedLock(Utils.SupplierWithException run, i
}
return run.run();
} catch (Throwable err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
} finally {
connectionPausedLock.readLock().unlock(); // let writers in
}
@@ -993,14 +993,14 @@ private void createPulsarTemporaryTopic(String name) throws JMSException {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (IllegalStateException err) {
if (!factory.isAllowTemporaryTopicWithoutAdmin()) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
log.warn(
"Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true",
name,
err);
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java
index 1db0a3b7..e66cd3c7 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java
@@ -108,7 +108,7 @@ public void close() throws JMSException {
this.spool.join();
}
} catch (InterruptedException err) {
- Utils.handleException(err);
+ Utils.handleException(err, null);
}
this.consumer.close();
this.dispatcherSession.close();
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java
index 7d73b2d6..727180eb 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java
@@ -592,7 +592,7 @@ private synchronized void ensureInitialized(String connectUsername, String conne
}
this.initialized = true;
} catch (Throwable t) {
- throw Utils.handleException(t);
+ throw Utils.handleException(t, null);
}
}
@@ -615,7 +615,8 @@ private static Cache> buildProducerCache(
} catch (PulsarClientException e) {
// ignore
log.debug(
- "Exception while closing pulsar producer", Utils.handleException(e));
+ "Exception while closing pulsar producer",
+ Utils.handleException(e, null));
}
}
log.debug(
@@ -1052,7 +1053,7 @@ public void close() {
con.close();
} catch (Exception ignore) {
// ignore
- Utils.handleException(ignore);
+ Utils.handleException(ignore, null);
}
}
@@ -1098,8 +1099,8 @@ public String getPulsarTopicName(Destination defaultDestination) throws JMSExcep
Producer getProducerForDestination(Destination defaultDestination, boolean transactions)
throws JMSException {
+ String fullQualifiedTopicName = getPulsarTopicName(defaultDestination);
try {
- String fullQualifiedTopicName = getPulsarTopicName(defaultDestination);
String key = transactions ? fullQualifiedTopicName + "-tx" : fullQualifiedTopicName;
boolean transactionsStickyPartitions = transactions && isTransactionsStickyPartitions();
boolean enableJMSPriority = isEnableJMSPriority();
@@ -1161,7 +1162,7 @@ public int choosePartition(Message> msg, TopicMetadata metadata) {
return producerBuilder.create();
}));
} catch (ExecutionException err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, fullQualifiedTopicName);
}
}
@@ -1226,7 +1227,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc
// applications start when the server is not available
long now = System.currentTimeMillis();
if (now - start > getWaitForServerStartupTimeout()) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, fullQualifiedTopicName);
} else {
log.info(
"Got {} error while setting up subscription for queue {}, maybe the namespace/broker is still starting",
@@ -1237,7 +1238,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
- throw Utils.handleException(err);
+ throw Utils.handleException(err, fullQualifiedTopicName);
}
}
}
@@ -1373,7 +1374,8 @@ public ConsumerBase> createConsumer(
}
return (ConsumerBase) newConsumer;
} catch (PulsarClientException err) {
- throw Utils.handleException(err);
+ String topic = getPulsarTopicName(destination);
+ throw Utils.handleException(err, topic);
}
}
@@ -1497,7 +1499,7 @@ public String downloadServerSideFilter(
// persistent://xxx/xx/xxxx"
long now = System.currentTimeMillis();
if (now - start > getWaitForServerStartupTimeout()) {
- throw Utils.handleException(notReady);
+ throw Utils.handleException(notReady, fullQualifiedTopicName);
} else {
log.info(
"Temporary error, cannot download server-side filters {}: {}",
@@ -1507,11 +1509,11 @@ public String downloadServerSideFilter(
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
- throw Utils.handleException(notReady);
+ throw Utils.handleException(notReady, fullQualifiedTopicName);
}
}
} catch (PulsarAdminException err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, fullQualifiedTopicName);
}
}
}
@@ -1519,10 +1521,11 @@ public String downloadServerSideFilter(
public List> createReadersForBrowser(
PulsarQueue destination, ConsumerConfiguration overrideConsumerConfiguration)
throws JMSException {
-
if (destination.isRegExp()) {
+
+ String topicName = null;
try {
- String topicName = getPulsarTopicName(destination);
+ topicName = getPulsarTopicName(destination);
List topicNames =
TopicDiscoveryUtils.discoverTopicsByPattern(topicName, getPulsarClient(), 1000);
log.info("createReadersForBrowser {} - {} - {}", destination, topicName, topicNames);
@@ -1534,7 +1537,7 @@ public List> createReadersForBrowser(
}
return res;
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, topicName);
}
} else if (destination.isMultiTopic()) {
List> res = new ArrayList<>();
@@ -1575,7 +1578,7 @@ public List> createReadersForBrowser(
} catch (PulsarAdminException.NotFoundException err) {
return Collections.emptyList();
} catch (PulsarAdminException err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, fullQualifiedTopicName);
}
}
}
@@ -1627,7 +1630,7 @@ private Reader> createReaderForBrowserForNonPartitionedTopic(
readers.add(newReader);
return newReader;
} catch (PulsarClientException | PulsarAdminException err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, fullQualifiedTopicName);
}
}
@@ -1644,6 +1647,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
throws JMSException {
String systemNamespace = getSystemNamespace();
boolean somethingDone = false;
+ String fullQualifiedTopicName = null;
try {
if (destination != null) {
@@ -1651,7 +1655,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
throw new InvalidDestinationException(
"Virtual destinations are not supported for unsubscribe");
}
- String fullQualifiedTopicName = getPulsarTopicName(destination);
+ fullQualifiedTopicName = getPulsarTopicName(destination);
log.info("deleteSubscription topic {} name {}", fullQualifiedTopicName, name);
try {
pulsarAdmin.topics().deleteSubscription(fullQualifiedTopicName, name, true);
@@ -1689,7 +1693,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
}
}
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, fullQualifiedTopicName);
}
return somethingDone;
}
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java
index 68d411f6..0a30f5ff 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java
@@ -127,7 +127,7 @@ private JMSDestinationMetadata describeDestination(PulsarDestination destination
} catch (PulsarAdminException.NotFoundException notFound) {
partitionedTopicMetadata = new PartitionedTopicMetadata(0);
} catch (PulsarAdminException err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, pulsarTopic);
}
int partitions = partitionedTopicMetadata.partitions;
final Map partitionsStats;
@@ -153,7 +153,7 @@ private JMSDestinationMetadata describeDestination(PulsarDestination destination
publishers = Collections.emptyList();
}
} catch (PulsarAdminException err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, pulsarTopic);
}
} else {
partitionsStats = Collections.emptyMap();
@@ -326,15 +326,16 @@ public void createSubscription(
String selector,
boolean fromBeginning)
throws JMSException {
+
+ PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
+ validateSelector(enableFilters, selector);
+ Map properties = new HashMap<>();
+ if (enableFilters) {
+ properties.put("jms.filtering", "true");
+ properties.put("jms.selector", selector);
+ }
+ String topicName = factory.getPulsarTopicName(dest);
try {
- PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
- validateSelector(enableFilters, selector);
- Map properties = new HashMap<>();
- if (enableFilters) {
- properties.put("jms.filtering", "true");
- properties.put("jms.selector", selector);
- }
- String topicName = factory.getPulsarTopicName(dest);
Topics topics = factory.ensurePulsarAdmin().topics();
topics.createSubscription(
topicName,
@@ -343,21 +344,20 @@ public void createSubscription(
false,
properties);
} catch (PulsarAdminException error) {
- throw Utils.handleException(error);
+ throw Utils.handleException(error, topicName);
}
}
@Override
public void createQueue(Queue destination, int partitions, boolean enableFilters, String selector)
throws JMSException {
- checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions);
- validateSelector(enableFilters, selector);
+ PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
+ String topicName = factory.getPulsarTopicName(dest);
try {
- PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
+ checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions);
+ validateSelector(enableFilters, selector);
checkDestination(
destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination");
-
- String topicName = factory.getPulsarTopicName(dest);
Topics topics = factory.ensurePulsarAdmin().topics();
boolean exists = false;
try {
@@ -396,19 +396,22 @@ public void createQueue(Queue destination, int partitions, boolean enableFilters
"Subscription " + subscriptionName + " already exists on Pulsar Topic " + topicName);
}
} catch (PulsarAdminException error) {
- throw Utils.handleException(error);
+ throw Utils.handleException(error, topicName);
+
+ } catch (Throwable t) {
+ throw Utils.handleException(t, topicName);
}
}
@Override
public void createTopic(Topic destination, int partitions) throws JMSException {
- checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions);
+
+ PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
+ String topicName = factory.getPulsarTopicName(dest);
try {
- PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
+ checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions);
checkDestination(
destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination");
-
- String topicName = factory.getPulsarTopicName(dest);
Topics topics = factory.ensurePulsarAdmin().topics();
try {
PartitionedTopicMetadata partitionedTopicMetadata =
@@ -421,7 +424,6 @@ public void createTopic(Topic destination, int partitions) throws JMSException {
+ " is different from "
+ partitions);
} catch (PulsarAdminException.NotFoundException notFound) {
- // ok
}
try {
if (partitions > 0) {
@@ -433,20 +435,22 @@ public void createTopic(Topic destination, int partitions) throws JMSException {
throw new InvalidDestinationException("Topic " + topicName + " already exists");
}
} catch (PulsarAdminException error) {
- throw Utils.handleException(error);
+ throw Utils.handleException(error, topicName);
+ } catch (Throwable t) {
+ throw Utils.handleException(t, topicName);
}
}
@Override
public void setQueueSubscriptionSelector(
Queue destination, boolean enableFilters, String selector) throws JMSException {
+ PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
+ String topicName = factory.getPulsarTopicName(dest);
+ String subscriptionName = factory.getQueueSubscriptionName(dest);
try {
- PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
- String topicName = factory.getPulsarTopicName(dest);
- String subscriptionName = factory.getQueueSubscriptionName(dest);
doUpdateSubscriptionSelector(enableFilters, selector, topicName, subscriptionName);
- } catch (PulsarAdminException error) {
- throw Utils.handleException(error);
+ } catch (Throwable error) {
+ throw Utils.handleException(error, topicName);
}
}
@@ -473,12 +477,12 @@ private void doUpdateSubscriptionSelector(
public void setSubscriptionSelector(
Topic destination, String subscriptionName, boolean enableFilters, String selector)
throws JMSException {
+ PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
+ String topicName = factory.getPulsarTopicName(dest);
try {
- PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination);
- String topicName = factory.getPulsarTopicName(dest);
doUpdateSubscriptionSelector(enableFilters, selector, topicName, subscriptionName);
- } catch (PulsarAdminException error) {
- throw Utils.handleException(error);
+ } catch (Throwable error) {
+ throw Utils.handleException(error, topicName);
}
}
}
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java
index 7c191a71..ea5a47e6 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java
@@ -1148,7 +1148,7 @@ void acknowledgeInternal() throws JMSException {
try {
consumer.acknowledge(receivedPulsarMessage, this, pulsarConsumer);
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
@@ -1220,7 +1220,7 @@ final void sendAsync(
() -> {
this.writable = false;
if (error != null) {
- completionListener.onException(this, Utils.handleException(error));
+ completionListener.onException(this, Utils.handleException(error, null));
} else {
assignSystemMessageId(messageIdFromServer);
@@ -1565,7 +1565,7 @@ protected static JMSException handleExceptionAccordingToMessageSpecs(Throwable t
if (t instanceof EOFException) {
throw new MessageEOFException(t + "");
}
- throw Utils.handleException(t);
+ throw Utils.handleException(t, null);
}
public void setWritable(boolean b) {
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java
index 74a0f7d6..a43c4eec 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java
@@ -328,43 +328,46 @@ public boolean hasSomePrefetchedMessages() {
synchronized Message receiveWithTimeoutAndValidateType(long timeout, Class expectedType)
throws JMSException {
- checkNotClosed();
- if (listener != null) {
- throw new IllegalStateException("cannot receive if you have a messageListener");
- }
- // time to wait for the Connection to "start"
- final int acquireConnectionStartTime =
- timeout == Long.MAX_VALUE ? Integer.MAX_VALUE : (int) timeout;
- // time to wait for each cycle
- final int stepTimeout = timeout < 100 ? ((int) timeout) : 100;
- final long start = System.currentTimeMillis();
- return session.executeOperationIfConnectionStarted(
- () -> {
- do {
- Message result =
- session.executeCriticalOperation(
- () -> {
- try {
- Consumer> consumer = getConsumer();
- org.apache.pulsar.client.api.Message> message =
- consumer.receive(stepTimeout, TimeUnit.MILLISECONDS);
- if (message == null) {
- return null;
+ String topic = destination != null ? destination.getName() : null;
+ try {
+ checkNotClosed();
+ if (listener != null) {
+ throw Utils.handleException(
+ new IllegalStateException("cannot receive if you have a messageListener"), topic);
+ }
+ final int acquireConnectionStartTime =
+ timeout == Long.MAX_VALUE ? Integer.MAX_VALUE : (int) timeout;
+ final int stepTimeout = timeout < 100 ? ((int) timeout) : 100;
+ final long start = System.currentTimeMillis();
+ return session.executeOperationIfConnectionStarted(
+ () -> {
+ do {
+ Message result =
+ session.executeCriticalOperation(
+ () -> {
+ try {
+ Consumer> consumer = getConsumer();
+ org.apache.pulsar.client.api.Message> message =
+ consumer.receive(stepTimeout, TimeUnit.MILLISECONDS);
+ if (message == null) {
+ return null;
+ }
+ return handleReceivedMessage(
+ message, consumer, expectedType, null, noLocal);
+ } catch (Exception err) {
+ throw Utils.handleException(err, topic);
}
- return handleReceivedMessage(
- message, consumer, expectedType, null, noLocal);
- } catch (Exception err) {
- throw Utils.handleException(err);
- }
- });
- if (result != null) {
- return result;
- }
- } while (System.currentTimeMillis() - start < timeout && !session.isClosed());
-
- return null;
- },
- acquireConnectionStartTime);
+ });
+ if (result != null) {
+ return result;
+ }
+ } while (System.currentTimeMillis() - start < timeout && !session.isClosed());
+ return null;
+ },
+ acquireConnectionStartTime);
+ } catch (Throwable t) {
+ throw Utils.handleException(t, topic);
+ }
}
/**
@@ -406,8 +409,8 @@ private PulsarMessage handleReceivedMessage(
java.util.function.Consumer listenerCode,
boolean noLocalFilter)
throws JMSException, org.apache.pulsar.client.api.PulsarClientException {
+ String topic = destination != null ? destination.getName() : null;
receivedMessages.incrementAndGet();
-
PulsarMessage result = PulsarMessage.decode(this, consumer, message);
if (expectedType != null && !result.isBodyAssignableTo(expectedType)) {
if (log.isDebugEnabled()) {
@@ -485,7 +488,7 @@ && requiresClientSideFiltering(message)
} catch (Throwable t) {
log.error("Listener thrown error, calling negativeAcknowledge", t);
consumer.negativeAcknowledge(message);
- throw Utils.handleException(t);
+ throw Utils.handleException(t, topic);
}
if (result.isNegativeAcked()) {
// this may happen if the listener calls "Session.recover"
@@ -551,6 +554,7 @@ private boolean requiresClientSideFiltering(org.apache.pulsar.client.api.Message
*/
@Override
public void close() throws JMSException {
+ String topic = destination != null ? destination.getName() : null;
if (Utils.isOnMessageListener(session, this)) {
requestClose.set(true);
return;
@@ -571,7 +575,7 @@ public void close() throws JMSException {
session.removeConsumer(this);
return null;
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, topic);
}
});
} else if (session.getTransacted()) {
@@ -636,11 +640,12 @@ void acknowledge(
PulsarMessage message,
Consumer> consumer)
throws JMSException {
+ String topic = destination != null ? destination.getName() : null;
try {
consumer.acknowledge(receivedPulsarMessage);
session.unregisterUnacknowledgedMessage(message);
} catch (PulsarClientException err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, topic);
}
}
@@ -711,15 +716,17 @@ void runListener(int timeout) {
}
void closeDuringRollback() throws JMSException {
+ String topic = destination != null ? destination.getName() : null;
try {
consumer.close();
session.removeConsumer(this);
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, topic);
}
}
public void closeInternal() throws JMSException {
+ String topic = destination != null ? destination.getName() : null;
if (!closed.compareAndSet(false, true)) {
return;
}
@@ -730,7 +737,7 @@ public void closeInternal() throws JMSException {
consumer = null;
}
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, topic);
}
}
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java
index 8f6cff79..2531f39f 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java
@@ -1208,46 +1208,56 @@ private void sendMessage(Destination defaultDestination, Message message) throws
if (message == null) {
throw new MessageFormatException("null message");
}
- session.executeCriticalOperation(
- () -> {
- Producer producer = session.getProducerForDestination(defaultDestination);
- message.setJMSDestination(defaultDestination);
- PulsarMessage pulsarMessage = prepareMessageForSend(message);
- final TypedMessageBuilder typedMessageBuilder;
- session.blockTransactionOperations();
- try {
- if (session.getTransacted()) {
- Transaction transaction = session.getTransaction();
- if (transaction != null) {
- typedMessageBuilder = producer.newMessage(transaction);
- } else {
- // emulated transactions
- typedMessageBuilder = producer.newMessage();
- if (uncommittedMessages == null) {
- uncommittedMessages = new ArrayList<>();
+ String topic =
+ defaultDestination != null ? ((PulsarDestination) defaultDestination).getName() : null;
+ try {
+ session.executeCriticalOperation(
+ () -> {
+ try {
+ Producer producer = session.getProducerForDestination(defaultDestination);
+ message.setJMSDestination(defaultDestination);
+ PulsarMessage pulsarMessage = prepareMessageForSend(message);
+ final TypedMessageBuilder typedMessageBuilder;
+ session.blockTransactionOperations();
+ try {
+ if (session.getTransacted()) {
+ Transaction transaction = session.getTransaction();
+ if (transaction != null) {
+ typedMessageBuilder = producer.newMessage(transaction);
+ } else {
+ typedMessageBuilder = producer.newMessage();
+ if (uncommittedMessages == null) {
+ uncommittedMessages = new ArrayList<>();
+ }
+ uncommittedMessages.add(
+ new PreparedMessage(typedMessageBuilder, pulsarMessage));
+ session.registerProducerWithTransaction(this);
+ if (defaultDeliveryDelay > 0) {
+ typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
+ }
+ return null;
+ }
+ } else {
+ typedMessageBuilder = producer.newMessage();
}
- uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage));
- session.registerProducerWithTransaction(this);
if (defaultDeliveryDelay > 0) {
typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
}
- return null;
+ pulsarMessage.send(typedMessageBuilder, disableMessageTimestamp, session);
+ } finally {
+ session.unblockTransactionOperations();
}
- } else {
- typedMessageBuilder = producer.newMessage();
- }
- if (defaultDeliveryDelay > 0) {
- typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
+ if (message != pulsarMessage) {
+ applyBackMessageProperties(message, pulsarMessage);
+ }
+ return null;
+ } catch (Exception err) {
+ throw Utils.handleException(err, topic);
}
- pulsarMessage.send(typedMessageBuilder, disableMessageTimestamp, session);
- } finally {
- session.unblockTransactionOperations();
- }
- if (message != pulsarMessage) {
- applyBackMessageProperties(message, pulsarMessage);
- }
- return null;
- });
+ });
+ } catch (Throwable t) {
+ throw Utils.handleException(t, topic);
+ }
}
private static class PreparedMessage {
@@ -1266,81 +1276,91 @@ private void sendMessage(
if (message == null) {
throw new MessageFormatException("null message");
}
- session.executeCriticalOperation(
- () -> {
- Producer producer = session.getProducerForDestination(defaultDestination);
- message.setJMSDestination(defaultDestination);
- PulsarMessage pulsarMessage = prepareMessageForSend(message);
- CompletionListener endActivityCompletionListener =
- new CompletionListener() {
- @Override
- public void onCompletion(Message message) {
- try {
- completionListener.onCompletion(message);
- } finally {
- session.unblockTransactionOperations();
- }
- }
+ String topic =
+ defaultDestination != null ? ((PulsarDestination) defaultDestination).getName() : null;
+ try {
+ session.executeCriticalOperation(
+ () -> {
+ try {
+ Producer producer = session.getProducerForDestination(defaultDestination);
+ message.setJMSDestination(defaultDestination);
+ PulsarMessage pulsarMessage = prepareMessageForSend(message);
+ CompletionListener endActivityCompletionListener =
+ new CompletionListener() {
+ @Override
+ public void onCompletion(Message message) {
+ try {
+ completionListener.onCompletion(message);
+ } finally {
+ session.unblockTransactionOperations();
+ }
+ }
- @Override
- public void onException(Message message, Exception exception) {
- try {
- completionListener.onException(message, exception);
- } finally {
- session.unblockTransactionOperations();
- }
- }
- };
- CompletionListener finalCompletionListener = endActivityCompletionListener;
- if (pulsarMessage != message) {
- finalCompletionListener =
- new CompletionListener() {
- @Override
- public void onCompletion(Message completedMessage) {
- // we have to pass the original message to the called
- applyBackMessageProperties(message, pulsarMessage);
- endActivityCompletionListener.onCompletion(message);
- }
+ @Override
+ public void onException(Message message, Exception exception) {
+ try {
+ completionListener.onException(message, exception);
+ } finally {
+ session.unblockTransactionOperations();
+ }
+ }
+ };
+ CompletionListener finalCompletionListener = endActivityCompletionListener;
+ if (pulsarMessage != message) {
+ finalCompletionListener =
+ new CompletionListener() {
+ @Override
+ public void onCompletion(Message completedMessage) {
+ applyBackMessageProperties(message, pulsarMessage);
+ endActivityCompletionListener.onCompletion(message);
+ }
- @Override
- public void onException(Message completedMessage, Exception e) {
- // we have to pass the original message to the called
- applyBackMessageProperties(message, pulsarMessage);
- endActivityCompletionListener.onException(message, e);
+ @Override
+ public void onException(Message completedMessage, Exception e) {
+ applyBackMessageProperties(message, pulsarMessage);
+ endActivityCompletionListener.onException(message, e);
+ }
+ };
+ }
+ session.blockTransactionOperations();
+ TypedMessageBuilder typedMessageBuilder;
+ if (session.getTransacted()) {
+ Transaction transaction = session.getTransaction();
+ if (transaction != null) {
+ typedMessageBuilder = producer.newMessage(transaction);
+ } else {
+ typedMessageBuilder = producer.newMessage();
+ if (defaultDeliveryDelay > 0) {
+ typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
}
- };
- }
-
- session.blockTransactionOperations();
- TypedMessageBuilder typedMessageBuilder;
- if (session.getTransacted()) {
- Transaction transaction = session.getTransaction();
- if (transaction != null) {
- typedMessageBuilder = producer.newMessage(transaction);
- } else {
- // emulated transactions
- typedMessageBuilder = producer.newMessage();
+ if (uncommittedMessages == null) {
+ uncommittedMessages = new ArrayList<>();
+ }
+ uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage));
+ session.registerProducerWithTransaction(this);
+ finalCompletionListener.onCompletion(pulsarMessage);
+ return null;
+ }
+ } else {
+ typedMessageBuilder = producer.newMessage();
+ }
if (defaultDeliveryDelay > 0) {
typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
}
- if (uncommittedMessages == null) {
- uncommittedMessages = new ArrayList<>();
- }
- uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage));
- session.registerProducerWithTransaction(this);
- finalCompletionListener.onCompletion(pulsarMessage);
+ pulsarMessage.sendAsync(
+ typedMessageBuilder,
+ finalCompletionListener,
+ session,
+ this,
+ disableMessageTimestamp);
return null;
+ } catch (Exception err) {
+ throw Utils.handleException(err, topic);
}
- } else {
- typedMessageBuilder = producer.newMessage();
- }
- if (defaultDeliveryDelay > 0) {
- typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS);
- }
- pulsarMessage.sendAsync(
- typedMessageBuilder, finalCompletionListener, session, this, disableMessageTimestamp);
- return null;
- });
+ });
+ } catch (Throwable t) {
+ throw Utils.handleException(t, topic);
+ }
}
protected void rollbackEmulatedTransaction() {
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java
index e8dd0904..edd81c2e 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java
@@ -226,11 +226,11 @@ private Transaction startTransaction(PulsarConnection connection) throws JMSExce
log.info("Transaction service not available {}", err.getCause().getMessage());
Thread.sleep(1000);
} else {
- throw Utils.handleException(err.getCause());
+ throw Utils.handleException(err.getCause(), null);
}
}
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
if (transaction == null) {
@@ -884,7 +884,7 @@ public void run() {
// this message is not tracked by this session
foreignMessage.acknowledge();
} catch (Throwable err) {
- Utils.handleException(err);
+ Utils.handleException(err, null);
log.info("Error in ConsumerConnection task on message {}", foreignMessage, err);
((PulsarMessage) foreignMessage).negativeAck();
}
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java
index 4c4321b6..55917470 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java
@@ -51,7 +51,7 @@ public final void delete() throws JMSException {
pulsarAdmin = session.getFactory().getPulsarAdmin();
} catch (IllegalStateException err) {
if (!session.getFactory().isAllowTemporaryTopicWithoutAdmin()) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, topicName);
}
log.warn(
"Cannot delete a temporary destination {}. Skipping because jms.allowTemporaryTopicWithoutAdmin=true",
@@ -91,7 +91,7 @@ public final void delete() throws JMSException {
}
} catch (final PulsarAdminException paEx) {
- Utils.handleException(paEx);
+ Utils.handleException(paEx, topicName);
} finally {
session.getConnection().removeTemporaryDestination(this);
}
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java
index 3a1c2998..dd87ba9b 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java
@@ -48,7 +48,7 @@ public static List discoverTopicsByPattern(String regex, PulsarClient cl
.getTopics();
return topicsPatternFilter(list, Pattern.compile(regex));
} catch (InterruptedException | TimeoutException | ExecutionException err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java
index 30834db6..f3d4c7be 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java
@@ -53,25 +53,43 @@
public final class Utils {
private Utils() {}
- public static JMSException handleException(Throwable cause) {
+ public static JMSException handleException(Throwable cause, String topic) {
while (cause instanceof CompletionException) {
cause = cause.getCause();
}
if (cause instanceof JMSException) {
- return (JMSException) cause;
+ JMSException jms = (JMSException) cause;
+ if (jms.getMessage() != null && jms.getMessage().contains("topic=")) {
+ return jms;
+ }
+ String newMsg = buildMessage(jms.getMessage(), topic);
+ try {
+ JMSException enriched = jms.getClass().getConstructor(String.class).newInstance(newMsg);
+ enriched.initCause(cause);
+ return enriched;
+ } catch (ReflectiveOperationException e) {
+ JMSException fallback = new JMSException(newMsg);
+ fallback.initCause(cause);
+ return fallback;
+ }
}
if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (cause instanceof ClassCastException) {
- return (JMSException)
- new MessageFormatException("Invalid cast " + cause.getMessage()).initCause(cause);
+ JMSException ex =
+ new MessageFormatException(buildMessage("Invalid cast " + safeMsg(cause), topic));
+ ex.initCause(cause);
+ return ex;
}
if (cause instanceof NumberFormatException) {
- return (JMSException)
- new MessageFormatException("Invalid conversion " + cause.getMessage()).initCause(cause);
+ JMSException ex =
+ new MessageFormatException(buildMessage("Invalid conversion " + safeMsg(cause), topic));
+ ex.initCause(cause);
+ return ex;
}
- JMSException err = new JMSException(cause + "");
+ String msg = buildMessage(cause, topic);
+ JMSException err = new JMSException(msg);
err.initCause(cause);
if (cause instanceof Exception) {
err.setLinkedException((Exception) cause);
@@ -80,14 +98,47 @@ public static JMSException handleException(Throwable cause) {
}
return err;
}
+ // helper
+ private static String safeMsg(Throwable t) {
+ return (t.getMessage() != null) ? t.getMessage() : t.toString();
+ }
+
+ private static String buildMessage(String base, String topic) {
+ if (topic == null || topic.isEmpty()) {
+ return base;
+ }
+ return base + " [topic=" + topic + "]";
+ }
+
+ private static String buildMessage(Throwable t, String topic) {
+ return buildMessage(safeMsg(t), topic);
+ }
+
+ public static T executeWithTopic(String topic, SupplierWithException code)
+ throws JMSException {
+ try {
+ return code.run();
+ } catch (Throwable err) {
+ throw handleException(err, topic);
+ }
+ }
+
+ public static void executeWithTopic(String topic, RunnableWithException code)
+ throws JMSException {
+ try {
+ code.run();
+ } catch (Throwable err) {
+ throw handleException(err, topic);
+ }
+ }
public static T get(CompletableFuture future) throws JMSException {
try {
return future.get();
} catch (ExecutionException err) {
- throw handleException(err.getCause());
+ throw handleException(err.getCause(), null);
} catch (InterruptedException err) {
- throw handleException(err);
+ throw handleException(err, null);
}
}
@@ -103,7 +154,7 @@ public static T invoke(SupplierWithException code) throws JMSException {
try {
return code.run();
} catch (Throwable err) {
- throw handleException(err);
+ throw handleException(err, null);
}
}
@@ -111,17 +162,23 @@ public static void invoke(RunnableWithException code) throws JMSException {
try {
code.run();
} catch (Throwable err) {
- throw handleException(err);
+ throw handleException(err, null);
}
}
public static boolean executeMessageListenerInSessionContext(
PulsarSession session, PulsarMessageConsumer consumer, BooleanSupplier code) {
- currentSession.set(new CallbackContext(session, consumer, null));
+ PulsarDestination dest = consumer.getDestination();
+ String topic = dest != null ? dest.getName() : null;
try {
+ currentSession.set(new CallbackContext(session, consumer, null));
return session.executeCriticalOperation(
() -> {
- return code.getAsBoolean();
+ try {
+ return code.getAsBoolean();
+ } catch (Throwable err) {
+ throw handleException(err, topic);
+ }
});
} catch (IllegalStateException err) {
log.debug("Ignore error in listener", err);
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java
index cfbb0792..ea8a15c0 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java
@@ -61,7 +61,7 @@ public PulsarBytesMessage(byte[] payload) throws JMSException {
this.dataOutputStream = null;
writable = false;
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
@@ -123,7 +123,7 @@ protected void prepareForSend(TypedMessageBuilder producer) throws JMSEx
producer.value(originalMessage);
}
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
@@ -149,7 +149,7 @@ protected static JMSException handleException(Throwable t) throws JMSException {
if (t instanceof EOFException) {
throw new MessageEOFException(t + "");
}
- throw Utils.handleException(t);
+ throw Utils.handleException(t, null);
}
/**
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java
index e63c25d6..d595e9bb 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java
@@ -48,7 +48,7 @@ private static Serializable decode(byte[] originalMessage) throws JMSException {
ObjectInputStream input = new ObjectInputStream(new ByteArrayInputStream(originalMessage));
return (Serializable) input.readUnshared();
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
@@ -96,7 +96,7 @@ private static byte[] encode(Object object) throws JMSException {
oo.close();
return out.toByteArray();
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java
index e09bf2f1..5ce2a128 100644
--- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java
+++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java
@@ -238,7 +238,7 @@ public PulsarStreamMessage(byte[] payload) throws JMSException {
this.dataOutputStream = null;
writable = false;
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
@@ -300,7 +300,7 @@ protected void prepareForSend(TypedMessageBuilder producer) throws JMSEx
producer.value(originalMessage);
}
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java
index 7c93a818..fc91d854 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java
@@ -519,7 +519,7 @@ public ServerSession getServerSession() throws JMSException {
// log.debug("picked session {}", session);
return session;
} catch (Exception err) {
- throw Utils.handleException(err);
+ throw Utils.handleException(err, null);
}
}
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java
index 7a180b65..b5681b72 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java
@@ -21,14 +21,19 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
+import jakarta.jms.*;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
final class ConsumerConfigurationTest {
+ @RegisterExtension
+ static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension();
private void test(
Map consumerConfiguration, Consumer test) {
diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java
index c9bbf3bc..0d1dd112 100644
--- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java
+++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java
@@ -67,7 +67,7 @@ public void testMaxNumOfProducers() throws JMSException {
int totalBrokerProducers = fetchProducerCount(queue1) + fetchProducerCount(queue2);
assertEquals(1, totalBrokerProducers, "Total broker producers count should be 1");
} catch (PulsarAdminException e) {
- throw Utils.handleException(e);
+ throw Utils.handleException(e, null);
}
}
}
@@ -112,7 +112,7 @@ public void testProducerAutoCloseTimeout() throws JMSException, InterruptedExcep
fetchProducerCount(tempQueue),
"Broker should have 0 producers after auto-close timeout");
} catch (PulsarAdminException e) {
- throw Utils.handleException(e);
+ throw Utils.handleException(e, null);
}
}
}