diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/bin/artemis b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/bin/artemis index b792eb88695..5b5c91909b4 100755 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/bin/artemis +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/bin/artemis @@ -119,6 +119,9 @@ if [ -f "$ARTEMIS_OOME_DUMP" ] ; then mv $ARTEMIS_OOME_DUMP $ARTEMIS_OOME_DUMP.bkp fi +# Whether to allow Unsafe +$JAVACMD --sun-misc-unsafe-memory-access=allow --version > /dev/null 2>&1 && ALLOW_UNSAFE="--sun-misc-unsafe-memory-access=allow" + exec "$JAVACMD" \ $LOGGING_ARGS \ $JAVA_ARGS \ @@ -132,6 +135,7 @@ exec "$JAVACMD" \ -Djava.io.tmpdir="$ARTEMIS_INSTANCE/tmp" \ -Ddata.dir="$ARTEMIS_DATA_DIR" \ -Dartemis.instance.etc="$ARTEMIS_INSTANCE_ETC" \ + $ALLOW_UNSAFE \ $DEBUG_ARGS \ $JAVA_ARGS_APPEND \ org.apache.activemq.artemis.boot.Artemis "$@" diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis-utility.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis-utility.profile index a724cd6c496..15d2043b588 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis-utility.profile +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis-utility.profile @@ -24,7 +24,7 @@ if [ -z "$LOGGING_ARGS" ]; then fi if [ -z "$JAVA_ARGS" ]; then - JAVA_ARGS="-Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED ${java-utility-opts}" + JAVA_ARGS="-Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED ${java-utility-opts}" fi # Uncomment to enable remote debugging diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile index 687cdd5acf1..23d4a133fe2 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/artemis.profile @@ -30,7 +30,7 @@ HAWTIO_ROLES='${role}' # Java Opts if [ -z "$JAVA_ARGS" ]; then - JAVA_ARGS="-XX:AutoBoxCacheMax=20000 -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx${java-memory} -Dhawtio.disableProxy=true -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Dhawtio.http.strictTransportSecurity=max-age=31536000;includeSubDomains;preload -Djolokia.policyLocation=classpath:jolokia-access.xml -Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED ${java-opts}" + JAVA_ARGS="-XX:AutoBoxCacheMax=20000 -XX:+PrintClassHistogram -XX:+UseG1GC -XX:+UseStringDeduplication -Xms512M -Xmx${java-memory} -Dhawtio.disableProxy=true -Dhawtio.realm=activemq -Dhawtio.offline=true -Dhawtio.rolePrincipalClasses=org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal -Dhawtio.http.strictTransportSecurity=max-age=31536000;includeSubDomains;preload -Djolokia.policyLocation=classpath:jolokia-access.xml -Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED ${java-opts}" fi # Uncomment to enable logging for Safepoint JVM pauses diff --git a/artemis-commons/pom.xml b/artemis-commons/pom.xml index af086bcd6a2..9a8128be4a8 100644 --- a/artemis-commons/pom.xml +++ b/artemis-commons/pom.xml @@ -83,6 +83,18 @@ io.netty netty-transport + + io.netty + netty-transport-classes-epoll + + + io.netty + netty-transport-classes-kqueue + + + io.netty + netty-transport-classes-io_uring + commons-beanutils commons-beanutils diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java index 540b3a5d39e..3b0dbfa9d38 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java @@ -77,4 +77,22 @@ public interface ActiveMQUtilLogger { @LogMessage(id = 202017, value = "Algorithm two-way is deprecated and will be removed from the default codec in a future version. Use a custom codec instead. Consult the manual for details.", level = LogMessage.Level.WARN) void deprecatedDefaultCodecTwoWayAlgorithm(); + + @LogMessage(id = 202018, value = "Unable to check KQueue availability ", level = LogMessage.Level.WARN) + void unableToCheckKQueueAvailability(Throwable e); + + @LogMessage(id = 202019, value = "KQueue is not available, please add to the classpath or configure useKQueue=false to remove this warning", level = LogMessage.Level.WARN) + void unableToCheckKQueueAvailabilityNoClass(); + + @LogMessage(id = 202020, value = "Unable to check Epoll availability ", level = LogMessage.Level.WARN) + void unableToCheckEpollAvailability(Throwable e); + + @LogMessage(id = 202021, value = "Epoll is not available, please add to the classpath or configure useEpoll=false to remove this warning", level = LogMessage.Level.WARN) + void unableToCheckEpollAvailabilityNoClass(); + + @LogMessage(id = 202022, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN) + void unableToCheckIoUringAvailability(Throwable e); + + @LogMessage(id = 202023, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN) + void unableToCheckIoUringAvailabilityNoClass(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CheckDependencies.java similarity index 60% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CheckDependencies.java index 4a90401dcca..1d48b299879 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/CheckDependencies.java @@ -15,16 +15,15 @@ * limitations under the License. */ -package org.apache.activemq.artemis.core.remoting.impl.netty; +package org.apache.activemq.artemis.utils; import io.netty.channel.epoll.Epoll; import io.netty.channel.kqueue.KQueue; -import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.apache.activemq.artemis.utils.Env; +import io.netty.channel.uring.IoUring; +import org.apache.activemq.artemis.logs.ActiveMQUtilLogger; /** - * This class will check for Epoll or KQueue is available, and return false in case of NoClassDefFoundError it could be - * improved to check for other cases eventually. + * This class will check if certain dependencies are available, and return false in case of NoClassDefFoundError */ public class CheckDependencies { @@ -32,10 +31,10 @@ public static final boolean isEpollAvailable() { try { return Env.isLinuxOs() && Epoll.isAvailable(); } catch (NoClassDefFoundError noClassDefFoundError) { - ActiveMQClientLogger.LOGGER.unableToCheckEpollAvailabilitynoClass(); + ActiveMQUtilLogger.LOGGER.unableToCheckEpollAvailabilityNoClass(); return false; } catch (Throwable e) { - ActiveMQClientLogger.LOGGER.unableToCheckEpollAvailability(e); + ActiveMQUtilLogger.LOGGER.unableToCheckEpollAvailability(e); return false; } } @@ -44,11 +43,24 @@ public static final boolean isKQueueAvailable() { try { return Env.isMacOs() && KQueue.isAvailable(); } catch (NoClassDefFoundError noClassDefFoundError) { - ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailabilityNoClass(); + ActiveMQUtilLogger.LOGGER.unableToCheckKQueueAvailabilityNoClass(); return false; } catch (Throwable e) { - ActiveMQClientLogger.LOGGER.unableToCheckKQueueAvailability(e); + ActiveMQUtilLogger.LOGGER.unableToCheckKQueueAvailability(e); return false; } } + + public static final boolean isIoUringAvailable() { + try { + return Env.isLinuxOs() && IoUring.isAvailable(); + } catch (NoClassDefFoundError noClassDefFoundError) { + ActiveMQUtilLogger.LOGGER.unableToCheckIoUringAvailabilityNoClass(); + return false; + } catch (Throwable e) { + ActiveMQUtilLogger.LOGGER.unableToCheckIoUringAvailability(e); + return false; + } + } + } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java index c0781c24c55..ac33d562942 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java @@ -204,6 +204,11 @@ public static Process spawnVM(String classPath, commandList.add(jacocoAgent); } + if (Runtime.version().feature() >= 24) { + commandList.add("--enable-native-access=ALL-UNNAMED"); + commandList.add("--sun-misc-unsafe-memory-access=allow"); + } + commandList.add(className); for (String arg : args) { commandList.add(arg); diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java index c9cf99d3940..7ea90a58c86 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java @@ -223,6 +223,7 @@ public void shouldZeroesDirectByteBuffer() { @Test public void shouldZeroesLimitedDirectByteBuffer() { + assumeTrue(PlatformDependent.hasUnsafe()); final byte one = (byte) 1; final int capacity = 64; final int bytes = 32; diff --git a/artemis-core-client-osgi/pom.xml b/artemis-core-client-osgi/pom.xml index e3b9b1882fe..c5c65982555 100644 --- a/artemis-core-client-osgi/pom.xml +++ b/artemis-core-client-osgi/pom.xml @@ -71,7 +71,7 @@ org.glassfish.json*;resolution:=optional, de.dentrassi.crypto.pem;resolution:=optional, - io.netty.buffer;io.netty.*;version="[4.1,5)", + io.netty.*;version="[4.2,5)", * <_exportcontents>org.apache.activemq.artemis.*;-noimport:=true diff --git a/artemis-core-client/pom.xml b/artemis-core-client/pom.xml index e65fb950486..3b6609b02df 100644 --- a/artemis-core-client/pom.xml +++ b/artemis-core-client/pom.xml @@ -89,6 +89,15 @@ io.netty netty-transport-classes-kqueue + + io.netty + netty-transport-native-io_uring + ${netty-transport-native-io_uring-classifier} + + + io.netty + netty-transport-classes-io_uring + io.netty netty-codec-http @@ -109,10 +118,6 @@ io.netty netty-handler-proxy - - io.netty - netty-codec - io.netty netty-codec-socks diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index ce5007d0a08..45cfe82caef 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -28,7 +28,7 @@ /** * Logger Codes 210000 - 218999 */ -@LogBundle(projectCode = "AMQ", regexID = "21[0-8][0-9]{3}", retiredIDs = {211001, 211002, 211003, 212000, 212006, 212029, 212074, 212078, 214012, 214023, 214024, 214026, 214027, 214028, 214029}) +@LogBundle(projectCode = "AMQ", regexID = "21[0-8][0-9]{3}", retiredIDs = {211001, 211002, 211003, 212000, 212006, 212029, 212071, 212073, 212074, 212075, 212076, 212078, 214012, 214023, 214024, 214026, 214027, 214028, 214029}) public interface ActiveMQClientLogger { ActiveMQClientLogger LOGGER = BundleFactory.newBundle(ActiveMQClientLogger.class, ActiveMQClientLogger.class.getPackage().getName()); @@ -236,21 +236,9 @@ public interface ActiveMQClientLogger { @LogMessage(id = 212070, value = "Unable to initialize VersionLoader ", level = LogMessage.Level.WARN) void unableToInitVersionLoader(Throwable e); - @LogMessage(id = 212071, value = "Unable to check Epoll availability ", level = LogMessage.Level.WARN) - void unableToCheckEpollAvailability(Throwable e); - @LogMessage(id = 212072, value = "Failed to change channel state to ReadyForWriting ", level = LogMessage.Level.WARN) void failedToSetChannelReadyForWriting(Throwable e); - @LogMessage(id = 212073, value = "Unable to check KQueue availability ", level = LogMessage.Level.WARN) - void unableToCheckKQueueAvailability(Throwable e); - - @LogMessage(id = 212075, value = "KQueue is not available, please add to the classpath or configure useKQueue=false to remove this warning", level = LogMessage.Level.WARN) - void unableToCheckKQueueAvailabilityNoClass(); - - @LogMessage(id = 212076, value = "Epoll is not available, please add to the classpath or configure useEpoll=false to remove this warning", level = LogMessage.Level.WARN) - void unableToCheckEpollAvailabilitynoClass(); - @LogMessage(id = 212077, value = "Timed out waiting to receive initial broadcast from cluster. Retry {} of {}", level = LogMessage.Level.WARN) void broadcastTimeout(int retry, int maxretry); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 90d34f9be29..bd0c734480e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -16,13 +16,12 @@ */ package org.apache.activemq.artemis.core.remoting.impl.netty; -import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; - import javax.net.ssl.SNIHostName; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -63,16 +62,19 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.WriteBufferWaterMark; -import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollIoHandler; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueIoHandler; import io.netty.channel.kqueue.KQueueSocketChannel; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringSocketChannel; import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpRequest; @@ -92,11 +94,11 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.Cookie; -import io.netty.handler.ssl.SslContext; import io.netty.handler.codec.socksx.SocksVersion; import io.netty.handler.proxy.ProxyHandler; import io.netty.handler.proxy.Socks4ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler; +import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.resolver.NoopAddressResolverGroup; import io.netty.util.AttributeKey; @@ -122,14 +124,15 @@ import org.apache.activemq.artemis.spi.core.remoting.ssl.OpenSSLContextFactoryProvider; import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextConfig; import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider; +import org.apache.activemq.artemis.utils.CheckDependencies; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.IPV6Util; import org.apache.activemq.artemis.utils.PasswordMaskingUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; +import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; import static org.apache.activemq.artemis.utils.Base64.encodeBytes; public class NettyConnector extends AbstractConnector { @@ -137,6 +140,7 @@ public class NettyConnector extends AbstractConnector { public static String NIO_CONNECTOR_TYPE = "NIO"; public static String EPOLL_CONNECTOR_TYPE = "EPOLL"; public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE"; + public static String IOURING_CONNECTOR_TYPE = "IO_URING"; private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -297,6 +301,8 @@ public class NettyConnector extends AbstractConnector { private boolean useKQueue; + private boolean useIoUring; + private int remotingThreads; private boolean useGlobalWorkerPool; @@ -402,6 +408,7 @@ public NettyConnector(final Map configuration, useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); + useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration); useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); @@ -543,27 +550,43 @@ public synchronized void start() { return; } - if (remotingThreads == -1) { + boolean defaultRemotingThreads = remotingThreads == -1; + + if (defaultRemotingThreads) { // Default to number of cores * 3 remotingThreads = Runtime.getRuntime().availableProcessors() * 3; } String connectorType; - if (useEpoll && CheckDependencies.isEpollAvailable()) { + if (useIoUring && CheckDependencies.isIoUringAvailable()) { + //IO_URING should default to 1 remotingThread unless specified in config + remotingThreads = defaultRemotingThreads ? 1 : remotingThreads; + + if (useGlobalWorkerPool) { + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory()))); + } else { + group = new MultiThreadIoEventLoopGroup(remotingThreads, IoUringIoHandler.newFactory()); + } + + connectorType = IOURING_CONNECTOR_TYPE; + channelClazz = IoUringSocketChannel.class; + + logger.debug("Connector {} using native io_uring", this); + } else if (useEpoll && CheckDependencies.isEpollAvailable()) { if (useGlobalWorkerPool) { - group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory))); + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory()))); } else { - group = new EpollEventLoopGroup(remotingThreads); + group = new MultiThreadIoEventLoopGroup(remotingThreads, EpollIoHandler.newFactory()); } connectorType = EPOLL_CONNECTOR_TYPE; channelClazz = EpollSocketChannel.class; logger.debug("Connector {} using native epoll", this); } else if (useKQueue && CheckDependencies.isKQueueAvailable()) { if (useGlobalWorkerPool) { - group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory))); + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory()))); } else { - group = new KQueueEventLoopGroup(remotingThreads); + group = new MultiThreadIoEventLoopGroup(remotingThreads, KQueueIoHandler.newFactory()); } connectorType = KQUEUE_CONNECTOR_TYPE; channelClazz = KQueueSocketChannel.class; @@ -571,10 +594,10 @@ public synchronized void start() { } else { if (useGlobalWorkerPool) { channelClazz = NioSocketChannel.class; - group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory))); + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory()))); } else { channelClazz = NioSocketChannel.class; - group = new NioEventLoopGroup(remotingThreads); + group = new MultiThreadIoEventLoopGroup(remotingThreads, NioIoHandler.newFactory()); } connectorType = NIO_CONNECTOR_TYPE; channelClazz = NioSocketChannel.class; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index b92947e97d8..1b848c02b43 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -74,6 +74,8 @@ public class TransportConstants { public static final String USE_KQUEUE_PROP_NAME = "useKQueue"; + public static final String USE_IOURING_PROP_NAME = "useIoUring"; + /** * @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME */ @@ -228,6 +230,8 @@ public class TransportConstants { public static final boolean DEFAULT_USE_KQUEUE = true; + public static final boolean DEFAULT_USE_IOURING = false; + public static final boolean DEFAULT_USE_INVM = false; public static final boolean DEFAULT_USE_SERVLET = false; @@ -442,6 +446,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) { allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME); //noinspection deprecation allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME); @@ -522,6 +527,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) { allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME); allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME); diff --git a/artemis-distribution/src/main/resources/bin/artemis b/artemis-distribution/src/main/resources/bin/artemis index 8f44f958931..7b3fda90468 100755 --- a/artemis-distribution/src/main/resources/bin/artemis +++ b/artemis-distribution/src/main/resources/bin/artemis @@ -90,10 +90,15 @@ if $cygwin ; then CLASSPATH=`cygpath --windows "$CLASSPATH"` fi +# Whether to allow Unsafe +$JAVACMD --sun-misc-unsafe-memory-access=allow --version > /dev/null 2>&1 && ALLOW_UNSAFE="--sun-misc-unsafe-memory-access=allow" + exec "$JAVACMD" $JAVA_ARGS $ARTEMIS_CLUSTER_PROPS \ -classpath "$CLASSPATH" \ -Dartemis.home="$ARTEMIS_HOME" \ -Djava.library.path="$ARTEMIS_HOME/bin/lib/linux-$(uname -m)" \ + --enable-native-access=ALL-UNNAMED \ + $ALLOW_UNSAFE \ $DEBUG_ARGS \ $JAVA_ARGS_APPEND \ org.apache.activemq.artemis.boot.Artemis "$@" diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml index f52772a189d..04338f17de4 100644 --- a/artemis-features/src/main/resources/features.xml +++ b/artemis-features/src/main/resources/features.xml @@ -33,7 +33,8 @@ mvn:io.netty/netty-resolver/${netty.version} mvn:io.netty/netty-transport/${netty.version} mvn:io.netty/netty-buffer/${netty.version} - mvn:io.netty/netty-codec/${netty.version} + mvn:io.netty/netty-codec-base/${netty.version} + mvn:io.netty/netty-codec-compression/${netty.version} mvn:io.netty/netty-codec-socks/${netty.version} mvn:io.netty/netty-codec-haproxy/${netty.version} mvn:io.netty/netty-codec-http/${netty.version} @@ -44,6 +45,8 @@ mvn:io.netty/netty-transport-native-epoll/${netty.version} mvn:io.netty/netty-transport-classes-kqueue/${netty.version} mvn:io.netty/netty-transport-native-kqueue/${netty.version} + mvn:io.netty/netty-transport-classes-io_uring/${netty.version} + mvn:io.netty/netty-transport-native-io_uring/${netty.version} mvn:io.netty/netty-transport-native-unix-common/${netty.version} diff --git a/artemis-jms-client-osgi/pom.xml b/artemis-jms-client-osgi/pom.xml index e3c5837e0e4..ed5b885c779 100644 --- a/artemis-jms-client-osgi/pom.xml +++ b/artemis-jms-client-osgi/pom.xml @@ -79,7 +79,7 @@ org.glassfish.json*;resolution:=optional, de.dentrassi.crypto.pem;resolution:=optional, - io.netty.buffer;io.netty.*;version="[4.1,5)", + io.netty.*;version="[4.2,5)", * <_exportcontents>org.apache.activemq.artemis.*;-noimport:=true diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml index fb774f29c1b..b1233224713 100644 --- a/artemis-pom/pom.xml +++ b/artemis-pom/pom.xml @@ -447,12 +447,6 @@ ${netty.version} - - io.netty - netty-codec - ${netty.version} - - io.netty netty-codec-http @@ -521,6 +515,19 @@ ${netty-transport-native-kqueue-classifier} + + io.netty + netty-transport-classes-io_uring + ${netty.version} + + + + io.netty + netty-transport-native-io_uring + ${netty.version} + ${netty-transport-native-io_uring-classifier} + + io.netty netty-tcnative-boringssl-static diff --git a/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml b/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml index 2f649262e84..69f3bd1bacb 100644 --- a/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml +++ b/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml @@ -90,10 +90,6 @@ io.netty netty-transport - - io.netty - netty-codec - org.osgi osgi.cmpn diff --git a/artemis-protocols/artemis-mqtt-protocol/pom.xml b/artemis-protocols/artemis-mqtt-protocol/pom.xml index 4c47d3344c3..2d704c39859 100644 --- a/artemis-protocols/artemis-mqtt-protocol/pom.xml +++ b/artemis-protocols/artemis-mqtt-protocol/pom.xml @@ -69,10 +69,6 @@ io.netty netty-transport - - io.netty - netty-codec - io.netty netty-common diff --git a/artemis-protocols/artemis-openwire-protocol/pom.xml b/artemis-protocols/artemis-openwire-protocol/pom.xml index 5c0a7154169..f1f0b1feff5 100644 --- a/artemis-protocols/artemis-openwire-protocol/pom.xml +++ b/artemis-protocols/artemis-openwire-protocol/pom.xml @@ -96,10 +96,6 @@ io.netty netty-transport - - io.netty - netty-codec - org.osgi osgi.cmpn diff --git a/artemis-server-osgi/pom.xml b/artemis-server-osgi/pom.xml index e3a6b8a8d4c..56ddcd7ee3f 100644 --- a/artemis-server-osgi/pom.xml +++ b/artemis-server-osgi/pom.xml @@ -129,7 +129,7 @@ org.glassfish.json*;resolution:=optional, org.postgresql*;resolution:=optional, de.dentrassi.crypto.pem;resolution:=optional, - io.netty.buffer;io.netty.*;version="[4.1,5)", + io.netty.*;version="[4.2,5)", java.net.http*;resolution:=optional, com.sun.net.httpserver*;resolution:=optional, com.nimbusds.jose*;resolution:=optional, diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml index 789c4f28da9..997ec85f326 100644 --- a/artemis-server/pom.xml +++ b/artemis-server/pom.xml @@ -136,10 +136,6 @@ io.netty netty-transport-classes-kqueue - - io.netty - netty-codec - commons-beanutils commons-beanutils diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index e07b34d789c..036d0c31764 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -49,19 +49,22 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.WriteBufferWaterMark; -import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollIoHandler; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueIoHandler; import io.netty.channel.kqueue.KQueueServerSocketChannel; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalServerChannel; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringServerSocketChannel; import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; @@ -98,6 +101,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextConfig; import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.CheckDependencies; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ProxyProtocolUtil; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -118,6 +122,7 @@ public class NettyAcceptor extends AbstractAcceptor { public static final String NIO_ACCEPTOR_TYPE = "NIO"; public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL"; public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE"; + public static final String IOURING_ACCEPTOR_TYPE = "IO_URING"; static { // Disable default Netty leak detection if the Netty leak detection level system properties are not in use @@ -158,6 +163,8 @@ public class NettyAcceptor extends AbstractAcceptor { private final boolean useKQueue; + private final boolean useIoUring; + private final ProtocolHandler protocolHandler; private final String host; @@ -308,6 +315,7 @@ public NettyAcceptor(final String name, useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); + useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration); backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration); useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration); @@ -497,19 +505,35 @@ protected void internalStart() throws Exception { eventLoopGroup = new DefaultEventLoopGroup(); } else { ThreadFactory threadFactory = SecurityManagerShim.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory(threadFactoryGroupName, true, ClientSessionFactoryImpl.class.getClassLoader())); - if (useEpoll && CheckDependencies.isEpollAvailable()) { + + boolean defaultRemotingThreads = remotingThreads == -1; + + if (defaultRemotingThreads) { + // Default to number of cores * 3 + remotingThreads = Runtime.getRuntime().availableProcessors() * 3; + } + + if (useIoUring && CheckDependencies.isIoUringAvailable()) { + //IO_URING should default to 1 remotingThread unless specified in config + remotingThreads = defaultRemotingThreads ? 1 : remotingThreads; + + channelClazz = IoUringServerSocketChannel.class; + eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory()); + acceptorType = IOURING_ACCEPTOR_TYPE; + logger.debug("Acceptor using native io_uring"); + } else if (useEpoll && CheckDependencies.isEpollAvailable()) { channelClazz = EpollServerSocketChannel.class; - eventLoopGroup = new EpollEventLoopGroup(remotingThreads, threadFactory); + eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory()); acceptorType = EPOLL_ACCEPTOR_TYPE; logger.debug("Acceptor {} using native epoll", name); } else if (useKQueue && CheckDependencies.isKQueueAvailable()) { channelClazz = KQueueServerSocketChannel.class; - eventLoopGroup = new KQueueEventLoopGroup(remotingThreads, threadFactory); + eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory()); acceptorType = KQUEUE_ACCEPTOR_TYPE; logger.debug("Acceptor {} using native kqueue", name); } else { channelClazz = NioServerSocketChannel.class; - eventLoopGroup = new NioEventLoopGroup(remotingThreads, threadFactory); + eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory()); acceptorType = NIO_ACCEPTOR_TYPE; logger.debug("Acceptor {} using nio", name); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java index 8be81812e03..17a999b79d0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executors; import java.util.stream.Stream; +import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.config.Configuration; @@ -102,6 +103,7 @@ public void testFixJournalFileSize() throws Exception { @TestTemplate public void testAddBytesToLargeMessageNotLeakingByteBuffer() throws Exception { + assumeTrue(PlatformDependent.hasUnsafe()); if (journalType == JournalType.ASYNCIO) { assumeTrue(AIOSequentialFileFactory.isSupported(), "AIO is not supported on this platform"); } diff --git a/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java b/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java index 86a2281cca9..f5766c52a31 100644 --- a/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java +++ b/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java @@ -58,8 +58,9 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.HttpClientCodec; @@ -1141,7 +1142,7 @@ private void createRandomJettyFiles(File dir, int num, List collector) thr } private Channel getChannel(int port, ClientHandler clientHandler) throws InterruptedException { - EventLoopGroup group = new NioEventLoopGroup(); + EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { @Override @@ -1154,7 +1155,7 @@ protected void initChannel(Channel ch) throws Exception { } private Channel getSslChannel(int port, SslHandler sslHandler, ClientHandler clientHandler) throws InterruptedException { - EventLoopGroup group = new NioEventLoopGroup(); + EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { @Override diff --git a/docs/user-manual/configuring-transports.adoc b/docs/user-manual/configuring-transports.adoc index 9655f4158f6..8a6466277ef 100644 --- a/docs/user-manual/configuring-transports.adoc +++ b/docs/user-manual/configuring-transports.adoc @@ -250,7 +250,7 @@ These Native transports add features specific to a particular platform, generate Both Clients and Server can benefit from this. -Current Supported Platforms. +Currently supported platforms: * Linux running 64bit JVM * MacOS running 64bit JVM @@ -261,7 +261,7 @@ If running on an unsupported platform or if there are any issues loading native ==== Linux Native Transport -On supported Linux platforms Epoll is used, @see https://en.wikipedia.org/wiki/Epoll. +On supported Linux platforms Epoll can be used, @see https://en.wikipedia.org/wiki/Epoll. The following properties are specific to this native transport: @@ -270,6 +270,24 @@ enables the use of epoll if a supported linux platform is running a 64bit JVM is Setting this to `false` will force the use of Java NIO instead of epoll. Default is `true` +Additionally, IO_URING can be used, @see https://en.wikipedia.org/wiki/Io_uring. + +The following properties are specific to this native transport: + +useIoUring:: +enables the use of IO_URING if a supported linux platform running a 64bit JVM is detected. +Setting this to `false` will attempt the use of `epoll`, then finally falling back to using Java NIO. +Default is `false` + +[WARNING] +==== +[#io_uring-warning] +IO_URING support is a recent addition to the broker and should be considered `experimental` at this stage. +Using it _could_ introduce unwanted side effects. As such, thorough testing and verification are advised before use in any production or otherwise critical environment. + +Netty has provided a https://github.com/netty/netty/tree/4.2/transport-native-io_uring#faq[FAQ] that may be helpful. +==== + ==== MacOS Native Transport On supported MacOS platforms KQueue is used, @see https://en.wikipedia.org/wiki/Kqueue. diff --git a/pom.xml b/pom.xml index ad1d4148f53..a0b701691cf 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ 12.3.1 5.23.0 4.0.6 - 4.1.132.Final + 4.2.12.Final 2.2.2 5.9.0 3.9.5 @@ -242,6 +242,7 @@ false true +