Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion activemq-filters/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<filters />
<filters/>
</configuration>
<executions>
<execution>
Expand Down
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<awaitility.version>4.2.0</awaitility.version>
<maven.antrun.plugin.version>3.1.0</maven.antrun.plugin.version>
<snakeyaml.version>2.0</snakeyaml.version>
<testcontainers.version>1.20.4</testcontainers.version>
<testcontainers.version>1.21.4</testcontainers.version>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change here? revert it this is not the scope in this PR

<lombok.version>1.18.42</lombok.version>
<!-- required for running tests on JDK11+ -->
<test.additional.args>--add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED
Expand Down Expand Up @@ -325,6 +325,18 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
Expand Down
6 changes: 3 additions & 3 deletions pulsar-client-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
<configuration>
<target>
<!-- shade the AsyncHttpClient ahc-default.properties files -->
<replace token="org.asynchttpclient." value="org.apache.pulsar.shade.org.asynchttpclient." file="${project.build.directory}/classes/org/asynchttpclient/config/ahc-default.properties" />
<replace token="org.asynchttpclient." value="org.apache.pulsar.shade.org.asynchttpclient." file="${project.build.directory}/classes/org/asynchttpclient/config/ahc-default.properties"/>
</target>
</configuration>
</execution>
Expand Down Expand Up @@ -480,8 +480,8 @@
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Multi-Release>true</Multi-Release>
Expand Down
4 changes: 2 additions & 2 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
28 changes: 24 additions & 4 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it.

<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -173,15 +180,28 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<mkdir dir="${project.build.outputDirectory}/interceptors" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<mkdir dir="${project.build.outputDirectory}/interceptors"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar"/>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it.

<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.example.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public void start() throws JMSException {
paused = false;
pausedCondition.signalAll();
} catch (Throwable err) {
throw Utils.handleException(err);
throw Utils.handleException(err, null);
} finally {
connectionPausedLock.writeLock().unlock();
}
Expand Down Expand Up @@ -555,7 +555,7 @@ public void stop() throws JMSException {
paused = true;
pausedCondition.signalAll();
} catch (Throwable err) {
throw Utils.handleException(err);
throw Utils.handleException(err, null);
} finally {
connectionPausedLock.writeLock().unlock();
}
Expand Down Expand Up @@ -893,7 +893,7 @@ public <T> T executeInConnectionPausedLock(Utils.SupplierWithException<T> run, i
}
return run.run();
} catch (Throwable err) {
throw Utils.handleException(err);
throw Utils.handleException(err, null);
} finally {
connectionPausedLock.readLock().unlock(); // let writers in
}
Expand Down Expand Up @@ -993,14 +993,14 @@ private void createPulsarTemporaryTopic(String name) throws JMSException {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (IllegalStateException err) {
if (!factory.isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
throw Utils.handleException(err, null);
}
log.warn(
"Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true",
name,
err);
} catch (Exception err) {
throw Utils.handleException(err);
throw Utils.handleException(err, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void close() throws JMSException {
this.spool.join();
}
} catch (InterruptedException err) {
Utils.handleException(err);
Utils.handleException(err, null);
}
this.consumer.close();
this.dispatcherSession.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ private synchronized void ensureInitialized(String connectUsername, String conne
}
this.initialized = true;
} catch (Throwable t) {
throw Utils.handleException(t);
throw Utils.handleException(t, null);
}
}

Expand All @@ -615,7 +615,8 @@ private static Cache<String, Producer<byte[]>> buildProducerCache(
} catch (PulsarClientException e) {
// ignore
log.debug(
"Exception while closing pulsar producer", Utils.handleException(e));
"Exception while closing pulsar producer",
Utils.handleException(e, null));
}
}
log.debug(
Expand Down Expand Up @@ -1052,7 +1053,7 @@ public void close() {
con.close();
} catch (Exception ignore) {
// ignore
Utils.handleException(ignore);
Utils.handleException(ignore, null);
}
}

Expand Down Expand Up @@ -1098,8 +1099,8 @@ public String getPulsarTopicName(Destination defaultDestination) throws JMSExcep

Producer<byte[]> getProducerForDestination(Destination defaultDestination, boolean transactions)
throws JMSException {
String fullQualifiedTopicName = getPulsarTopicName(defaultDestination);
try {
String fullQualifiedTopicName = getPulsarTopicName(defaultDestination);
String key = transactions ? fullQualifiedTopicName + "-tx" : fullQualifiedTopicName;
boolean transactionsStickyPartitions = transactions && isTransactionsStickyPartitions();
boolean enableJMSPriority = isEnableJMSPriority();
Expand Down Expand Up @@ -1161,7 +1162,7 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return producerBuilder.create();
}));
} catch (ExecutionException err) {
throw Utils.handleException(err);
throw Utils.handleException(err, fullQualifiedTopicName);
}
}

Expand Down Expand Up @@ -1226,7 +1227,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc
// applications start when the server is not available
long now = System.currentTimeMillis();
if (now - start > getWaitForServerStartupTimeout()) {
throw Utils.handleException(err);
throw Utils.handleException(err, fullQualifiedTopicName);
} else {
log.info(
"Got {} error while setting up subscription for queue {}, maybe the namespace/broker is still starting",
Expand All @@ -1237,7 +1238,7 @@ public void ensureQueueSubscription(PulsarDestination destination) throws JMSExc
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
throw Utils.handleException(err);
throw Utils.handleException(err, fullQualifiedTopicName);
}
}
}
Expand Down Expand Up @@ -1373,7 +1374,8 @@ public ConsumerBase<?> createConsumer(
}
return (ConsumerBase) newConsumer;
} catch (PulsarClientException err) {
throw Utils.handleException(err);
String topic = getPulsarTopicName(destination);
throw Utils.handleException(err, topic);
}
}

Expand Down Expand Up @@ -1497,7 +1499,7 @@ public String downloadServerSideFilter(
// persistent://xxx/xx/xxxx"
long now = System.currentTimeMillis();
if (now - start > getWaitForServerStartupTimeout()) {
throw Utils.handleException(notReady);
throw Utils.handleException(notReady, fullQualifiedTopicName);
} else {
log.info(
"Temporary error, cannot download server-side filters {}: {}",
Expand All @@ -1507,22 +1509,23 @@ public String downloadServerSideFilter(
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
throw Utils.handleException(notReady);
throw Utils.handleException(notReady, fullQualifiedTopicName);
}
}
} catch (PulsarAdminException err) {
throw Utils.handleException(err);
throw Utils.handleException(err, fullQualifiedTopicName);
}
}
}

public List<Reader<?>> createReadersForBrowser(
PulsarQueue destination, ConsumerConfiguration overrideConsumerConfiguration)
throws JMSException {

if (destination.isRegExp()) {

String topicName = null;
try {
String topicName = getPulsarTopicName(destination);
topicName = getPulsarTopicName(destination);
List<String> topicNames =
TopicDiscoveryUtils.discoverTopicsByPattern(topicName, getPulsarClient(), 1000);
log.info("createReadersForBrowser {} - {} - {}", destination, topicName, topicNames);
Expand All @@ -1534,7 +1537,7 @@ public List<Reader<?>> createReadersForBrowser(
}
return res;
} catch (Exception err) {
throw Utils.handleException(err);
throw Utils.handleException(err, topicName);
}
} else if (destination.isMultiTopic()) {
List<Reader<?>> res = new ArrayList<>();
Expand Down Expand Up @@ -1575,7 +1578,7 @@ public List<Reader<?>> createReadersForBrowser(
} catch (PulsarAdminException.NotFoundException err) {
return Collections.emptyList();
} catch (PulsarAdminException err) {
throw Utils.handleException(err);
throw Utils.handleException(err, fullQualifiedTopicName);
}
}
}
Expand Down Expand Up @@ -1627,7 +1630,7 @@ private Reader<?> createReaderForBrowserForNonPartitionedTopic(
readers.add(newReader);
return newReader;
} catch (PulsarClientException | PulsarAdminException err) {
throw Utils.handleException(err);
throw Utils.handleException(err, fullQualifiedTopicName);
}
}

Expand All @@ -1644,14 +1647,15 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
throws JMSException {
String systemNamespace = getSystemNamespace();
boolean somethingDone = false;
String fullQualifiedTopicName = null;
try {

if (destination != null) {
if (destination.isVirtualDestination()) {
throw new InvalidDestinationException(
"Virtual destinations are not supported for unsubscribe");
}
String fullQualifiedTopicName = getPulsarTopicName(destination);
fullQualifiedTopicName = getPulsarTopicName(destination);
log.info("deleteSubscription topic {} name {}", fullQualifiedTopicName, name);
try {
pulsarAdmin.topics().deleteSubscription(fullQualifiedTopicName, name, true);
Expand Down Expand Up @@ -1689,7 +1693,7 @@ public boolean deleteSubscription(PulsarDestination destination, String name)
}
}
} catch (Exception err) {
throw Utils.handleException(err);
throw Utils.handleException(err, fullQualifiedTopicName);
}
return somethingDone;
}
Expand Down
Loading
Loading