diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/bin/artemis b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/bin/artemis
index b792eb88695..5b5c91909b4 100755
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/bin/artemis
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/bin/artemis
@@ -119,6 +119,9 @@ if [ -f "$ARTEMIS_OOME_DUMP" ] ; then
mv $ARTEMIS_OOME_DUMP $ARTEMIS_OOME_DUMP.bkp
fi
+# Whether to allow Unsafe
+$JAVACMD --sun-misc-unsafe-memory-access=allow --version > /dev/null 2>&1 && ALLOW_UNSAFE="--sun-misc-unsafe-memory-access=allow"
+
exec "$JAVACMD" \
$LOGGING_ARGS \
$JAVA_ARGS \
@@ -132,6 +135,7 @@ exec "$JAVACMD" \
-Djava.io.tmpdir="$ARTEMIS_INSTANCE/tmp" \
-Ddata.dir="$ARTEMIS_DATA_DIR" \
-Dartemis.instance.etc="$ARTEMIS_INSTANCE_ETC" \
+ $ALLOW_UNSAFE \
$DEBUG_ARGS \
$JAVA_ARGS_APPEND \
org.apache.activemq.artemis.boot.Artemis "$@"
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis-utility.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis-utility.profile
index a724cd6c496..15d2043b588 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis-utility.profile
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis-utility.profile
@@ -24,7 +24,7 @@ if [ -z "$LOGGING_ARGS" ]; then
fi
if [ -z "$JAVA_ARGS" ]; then
- JAVA_ARGS="-Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED ${java-utility-opts}"
+ JAVA_ARGS="-Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED ${java-utility-opts}"
fi
# Uncomment to enable remote debugging
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
index 687cdd5acf1..23d4a133fe2 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile
@@ -30,7 +30,7 @@ HAWTIO_ROLES='${role}'
# Java Opts
if [ -z "$JAVA_ARGS" ]; then
- JAVA_ARGS="-XX:AutoBoxCacheMax=20000 -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx${java-memory} -Dhawtio.disableProxy=true -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Dhawtio.http.strictTransportSecurity=max-age=31536000;includeSubDomains;preload -Djolokia.policyLocation=classpath:jolokia-access.xml -Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED ${java-opts}"
+ JAVA_ARGS="-XX:AutoBoxCacheMax=20000 -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx${java-memory} -Dhawtio.disableProxy=true -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Dhawtio.http.strictTransportSecurity=max-age=31536000;includeSubDomains;preload -Djolokia.policyLocation=classpath:jolokia-access.xml -Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED ${java-opts}"
fi
# Uncomment to enable logging for Safepoint JVM pauses
diff --git a/artemis-commons/pom.xml b/artemis-commons/pom.xml
index af086bcd6a2..9a8128be4a8 100644
--- a/artemis-commons/pom.xml
+++ b/artemis-commons/pom.xml
@@ -83,6 +83,18 @@
io.netty
netty-transport
+
+ io.netty
+ netty-transport-classes-epoll
+
+
+ io.netty
+ netty-transport-classes-kqueue
+
+
+ io.netty
+ netty-transport-classes-io_uring
+
commons-beanutils
commons-beanutils
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java
index 540b3a5d39e..3b0dbfa9d38 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java
@@ -77,4 +77,22 @@ public interface ActiveMQUtilLogger {
@LogMessage(id = 202017, value = "Algorithm two-way is deprecated and will be removed from the default codec in a future version. Use a custom codec instead. Consult the manual for details.", level = LogMessage.Level.WARN)
void deprecatedDefaultCodecTwoWayAlgorithm();
+
+ @LogMessage(id = 202018, value = "Unable to check KQueue availability ", level = LogMessage.Level.WARN)
+ void unableToCheckKQueueAvailability(Throwable e);
+
+ @LogMessage(id = 202019, value = "KQueue is not available, please add to the classpath or configure useKQueue=false to remove this warning", level = LogMessage.Level.WARN)
+ void unableToCheckKQueueAvailabilityNoClass();
+
+ @LogMessage(id = 202020, value = "Unable to check Epoll availability ", level = LogMessage.Level.WARN)
+ void unableToCheckEpollAvailability(Throwable e);
+
+ @LogMessage(id = 202021, value = "Epoll is not available, please add to the classpath or configure useEpoll=false to remove this warning", level = LogMessage.Level.WARN)
+ void unableToCheckEpollAvailabilityNoClass();
+
+ @LogMessage(id = 202022, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN)
+ void unableToCheckIoUringAvailability(Throwable e);
+
+ @LogMessage(id = 202023, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN)
+ void unableToCheckIoUringAvailabilityNoClass();
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CheckDependencies.java
similarity index 60%
rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java
rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CheckDependencies.java
index 4a90401dcca..1d48b299879 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CheckDependencies.java
@@ -15,16 +15,15 @@
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.remoting.impl.netty;
+package org.apache.activemq.artemis.utils;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
-import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
-import org.apache.activemq.artemis.utils.Env;
+import io.netty.channel.uring.IoUring;
+import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
/**
- * This class will check for Epoll or KQueue is available, and return false in case of NoClassDefFoundError it could be
- * improved to check for other cases eventually.
+ * This class will check if certain dependencies are available, and return false in case of NoClassDefFoundError
*/
public class CheckDependencies {
@@ -32,10 +31,10 @@ public static final boolean isEpollAvailable() {
try {
return Env.isLinuxOs() && Epoll.isAvailable();
} catch (NoClassDefFoundError noClassDefFoundError) {
- ActiveMQClientLogger.LOGGER.unableToCheckEpollAvailabilitynoClass();
+ ActiveMQUtilLogger.LOGGER.unableToCheckEpollAvailabilityNoClass();
return false;
} catch (Throwable e) {
- ActiveMQClientLogger.LOGGER.unableToCheckEpollAvailability(e);
+ ActiveMQUtilLogger.LOGGER.unableToCheckEpollAvailability(e);
return false;
}
}
@@ -44,11 +43,24 @@ public static final boolean isKQueueAvailable() {
try {
return Env.isMacOs() && KQueue.isAvailable();
} catch (NoClassDefFoundError noClassDefFoundError) {
- ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailabilityNoClass();
+ ActiveMQUtilLogger.LOGGER.unableToCheckKQueueAvailabilityNoClass();
return false;
} catch (Throwable e) {
- ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailability(e);
+ ActiveMQUtilLogger.LOGGER.unableToCheckKQueueAvailability(e);
return false;
}
}
+
+ public static final boolean isIoUringAvailable() {
+ try {
+ return Env.isLinuxOs() && IoUring.isAvailable();
+ } catch (NoClassDefFoundError noClassDefFoundError) {
+ ActiveMQUtilLogger.LOGGER.unableToCheckIoUringAvailabilityNoClass();
+ return false;
+ } catch (Throwable e) {
+ ActiveMQUtilLogger.LOGGER.unableToCheckIoUringAvailability(e);
+ return false;
+ }
+ }
+
}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java
index c0781c24c55..ac33d562942 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java
@@ -204,6 +204,11 @@ public static Process spawnVM(String classPath,
commandList.add(jacocoAgent);
}
+ if (Runtime.version().feature() >= 24) {
+ commandList.add("--enable-native-access=ALL-UNNAMED");
+ commandList.add("--sun-misc-unsafe-memory-access=allow");
+ }
+
commandList.add(className);
for (String arg : args) {
commandList.add(arg);
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
index c9cf99d3940..7ea90a58c86 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java
@@ -223,6 +223,7 @@ public void shouldZeroesDirectByteBuffer() {
@Test
public void shouldZeroesLimitedDirectByteBuffer() {
+ assumeTrue(PlatformDependent.hasUnsafe());
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
diff --git a/artemis-core-client-osgi/pom.xml b/artemis-core-client-osgi/pom.xml
index e3b9b1882fe..c5c65982555 100644
--- a/artemis-core-client-osgi/pom.xml
+++ b/artemis-core-client-osgi/pom.xml
@@ -71,7 +71,7 @@
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
- io.netty.buffer;io.netty.*;version="[4.1,5)",
+ io.netty.*;version="[4.2,5)",
*
<_exportcontents>org.apache.activemq.artemis.*;-noimport:=true
diff --git a/artemis-core-client/pom.xml b/artemis-core-client/pom.xml
index e65fb950486..3b6609b02df 100644
--- a/artemis-core-client/pom.xml
+++ b/artemis-core-client/pom.xml
@@ -89,6 +89,15 @@
io.netty
netty-transport-classes-kqueue
+
+ io.netty
+ netty-transport-native-io_uring
+ ${netty-transport-native-io_uring-classifier}
+
+
+ io.netty
+ netty-transport-classes-io_uring
+
io.netty
netty-codec-http
@@ -109,10 +118,6 @@
io.netty
netty-handler-proxy
-
- io.netty
- netty-codec
-
io.netty
netty-codec-socks
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index ce5007d0a08..45cfe82caef 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -28,7 +28,7 @@
/**
* Logger Codes 210000 - 218999
*/
-@LogBundle(projectCode = "AMQ", regexID = "21[0-8][0-9]{3}", retiredIDs = {211001, 211002, 211003, 212000, 212006, 212029, 212074, 212078, 214012, 214023, 214024, 214026, 214027, 214028, 214029})
+@LogBundle(projectCode = "AMQ", regexID = "21[0-8][0-9]{3}", retiredIDs = {211001, 211002, 211003, 212000, 212006, 212029, 212071, 212073, 212074, 212075, 212076, 212078, 214012, 214023, 214024, 214026, 214027, 214028, 214029})
public interface ActiveMQClientLogger {
ActiveMQClientLogger LOGGER = BundleFactory.newBundle(ActiveMQClientLogger.class, ActiveMQClientLogger.class.getPackage().getName());
@@ -236,21 +236,9 @@ public interface ActiveMQClientLogger {
@LogMessage(id = 212070, value = "Unable to initialize VersionLoader ", level = LogMessage.Level.WARN)
void unableToInitVersionLoader(Throwable e);
- @LogMessage(id = 212071, value = "Unable to check Epoll availability ", level = LogMessage.Level.WARN)
- void unableToCheckEpollAvailability(Throwable e);
-
@LogMessage(id = 212072, value = "Failed to change channel state to ReadyForWriting ", level = LogMessage.Level.WARN)
void failedToSetChannelReadyForWriting(Throwable e);
- @LogMessage(id = 212073, value = "Unable to check KQueue availability ", level = LogMessage.Level.WARN)
- void unableToCheckKQueueAvailability(Throwable e);
-
- @LogMessage(id = 212075, value = "KQueue is not available, please add to the classpath or configure useKQueue=false to remove this warning", level = LogMessage.Level.WARN)
- void unableToCheckKQueueAvailabilityNoClass();
-
- @LogMessage(id = 212076, value = "Epoll is not available, please add to the classpath or configure useEpoll=false to remove this warning", level = LogMessage.Level.WARN)
- void unableToCheckEpollAvailabilitynoClass();
-
@LogMessage(id = 212077, value = "Timed out waiting to receive initial broadcast from cluster. Retry {} of {}", level = LogMessage.Level.WARN)
void broadcastTimeout(int retry, int maxretry);
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 90d34f9be29..bd0c734480e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -16,13 +16,12 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
-import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX;
-
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -63,16 +62,19 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueIoHandler;
import io.netty.channel.kqueue.KQueueSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.uring.IoUringIoHandler;
+import io.netty.channel.uring.IoUringSocketChannel;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpRequest;
@@ -92,11 +94,11 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
-import io.netty.handler.ssl.SslContext;
import io.netty.handler.codec.socksx.SocksVersion;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
+import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.NoopAddressResolverGroup;
import io.netty.util.AttributeKey;
@@ -122,14 +124,15 @@
import org.apache.activemq.artemis.spi.core.remoting.ssl.OpenSSLContextFactoryProvider;
import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextConfig;
import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider;
+import org.apache.activemq.artemis.utils.CheckDependencies;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.IPV6Util;
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
+import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX;
import static org.apache.activemq.artemis.utils.Base64.encodeBytes;
public class NettyConnector extends AbstractConnector {
@@ -137,6 +140,7 @@ public class NettyConnector extends AbstractConnector {
public static String NIO_CONNECTOR_TYPE = "NIO";
public static String EPOLL_CONNECTOR_TYPE = "EPOLL";
public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE";
+ public static String IOURING_CONNECTOR_TYPE = "IO_URING";
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -297,6 +301,8 @@ public class NettyConnector extends AbstractConnector {
private boolean useKQueue;
+ private boolean useIoUring;
+
private int remotingThreads;
private boolean useGlobalWorkerPool;
@@ -402,6 +408,7 @@ public NettyConnector(final Map configuration,
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
+ useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);
useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
@@ -543,27 +550,43 @@ public synchronized void start() {
return;
}
- if (remotingThreads == -1) {
+ boolean defaultRemotingThreads = remotingThreads == -1;
+
+ if (defaultRemotingThreads) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}
String connectorType;
- if (useEpoll && CheckDependencies.isEpollAvailable()) {
+ if (useIoUring && CheckDependencies.isIoUringAvailable()) {
+ //IO_URING should default to 1 remotingThread unless specified in config
+ remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;
+
+ if (useGlobalWorkerPool) {
+ group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory())));
+ } else {
+ group = new MultiThreadIoEventLoopGroup(remotingThreads, IoUringIoHandler.newFactory());
+ }
+
+ connectorType = IOURING_CONNECTOR_TYPE;
+ channelClazz = IoUringSocketChannel.class;
+
+ logger.debug("Connector {} using native io_uring", this);
+ } else if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useGlobalWorkerPool) {
- group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory)));
+ group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory())));
} else {
- group = new EpollEventLoopGroup(remotingThreads);
+ group = new MultiThreadIoEventLoopGroup(remotingThreads, EpollIoHandler.newFactory());
}
connectorType = EPOLL_CONNECTOR_TYPE;
channelClazz = EpollSocketChannel.class;
logger.debug("Connector {} using native epoll", this);
} else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
if (useGlobalWorkerPool) {
- group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory)));
+ group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory())));
} else {
- group = new KQueueEventLoopGroup(remotingThreads);
+ group = new MultiThreadIoEventLoopGroup(remotingThreads, KQueueIoHandler.newFactory());
}
connectorType = KQUEUE_CONNECTOR_TYPE;
channelClazz = KQueueSocketChannel.class;
@@ -571,10 +594,10 @@ public synchronized void start() {
} else {
if (useGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;
- group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory)));
+ group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory())));
} else {
channelClazz = NioSocketChannel.class;
- group = new NioEventLoopGroup(remotingThreads);
+ group = new MultiThreadIoEventLoopGroup(remotingThreads, NioIoHandler.newFactory());
}
connectorType = NIO_CONNECTOR_TYPE;
channelClazz = NioSocketChannel.class;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index b92947e97d8..1b848c02b43 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -74,6 +74,8 @@ public class TransportConstants {
public static final String USE_KQUEUE_PROP_NAME = "useKQueue";
+ public static final String USE_IOURING_PROP_NAME = "useIoUring";
+
/**
* @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME
*/
@@ -228,6 +230,8 @@ public class TransportConstants {
public static final boolean DEFAULT_USE_KQUEUE = true;
+ public static final boolean DEFAULT_USE_IOURING = false;
+
public static final boolean DEFAULT_USE_INVM = false;
public static final boolean DEFAULT_USE_SERVLET = false;
@@ -442,6 +446,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
+ allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
//noinspection deprecation
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
@@ -522,6 +527,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
+ allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);
diff --git a/artemis-distribution/src/main/resources/bin/artemis b/artemis-distribution/src/main/resources/bin/artemis
index 8f44f958931..7b3fda90468 100755
--- a/artemis-distribution/src/main/resources/bin/artemis
+++ b/artemis-distribution/src/main/resources/bin/artemis
@@ -90,10 +90,15 @@ if $cygwin ; then
CLASSPATH=`cygpath --windows "$CLASSPATH"`
fi
+# Whether to allow Unsafe
+$JAVACMD --sun-misc-unsafe-memory-access=allow --version > /dev/null 2>&1 && ALLOW_UNSAFE="--sun-misc-unsafe-memory-access=allow"
+
exec "$JAVACMD" $JAVA_ARGS $ARTEMIS_CLUSTER_PROPS \
-classpath "$CLASSPATH" \
-Dartemis.home="$ARTEMIS_HOME" \
-Djava.library.path="$ARTEMIS_HOME/bin/lib/linux-$(uname -m)" \
+ --enable-native-access=ALL-UNNAMED \
+ $ALLOW_UNSAFE \
$DEBUG_ARGS \
$JAVA_ARGS_APPEND \
org.apache.activemq.artemis.boot.Artemis "$@"
diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml
index f52772a189d..04338f17de4 100644
--- a/artemis-features/src/main/resources/features.xml
+++ b/artemis-features/src/main/resources/features.xml
@@ -33,7 +33,8 @@
mvn:io.netty/netty-resolver/${netty.version}
mvn:io.netty/netty-transport/${netty.version}
mvn:io.netty/netty-buffer/${netty.version}
- mvn:io.netty/netty-codec/${netty.version}
+ mvn:io.netty/netty-codec-base/${netty.version}
+ mvn:io.netty/netty-codec-compression/${netty.version}
mvn:io.netty/netty-codec-socks/${netty.version}
mvn:io.netty/netty-codec-haproxy/${netty.version}
mvn:io.netty/netty-codec-http/${netty.version}
@@ -44,6 +45,8 @@
mvn:io.netty/netty-transport-native-epoll/${netty.version}
mvn:io.netty/netty-transport-classes-kqueue/${netty.version}
mvn:io.netty/netty-transport-native-kqueue/${netty.version}
+ mvn:io.netty/netty-transport-classes-io_uring/${netty.version}
+ mvn:io.netty/netty-transport-native-io_uring/${netty.version}
mvn:io.netty/netty-transport-native-unix-common/${netty.version}
diff --git a/artemis-jms-client-osgi/pom.xml b/artemis-jms-client-osgi/pom.xml
index e3c5837e0e4..ed5b885c779 100644
--- a/artemis-jms-client-osgi/pom.xml
+++ b/artemis-jms-client-osgi/pom.xml
@@ -79,7 +79,7 @@
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
- io.netty.buffer;io.netty.*;version="[4.1,5)",
+ io.netty.*;version="[4.2,5)",
*
<_exportcontents>org.apache.activemq.artemis.*;-noimport:=true
diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml
index fb774f29c1b..b1233224713 100644
--- a/artemis-pom/pom.xml
+++ b/artemis-pom/pom.xml
@@ -447,12 +447,6 @@
${netty.version}
-
- io.netty
- netty-codec
- ${netty.version}
-
-
io.netty
netty-codec-http
@@ -521,6 +515,19 @@
${netty-transport-native-kqueue-classifier}
+
+ io.netty
+ netty-transport-classes-io_uring
+ ${netty.version}
+
+
+
+ io.netty
+ netty-transport-native-io_uring
+ ${netty.version}
+ ${netty-transport-native-io_uring-classifier}
+
+
io.netty
netty-tcnative-boringssl-static
diff --git a/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml b/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml
index 2f649262e84..69f3bd1bacb 100644
--- a/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml
+++ b/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml
@@ -90,10 +90,6 @@
io.netty
netty-transport
-
- io.netty
- netty-codec
-
org.osgi
osgi.cmpn
diff --git a/artemis-protocols/artemis-mqtt-protocol/pom.xml b/artemis-protocols/artemis-mqtt-protocol/pom.xml
index 4c47d3344c3..2d704c39859 100644
--- a/artemis-protocols/artemis-mqtt-protocol/pom.xml
+++ b/artemis-protocols/artemis-mqtt-protocol/pom.xml
@@ -69,10 +69,6 @@
io.netty
netty-transport
-
- io.netty
- netty-codec
-
io.netty
netty-common
diff --git a/artemis-protocols/artemis-openwire-protocol/pom.xml b/artemis-protocols/artemis-openwire-protocol/pom.xml
index 5c0a7154169..f1f0b1feff5 100644
--- a/artemis-protocols/artemis-openwire-protocol/pom.xml
+++ b/artemis-protocols/artemis-openwire-protocol/pom.xml
@@ -96,10 +96,6 @@
io.netty
netty-transport
-
- io.netty
- netty-codec
-
org.osgi
osgi.cmpn
diff --git a/artemis-server-osgi/pom.xml b/artemis-server-osgi/pom.xml
index e3a6b8a8d4c..56ddcd7ee3f 100644
--- a/artemis-server-osgi/pom.xml
+++ b/artemis-server-osgi/pom.xml
@@ -129,7 +129,7 @@
org.glassfish.json*;resolution:=optional,
org.postgresql*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
- io.netty.buffer;io.netty.*;version="[4.1,5)",
+ io.netty.*;version="[4.2,5)",
java.net.http*;resolution:=optional,
com.sun.net.httpserver*;resolution:=optional,
com.nimbusds.jose*;resolution:=optional,
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index 789c4f28da9..997ec85f326 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -136,10 +136,6 @@
io.netty
netty-transport-classes-kqueue
-
- io.netty
- netty-codec
-
commons-beanutils
commons-beanutils
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index e07b34d789c..036d0c31764 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -49,19 +49,22 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueIoHandler;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalServerChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.uring.IoUringIoHandler;
+import io.netty.channel.uring.IoUringServerSocketChannel;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
@@ -98,6 +101,7 @@
import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextConfig;
import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.CheckDependencies;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ProxyProtocolUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -118,6 +122,7 @@ public class NettyAcceptor extends AbstractAcceptor {
public static final String NIO_ACCEPTOR_TYPE = "NIO";
public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL";
public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE";
+ public static final String IOURING_ACCEPTOR_TYPE = "IO_URING";
static {
// Disable default Netty leak detection if the Netty leak detection level system properties are not in use
@@ -158,6 +163,8 @@ public class NettyAcceptor extends AbstractAcceptor {
private final boolean useKQueue;
+ private final boolean useIoUring;
+
private final ProtocolHandler protocolHandler;
private final String host;
@@ -308,6 +315,7 @@ public NettyAcceptor(final String name,
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
+ useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);
backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
@@ -497,19 +505,35 @@ protected void internalStart() throws Exception {
eventLoopGroup = new DefaultEventLoopGroup();
} else {
ThreadFactory threadFactory = SecurityManagerShim.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory(threadFactoryGroupName, true, ClientSessionFactoryImpl.class.getClassLoader()));
- if (useEpoll && CheckDependencies.isEpollAvailable()) {
+
+ boolean defaultRemotingThreads = remotingThreads == -1;
+
+ if (defaultRemotingThreads) {
+ // Default to number of cores * 3
+ remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
+ }
+
+ if (useIoUring && CheckDependencies.isIoUringAvailable()) {
+ //IO_URING should default to 1 remotingThread unless specified in config
+ remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;
+
+ channelClazz = IoUringServerSocketChannel.class;
+ eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory());
+ acceptorType = IOURING_ACCEPTOR_TYPE;
+ logger.debug("Acceptor using native io_uring");
+ } else if (useEpoll && CheckDependencies.isEpollAvailable()) {
channelClazz = EpollServerSocketChannel.class;
- eventLoopGroup = new EpollEventLoopGroup(remotingThreads, threadFactory);
+ eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory());
acceptorType = EPOLL_ACCEPTOR_TYPE;
logger.debug("Acceptor {} using native epoll", name);
} else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
channelClazz = KQueueServerSocketChannel.class;
- eventLoopGroup = new KQueueEventLoopGroup(remotingThreads, threadFactory);
+ eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory());
acceptorType = KQUEUE_ACCEPTOR_TYPE;
logger.debug("Acceptor {} using native kqueue", name);
} else {
channelClazz = NioServerSocketChannel.class;
- eventLoopGroup = new NioEventLoopGroup(remotingThreads, threadFactory);
+ eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory());
acceptorType = NIO_ACCEPTOR_TYPE;
logger.debug("Acceptor {} using nio", name);
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
index 8be81812e03..17a999b79d0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
@@ -32,6 +32,7 @@
import java.util.concurrent.Executors;
import java.util.stream.Stream;
+import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -102,6 +103,7 @@ public void testFixJournalFileSize() throws Exception {
@TestTemplate
public void testAddBytesToLargeMessageNotLeakingByteBuffer() throws Exception {
+ assumeTrue(PlatformDependent.hasUnsafe());
if (journalType == JournalType.ASYNCIO) {
assumeTrue(AIOSequentialFileFactory.isSupported(), "AIO is not supported on this platform");
}
diff --git a/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java b/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java
index 86a2281cca9..f5766c52a31 100644
--- a/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java
+++ b/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java
@@ -58,8 +58,9 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
@@ -1141,7 +1142,7 @@ private void createRandomJettyFiles(File dir, int num, List collector) thr
}
private Channel getChannel(int port, ClientHandler clientHandler) throws InterruptedException {
- EventLoopGroup group = new NioEventLoopGroup();
+ EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {
@Override
@@ -1154,7 +1155,7 @@ protected void initChannel(Channel ch) throws Exception {
}
private Channel getSslChannel(int port, SslHandler sslHandler, ClientHandler clientHandler) throws InterruptedException {
- EventLoopGroup group = new NioEventLoopGroup();
+ EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {
@Override
diff --git a/docs/user-manual/configuring-transports.adoc b/docs/user-manual/configuring-transports.adoc
index 9655f4158f6..8a6466277ef 100644
--- a/docs/user-manual/configuring-transports.adoc
+++ b/docs/user-manual/configuring-transports.adoc
@@ -250,7 +250,7 @@ These Native transports add features specific to a particular platform, generate
Both Clients and Server can benefit from this.
-Current Supported Platforms.
+Currently supported platforms:
* Linux running 64bit JVM
* MacOS running 64bit JVM
@@ -261,7 +261,7 @@ If running on an unsupported platform or if there are any issues loading native
==== Linux Native Transport
-On supported Linux platforms Epoll is used, @see https://en.wikipedia.org/wiki/Epoll.
+On supported Linux platforms Epoll can be used, @see https://en.wikipedia.org/wiki/Epoll.
The following properties are specific to this native transport:
@@ -270,6 +270,24 @@ enables the use of epoll if a supported linux platform is running a 64bit JVM is
Setting this to `false` will force the use of Java NIO instead of epoll.
Default is `true`
+Additionally, IO_URING can be used, @see https://en.wikipedia.org/wiki/Io_uring.
+
+The following properties are specific to this native transport:
+
+useIoUring::
+enables the use of IO_URING if a supported linux platform running a 64bit JVM is detected.
+Setting this to `false` will attempt the use of `epoll`, then finally falling back to using Java NIO.
+Default is `false`
+
+[WARNING]
+====
+[#io_uring-warning]
+IO_URING support is a recent addition to the broker and should be considered `experimental` at this stage.
+Using it _could_ introduce unwanted side effects. As such, thorough testing and verification are advised before use in any production or otherwise critical environment.
+
+Netty has provided a https://github.com/netty/netty/tree/4.2/transport-native-io_uring#faq[FAQ] that may be helpful.
+====
+
==== MacOS Native Transport
On supported MacOS platforms KQueue is used, @see https://en.wikipedia.org/wiki/Kqueue.
diff --git a/pom.xml b/pom.xml
index ad1d4148f53..a0b701691cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,7 +122,7 @@
12.3.1
5.23.0
4.0.6
- 4.1.132.Final
+ 4.2.12.Final
2.2.2
5.9.0
3.9.5
@@ -242,6 +242,7 @@
false
true
+