From 47e7d595403d2b96801b4e549cdba55c324b51d5 Mon Sep 17 00:00:00 2001 From: Rutuja-IBM Date: Thu, 9 Apr 2026 12:14:37 +0530 Subject: [PATCH 1/3] Context wrapping for exception --- activemq-filters/pom.xml | 2 +- pom.xml | 14 +- pulsar-client-shaded/pom.xml | 6 +- pulsar-jms-integration-tests/pom.xml | 4 +- pulsar-jms/pom.xml | 28 ++- .../oss/pulsar/jms/PulsarJMSAdminImpl.java | 81 +++++-- .../oss/pulsar/jms/PulsarMessageConsumer.java | 80 ++++--- .../oss/pulsar/jms/PulsarMessageProducer.java | 212 ++++++++++-------- .../com/datastax/oss/pulsar/jms/Utils.java | 147 +++++++++++- .../pulsar/jms/ConsumerConfigurationTest.java | 185 +++++++++++++++ .../pulsar/jms/PulsarMessageProducerTest.java | 58 +++++ .../datastax/oss/pulsar/jms/UtilsTest.java | 12 + 12 files changed, 649 insertions(+), 180 deletions(-) create mode 100644 pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarMessageProducerTest.java diff --git a/activemq-filters/pom.xml b/activemq-filters/pom.xml index 060ec626..a514e548 100644 --- a/activemq-filters/pom.xml +++ b/activemq-filters/pom.xml @@ -69,7 +69,7 @@ org.apache.maven.plugins maven-shade-plugin - + diff --git a/pom.xml b/pom.xml index 3457981b..246e05cf 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ 4.2.0 3.1.0 2.0 - 1.20.4 + 1.21.4 1.18.42 --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED @@ -325,6 +325,18 @@ pom import + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + org.testcontainers + pulsar + ${testcontainers.version} + test + diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index f662541c..0ca33949 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -112,7 +112,7 @@ - + @@ -480,8 +480,8 @@ - - + + true diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 60f29202..cc3206e8 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -96,8 +96,8 @@ copy filters - - + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index 138ec436..9b74138f 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -150,6 +150,13 @@ org.testcontainers junit-jupiter + ${testcontainers.version} + test + + + org.testcontainers + testcontainers + ${testcontainers.version} test @@ -173,15 +180,28 @@ copy filters - - - - + + + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + org.example.App + + + + diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java index 68d411f6..4ad9e993 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java @@ -326,15 +326,17 @@ public void createSubscription( String selector, boolean fromBeginning) throws JMSException { + + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + validateSelector(enableFilters, selector); + Map properties = new HashMap<>(); + if (enableFilters) { + properties.put("jms.filtering", "true"); + properties.put("jms.selector", selector); + } + String topicName = factory.getPulsarTopicName(dest); + Utils.setContext(topicName); // ✅ moved out try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); - validateSelector(enableFilters, selector); - Map properties = new HashMap<>(); - if (enableFilters) { - properties.put("jms.filtering", "true"); - properties.put("jms.selector", selector); - } - String topicName = factory.getPulsarTopicName(dest); Topics topics = factory.ensurePulsarAdmin().topics(); topics.createSubscription( topicName, @@ -344,20 +346,22 @@ public void createSubscription( properties); } catch (PulsarAdminException error) { throw Utils.handleException(error); + } finally { + Utils.clearContext(); } } @Override public void createQueue(Queue destination, int partitions, boolean enableFilters, String selector) throws JMSException { - checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); - validateSelector(enableFilters, selector); + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + String topicName = factory.getPulsarTopicName(dest); + Utils.setContext(topicName); try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); + validateSelector(enableFilters, selector); checkDestination( destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination"); - - String topicName = factory.getPulsarTopicName(dest); Topics topics = factory.ensurePulsarAdmin().topics(); boolean exists = false; try { @@ -397,22 +401,36 @@ public void createQueue(Queue destination, int partitions, boolean enableFilters } } catch (PulsarAdminException error) { throw Utils.handleException(error); + } catch (Throwable t) { + // if (t instanceof JMSException) { + // throw (JMSException) t; + // } + throw Utils.handleException(t); + } finally { + Utils.clearContext(); } } @Override public void createTopic(Topic destination, int partitions) throws JMSException { - checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); + + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + String topicName = factory.getPulsarTopicName(dest); + + Utils.setContext(topicName); + try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); + checkDestination( destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination"); - String topicName = factory.getPulsarTopicName(dest); Topics topics = factory.ensurePulsarAdmin().topics(); + try { PartitionedTopicMetadata partitionedTopicMetadata = topics.getPartitionedTopicMetadata(topicName); + checkDestination( destination, d -> partitionedTopicMetadata.partitions != partitions, @@ -420,9 +438,11 @@ public void createTopic(Topic destination, int partitions) throws JMSException { + partitionedTopicMetadata.partitions + " is different from " + partitions); + } catch (PulsarAdminException.NotFoundException notFound) { // ok } + try { if (partitions > 0) { topics.createPartitionedTopic(topicName, partitions); @@ -432,21 +452,33 @@ public void createTopic(Topic destination, int partitions) throws JMSException { } catch (PulsarAdminException.ConflictException exists) { throw new InvalidDestinationException("Topic " + topicName + " already exists"); } + } catch (PulsarAdminException error) { throw Utils.handleException(error); + + } catch (Throwable t) { + // if (t instanceof JMSException) { + // throw (JMSException) t; + // } + throw Utils.handleException(t); + } finally { + Utils.clearContext(); } } @Override public void setQueueSubscriptionSelector( Queue destination, boolean enableFilters, String selector) throws JMSException { + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + String topicName = factory.getPulsarTopicName(dest); + String subscriptionName = factory.getQueueSubscriptionName(dest); + Utils.setContext(topicName); try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); - String topicName = factory.getPulsarTopicName(dest); - String subscriptionName = factory.getQueueSubscriptionName(dest); doUpdateSubscriptionSelector(enableFilters, selector, topicName, subscriptionName); - } catch (PulsarAdminException error) { + } catch (Throwable error) { // catch everything throw Utils.handleException(error); + } finally { + Utils.clearContext(); } } @@ -473,12 +505,15 @@ private void doUpdateSubscriptionSelector( public void setSubscriptionSelector( Topic destination, String subscriptionName, boolean enableFilters, String selector) throws JMSException { + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + String topicName = factory.getPulsarTopicName(dest); + Utils.setContext(topicName); try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); - String topicName = factory.getPulsarTopicName(dest); doUpdateSubscriptionSelector(enableFilters, selector, topicName, subscriptionName); - } catch (PulsarAdminException error) { + } catch (Throwable error) { throw Utils.handleException(error); + } finally { + Utils.clearContext(); } } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java index 74a0f7d6..972fd2a8 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java @@ -328,43 +328,51 @@ public boolean hasSomePrefetchedMessages() { synchronized Message receiveWithTimeoutAndValidateType(long timeout, Class expectedType) throws JMSException { - checkNotClosed(); - if (listener != null) { - throw new IllegalStateException("cannot receive if you have a messageListener"); - } - // time to wait for the Connection to "start" - final int acquireConnectionStartTime = - timeout == Long.MAX_VALUE ? Integer.MAX_VALUE : (int) timeout; - // time to wait for each cycle - final int stepTimeout = timeout < 100 ? ((int) timeout) : 100; - final long start = System.currentTimeMillis(); - return session.executeOperationIfConnectionStarted( - () -> { - do { - Message result = - session.executeCriticalOperation( - () -> { - try { - Consumer consumer = getConsumer(); - org.apache.pulsar.client.api.Message message = - consumer.receive(stepTimeout, TimeUnit.MILLISECONDS); - if (message == null) { - return null; + Utils.setContext(destination.getName()); + try { + checkNotClosed(); + if (listener != null) { + throw Utils.handleException( + new IllegalStateException("cannot receive if you have a messageListener")); + } + final int acquireConnectionStartTime = + timeout == Long.MAX_VALUE ? Integer.MAX_VALUE : (int) timeout; + final int stepTimeout = timeout < 100 ? ((int) timeout) : 100; + final long start = System.currentTimeMillis(); + return session.executeOperationIfConnectionStarted( + () -> { + do { + Message result = + session.executeCriticalOperation( + () -> { + try { + Consumer consumer = getConsumer(); + org.apache.pulsar.client.api.Message message = + consumer.receive(stepTimeout, TimeUnit.MILLISECONDS); + if (message == null) { + return null; + } + return handleReceivedMessage( + message, consumer, expectedType, null, noLocal); + } catch (Exception err) { + throw Utils.handleException(err); } - return handleReceivedMessage( - message, consumer, expectedType, null, noLocal); - } catch (Exception err) { - throw Utils.handleException(err); - } - }); - if (result != null) { - return result; - } - } while (System.currentTimeMillis() - start < timeout && !session.isClosed()); - - return null; - }, - acquireConnectionStartTime); + }); + if (result != null) { + return result; + } + } while (System.currentTimeMillis() - start < timeout && !session.isClosed()); + return null; + }, + acquireConnectionStartTime); + } catch (Throwable t) { + // if (t instanceof JMSException) { + // throw (JMSException) t; + // } + throw Utils.handleException(t); + } finally { + Utils.clearContext(); + } } /** diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java index 8f6cff79..3fc97da6 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java @@ -1208,46 +1208,51 @@ private void sendMessage(Destination defaultDestination, Message message) throws if (message == null) { throw new MessageFormatException("null message"); } - session.executeCriticalOperation( - () -> { - Producer producer = session.getProducerForDestination(defaultDestination); - message.setJMSDestination(defaultDestination); - PulsarMessage pulsarMessage = prepareMessageForSend(message); - final TypedMessageBuilder typedMessageBuilder; - session.blockTransactionOperations(); - try { - if (session.getTransacted()) { - Transaction transaction = session.getTransaction(); - if (transaction != null) { - typedMessageBuilder = producer.newMessage(transaction); + Utils.setContext(((PulsarDestination) defaultDestination).getName()); + try { + session.executeCriticalOperation( + () -> { + Producer producer = session.getProducerForDestination(defaultDestination); + message.setJMSDestination(defaultDestination); + PulsarMessage pulsarMessage = prepareMessageForSend(message); + final TypedMessageBuilder typedMessageBuilder; + session.blockTransactionOperations(); + try { + if (session.getTransacted()) { + Transaction transaction = session.getTransaction(); + if (transaction != null) { + typedMessageBuilder = producer.newMessage(transaction); + } else { + // emulated transactions + typedMessageBuilder = producer.newMessage(); + if (uncommittedMessages == null) { + uncommittedMessages = new ArrayList<>(); + } + uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); + session.registerProducerWithTransaction(this); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + } + return null; + } } else { - // emulated transactions typedMessageBuilder = producer.newMessage(); - if (uncommittedMessages == null) { - uncommittedMessages = new ArrayList<>(); - } - uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); - session.registerProducerWithTransaction(this); - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); - } - return null; } - } else { - typedMessageBuilder = producer.newMessage(); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + } + pulsarMessage.send(typedMessageBuilder, disableMessageTimestamp, session); + } finally { + session.unblockTransactionOperations(); } - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + if (message != pulsarMessage) { + applyBackMessageProperties(message, pulsarMessage); } - pulsarMessage.send(typedMessageBuilder, disableMessageTimestamp, session); - } finally { - session.unblockTransactionOperations(); - } - if (message != pulsarMessage) { - applyBackMessageProperties(message, pulsarMessage); - } - return null; - }); + return null; + }); + } finally { + Utils.clearContext(); + } } private static class PreparedMessage { @@ -1266,81 +1271,90 @@ private void sendMessage( if (message == null) { throw new MessageFormatException("null message"); } - session.executeCriticalOperation( - () -> { - Producer producer = session.getProducerForDestination(defaultDestination); - message.setJMSDestination(defaultDestination); - PulsarMessage pulsarMessage = prepareMessageForSend(message); - CompletionListener endActivityCompletionListener = - new CompletionListener() { - @Override - public void onCompletion(Message message) { - try { - completionListener.onCompletion(message); - } finally { - session.unblockTransactionOperations(); - } - } - - @Override - public void onException(Message message, Exception exception) { - try { - completionListener.onException(message, exception); - } finally { - session.unblockTransactionOperations(); - } - } - }; - CompletionListener finalCompletionListener = endActivityCompletionListener; - if (pulsarMessage != message) { - finalCompletionListener = + Utils.setContext(((PulsarDestination) defaultDestination).getName()); + try { + session.executeCriticalOperation( + () -> { + Producer producer = session.getProducerForDestination(defaultDestination); + message.setJMSDestination(defaultDestination); + PulsarMessage pulsarMessage = prepareMessageForSend(message); + CompletionListener endActivityCompletionListener = new CompletionListener() { @Override - public void onCompletion(Message completedMessage) { - // we have to pass the original message to the called - applyBackMessageProperties(message, pulsarMessage); - endActivityCompletionListener.onCompletion(message); + public void onCompletion(Message message) { + try { + completionListener.onCompletion(message); + } finally { + session.unblockTransactionOperations(); + } } @Override - public void onException(Message completedMessage, Exception e) { - // we have to pass the original message to the called - applyBackMessageProperties(message, pulsarMessage); - endActivityCompletionListener.onException(message, e); + public void onException(Message message, Exception exception) { + try { + completionListener.onException(message, exception); + } finally { + session.unblockTransactionOperations(); + } } }; - } + CompletionListener finalCompletionListener = endActivityCompletionListener; + if (pulsarMessage != message) { + finalCompletionListener = + new CompletionListener() { + @Override + public void onCompletion(Message completedMessage) { + // we have to pass the original message to the called + applyBackMessageProperties(message, pulsarMessage); + endActivityCompletionListener.onCompletion(message); + } - session.blockTransactionOperations(); - TypedMessageBuilder typedMessageBuilder; - if (session.getTransacted()) { - Transaction transaction = session.getTransaction(); - if (transaction != null) { - typedMessageBuilder = producer.newMessage(transaction); + @Override + public void onException(Message completedMessage, Exception e) { + // we have to pass the original message to the called + applyBackMessageProperties(message, pulsarMessage); + endActivityCompletionListener.onException(message, e); + } + }; + } + + session.blockTransactionOperations(); + TypedMessageBuilder typedMessageBuilder; + if (session.getTransacted()) { + Transaction transaction = session.getTransaction(); + if (transaction != null) { + typedMessageBuilder = producer.newMessage(transaction); + } else { + // emulated transactions + typedMessageBuilder = producer.newMessage(); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + } + if (uncommittedMessages == null) { + uncommittedMessages = new ArrayList<>(); + } + uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); + session.registerProducerWithTransaction(this); + finalCompletionListener.onCompletion(pulsarMessage); + return null; + } } else { - // emulated transactions typedMessageBuilder = producer.newMessage(); - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); - } - if (uncommittedMessages == null) { - uncommittedMessages = new ArrayList<>(); - } - uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); - session.registerProducerWithTransaction(this); - finalCompletionListener.onCompletion(pulsarMessage); - return null; } - } else { - typedMessageBuilder = producer.newMessage(); - } - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); - } - pulsarMessage.sendAsync( - typedMessageBuilder, finalCompletionListener, session, this, disableMessageTimestamp); - return null; - }); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + } + pulsarMessage.sendAsync( + typedMessageBuilder, + finalCompletionListener, + session, + this, + disableMessageTimestamp); + return null; + }); + } finally { + Utils.clearContext(); + } } protected void rollbackEmulatedTransaction() { diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java index 30834db6..4a87349e 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java @@ -53,25 +53,90 @@ public final class Utils { private Utils() {} + // public static JMSException handleException(Throwable cause) { + // while (cause instanceof CompletionException) { + // cause = cause.getCause(); + // } + // if (cause instanceof JMSException) { + // JMSException jms = (JMSException) cause; + // if (jms.getMessage() != null && jms.getMessage().contains("topic=")) { + // return jms; + // } + // String newMsg = + // (jms.getMessage() == null ? "" : jms.getMessage()) + buildContextSuffix(); + // try { + // JMSException enriched = jms.getClass().getConstructor(String.class).newInstance(newMsg); + // enriched.initCause(cause); + // return enriched; + // } catch (ReflectiveOperationException e) { + // JMSException fallback = new JMSException(newMsg); + // fallback.initCause(cause); + // return fallback; + // } + // } + // if (cause instanceof InterruptedException) { + // Thread.currentThread().interrupt(); + // } + // if (cause instanceof ClassCastException) { + // return (JMSException) + // new MessageFormatException("Invalid cast " + cause.getMessage() + + // buildContextSuffix()); + // } + // if (cause instanceof NumberFormatException) { + // return (JMSException) + // new MessageFormatException("Invalid conversion " + + // cause.getMessage()).initCause(cause); + // } + // JMSException err = + // new JMSException( + // (cause.getMessage() == null ? cause.toString() : cause.getMessage()) + // + buildContextSuffix()); + // err.initCause(cause); + // if (cause instanceof Exception) { + // err.setLinkedException((Exception) cause); + // } else { + // err.setLinkedException(new Exception(cause)); + // } + // return err; + // } public static JMSException handleException(Throwable cause) { while (cause instanceof CompletionException) { cause = cause.getCause(); } + String suffix = buildContextSuffix(); if (cause instanceof JMSException) { - return (JMSException) cause; + JMSException jms = (JMSException) cause; + if (jms.getMessage() != null && jms.getMessage().contains("topic=")) { + return jms; + } + String newMsg = (jms.getMessage() == null ? "" : jms.getMessage()) + suffix; + try { + JMSException enriched = jms.getClass().getConstructor(String.class).newInstance(newMsg); + enriched.initCause(cause); + return enriched; + } catch (ReflectiveOperationException e) { + JMSException fallback = new JMSException(newMsg); + fallback.initCause(cause); + return fallback; + } } + // Preserve interrupt if (cause instanceof InterruptedException) { Thread.currentThread().interrupt(); } + // Special cases if (cause instanceof ClassCastException) { - return (JMSException) - new MessageFormatException("Invalid cast " + cause.getMessage()).initCause(cause); + JMSException ex = new MessageFormatException("Invalid cast " + safeMsg(cause) + suffix); + ex.initCause(cause); + return ex; } if (cause instanceof NumberFormatException) { - return (JMSException) - new MessageFormatException("Invalid conversion " + cause.getMessage()).initCause(cause); + JMSException ex = new MessageFormatException("Invalid conversion " + safeMsg(cause) + suffix); + ex.initCause(cause); + return ex; } - JMSException err = new JMSException(cause + ""); + String msg = safeMsg(cause) + suffix; + JMSException err = new JMSException(msg); err.initCause(cause); if (cause instanceof Exception) { err.setLinkedException((Exception) cause); @@ -81,6 +146,65 @@ public static JMSException handleException(Throwable cause) { return err; } + // helper + private static String safeMsg(Throwable t) { + return (t.getMessage() != null) ? t.getMessage() : t.toString(); + } + + private static final ThreadLocal CTX = new ThreadLocal<>(); + + public static void setContext(String topic) { + CTX.set(new ExecutionContext(topic)); + } + + public static void clearContext() { + CTX.remove(); + } + + private static ExecutionContext getContext() { + return CTX.get(); + } + + private static String buildContextSuffix() { + ExecutionContext ctx = CTX.get(); + if (ctx == null || ctx.topic == null) { + return ""; + } + return " [topic=" + ctx.topic + "]"; + } + + private static class ExecutionContext { + final String topic; + + ExecutionContext(String topic) { + this.topic = topic; + } + } + + public static T executeWithTopic(String topic, SupplierWithException code) + throws JMSException { + try { + setContext(topic); + return code.run(); + } catch (Throwable err) { + throw handleException(err); + } finally { + clearContext(); + } + } + + public static void executeWithTopic(String topic, RunnableWithException code) + throws JMSException { + try { + setContext(topic); + code.run(); + } catch (Throwable err) { + throw handleException(err); + } finally { + clearContext(); + } + } + public static T get(CompletableFuture future) throws JMSException { try { return future.get(); @@ -117,12 +241,12 @@ public static void invoke(RunnableWithException code) throws JMSException { public static boolean executeMessageListenerInSessionContext( PulsarSession session, PulsarMessageConsumer consumer, BooleanSupplier code) { - currentSession.set(new CallbackContext(session, consumer, null)); + PulsarDestination dest = consumer.getDestination(); + String topic = dest != null ? dest.getName() : null; + setContext(topic); try { - return session.executeCriticalOperation( - () -> { - return code.getAsBoolean(); - }); + currentSession.set(new CallbackContext(session, consumer, null)); + return session.executeCriticalOperation(() -> code.getAsBoolean()); } catch (IllegalStateException err) { log.debug("Ignore error in listener", err); return false; @@ -130,6 +254,7 @@ public static boolean executeMessageListenerInSessionContext( log.error("Unexpected error in listener", err); return false; } finally { + clearContext(); currentSession.remove(); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java index 7a180b65..fab7f295 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java @@ -15,20 +15,28 @@ */ package com.datastax.oss.pulsar.jms; +import static org.junit.Assert.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.datastax.oss.pulsar.jms.api.JMSAdmin; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import jakarta.jms.*; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaType; +import org.junit.Assert; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; final class ConsumerConfigurationTest { + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); private void test( Map consumerConfiguration, Consumer test) { @@ -49,6 +57,183 @@ private void test( test.accept(result.applyDefaults(defaultConfiguration)); } + @Test + public void producerErrorShouldContainTopic() throws Exception { + + String pulsarToken = System.getenv("PULSAR_TOKEN"); + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); + properties.put( + "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); + properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); + properties.put("authParams", pulsarToken); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); + PulsarConnection connection = factory.createConnection()) { + + PulsarSession session = connection.createSession(); + + PulsarDestination destination = + new PulsarQueue("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test1"); // + + // UUID.randomUUID()); + + MessageProducer producer = session.createProducer(destination); + + try { + producer.send(session.createTextMessage("testing")); + fail("Expected exception"); + + } catch (JMSException e) { + System.out.println("EXCEPTION = " + e.getMessage()); + + Assert.assertTrue(e.getMessage().contains("topic=")); + } + } + } + + @Test + public void consumerErrorShouldContainTopic_real() throws Exception { + + String pulsarToken = System.getenv("PULSAR_TOKEN"); + if (pulsarToken == null || pulsarToken.isEmpty()) { + throw new RuntimeException("PULSAR_TOKEN environment variable is not set"); + } + + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); + properties.put( + "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); + properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); + properties.put("authParams", pulsarToken); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); + PulsarConnection connection = factory.createConnection()) { + + PulsarSession session = connection.createSession(); + + PulsarDestination destination = + new PulsarQueue("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test"); + + MessageConsumer consumer = session.createConsumer(destination); + + consumer.close(); + + try { + consumer.receive(); + fail("Expected exception after closing consumer"); + + } catch (JMSException e) { + System.out.println("CONSUMER EXCEPTION = " + e.getMessage()); + + assertTrue(e.getMessage().contains("topic=")); + assertTrue(e.getMessage().contains(destination.getName())); // strong check + } + } + } + + @Test + public void adminErrorShouldContainTopic_real() throws Exception { + + String pulsarToken = System.getenv("PULSAR_TOKEN"); + if (pulsarToken == null || pulsarToken.isEmpty()) { + throw new RuntimeException("PULSAR_TOKEN environment variable is not set"); + } + + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); + properties.put( + "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); + properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); + properties.put("authParams", "token:" + pulsarToken); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); + PulsarConnection connection = factory.createConnection()) { + + PulsarSession session = connection.createSession(); + JMSAdmin admin = factory.getAdmin(); + Topic topic = + session.createTopic("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test"); + try { + // FORCE FAILURE (invalid partitions) + admin.createTopic(topic, -1); + + fail("Expected exception"); + + } catch (JMSException e) { + System.out.println("ADMIN EXCEPTION = " + e.getMessage()); + assertTrue(e.getMessage().contains("topic=")); + assertTrue(e.getMessage().contains(topic.getTopicName())); + } + } + } + + @Test + public void createQueueErrorShouldContainTopic() throws Exception { + + String pulsarToken = System.getenv("PULSAR_TOKEN"); + + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); + properties.put( + "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); + properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); + properties.put("authParams", "token:" + pulsarToken); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); + PulsarConnection connection = factory.createConnection()) { + + PulsarSession session = connection.createSession(); + JMSAdmin admin = factory.getAdmin(); + + Queue queue = + session.createQueue("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test"); + + try { + admin.createQueue(queue, -1, false, null); // 🔥 invalid partitions + fail("Expected exception"); + + } catch (JMSException e) { + System.out.println("QUEUE EXCEPTION = " + e.getMessage()); + + assertTrue(e.getMessage().contains("topic=")); + } + } + } + + @Test + public void setQueueSubscriptionSelectorErrorShouldContainTopic() throws Exception { + + String pulsarToken = System.getenv("PULSAR_TOKEN"); + + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); + properties.put( + "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); + properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); + properties.put("authParams", "token:" + pulsarToken); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); + PulsarConnection connection = factory.createConnection()) { + + PulsarSession session = connection.createSession(); + JMSAdmin admin = factory.getAdmin(); + + Queue queue = + session.createQueue("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test"); + + try { + // invalid selector + admin.setQueueSubscriptionSelector(queue, true, "INVALID ###"); + fail("Expected exception"); + + } catch (JMSException e) { + System.out.println("QUEUE SELECTOR EXCEPTION = " + e.getMessage()); + + assertTrue(e.getMessage().contains("topic=")); + } + } + } + @Test void testBuildEmptyConfiguration() { Map consumerConfiguration = new HashMap<>(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarMessageProducerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarMessageProducerTest.java new file mode 100644 index 00000000..afc656e5 --- /dev/null +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarMessageProducerTest.java @@ -0,0 +1,58 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import jakarta.jms.*; +import java.util.Map; +import java.util.UUID; +import org.junit.ClassRule; +import org.junit.Test; + +public class PulsarMessageProducerTest { + + @ClassRule + public static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); + + @Test + public void producerErrorShouldContainTopic() throws Exception { + + Map properties = pulsarContainer.buildJMSConnectionProperties(); + + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); + PulsarConnection connection = factory.createConnection()) { + + PulsarSession session = connection.createSession(); + + PulsarDestination destination = new PulsarQueue("invalid@@@" + UUID.randomUUID()); + + MessageProducer producer = session.createProducer(destination); + + try { + producer.send(session.createTextMessage("test")); + fail("Expected exception"); + + } catch (JMSException e) { + System.out.println("EXCEPTION = " + e.getMessage()); + + assertTrue(e.getMessage().contains("topic=")); + } + } + } +} diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UtilsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UtilsTest.java index 40de7466..ca4bb40e 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UtilsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UtilsTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import jakarta.jms.JMSException; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -410,4 +411,15 @@ private void testConfiguration(String queryString, Consumer> Map res = Utils.buildConfigurationOverride(dest); verifier.accept(res); } + + @Test + public void shouldAppendTopicToException() { + Utils.setContext("persistent://topictest/default/topicnametest1"); + + JMSException ex = Utils.handleException(new RuntimeException("failure")); + + assertTrue(ex.getMessage().contains("topic=persistent://topictest/default/topicnametest1")); + System.out.println(ex.getMessage()); + Utils.clearContext(); + } } From 886fee50b8f02916fd0cf54cf92e55c826f18942 Mon Sep 17 00:00:00 2001 From: Rutuja-IBM Date: Thu, 9 Apr 2026 12:24:00 +0530 Subject: [PATCH 2/3] removed code block --- .../oss/pulsar/jms/PulsarJMSAdminImpl.java | 2 +- .../com/datastax/oss/pulsar/jms/Utils.java | 46 --------------- .../pulsar/jms/PulsarMessageProducerTest.java | 58 ------------------- 3 files changed, 1 insertion(+), 105 deletions(-) delete mode 100644 pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarMessageProducerTest.java diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java index 4ad9e993..0a273454 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java @@ -335,7 +335,7 @@ public void createSubscription( properties.put("jms.selector", selector); } String topicName = factory.getPulsarTopicName(dest); - Utils.setContext(topicName); // ✅ moved out + Utils.setContext(topicName); try { Topics topics = factory.ensurePulsarAdmin().topics(); topics.createSubscription( diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java index 4a87349e..54e45cd5 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java @@ -53,52 +53,6 @@ public final class Utils { private Utils() {} - // public static JMSException handleException(Throwable cause) { - // while (cause instanceof CompletionException) { - // cause = cause.getCause(); - // } - // if (cause instanceof JMSException) { - // JMSException jms = (JMSException) cause; - // if (jms.getMessage() != null && jms.getMessage().contains("topic=")) { - // return jms; - // } - // String newMsg = - // (jms.getMessage() == null ? "" : jms.getMessage()) + buildContextSuffix(); - // try { - // JMSException enriched = jms.getClass().getConstructor(String.class).newInstance(newMsg); - // enriched.initCause(cause); - // return enriched; - // } catch (ReflectiveOperationException e) { - // JMSException fallback = new JMSException(newMsg); - // fallback.initCause(cause); - // return fallback; - // } - // } - // if (cause instanceof InterruptedException) { - // Thread.currentThread().interrupt(); - // } - // if (cause instanceof ClassCastException) { - // return (JMSException) - // new MessageFormatException("Invalid cast " + cause.getMessage() + - // buildContextSuffix()); - // } - // if (cause instanceof NumberFormatException) { - // return (JMSException) - // new MessageFormatException("Invalid conversion " + - // cause.getMessage()).initCause(cause); - // } - // JMSException err = - // new JMSException( - // (cause.getMessage() == null ? cause.toString() : cause.getMessage()) - // + buildContextSuffix()); - // err.initCause(cause); - // if (cause instanceof Exception) { - // err.setLinkedException((Exception) cause); - // } else { - // err.setLinkedException(new Exception(cause)); - // } - // return err; - // } public static JMSException handleException(Throwable cause) { while (cause instanceof CompletionException) { cause = cause.getCause(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarMessageProducerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarMessageProducerTest.java deleted file mode 100644 index afc656e5..00000000 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PulsarMessageProducerTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright DataStax, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datastax.oss.pulsar.jms; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; -import jakarta.jms.*; -import java.util.Map; -import java.util.UUID; -import org.junit.ClassRule; -import org.junit.Test; - -public class PulsarMessageProducerTest { - - @ClassRule - public static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); - - @Test - public void producerErrorShouldContainTopic() throws Exception { - - Map properties = pulsarContainer.buildJMSConnectionProperties(); - - try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); - PulsarConnection connection = factory.createConnection()) { - - PulsarSession session = connection.createSession(); - - PulsarDestination destination = new PulsarQueue("invalid@@@" + UUID.randomUUID()); - - MessageProducer producer = session.createProducer(destination); - - try { - producer.send(session.createTextMessage("test")); - fail("Expected exception"); - - } catch (JMSException e) { - System.out.println("EXCEPTION = " + e.getMessage()); - - assertTrue(e.getMessage().contains("topic=")); - } - } - } -} From 14c23eecbb81f28c70e4e54fe585ff17343b1db4 Mon Sep 17 00:00:00 2001 From: Rutuja-IBM Date: Tue, 14 Apr 2026 16:44:05 +0530 Subject: [PATCH 3/3] [fix] passing topicName in the target code block --- .../oss/pulsar/jms/PulsarConnection.java | 10 +- .../pulsar/jms/PulsarConnectionConsumer.java | 2 +- .../pulsar/jms/PulsarConnectionFactory.java | 40 ++-- .../oss/pulsar/jms/PulsarJMSAdminImpl.java | 53 +---- .../oss/pulsar/jms/PulsarMessage.java | 6 +- .../oss/pulsar/jms/PulsarMessageConsumer.java | 29 ++- .../oss/pulsar/jms/PulsarMessageProducer.java | 206 +++++++++--------- .../oss/pulsar/jms/PulsarSession.java | 6 +- .../jms/PulsarTemporaryDestination.java | 4 +- .../oss/pulsar/jms/TopicDiscoveryUtils.java | 2 +- .../com/datastax/oss/pulsar/jms/Utils.java | 76 +++---- .../jms/messages/PulsarBytesMessage.java | 6 +- .../jms/messages/PulsarObjectMessage.java | 4 +- .../jms/messages/PulsarStreamMessage.java | 4 +- .../pulsar/jms/ConnectionConsumerTest.java | 2 +- .../pulsar/jms/ConsumerConfigurationTest.java | 180 --------------- .../oss/pulsar/jms/ProducerCacheTest.java | 4 +- .../datastax/oss/pulsar/jms/UtilsTest.java | 12 - 18 files changed, 205 insertions(+), 441 deletions(-) diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java index cd83bc3c..aefc1dad 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java @@ -493,7 +493,7 @@ public void start() throws JMSException { paused = false; pausedCondition.signalAll(); } catch (Throwable err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } finally { connectionPausedLock.writeLock().unlock(); } @@ -555,7 +555,7 @@ public void stop() throws JMSException { paused = true; pausedCondition.signalAll(); } catch (Throwable err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } finally { connectionPausedLock.writeLock().unlock(); } @@ -893,7 +893,7 @@ public T executeInConnectionPausedLock(Utils.SupplierWithException run, i } return run.run(); } catch (Throwable err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } finally { connectionPausedLock.readLock().unlock(); // let writers in } @@ -993,14 +993,14 @@ private void createPulsarTemporaryTopic(String name) throws JMSException { factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); } catch (IllegalStateException err) { if (!factory.isAllowTemporaryTopicWithoutAdmin()) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } log.warn( "Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true", name, err); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java index 1db0a3b7..e66cd3c7 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java @@ -108,7 +108,7 @@ public void close() throws JMSException { this.spool.join(); } } catch (InterruptedException err) { - Utils.handleException(err); + Utils.handleException(err, null); } this.consumer.close(); this.dispatcherSession.close(); diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 7d73b2d6..727180eb 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -592,7 +592,7 @@ private synchronized void ensureInitialized(String connectUsername, String conne } this.initialized = true; } catch (Throwable t) { - throw Utils.handleException(t); + throw Utils.handleException(t, null); } } @@ -615,7 +615,8 @@ private static Cache> buildProducerCache( } catch (PulsarClientException e) { // ignore log.debug( - "Exception while closing pulsar producer", Utils.handleException(e)); + "Exception while closing pulsar producer", + Utils.handleException(e, null)); } } log.debug( @@ -1052,7 +1053,7 @@ public void close() { con.close(); } catch (Exception ignore) { // ignore - Utils.handleException(ignore); + Utils.handleException(ignore, null); } } @@ -1098,8 +1099,8 @@ public String getPulsarTopicName(Destination defaultDestination) throws JMSExcep Producer getProducerForDestination(Destination defaultDestination, boolean transactions) throws JMSException { + String fullQualifiedTopicName = getPulsarTopicName(defaultDestination); try { - String fullQualifiedTopicName = getPulsarTopicName(defaultDestination); String key = transactions ? fullQualifiedTopicName + "-tx" : fullQualifiedTopicName; boolean transactionsStickyPartitions = transactions && isTransactionsStickyPartitions(); boolean enableJMSPriority = isEnableJMSPriority(); @@ -1161,7 +1162,7 @@ public int choosePartition(Message msg, TopicMetadata metadata) { return producerBuilder.create(); })); } catch (ExecutionException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } @@ -1226,7 +1227,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc // applications start when the server is not available long now = System.currentTimeMillis(); if (now - start > getWaitForServerStartupTimeout()) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } else { log.info( "Got {} error while setting up subscription for queue {}, maybe the namespace/broker is still starting", @@ -1237,7 +1238,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc Thread.sleep(1000); } catch (InterruptedException interruptedException) { Thread.currentThread().interrupt(); - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } } @@ -1373,7 +1374,8 @@ public ConsumerBase createConsumer( } return (ConsumerBase) newConsumer; } catch (PulsarClientException err) { - throw Utils.handleException(err); + String topic = getPulsarTopicName(destination); + throw Utils.handleException(err, topic); } } @@ -1497,7 +1499,7 @@ public String downloadServerSideFilter( // persistent://xxx/xx/xxxx" long now = System.currentTimeMillis(); if (now - start > getWaitForServerStartupTimeout()) { - throw Utils.handleException(notReady); + throw Utils.handleException(notReady, fullQualifiedTopicName); } else { log.info( "Temporary error, cannot download server-side filters {}: {}", @@ -1507,11 +1509,11 @@ public String downloadServerSideFilter( Thread.sleep(1000); } catch (InterruptedException interruptedException) { Thread.currentThread().interrupt(); - throw Utils.handleException(notReady); + throw Utils.handleException(notReady, fullQualifiedTopicName); } } } catch (PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } } @@ -1519,10 +1521,11 @@ public String downloadServerSideFilter( public List> createReadersForBrowser( PulsarQueue destination, ConsumerConfiguration overrideConsumerConfiguration) throws JMSException { - if (destination.isRegExp()) { + + String topicName = null; try { - String topicName = getPulsarTopicName(destination); + topicName = getPulsarTopicName(destination); List topicNames = TopicDiscoveryUtils.discoverTopicsByPattern(topicName, getPulsarClient(), 1000); log.info("createReadersForBrowser {} - {} - {}", destination, topicName, topicNames); @@ -1534,7 +1537,7 @@ public List> createReadersForBrowser( } return res; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topicName); } } else if (destination.isMultiTopic()) { List> res = new ArrayList<>(); @@ -1575,7 +1578,7 @@ public List> createReadersForBrowser( } catch (PulsarAdminException.NotFoundException err) { return Collections.emptyList(); } catch (PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } } @@ -1627,7 +1630,7 @@ private Reader createReaderForBrowserForNonPartitionedTopic( readers.add(newReader); return newReader; } catch (PulsarClientException | PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } @@ -1644,6 +1647,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) throws JMSException { String systemNamespace = getSystemNamespace(); boolean somethingDone = false; + String fullQualifiedTopicName = null; try { if (destination != null) { @@ -1651,7 +1655,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) throw new InvalidDestinationException( "Virtual destinations are not supported for unsubscribe"); } - String fullQualifiedTopicName = getPulsarTopicName(destination); + fullQualifiedTopicName = getPulsarTopicName(destination); log.info("deleteSubscription topic {} name {}", fullQualifiedTopicName, name); try { pulsarAdmin.topics().deleteSubscription(fullQualifiedTopicName, name, true); @@ -1689,7 +1693,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) } } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } return somethingDone; } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java index 0a273454..0a30f5ff 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java @@ -127,7 +127,7 @@ private JMSDestinationMetadata describeDestination(PulsarDestination destination } catch (PulsarAdminException.NotFoundException notFound) { partitionedTopicMetadata = new PartitionedTopicMetadata(0); } catch (PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, pulsarTopic); } int partitions = partitionedTopicMetadata.partitions; final Map partitionsStats; @@ -153,7 +153,7 @@ private JMSDestinationMetadata describeDestination(PulsarDestination destination publishers = Collections.emptyList(); } } catch (PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, pulsarTopic); } } else { partitionsStats = Collections.emptyMap(); @@ -335,7 +335,6 @@ public void createSubscription( properties.put("jms.selector", selector); } String topicName = factory.getPulsarTopicName(dest); - Utils.setContext(topicName); try { Topics topics = factory.ensurePulsarAdmin().topics(); topics.createSubscription( @@ -345,9 +344,7 @@ public void createSubscription( false, properties); } catch (PulsarAdminException error) { - throw Utils.handleException(error); - } finally { - Utils.clearContext(); + throw Utils.handleException(error, topicName); } } @@ -356,7 +353,6 @@ public void createQueue(Queue destination, int partitions, boolean enableFilters throws JMSException { PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); String topicName = factory.getPulsarTopicName(dest); - Utils.setContext(topicName); try { checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); validateSelector(enableFilters, selector); @@ -400,14 +396,10 @@ public void createQueue(Queue destination, int partitions, boolean enableFilters "Subscription " + subscriptionName + " already exists on Pulsar Topic " + topicName); } } catch (PulsarAdminException error) { - throw Utils.handleException(error); + throw Utils.handleException(error, topicName); + } catch (Throwable t) { - // if (t instanceof JMSException) { - // throw (JMSException) t; - // } - throw Utils.handleException(t); - } finally { - Utils.clearContext(); + throw Utils.handleException(t, topicName); } } @@ -416,21 +408,14 @@ public void createTopic(Topic destination, int partitions) throws JMSException { PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); String topicName = factory.getPulsarTopicName(dest); - - Utils.setContext(topicName); - try { checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); - checkDestination( destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination"); - Topics topics = factory.ensurePulsarAdmin().topics(); - try { PartitionedTopicMetadata partitionedTopicMetadata = topics.getPartitionedTopicMetadata(topicName); - checkDestination( destination, d -> partitionedTopicMetadata.partitions != partitions, @@ -438,11 +423,8 @@ public void createTopic(Topic destination, int partitions) throws JMSException { + partitionedTopicMetadata.partitions + " is different from " + partitions); - } catch (PulsarAdminException.NotFoundException notFound) { - // ok } - try { if (partitions > 0) { topics.createPartitionedTopic(topicName, partitions); @@ -452,17 +434,10 @@ public void createTopic(Topic destination, int partitions) throws JMSException { } catch (PulsarAdminException.ConflictException exists) { throw new InvalidDestinationException("Topic " + topicName + " already exists"); } - } catch (PulsarAdminException error) { - throw Utils.handleException(error); - + throw Utils.handleException(error, topicName); } catch (Throwable t) { - // if (t instanceof JMSException) { - // throw (JMSException) t; - // } - throw Utils.handleException(t); - } finally { - Utils.clearContext(); + throw Utils.handleException(t, topicName); } } @@ -472,13 +447,10 @@ public void setQueueSubscriptionSelector( PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); String topicName = factory.getPulsarTopicName(dest); String subscriptionName = factory.getQueueSubscriptionName(dest); - Utils.setContext(topicName); try { doUpdateSubscriptionSelector(enableFilters, selector, topicName, subscriptionName); - } catch (Throwable error) { // catch everything - throw Utils.handleException(error); - } finally { - Utils.clearContext(); + } catch (Throwable error) { + throw Utils.handleException(error, topicName); } } @@ -507,13 +479,10 @@ public void setSubscriptionSelector( throws JMSException { PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); String topicName = factory.getPulsarTopicName(dest); - Utils.setContext(topicName); try { doUpdateSubscriptionSelector(enableFilters, selector, topicName, subscriptionName); } catch (Throwable error) { - throw Utils.handleException(error); - } finally { - Utils.clearContext(); + throw Utils.handleException(error, topicName); } } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java index 7c191a71..ea5a47e6 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java @@ -1148,7 +1148,7 @@ void acknowledgeInternal() throws JMSException { try { consumer.acknowledge(receivedPulsarMessage, this, pulsarConsumer); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -1220,7 +1220,7 @@ final void sendAsync( () -> { this.writable = false; if (error != null) { - completionListener.onException(this, Utils.handleException(error)); + completionListener.onException(this, Utils.handleException(error, null)); } else { assignSystemMessageId(messageIdFromServer); @@ -1565,7 +1565,7 @@ protected static JMSException handleExceptionAccordingToMessageSpecs(Throwable t if (t instanceof EOFException) { throw new MessageEOFException(t + ""); } - throw Utils.handleException(t); + throw Utils.handleException(t, null); } public void setWritable(boolean b) { diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java index 972fd2a8..a43c4eec 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java @@ -328,12 +328,12 @@ public boolean hasSomePrefetchedMessages() { synchronized Message receiveWithTimeoutAndValidateType(long timeout, Class expectedType) throws JMSException { - Utils.setContext(destination.getName()); + String topic = destination != null ? destination.getName() : null; try { checkNotClosed(); if (listener != null) { throw Utils.handleException( - new IllegalStateException("cannot receive if you have a messageListener")); + new IllegalStateException("cannot receive if you have a messageListener"), topic); } final int acquireConnectionStartTime = timeout == Long.MAX_VALUE ? Integer.MAX_VALUE : (int) timeout; @@ -355,7 +355,7 @@ synchronized Message receiveWithTimeoutAndValidateType(long timeout, Class expec return handleReceivedMessage( message, consumer, expectedType, null, noLocal); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topic); } }); if (result != null) { @@ -366,12 +366,7 @@ synchronized Message receiveWithTimeoutAndValidateType(long timeout, Class expec }, acquireConnectionStartTime); } catch (Throwable t) { - // if (t instanceof JMSException) { - // throw (JMSException) t; - // } - throw Utils.handleException(t); - } finally { - Utils.clearContext(); + throw Utils.handleException(t, topic); } } @@ -414,8 +409,8 @@ private PulsarMessage handleReceivedMessage( java.util.function.Consumer listenerCode, boolean noLocalFilter) throws JMSException, org.apache.pulsar.client.api.PulsarClientException { + String topic = destination != null ? destination.getName() : null; receivedMessages.incrementAndGet(); - PulsarMessage result = PulsarMessage.decode(this, consumer, message); if (expectedType != null && !result.isBodyAssignableTo(expectedType)) { if (log.isDebugEnabled()) { @@ -493,7 +488,7 @@ && requiresClientSideFiltering(message) } catch (Throwable t) { log.error("Listener thrown error, calling negativeAcknowledge", t); consumer.negativeAcknowledge(message); - throw Utils.handleException(t); + throw Utils.handleException(t, topic); } if (result.isNegativeAcked()) { // this may happen if the listener calls "Session.recover" @@ -559,6 +554,7 @@ private boolean requiresClientSideFiltering(org.apache.pulsar.client.api.Message */ @Override public void close() throws JMSException { + String topic = destination != null ? destination.getName() : null; if (Utils.isOnMessageListener(session, this)) { requestClose.set(true); return; @@ -579,7 +575,7 @@ public void close() throws JMSException { session.removeConsumer(this); return null; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topic); } }); } else if (session.getTransacted()) { @@ -644,11 +640,12 @@ void acknowledge( PulsarMessage message, Consumer consumer) throws JMSException { + String topic = destination != null ? destination.getName() : null; try { consumer.acknowledge(receivedPulsarMessage); session.unregisterUnacknowledgedMessage(message); } catch (PulsarClientException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topic); } } @@ -719,15 +716,17 @@ void runListener(int timeout) { } void closeDuringRollback() throws JMSException { + String topic = destination != null ? destination.getName() : null; try { consumer.close(); session.removeConsumer(this); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topic); } } public void closeInternal() throws JMSException { + String topic = destination != null ? destination.getName() : null; if (!closed.compareAndSet(false, true)) { return; } @@ -738,7 +737,7 @@ public void closeInternal() throws JMSException { consumer = null; } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topic); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java index 3fc97da6..2531f39f 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java @@ -1208,50 +1208,55 @@ private void sendMessage(Destination defaultDestination, Message message) throws if (message == null) { throw new MessageFormatException("null message"); } - Utils.setContext(((PulsarDestination) defaultDestination).getName()); + String topic = + defaultDestination != null ? ((PulsarDestination) defaultDestination).getName() : null; try { session.executeCriticalOperation( () -> { - Producer producer = session.getProducerForDestination(defaultDestination); - message.setJMSDestination(defaultDestination); - PulsarMessage pulsarMessage = prepareMessageForSend(message); - final TypedMessageBuilder typedMessageBuilder; - session.blockTransactionOperations(); try { - if (session.getTransacted()) { - Transaction transaction = session.getTransaction(); - if (transaction != null) { - typedMessageBuilder = producer.newMessage(transaction); + Producer producer = session.getProducerForDestination(defaultDestination); + message.setJMSDestination(defaultDestination); + PulsarMessage pulsarMessage = prepareMessageForSend(message); + final TypedMessageBuilder typedMessageBuilder; + session.blockTransactionOperations(); + try { + if (session.getTransacted()) { + Transaction transaction = session.getTransaction(); + if (transaction != null) { + typedMessageBuilder = producer.newMessage(transaction); + } else { + typedMessageBuilder = producer.newMessage(); + if (uncommittedMessages == null) { + uncommittedMessages = new ArrayList<>(); + } + uncommittedMessages.add( + new PreparedMessage(typedMessageBuilder, pulsarMessage)); + session.registerProducerWithTransaction(this); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + } + return null; + } } else { - // emulated transactions typedMessageBuilder = producer.newMessage(); - if (uncommittedMessages == null) { - uncommittedMessages = new ArrayList<>(); - } - uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); - session.registerProducerWithTransaction(this); - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); - } - return null; } - } else { - typedMessageBuilder = producer.newMessage(); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + } + pulsarMessage.send(typedMessageBuilder, disableMessageTimestamp, session); + } finally { + session.unblockTransactionOperations(); } - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + if (message != pulsarMessage) { + applyBackMessageProperties(message, pulsarMessage); } - pulsarMessage.send(typedMessageBuilder, disableMessageTimestamp, session); - } finally { - session.unblockTransactionOperations(); + return null; + } catch (Exception err) { + throw Utils.handleException(err, topic); } - if (message != pulsarMessage) { - applyBackMessageProperties(message, pulsarMessage); - } - return null; }); - } finally { - Utils.clearContext(); + } catch (Throwable t) { + throw Utils.handleException(t, topic); } } @@ -1271,89 +1276,90 @@ private void sendMessage( if (message == null) { throw new MessageFormatException("null message"); } - Utils.setContext(((PulsarDestination) defaultDestination).getName()); + String topic = + defaultDestination != null ? ((PulsarDestination) defaultDestination).getName() : null; try { session.executeCriticalOperation( () -> { - Producer producer = session.getProducerForDestination(defaultDestination); - message.setJMSDestination(defaultDestination); - PulsarMessage pulsarMessage = prepareMessageForSend(message); - CompletionListener endActivityCompletionListener = - new CompletionListener() { - @Override - public void onCompletion(Message message) { - try { - completionListener.onCompletion(message); - } finally { - session.unblockTransactionOperations(); - } - } - - @Override - public void onException(Message message, Exception exception) { - try { - completionListener.onException(message, exception); - } finally { - session.unblockTransactionOperations(); - } - } - }; - CompletionListener finalCompletionListener = endActivityCompletionListener; - if (pulsarMessage != message) { - finalCompletionListener = + try { + Producer producer = session.getProducerForDestination(defaultDestination); + message.setJMSDestination(defaultDestination); + PulsarMessage pulsarMessage = prepareMessageForSend(message); + CompletionListener endActivityCompletionListener = new CompletionListener() { @Override - public void onCompletion(Message completedMessage) { - // we have to pass the original message to the called - applyBackMessageProperties(message, pulsarMessage); - endActivityCompletionListener.onCompletion(message); + public void onCompletion(Message message) { + try { + completionListener.onCompletion(message); + } finally { + session.unblockTransactionOperations(); + } } @Override - public void onException(Message completedMessage, Exception e) { - // we have to pass the original message to the called - applyBackMessageProperties(message, pulsarMessage); - endActivityCompletionListener.onException(message, e); + public void onException(Message message, Exception exception) { + try { + completionListener.onException(message, exception); + } finally { + session.unblockTransactionOperations(); + } } }; - } + CompletionListener finalCompletionListener = endActivityCompletionListener; + if (pulsarMessage != message) { + finalCompletionListener = + new CompletionListener() { + @Override + public void onCompletion(Message completedMessage) { + applyBackMessageProperties(message, pulsarMessage); + endActivityCompletionListener.onCompletion(message); + } - session.blockTransactionOperations(); - TypedMessageBuilder typedMessageBuilder; - if (session.getTransacted()) { - Transaction transaction = session.getTransaction(); - if (transaction != null) { - typedMessageBuilder = producer.newMessage(transaction); + @Override + public void onException(Message completedMessage, Exception e) { + applyBackMessageProperties(message, pulsarMessage); + endActivityCompletionListener.onException(message, e); + } + }; + } + session.blockTransactionOperations(); + TypedMessageBuilder typedMessageBuilder; + if (session.getTransacted()) { + Transaction transaction = session.getTransaction(); + if (transaction != null) { + typedMessageBuilder = producer.newMessage(transaction); + } else { + typedMessageBuilder = producer.newMessage(); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + } + if (uncommittedMessages == null) { + uncommittedMessages = new ArrayList<>(); + } + uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); + session.registerProducerWithTransaction(this); + finalCompletionListener.onCompletion(pulsarMessage); + return null; + } } else { - // emulated transactions typedMessageBuilder = producer.newMessage(); - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); - } - if (uncommittedMessages == null) { - uncommittedMessages = new ArrayList<>(); - } - uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); - session.registerProducerWithTransaction(this); - finalCompletionListener.onCompletion(pulsarMessage); - return null; } - } else { - typedMessageBuilder = producer.newMessage(); - } - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + } + pulsarMessage.sendAsync( + typedMessageBuilder, + finalCompletionListener, + session, + this, + disableMessageTimestamp); + return null; + } catch (Exception err) { + throw Utils.handleException(err, topic); } - pulsarMessage.sendAsync( - typedMessageBuilder, - finalCompletionListener, - session, - this, - disableMessageTimestamp); - return null; }); - } finally { - Utils.clearContext(); + } catch (Throwable t) { + throw Utils.handleException(t, topic); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java index e8dd0904..edd81c2e 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java @@ -226,11 +226,11 @@ private Transaction startTransaction(PulsarConnection connection) throws JMSExce log.info("Transaction service not available {}", err.getCause().getMessage()); Thread.sleep(1000); } else { - throw Utils.handleException(err.getCause()); + throw Utils.handleException(err.getCause(), null); } } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } if (transaction == null) { @@ -884,7 +884,7 @@ public void run() { // this message is not tracked by this session foreignMessage.acknowledge(); } catch (Throwable err) { - Utils.handleException(err); + Utils.handleException(err, null); log.info("Error in ConsumerConnection task on message {}", foreignMessage, err); ((PulsarMessage) foreignMessage).negativeAck(); } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java index 4c4321b6..55917470 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java @@ -51,7 +51,7 @@ public final void delete() throws JMSException { pulsarAdmin = session.getFactory().getPulsarAdmin(); } catch (IllegalStateException err) { if (!session.getFactory().isAllowTemporaryTopicWithoutAdmin()) { - throw Utils.handleException(err); + throw Utils.handleException(err, topicName); } log.warn( "Cannot delete a temporary destination {}. Skipping because jms.allowTemporaryTopicWithoutAdmin=true", @@ -91,7 +91,7 @@ public final void delete() throws JMSException { } } catch (final PulsarAdminException paEx) { - Utils.handleException(paEx); + Utils.handleException(paEx, topicName); } finally { session.getConnection().removeTemporaryDestination(this); } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java index 3a1c2998..dd87ba9b 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java @@ -48,7 +48,7 @@ public static List discoverTopicsByPattern(String regex, PulsarClient cl .getTopics(); return topicsPatternFilter(list, Pattern.compile(regex)); } catch (InterruptedException | TimeoutException | ExecutionException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java index 54e45cd5..f3d4c7be 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java @@ -53,17 +53,16 @@ public final class Utils { private Utils() {} - public static JMSException handleException(Throwable cause) { + public static JMSException handleException(Throwable cause, String topic) { while (cause instanceof CompletionException) { cause = cause.getCause(); } - String suffix = buildContextSuffix(); if (cause instanceof JMSException) { JMSException jms = (JMSException) cause; if (jms.getMessage() != null && jms.getMessage().contains("topic=")) { return jms; } - String newMsg = (jms.getMessage() == null ? "" : jms.getMessage()) + suffix; + String newMsg = buildMessage(jms.getMessage(), topic); try { JMSException enriched = jms.getClass().getConstructor(String.class).newInstance(newMsg); enriched.initCause(cause); @@ -74,22 +73,22 @@ public static JMSException handleException(Throwable cause) { return fallback; } } - // Preserve interrupt if (cause instanceof InterruptedException) { Thread.currentThread().interrupt(); } - // Special cases if (cause instanceof ClassCastException) { - JMSException ex = new MessageFormatException("Invalid cast " + safeMsg(cause) + suffix); + JMSException ex = + new MessageFormatException(buildMessage("Invalid cast " + safeMsg(cause), topic)); ex.initCause(cause); return ex; } if (cause instanceof NumberFormatException) { - JMSException ex = new MessageFormatException("Invalid conversion " + safeMsg(cause) + suffix); + JMSException ex = + new MessageFormatException(buildMessage("Invalid conversion " + safeMsg(cause), topic)); ex.initCause(cause); return ex; } - String msg = safeMsg(cause) + suffix; + String msg = buildMessage(cause, topic); JMSException err = new JMSException(msg); err.initCause(cause); if (cause instanceof Exception) { @@ -99,63 +98,37 @@ public static JMSException handleException(Throwable cause) { } return err; } - // helper private static String safeMsg(Throwable t) { return (t.getMessage() != null) ? t.getMessage() : t.toString(); } - private static final ThreadLocal CTX = new ThreadLocal<>(); - - public static void setContext(String topic) { - CTX.set(new ExecutionContext(topic)); - } - - public static void clearContext() { - CTX.remove(); - } - - private static ExecutionContext getContext() { - return CTX.get(); - } - - private static String buildContextSuffix() { - ExecutionContext ctx = CTX.get(); - if (ctx == null || ctx.topic == null) { - return ""; + private static String buildMessage(String base, String topic) { + if (topic == null || topic.isEmpty()) { + return base; } - return " [topic=" + ctx.topic + "]"; + return base + " [topic=" + topic + "]"; } - private static class ExecutionContext { - final String topic; - - ExecutionContext(String topic) { - this.topic = topic; - } + private static String buildMessage(Throwable t, String topic) { + return buildMessage(safeMsg(t), topic); } public static T executeWithTopic(String topic, SupplierWithException code) throws JMSException { try { - setContext(topic); return code.run(); } catch (Throwable err) { - throw handleException(err); - } finally { - clearContext(); + throw handleException(err, topic); } } public static void executeWithTopic(String topic, RunnableWithException code) throws JMSException { try { - setContext(topic); code.run(); } catch (Throwable err) { - throw handleException(err); - } finally { - clearContext(); + throw handleException(err, topic); } } @@ -163,9 +136,9 @@ public static T get(CompletableFuture future) throws JMSException { try { return future.get(); } catch (ExecutionException err) { - throw handleException(err.getCause()); + throw handleException(err.getCause(), null); } catch (InterruptedException err) { - throw handleException(err); + throw handleException(err, null); } } @@ -181,7 +154,7 @@ public static T invoke(SupplierWithException code) throws JMSException { try { return code.run(); } catch (Throwable err) { - throw handleException(err); + throw handleException(err, null); } } @@ -189,7 +162,7 @@ public static void invoke(RunnableWithException code) throws JMSException { try { code.run(); } catch (Throwable err) { - throw handleException(err); + throw handleException(err, null); } } @@ -197,10 +170,16 @@ public static boolean executeMessageListenerInSessionContext( PulsarSession session, PulsarMessageConsumer consumer, BooleanSupplier code) { PulsarDestination dest = consumer.getDestination(); String topic = dest != null ? dest.getName() : null; - setContext(topic); try { currentSession.set(new CallbackContext(session, consumer, null)); - return session.executeCriticalOperation(() -> code.getAsBoolean()); + return session.executeCriticalOperation( + () -> { + try { + return code.getAsBoolean(); + } catch (Throwable err) { + throw handleException(err, topic); + } + }); } catch (IllegalStateException err) { log.debug("Ignore error in listener", err); return false; @@ -208,7 +187,6 @@ public static boolean executeMessageListenerInSessionContext( log.error("Unexpected error in listener", err); return false; } finally { - clearContext(); currentSession.remove(); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java index cfbb0792..ea8a15c0 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java @@ -61,7 +61,7 @@ public PulsarBytesMessage(byte[] payload) throws JMSException { this.dataOutputStream = null; writable = false; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -123,7 +123,7 @@ protected void prepareForSend(TypedMessageBuilder producer) throws JMSEx producer.value(originalMessage); } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -149,7 +149,7 @@ protected static JMSException handleException(Throwable t) throws JMSException { if (t instanceof EOFException) { throw new MessageEOFException(t + ""); } - throw Utils.handleException(t); + throw Utils.handleException(t, null); } /** diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java index e63c25d6..d595e9bb 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java @@ -48,7 +48,7 @@ private static Serializable decode(byte[] originalMessage) throws JMSException { ObjectInputStream input = new ObjectInputStream(new ByteArrayInputStream(originalMessage)); return (Serializable) input.readUnshared(); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -96,7 +96,7 @@ private static byte[] encode(Object object) throws JMSException { oo.close(); return out.toByteArray(); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java index e09bf2f1..5ce2a128 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java @@ -238,7 +238,7 @@ public PulsarStreamMessage(byte[] payload) throws JMSException { this.dataOutputStream = null; writable = false; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -300,7 +300,7 @@ protected void prepareForSend(TypedMessageBuilder producer) throws JMSEx producer.value(originalMessage); } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java index 7c93a818..fc91d854 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java @@ -519,7 +519,7 @@ public ServerSession getServerSession() throws JMSException { // log.debug("picked session {}", session); return session; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java index fab7f295..b5681b72 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java @@ -15,14 +15,12 @@ */ package com.datastax.oss.pulsar.jms; -import static org.junit.Assert.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import com.datastax.oss.pulsar.jms.api.JMSAdmin; import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; import jakarta.jms.*; import java.util.HashMap; @@ -30,7 +28,6 @@ import java.util.function.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaType; -import org.junit.Assert; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -57,183 +54,6 @@ private void test( test.accept(result.applyDefaults(defaultConfiguration)); } - @Test - public void producerErrorShouldContainTopic() throws Exception { - - String pulsarToken = System.getenv("PULSAR_TOKEN"); - Map properties = pulsarContainer.buildJMSConnectionProperties(); - properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); - properties.put( - "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); - properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); - properties.put("authParams", pulsarToken); - - try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); - PulsarConnection connection = factory.createConnection()) { - - PulsarSession session = connection.createSession(); - - PulsarDestination destination = - new PulsarQueue("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test1"); // + - // UUID.randomUUID()); - - MessageProducer producer = session.createProducer(destination); - - try { - producer.send(session.createTextMessage("testing")); - fail("Expected exception"); - - } catch (JMSException e) { - System.out.println("EXCEPTION = " + e.getMessage()); - - Assert.assertTrue(e.getMessage().contains("topic=")); - } - } - } - - @Test - public void consumerErrorShouldContainTopic_real() throws Exception { - - String pulsarToken = System.getenv("PULSAR_TOKEN"); - if (pulsarToken == null || pulsarToken.isEmpty()) { - throw new RuntimeException("PULSAR_TOKEN environment variable is not set"); - } - - Map properties = pulsarContainer.buildJMSConnectionProperties(); - properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); - properties.put( - "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); - properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); - properties.put("authParams", pulsarToken); - - try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); - PulsarConnection connection = factory.createConnection()) { - - PulsarSession session = connection.createSession(); - - PulsarDestination destination = - new PulsarQueue("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test"); - - MessageConsumer consumer = session.createConsumer(destination); - - consumer.close(); - - try { - consumer.receive(); - fail("Expected exception after closing consumer"); - - } catch (JMSException e) { - System.out.println("CONSUMER EXCEPTION = " + e.getMessage()); - - assertTrue(e.getMessage().contains("topic=")); - assertTrue(e.getMessage().contains(destination.getName())); // strong check - } - } - } - - @Test - public void adminErrorShouldContainTopic_real() throws Exception { - - String pulsarToken = System.getenv("PULSAR_TOKEN"); - if (pulsarToken == null || pulsarToken.isEmpty()) { - throw new RuntimeException("PULSAR_TOKEN environment variable is not set"); - } - - Map properties = pulsarContainer.buildJMSConnectionProperties(); - properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); - properties.put( - "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); - properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); - properties.put("authParams", "token:" + pulsarToken); - - try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); - PulsarConnection connection = factory.createConnection()) { - - PulsarSession session = connection.createSession(); - JMSAdmin admin = factory.getAdmin(); - Topic topic = - session.createTopic("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test"); - try { - // FORCE FAILURE (invalid partitions) - admin.createTopic(topic, -1); - - fail("Expected exception"); - - } catch (JMSException e) { - System.out.println("ADMIN EXCEPTION = " + e.getMessage()); - assertTrue(e.getMessage().contains("topic=")); - assertTrue(e.getMessage().contains(topic.getTopicName())); - } - } - } - - @Test - public void createQueueErrorShouldContainTopic() throws Exception { - - String pulsarToken = System.getenv("PULSAR_TOKEN"); - - Map properties = pulsarContainer.buildJMSConnectionProperties(); - properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); - properties.put( - "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); - properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); - properties.put("authParams", "token:" + pulsarToken); - - try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); - PulsarConnection connection = factory.createConnection()) { - - PulsarSession session = connection.createSession(); - JMSAdmin admin = factory.getAdmin(); - - Queue queue = - session.createQueue("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test"); - - try { - admin.createQueue(queue, -1, false, null); // 🔥 invalid partitions - fail("Expected exception"); - - } catch (JMSException e) { - System.out.println("QUEUE EXCEPTION = " + e.getMessage()); - - assertTrue(e.getMessage().contains("topic=")); - } - } - } - - @Test - public void setQueueSubscriptionSelectorErrorShouldContainTopic() throws Exception { - - String pulsarToken = System.getenv("PULSAR_TOKEN"); - - Map properties = pulsarContainer.buildJMSConnectionProperties(); - properties.put("webServiceUrl", "https://pulsar-gcp-useast4.api.dev.streaming.datastax.com"); - properties.put( - "brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast4.dev.streaming.datastax.com:6651"); - properties.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); - properties.put("authParams", "token:" + pulsarToken); - - try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); - PulsarConnection connection = factory.createConnection()) { - - PulsarSession session = connection.createSession(); - JMSAdmin admin = factory.getAdmin(); - - Queue queue = - session.createQueue("persistent://pulsar-jms-test/pulsar-jms-test-ns/topicname-test"); - - try { - // invalid selector - admin.setQueueSubscriptionSelector(queue, true, "INVALID ###"); - fail("Expected exception"); - - } catch (JMSException e) { - System.out.println("QUEUE SELECTOR EXCEPTION = " + e.getMessage()); - - assertTrue(e.getMessage().contains("topic=")); - } - } - } - @Test void testBuildEmptyConfiguration() { Map consumerConfiguration = new HashMap<>(); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java index c9bbf3bc..0d1dd112 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java @@ -67,7 +67,7 @@ public void testMaxNumOfProducers() throws JMSException { int totalBrokerProducers = fetchProducerCount(queue1) + fetchProducerCount(queue2); assertEquals(1, totalBrokerProducers, "Total broker producers count should be 1"); } catch (PulsarAdminException e) { - throw Utils.handleException(e); + throw Utils.handleException(e, null); } } } @@ -112,7 +112,7 @@ public void testProducerAutoCloseTimeout() throws JMSException, InterruptedExcep fetchProducerCount(tempQueue), "Broker should have 0 producers after auto-close timeout"); } catch (PulsarAdminException e) { - throw Utils.handleException(e); + throw Utils.handleException(e, null); } } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UtilsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UtilsTest.java index ca4bb40e..40de7466 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UtilsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/UtilsTest.java @@ -19,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import jakarta.jms.JMSException; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -411,15 +410,4 @@ private void testConfiguration(String queryString, Consumer> Map res = Utils.buildConfigurationOverride(dest); verifier.accept(res); } - - @Test - public void shouldAppendTopicToException() { - Utils.setContext("persistent://topictest/default/topicnametest1"); - - JMSException ex = Utils.handleException(new RuntimeException("failure")); - - assertTrue(ex.getMessage().contains("topic=persistent://topictest/default/topicnametest1")); - System.out.println(ex.getMessage()); - Utils.clearContext(); - } }