diff --git a/activemq-filters/pom.xml b/activemq-filters/pom.xml index 060ec626..a514e548 100644 --- a/activemq-filters/pom.xml +++ b/activemq-filters/pom.xml @@ -69,7 +69,7 @@ org.apache.maven.plugins maven-shade-plugin - + diff --git a/pom.xml b/pom.xml index 3457981b..246e05cf 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ 4.2.0 3.1.0 2.0 - 1.20.4 + 1.21.4 1.18.42 --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED @@ -325,6 +325,18 @@ pom import + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + org.testcontainers + pulsar + ${testcontainers.version} + test + diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index f662541c..0ca33949 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -112,7 +112,7 @@ - + @@ -480,8 +480,8 @@ - - + + true diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 60f29202..cc3206e8 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -96,8 +96,8 @@ copy filters - - + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index 138ec436..9b74138f 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -150,6 +150,13 @@ org.testcontainers junit-jupiter + ${testcontainers.version} + test + + + org.testcontainers + testcontainers + ${testcontainers.version} test @@ -173,15 +180,28 @@ copy filters - - - - + + + + + + maven-assembly-plugin + + + jar-with-dependencies + + + + org.example.App + + + + diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java index cd83bc3c..aefc1dad 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnection.java @@ -493,7 +493,7 @@ public void start() throws JMSException { paused = false; pausedCondition.signalAll(); } catch (Throwable err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } finally { connectionPausedLock.writeLock().unlock(); } @@ -555,7 +555,7 @@ public void stop() throws JMSException { paused = true; pausedCondition.signalAll(); } catch (Throwable err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } finally { connectionPausedLock.writeLock().unlock(); } @@ -893,7 +893,7 @@ public T executeInConnectionPausedLock(Utils.SupplierWithException run, i } return run.run(); } catch (Throwable err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } finally { connectionPausedLock.readLock().unlock(); // let writers in } @@ -993,14 +993,14 @@ private void createPulsarTemporaryTopic(String name) throws JMSException { factory.getPulsarAdmin().topics().createNonPartitionedTopic(name); } catch (IllegalStateException err) { if (!factory.isAllowTemporaryTopicWithoutAdmin()) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } log.warn( "Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true", name, err); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java index 1db0a3b7..e66cd3c7 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionConsumer.java @@ -108,7 +108,7 @@ public void close() throws JMSException { this.spool.join(); } } catch (InterruptedException err) { - Utils.handleException(err); + Utils.handleException(err, null); } this.consumer.close(); this.dispatcherSession.close(); diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java index 7d73b2d6..727180eb 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarConnectionFactory.java @@ -592,7 +592,7 @@ private synchronized void ensureInitialized(String connectUsername, String conne } this.initialized = true; } catch (Throwable t) { - throw Utils.handleException(t); + throw Utils.handleException(t, null); } } @@ -615,7 +615,8 @@ private static Cache> buildProducerCache( } catch (PulsarClientException e) { // ignore log.debug( - "Exception while closing pulsar producer", Utils.handleException(e)); + "Exception while closing pulsar producer", + Utils.handleException(e, null)); } } log.debug( @@ -1052,7 +1053,7 @@ public void close() { con.close(); } catch (Exception ignore) { // ignore - Utils.handleException(ignore); + Utils.handleException(ignore, null); } } @@ -1098,8 +1099,8 @@ public String getPulsarTopicName(Destination defaultDestination) throws JMSExcep Producer getProducerForDestination(Destination defaultDestination, boolean transactions) throws JMSException { + String fullQualifiedTopicName = getPulsarTopicName(defaultDestination); try { - String fullQualifiedTopicName = getPulsarTopicName(defaultDestination); String key = transactions ? fullQualifiedTopicName + "-tx" : fullQualifiedTopicName; boolean transactionsStickyPartitions = transactions && isTransactionsStickyPartitions(); boolean enableJMSPriority = isEnableJMSPriority(); @@ -1161,7 +1162,7 @@ public int choosePartition(Message msg, TopicMetadata metadata) { return producerBuilder.create(); })); } catch (ExecutionException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } @@ -1226,7 +1227,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc // applications start when the server is not available long now = System.currentTimeMillis(); if (now - start > getWaitForServerStartupTimeout()) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } else { log.info( "Got {} error while setting up subscription for queue {}, maybe the namespace/broker is still starting", @@ -1237,7 +1238,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc Thread.sleep(1000); } catch (InterruptedException interruptedException) { Thread.currentThread().interrupt(); - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } } @@ -1373,7 +1374,8 @@ public ConsumerBase createConsumer( } return (ConsumerBase) newConsumer; } catch (PulsarClientException err) { - throw Utils.handleException(err); + String topic = getPulsarTopicName(destination); + throw Utils.handleException(err, topic); } } @@ -1497,7 +1499,7 @@ public String downloadServerSideFilter( // persistent://xxx/xx/xxxx" long now = System.currentTimeMillis(); if (now - start > getWaitForServerStartupTimeout()) { - throw Utils.handleException(notReady); + throw Utils.handleException(notReady, fullQualifiedTopicName); } else { log.info( "Temporary error, cannot download server-side filters {}: {}", @@ -1507,11 +1509,11 @@ public String downloadServerSideFilter( Thread.sleep(1000); } catch (InterruptedException interruptedException) { Thread.currentThread().interrupt(); - throw Utils.handleException(notReady); + throw Utils.handleException(notReady, fullQualifiedTopicName); } } } catch (PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } } @@ -1519,10 +1521,11 @@ public String downloadServerSideFilter( public List> createReadersForBrowser( PulsarQueue destination, ConsumerConfiguration overrideConsumerConfiguration) throws JMSException { - if (destination.isRegExp()) { + + String topicName = null; try { - String topicName = getPulsarTopicName(destination); + topicName = getPulsarTopicName(destination); List topicNames = TopicDiscoveryUtils.discoverTopicsByPattern(topicName, getPulsarClient(), 1000); log.info("createReadersForBrowser {} - {} - {}", destination, topicName, topicNames); @@ -1534,7 +1537,7 @@ public List> createReadersForBrowser( } return res; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topicName); } } else if (destination.isMultiTopic()) { List> res = new ArrayList<>(); @@ -1575,7 +1578,7 @@ public List> createReadersForBrowser( } catch (PulsarAdminException.NotFoundException err) { return Collections.emptyList(); } catch (PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } } @@ -1627,7 +1630,7 @@ private Reader createReaderForBrowserForNonPartitionedTopic( readers.add(newReader); return newReader; } catch (PulsarClientException | PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } } @@ -1644,6 +1647,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) throws JMSException { String systemNamespace = getSystemNamespace(); boolean somethingDone = false; + String fullQualifiedTopicName = null; try { if (destination != null) { @@ -1651,7 +1655,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) throw new InvalidDestinationException( "Virtual destinations are not supported for unsubscribe"); } - String fullQualifiedTopicName = getPulsarTopicName(destination); + fullQualifiedTopicName = getPulsarTopicName(destination); log.info("deleteSubscription topic {} name {}", fullQualifiedTopicName, name); try { pulsarAdmin.topics().deleteSubscription(fullQualifiedTopicName, name, true); @@ -1689,7 +1693,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name) } } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, fullQualifiedTopicName); } return somethingDone; } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java index 68d411f6..0a30f5ff 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarJMSAdminImpl.java @@ -127,7 +127,7 @@ private JMSDestinationMetadata describeDestination(PulsarDestination destination } catch (PulsarAdminException.NotFoundException notFound) { partitionedTopicMetadata = new PartitionedTopicMetadata(0); } catch (PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, pulsarTopic); } int partitions = partitionedTopicMetadata.partitions; final Map partitionsStats; @@ -153,7 +153,7 @@ private JMSDestinationMetadata describeDestination(PulsarDestination destination publishers = Collections.emptyList(); } } catch (PulsarAdminException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, pulsarTopic); } } else { partitionsStats = Collections.emptyMap(); @@ -326,15 +326,16 @@ public void createSubscription( String selector, boolean fromBeginning) throws JMSException { + + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + validateSelector(enableFilters, selector); + Map properties = new HashMap<>(); + if (enableFilters) { + properties.put("jms.filtering", "true"); + properties.put("jms.selector", selector); + } + String topicName = factory.getPulsarTopicName(dest); try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); - validateSelector(enableFilters, selector); - Map properties = new HashMap<>(); - if (enableFilters) { - properties.put("jms.filtering", "true"); - properties.put("jms.selector", selector); - } - String topicName = factory.getPulsarTopicName(dest); Topics topics = factory.ensurePulsarAdmin().topics(); topics.createSubscription( topicName, @@ -343,21 +344,20 @@ public void createSubscription( false, properties); } catch (PulsarAdminException error) { - throw Utils.handleException(error); + throw Utils.handleException(error, topicName); } } @Override public void createQueue(Queue destination, int partitions, boolean enableFilters, String selector) throws JMSException { - checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); - validateSelector(enableFilters, selector); + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + String topicName = factory.getPulsarTopicName(dest); try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); + validateSelector(enableFilters, selector); checkDestination( destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination"); - - String topicName = factory.getPulsarTopicName(dest); Topics topics = factory.ensurePulsarAdmin().topics(); boolean exists = false; try { @@ -396,19 +396,22 @@ public void createQueue(Queue destination, int partitions, boolean enableFilters "Subscription " + subscriptionName + " already exists on Pulsar Topic " + topicName); } } catch (PulsarAdminException error) { - throw Utils.handleException(error); + throw Utils.handleException(error, topicName); + + } catch (Throwable t) { + throw Utils.handleException(t, topicName); } } @Override public void createTopic(Topic destination, int partitions) throws JMSException { - checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); + + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + String topicName = factory.getPulsarTopicName(dest); try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + checkArgument(() -> partitions >= 0, "Invalid number of partitions " + partitions); checkDestination( destination, d -> !dest.isVirtualDestination(), "Cannot create a VirtualDestination"); - - String topicName = factory.getPulsarTopicName(dest); Topics topics = factory.ensurePulsarAdmin().topics(); try { PartitionedTopicMetadata partitionedTopicMetadata = @@ -421,7 +424,6 @@ public void createTopic(Topic destination, int partitions) throws JMSException { + " is different from " + partitions); } catch (PulsarAdminException.NotFoundException notFound) { - // ok } try { if (partitions > 0) { @@ -433,20 +435,22 @@ public void createTopic(Topic destination, int partitions) throws JMSException { throw new InvalidDestinationException("Topic " + topicName + " already exists"); } } catch (PulsarAdminException error) { - throw Utils.handleException(error); + throw Utils.handleException(error, topicName); + } catch (Throwable t) { + throw Utils.handleException(t, topicName); } } @Override public void setQueueSubscriptionSelector( Queue destination, boolean enableFilters, String selector) throws JMSException { + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + String topicName = factory.getPulsarTopicName(dest); + String subscriptionName = factory.getQueueSubscriptionName(dest); try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); - String topicName = factory.getPulsarTopicName(dest); - String subscriptionName = factory.getQueueSubscriptionName(dest); doUpdateSubscriptionSelector(enableFilters, selector, topicName, subscriptionName); - } catch (PulsarAdminException error) { - throw Utils.handleException(error); + } catch (Throwable error) { + throw Utils.handleException(error, topicName); } } @@ -473,12 +477,12 @@ private void doUpdateSubscriptionSelector( public void setSubscriptionSelector( Topic destination, String subscriptionName, boolean enableFilters, String selector) throws JMSException { + PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); + String topicName = factory.getPulsarTopicName(dest); try { - PulsarDestination dest = PulsarConnectionFactory.toPulsarDestination(destination); - String topicName = factory.getPulsarTopicName(dest); doUpdateSubscriptionSelector(enableFilters, selector, topicName, subscriptionName); - } catch (PulsarAdminException error) { - throw Utils.handleException(error); + } catch (Throwable error) { + throw Utils.handleException(error, topicName); } } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java index 7c191a71..ea5a47e6 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessage.java @@ -1148,7 +1148,7 @@ void acknowledgeInternal() throws JMSException { try { consumer.acknowledge(receivedPulsarMessage, this, pulsarConsumer); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -1220,7 +1220,7 @@ final void sendAsync( () -> { this.writable = false; if (error != null) { - completionListener.onException(this, Utils.handleException(error)); + completionListener.onException(this, Utils.handleException(error, null)); } else { assignSystemMessageId(messageIdFromServer); @@ -1565,7 +1565,7 @@ protected static JMSException handleExceptionAccordingToMessageSpecs(Throwable t if (t instanceof EOFException) { throw new MessageEOFException(t + ""); } - throw Utils.handleException(t); + throw Utils.handleException(t, null); } public void setWritable(boolean b) { diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java index 74a0f7d6..a43c4eec 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java @@ -328,43 +328,46 @@ public boolean hasSomePrefetchedMessages() { synchronized Message receiveWithTimeoutAndValidateType(long timeout, Class expectedType) throws JMSException { - checkNotClosed(); - if (listener != null) { - throw new IllegalStateException("cannot receive if you have a messageListener"); - } - // time to wait for the Connection to "start" - final int acquireConnectionStartTime = - timeout == Long.MAX_VALUE ? Integer.MAX_VALUE : (int) timeout; - // time to wait for each cycle - final int stepTimeout = timeout < 100 ? ((int) timeout) : 100; - final long start = System.currentTimeMillis(); - return session.executeOperationIfConnectionStarted( - () -> { - do { - Message result = - session.executeCriticalOperation( - () -> { - try { - Consumer consumer = getConsumer(); - org.apache.pulsar.client.api.Message message = - consumer.receive(stepTimeout, TimeUnit.MILLISECONDS); - if (message == null) { - return null; + String topic = destination != null ? destination.getName() : null; + try { + checkNotClosed(); + if (listener != null) { + throw Utils.handleException( + new IllegalStateException("cannot receive if you have a messageListener"), topic); + } + final int acquireConnectionStartTime = + timeout == Long.MAX_VALUE ? Integer.MAX_VALUE : (int) timeout; + final int stepTimeout = timeout < 100 ? ((int) timeout) : 100; + final long start = System.currentTimeMillis(); + return session.executeOperationIfConnectionStarted( + () -> { + do { + Message result = + session.executeCriticalOperation( + () -> { + try { + Consumer consumer = getConsumer(); + org.apache.pulsar.client.api.Message message = + consumer.receive(stepTimeout, TimeUnit.MILLISECONDS); + if (message == null) { + return null; + } + return handleReceivedMessage( + message, consumer, expectedType, null, noLocal); + } catch (Exception err) { + throw Utils.handleException(err, topic); } - return handleReceivedMessage( - message, consumer, expectedType, null, noLocal); - } catch (Exception err) { - throw Utils.handleException(err); - } - }); - if (result != null) { - return result; - } - } while (System.currentTimeMillis() - start < timeout && !session.isClosed()); - - return null; - }, - acquireConnectionStartTime); + }); + if (result != null) { + return result; + } + } while (System.currentTimeMillis() - start < timeout && !session.isClosed()); + return null; + }, + acquireConnectionStartTime); + } catch (Throwable t) { + throw Utils.handleException(t, topic); + } } /** @@ -406,8 +409,8 @@ private PulsarMessage handleReceivedMessage( java.util.function.Consumer listenerCode, boolean noLocalFilter) throws JMSException, org.apache.pulsar.client.api.PulsarClientException { + String topic = destination != null ? destination.getName() : null; receivedMessages.incrementAndGet(); - PulsarMessage result = PulsarMessage.decode(this, consumer, message); if (expectedType != null && !result.isBodyAssignableTo(expectedType)) { if (log.isDebugEnabled()) { @@ -485,7 +488,7 @@ && requiresClientSideFiltering(message) } catch (Throwable t) { log.error("Listener thrown error, calling negativeAcknowledge", t); consumer.negativeAcknowledge(message); - throw Utils.handleException(t); + throw Utils.handleException(t, topic); } if (result.isNegativeAcked()) { // this may happen if the listener calls "Session.recover" @@ -551,6 +554,7 @@ private boolean requiresClientSideFiltering(org.apache.pulsar.client.api.Message */ @Override public void close() throws JMSException { + String topic = destination != null ? destination.getName() : null; if (Utils.isOnMessageListener(session, this)) { requestClose.set(true); return; @@ -571,7 +575,7 @@ public void close() throws JMSException { session.removeConsumer(this); return null; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topic); } }); } else if (session.getTransacted()) { @@ -636,11 +640,12 @@ void acknowledge( PulsarMessage message, Consumer consumer) throws JMSException { + String topic = destination != null ? destination.getName() : null; try { consumer.acknowledge(receivedPulsarMessage); session.unregisterUnacknowledgedMessage(message); } catch (PulsarClientException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topic); } } @@ -711,15 +716,17 @@ void runListener(int timeout) { } void closeDuringRollback() throws JMSException { + String topic = destination != null ? destination.getName() : null; try { consumer.close(); session.removeConsumer(this); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topic); } } public void closeInternal() throws JMSException { + String topic = destination != null ? destination.getName() : null; if (!closed.compareAndSet(false, true)) { return; } @@ -730,7 +737,7 @@ public void closeInternal() throws JMSException { consumer = null; } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, topic); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java index 8f6cff79..2531f39f 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java @@ -1208,46 +1208,56 @@ private void sendMessage(Destination defaultDestination, Message message) throws if (message == null) { throw new MessageFormatException("null message"); } - session.executeCriticalOperation( - () -> { - Producer producer = session.getProducerForDestination(defaultDestination); - message.setJMSDestination(defaultDestination); - PulsarMessage pulsarMessage = prepareMessageForSend(message); - final TypedMessageBuilder typedMessageBuilder; - session.blockTransactionOperations(); - try { - if (session.getTransacted()) { - Transaction transaction = session.getTransaction(); - if (transaction != null) { - typedMessageBuilder = producer.newMessage(transaction); - } else { - // emulated transactions - typedMessageBuilder = producer.newMessage(); - if (uncommittedMessages == null) { - uncommittedMessages = new ArrayList<>(); + String topic = + defaultDestination != null ? ((PulsarDestination) defaultDestination).getName() : null; + try { + session.executeCriticalOperation( + () -> { + try { + Producer producer = session.getProducerForDestination(defaultDestination); + message.setJMSDestination(defaultDestination); + PulsarMessage pulsarMessage = prepareMessageForSend(message); + final TypedMessageBuilder typedMessageBuilder; + session.blockTransactionOperations(); + try { + if (session.getTransacted()) { + Transaction transaction = session.getTransaction(); + if (transaction != null) { + typedMessageBuilder = producer.newMessage(transaction); + } else { + typedMessageBuilder = producer.newMessage(); + if (uncommittedMessages == null) { + uncommittedMessages = new ArrayList<>(); + } + uncommittedMessages.add( + new PreparedMessage(typedMessageBuilder, pulsarMessage)); + session.registerProducerWithTransaction(this); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + } + return null; + } + } else { + typedMessageBuilder = producer.newMessage(); } - uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); - session.registerProducerWithTransaction(this); if (defaultDeliveryDelay > 0) { typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); } - return null; + pulsarMessage.send(typedMessageBuilder, disableMessageTimestamp, session); + } finally { + session.unblockTransactionOperations(); } - } else { - typedMessageBuilder = producer.newMessage(); - } - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); + if (message != pulsarMessage) { + applyBackMessageProperties(message, pulsarMessage); + } + return null; + } catch (Exception err) { + throw Utils.handleException(err, topic); } - pulsarMessage.send(typedMessageBuilder, disableMessageTimestamp, session); - } finally { - session.unblockTransactionOperations(); - } - if (message != pulsarMessage) { - applyBackMessageProperties(message, pulsarMessage); - } - return null; - }); + }); + } catch (Throwable t) { + throw Utils.handleException(t, topic); + } } private static class PreparedMessage { @@ -1266,81 +1276,91 @@ private void sendMessage( if (message == null) { throw new MessageFormatException("null message"); } - session.executeCriticalOperation( - () -> { - Producer producer = session.getProducerForDestination(defaultDestination); - message.setJMSDestination(defaultDestination); - PulsarMessage pulsarMessage = prepareMessageForSend(message); - CompletionListener endActivityCompletionListener = - new CompletionListener() { - @Override - public void onCompletion(Message message) { - try { - completionListener.onCompletion(message); - } finally { - session.unblockTransactionOperations(); - } - } + String topic = + defaultDestination != null ? ((PulsarDestination) defaultDestination).getName() : null; + try { + session.executeCriticalOperation( + () -> { + try { + Producer producer = session.getProducerForDestination(defaultDestination); + message.setJMSDestination(defaultDestination); + PulsarMessage pulsarMessage = prepareMessageForSend(message); + CompletionListener endActivityCompletionListener = + new CompletionListener() { + @Override + public void onCompletion(Message message) { + try { + completionListener.onCompletion(message); + } finally { + session.unblockTransactionOperations(); + } + } - @Override - public void onException(Message message, Exception exception) { - try { - completionListener.onException(message, exception); - } finally { - session.unblockTransactionOperations(); - } - } - }; - CompletionListener finalCompletionListener = endActivityCompletionListener; - if (pulsarMessage != message) { - finalCompletionListener = - new CompletionListener() { - @Override - public void onCompletion(Message completedMessage) { - // we have to pass the original message to the called - applyBackMessageProperties(message, pulsarMessage); - endActivityCompletionListener.onCompletion(message); - } + @Override + public void onException(Message message, Exception exception) { + try { + completionListener.onException(message, exception); + } finally { + session.unblockTransactionOperations(); + } + } + }; + CompletionListener finalCompletionListener = endActivityCompletionListener; + if (pulsarMessage != message) { + finalCompletionListener = + new CompletionListener() { + @Override + public void onCompletion(Message completedMessage) { + applyBackMessageProperties(message, pulsarMessage); + endActivityCompletionListener.onCompletion(message); + } - @Override - public void onException(Message completedMessage, Exception e) { - // we have to pass the original message to the called - applyBackMessageProperties(message, pulsarMessage); - endActivityCompletionListener.onException(message, e); + @Override + public void onException(Message completedMessage, Exception e) { + applyBackMessageProperties(message, pulsarMessage); + endActivityCompletionListener.onException(message, e); + } + }; + } + session.blockTransactionOperations(); + TypedMessageBuilder typedMessageBuilder; + if (session.getTransacted()) { + Transaction transaction = session.getTransaction(); + if (transaction != null) { + typedMessageBuilder = producer.newMessage(transaction); + } else { + typedMessageBuilder = producer.newMessage(); + if (defaultDeliveryDelay > 0) { + typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); } - }; - } - - session.blockTransactionOperations(); - TypedMessageBuilder typedMessageBuilder; - if (session.getTransacted()) { - Transaction transaction = session.getTransaction(); - if (transaction != null) { - typedMessageBuilder = producer.newMessage(transaction); - } else { - // emulated transactions - typedMessageBuilder = producer.newMessage(); + if (uncommittedMessages == null) { + uncommittedMessages = new ArrayList<>(); + } + uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); + session.registerProducerWithTransaction(this); + finalCompletionListener.onCompletion(pulsarMessage); + return null; + } + } else { + typedMessageBuilder = producer.newMessage(); + } if (defaultDeliveryDelay > 0) { typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); } - if (uncommittedMessages == null) { - uncommittedMessages = new ArrayList<>(); - } - uncommittedMessages.add(new PreparedMessage(typedMessageBuilder, pulsarMessage)); - session.registerProducerWithTransaction(this); - finalCompletionListener.onCompletion(pulsarMessage); + pulsarMessage.sendAsync( + typedMessageBuilder, + finalCompletionListener, + session, + this, + disableMessageTimestamp); return null; + } catch (Exception err) { + throw Utils.handleException(err, topic); } - } else { - typedMessageBuilder = producer.newMessage(); - } - if (defaultDeliveryDelay > 0) { - typedMessageBuilder.deliverAfter(defaultDeliveryDelay, TimeUnit.MILLISECONDS); - } - pulsarMessage.sendAsync( - typedMessageBuilder, finalCompletionListener, session, this, disableMessageTimestamp); - return null; - }); + }); + } catch (Throwable t) { + throw Utils.handleException(t, topic); + } } protected void rollbackEmulatedTransaction() { diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java index e8dd0904..edd81c2e 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarSession.java @@ -226,11 +226,11 @@ private Transaction startTransaction(PulsarConnection connection) throws JMSExce log.info("Transaction service not available {}", err.getCause().getMessage()); Thread.sleep(1000); } else { - throw Utils.handleException(err.getCause()); + throw Utils.handleException(err.getCause(), null); } } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } if (transaction == null) { @@ -884,7 +884,7 @@ public void run() { // this message is not tracked by this session foreignMessage.acknowledge(); } catch (Throwable err) { - Utils.handleException(err); + Utils.handleException(err, null); log.info("Error in ConsumerConnection task on message {}", foreignMessage, err); ((PulsarMessage) foreignMessage).negativeAck(); } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java index 4c4321b6..55917470 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarTemporaryDestination.java @@ -51,7 +51,7 @@ public final void delete() throws JMSException { pulsarAdmin = session.getFactory().getPulsarAdmin(); } catch (IllegalStateException err) { if (!session.getFactory().isAllowTemporaryTopicWithoutAdmin()) { - throw Utils.handleException(err); + throw Utils.handleException(err, topicName); } log.warn( "Cannot delete a temporary destination {}. Skipping because jms.allowTemporaryTopicWithoutAdmin=true", @@ -91,7 +91,7 @@ public final void delete() throws JMSException { } } catch (final PulsarAdminException paEx) { - Utils.handleException(paEx); + Utils.handleException(paEx, topicName); } finally { session.getConnection().removeTemporaryDestination(this); } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java index 3a1c2998..dd87ba9b 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/TopicDiscoveryUtils.java @@ -48,7 +48,7 @@ public static List discoverTopicsByPattern(String regex, PulsarClient cl .getTopics(); return topicsPatternFilter(list, Pattern.compile(regex)); } catch (InterruptedException | TimeoutException | ExecutionException err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java index 30834db6..f3d4c7be 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/Utils.java @@ -53,25 +53,43 @@ public final class Utils { private Utils() {} - public static JMSException handleException(Throwable cause) { + public static JMSException handleException(Throwable cause, String topic) { while (cause instanceof CompletionException) { cause = cause.getCause(); } if (cause instanceof JMSException) { - return (JMSException) cause; + JMSException jms = (JMSException) cause; + if (jms.getMessage() != null && jms.getMessage().contains("topic=")) { + return jms; + } + String newMsg = buildMessage(jms.getMessage(), topic); + try { + JMSException enriched = jms.getClass().getConstructor(String.class).newInstance(newMsg); + enriched.initCause(cause); + return enriched; + } catch (ReflectiveOperationException e) { + JMSException fallback = new JMSException(newMsg); + fallback.initCause(cause); + return fallback; + } } if (cause instanceof InterruptedException) { Thread.currentThread().interrupt(); } if (cause instanceof ClassCastException) { - return (JMSException) - new MessageFormatException("Invalid cast " + cause.getMessage()).initCause(cause); + JMSException ex = + new MessageFormatException(buildMessage("Invalid cast " + safeMsg(cause), topic)); + ex.initCause(cause); + return ex; } if (cause instanceof NumberFormatException) { - return (JMSException) - new MessageFormatException("Invalid conversion " + cause.getMessage()).initCause(cause); + JMSException ex = + new MessageFormatException(buildMessage("Invalid conversion " + safeMsg(cause), topic)); + ex.initCause(cause); + return ex; } - JMSException err = new JMSException(cause + ""); + String msg = buildMessage(cause, topic); + JMSException err = new JMSException(msg); err.initCause(cause); if (cause instanceof Exception) { err.setLinkedException((Exception) cause); @@ -80,14 +98,47 @@ public static JMSException handleException(Throwable cause) { } return err; } + // helper + private static String safeMsg(Throwable t) { + return (t.getMessage() != null) ? t.getMessage() : t.toString(); + } + + private static String buildMessage(String base, String topic) { + if (topic == null || topic.isEmpty()) { + return base; + } + return base + " [topic=" + topic + "]"; + } + + private static String buildMessage(Throwable t, String topic) { + return buildMessage(safeMsg(t), topic); + } + + public static T executeWithTopic(String topic, SupplierWithException code) + throws JMSException { + try { + return code.run(); + } catch (Throwable err) { + throw handleException(err, topic); + } + } + + public static void executeWithTopic(String topic, RunnableWithException code) + throws JMSException { + try { + code.run(); + } catch (Throwable err) { + throw handleException(err, topic); + } + } public static T get(CompletableFuture future) throws JMSException { try { return future.get(); } catch (ExecutionException err) { - throw handleException(err.getCause()); + throw handleException(err.getCause(), null); } catch (InterruptedException err) { - throw handleException(err); + throw handleException(err, null); } } @@ -103,7 +154,7 @@ public static T invoke(SupplierWithException code) throws JMSException { try { return code.run(); } catch (Throwable err) { - throw handleException(err); + throw handleException(err, null); } } @@ -111,17 +162,23 @@ public static void invoke(RunnableWithException code) throws JMSException { try { code.run(); } catch (Throwable err) { - throw handleException(err); + throw handleException(err, null); } } public static boolean executeMessageListenerInSessionContext( PulsarSession session, PulsarMessageConsumer consumer, BooleanSupplier code) { - currentSession.set(new CallbackContext(session, consumer, null)); + PulsarDestination dest = consumer.getDestination(); + String topic = dest != null ? dest.getName() : null; try { + currentSession.set(new CallbackContext(session, consumer, null)); return session.executeCriticalOperation( () -> { - return code.getAsBoolean(); + try { + return code.getAsBoolean(); + } catch (Throwable err) { + throw handleException(err, topic); + } }); } catch (IllegalStateException err) { log.debug("Ignore error in listener", err); diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java index cfbb0792..ea8a15c0 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarBytesMessage.java @@ -61,7 +61,7 @@ public PulsarBytesMessage(byte[] payload) throws JMSException { this.dataOutputStream = null; writable = false; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -123,7 +123,7 @@ protected void prepareForSend(TypedMessageBuilder producer) throws JMSEx producer.value(originalMessage); } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -149,7 +149,7 @@ protected static JMSException handleException(Throwable t) throws JMSException { if (t instanceof EOFException) { throw new MessageEOFException(t + ""); } - throw Utils.handleException(t); + throw Utils.handleException(t, null); } /** diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java index e63c25d6..d595e9bb 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarObjectMessage.java @@ -48,7 +48,7 @@ private static Serializable decode(byte[] originalMessage) throws JMSException { ObjectInputStream input = new ObjectInputStream(new ByteArrayInputStream(originalMessage)); return (Serializable) input.readUnshared(); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -96,7 +96,7 @@ private static byte[] encode(Object object) throws JMSException { oo.close(); return out.toByteArray(); } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java index e09bf2f1..5ce2a128 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/messages/PulsarStreamMessage.java @@ -238,7 +238,7 @@ public PulsarStreamMessage(byte[] payload) throws JMSException { this.dataOutputStream = null; writable = false; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } @@ -300,7 +300,7 @@ protected void prepareForSend(TypedMessageBuilder producer) throws JMSEx producer.value(originalMessage); } } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java index 7c93a818..fc91d854 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConnectionConsumerTest.java @@ -519,7 +519,7 @@ public ServerSession getServerSession() throws JMSException { // log.debug("picked session {}", session); return session; } catch (Exception err) { - throw Utils.handleException(err); + throw Utils.handleException(err, null); } } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java index 7a180b65..b5681b72 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ConsumerConfigurationTest.java @@ -21,14 +21,19 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import jakarta.jms.*; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaType; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; final class ConsumerConfigurationTest { + @RegisterExtension + static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension(); private void test( Map consumerConfiguration, Consumer test) { diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java index c9bbf3bc..0d1dd112 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/ProducerCacheTest.java @@ -67,7 +67,7 @@ public void testMaxNumOfProducers() throws JMSException { int totalBrokerProducers = fetchProducerCount(queue1) + fetchProducerCount(queue2); assertEquals(1, totalBrokerProducers, "Total broker producers count should be 1"); } catch (PulsarAdminException e) { - throw Utils.handleException(e); + throw Utils.handleException(e, null); } } } @@ -112,7 +112,7 @@ public void testProducerAutoCloseTimeout() throws JMSException, InterruptedExcep fetchProducerCount(tempQueue), "Broker should have 0 producers after auto-close timeout"); } catch (PulsarAdminException e) { - throw Utils.handleException(e); + throw Utils.handleException(e, null); } } }