Skip to content

Commit 71dd4c3

Browse files
AntonRoskvistjbertram
authored andcommitted
ARTEMIS-3163 Support for Netty IO_URING transport
1 parent ae4dfdf commit 71dd4c3

13 files changed

Lines changed: 132 additions & 74 deletions

File tree

artemis-commons/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@
8383
<groupId>io.netty</groupId>
8484
<artifactId>netty-transport</artifactId>
8585
</dependency>
86+
<dependency>
87+
<groupId>io.netty</groupId>
88+
<artifactId>netty-transport-classes-epoll</artifactId>
89+
</dependency>
90+
<dependency>
91+
<groupId>io.netty</groupId>
92+
<artifactId>netty-transport-classes-kqueue</artifactId>
93+
</dependency>
94+
<dependency>
95+
<groupId>io.netty</groupId>
96+
<artifactId>netty-transport-classes-io_uring</artifactId>
97+
</dependency>
8698
<dependency>
8799
<groupId>commons-beanutils</groupId>
88100
<artifactId>commons-beanutils</artifactId>

artemis-commons/src/main/java/org/apache/activemq/artemis/logs/ActiveMQUtilLogger.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,22 @@ public interface ActiveMQUtilLogger {
7777

7878
@LogMessage(id = 202017, value = "Algorithm two-way is deprecated and will be removed from the default codec in a future version. Use a custom codec instead. Consult the manual for details.", level = LogMessage.Level.WARN)
7979
void deprecatedDefaultCodecTwoWayAlgorithm();
80+
81+
@LogMessage(id = 202018, value = "Unable to check KQueue availability ", level = LogMessage.Level.WARN)
82+
void unableToCheckKQueueAvailability(Throwable e);
83+
84+
@LogMessage(id = 202019, value = "KQueue is not available, please add to the classpath or configure useKQueue=false to remove this warning", level = LogMessage.Level.WARN)
85+
void unableToCheckKQueueAvailabilityNoClass();
86+
87+
@LogMessage(id = 202020, value = "Unable to check Epoll availability ", level = LogMessage.Level.WARN)
88+
void unableToCheckEpollAvailability(Throwable e);
89+
90+
@LogMessage(id = 202021, value = "Epoll is not available, please add to the classpath or configure useEpoll=false to remove this warning", level = LogMessage.Level.WARN)
91+
void unableToCheckEpollAvailabilityNoClass();
92+
93+
@LogMessage(id = 202022, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN)
94+
void unableToCheckIoUringAvailability(Throwable e);
95+
96+
@LogMessage(id = 202023, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN)
97+
void unableToCheckIoUringAvailabilityNoClass();
8098
}

artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,7 @@ public static Process spawnVM(String classPath,
204204
commandList.add(jacocoAgent);
205205
}
206206

207-
String javaVersion = System.getProperty("java.version");
208-
if (javaVersion.startsWith("24") || javaVersion.startsWith("25")) {
207+
if (Runtime.version().feature() >= 24) {
209208
commandList.add("--enable-native-access=ALL-UNNAMED");
210209
commandList.add("--sun-misc-unsafe-memory-access=allow");
211210
}

artemis-core-client/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,15 @@
8989
<groupId>io.netty</groupId>
9090
<artifactId>netty-transport-classes-kqueue</artifactId>
9191
</dependency>
92+
<dependency>
93+
<groupId>io.netty</groupId>
94+
<artifactId>netty-transport-native-io_uring</artifactId>
95+
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
96+
</dependency>
97+
<dependency>
98+
<groupId>io.netty</groupId>
99+
<artifactId>netty-transport-classes-io_uring</artifactId>
100+
</dependency>
92101
<dependency>
93102
<groupId>io.netty</groupId>
94103
<artifactId>netty-codec-http</artifactId>

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
/**
2929
* Logger Codes 210000 - 218999
3030
*/
31-
@LogBundle(projectCode = "AMQ", regexID = "21[0-8][0-9]{3}", retiredIDs = {211001, 211002, 211003, 212000, 212006, 212029, 212074, 212078, 214012, 214023, 214024, 214026, 214027, 214028, 214029})
31+
@LogBundle(projectCode = "AMQ", regexID = "21[0-8][0-9]{3}", retiredIDs = {211001, 211002, 211003, 212000, 212006, 212029, 212071, 212073, 212074, 212075, 212076, 212078, 214012, 214023, 214024, 214026, 214027, 214028, 214029})
3232
public interface ActiveMQClientLogger {
3333

3434
ActiveMQClientLogger LOGGER = BundleFactory.newBundle(ActiveMQClientLogger.class, ActiveMQClientLogger.class.getPackage().getName());
@@ -236,21 +236,9 @@ public interface ActiveMQClientLogger {
236236
@LogMessage(id = 212070, value = "Unable to initialize VersionLoader ", level = LogMessage.Level.WARN)
237237
void unableToInitVersionLoader(Throwable e);
238238

239-
@LogMessage(id = 212071, value = "Unable to check Epoll availability ", level = LogMessage.Level.WARN)
240-
void unableToCheckEpollAvailability(Throwable e);
241-
242239
@LogMessage(id = 212072, value = "Failed to change channel state to ReadyForWriting ", level = LogMessage.Level.WARN)
243240
void failedToSetChannelReadyForWriting(Throwable e);
244241

245-
@LogMessage(id = 212073, value = "Unable to check KQueue availability ", level = LogMessage.Level.WARN)
246-
void unableToCheckKQueueAvailability(Throwable e);
247-
248-
@LogMessage(id = 212075, value = "KQueue is not available, please add to the classpath or configure useKQueue=false to remove this warning", level = LogMessage.Level.WARN)
249-
void unableToCheckKQueueAvailabilityNoClass();
250-
251-
@LogMessage(id = 212076, value = "Epoll is not available, please add to the classpath or configure useEpoll=false to remove this warning", level = LogMessage.Level.WARN)
252-
void unableToCheckEpollAvailabilitynoClass();
253-
254242
@LogMessage(id = 212077, value = "Timed out waiting to receive initial broadcast from cluster. Retry {} of {}", level = LogMessage.Level.WARN)
255243
void broadcastTimeout(int retry, int maxretry);
256244

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java

Lines changed: 0 additions & 54 deletions
This file was deleted.

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@
7373
import io.netty.channel.kqueue.KQueueSocketChannel;
7474
import io.netty.channel.nio.NioIoHandler;
7575
import io.netty.channel.socket.nio.NioSocketChannel;
76+
import io.netty.channel.uring.IoUringIoHandler;
77+
import io.netty.channel.uring.IoUringSocketChannel;
7678
import io.netty.handler.codec.base64.Base64;
7779
import io.netty.handler.codec.http.DefaultFullHttpRequest;
7880
import io.netty.handler.codec.http.DefaultHttpRequest;
@@ -122,6 +124,7 @@
122124
import org.apache.activemq.artemis.spi.core.remoting.ssl.OpenSSLContextFactoryProvider;
123125
import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextConfig;
124126
import org.apache.activemq.artemis.spi.core.remoting.ssl.SSLContextFactoryProvider;
127+
import org.apache.activemq.artemis.utils.CheckDependencies;
125128
import org.apache.activemq.artemis.utils.ConfigurationHelper;
126129
import org.apache.activemq.artemis.utils.FutureLatch;
127130
import org.apache.activemq.artemis.utils.IPV6Util;
@@ -137,6 +140,7 @@ public class NettyConnector extends AbstractConnector {
137140
public static String NIO_CONNECTOR_TYPE = "NIO";
138141
public static String EPOLL_CONNECTOR_TYPE = "EPOLL";
139142
public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE";
143+
public static String IOURING_CONNECTOR_TYPE = "IO_URING";
140144

141145
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
142146

@@ -297,6 +301,8 @@ public class NettyConnector extends AbstractConnector {
297301

298302
private boolean useKQueue;
299303

304+
private boolean useIoUring;
305+
300306
private int remotingThreads;
301307

302308
private boolean useGlobalWorkerPool;
@@ -402,6 +408,7 @@ public NettyConnector(final Map<String, Object> configuration,
402408

403409
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
404410
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
411+
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);
405412

406413
useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
407414
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
@@ -543,14 +550,30 @@ public synchronized void start() {
543550
return;
544551
}
545552

546-
if (remotingThreads == -1) {
553+
boolean defaultRemotingThreads = remotingThreads == -1;
554+
555+
if (defaultRemotingThreads) {
547556
// Default to number of cores * 3
548557
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
549558
}
550559

551560
String connectorType;
552561

553-
if (useEpoll && CheckDependencies.isEpollAvailable()) {
562+
if (useIoUring && CheckDependencies.isIoUringAvailable()) {
563+
//IO_URING should default to 1 remotingThread unless specified in config
564+
remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;
565+
566+
if (useGlobalWorkerPool) {
567+
group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory())));
568+
} else {
569+
group = new MultiThreadIoEventLoopGroup(remotingThreads, IoUringIoHandler.newFactory());
570+
}
571+
572+
connectorType = IOURING_CONNECTOR_TYPE;
573+
channelClazz = IoUringSocketChannel.class;
574+
575+
logger.debug("Connector {} using native io_uring", this);
576+
} else if (useEpoll && CheckDependencies.isEpollAvailable()) {
554577
if (useGlobalWorkerPool) {
555578
group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory())));
556579
} else {

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public class TransportConstants {
7474

7575
public static final String USE_KQUEUE_PROP_NAME = "useKQueue";
7676

77+
public static final String USE_IOURING_PROP_NAME = "useIoUring";
78+
7779
/**
7880
* @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME
7981
*/
@@ -228,6 +230,8 @@ public class TransportConstants {
228230

229231
public static final boolean DEFAULT_USE_KQUEUE = true;
230232

233+
public static final boolean DEFAULT_USE_IOURING = false;
234+
231235
public static final boolean DEFAULT_USE_INVM = false;
232236

233237
public static final boolean DEFAULT_USE_SERVLET = false;
@@ -442,6 +446,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
442446
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
443447
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
444448
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
449+
allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
445450
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
446451
//noinspection deprecation
447452
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
@@ -522,6 +527,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
522527
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
523528
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
524529
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
530+
allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
525531
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
526532
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
527533
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);

artemis-features/src/main/resources/features.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
<bundle>mvn:io.netty/netty-transport-native-epoll/${netty.version}</bundle>
4646
<bundle>mvn:io.netty/netty-transport-classes-kqueue/${netty.version}</bundle>
4747
<bundle>mvn:io.netty/netty-transport-native-kqueue/${netty.version}</bundle>
48+
<bundle>mvn:io.netty/netty-transport-classes-io_uring/${netty.version}</bundle>
49+
<bundle>mvn:io.netty/netty-transport-native-io_uring/${netty.version}</bundle>
4850
<bundle>mvn:io.netty/netty-transport-native-unix-common/${netty.version}</bundle>
4951
</feature>
5052

artemis-pom/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,19 @@
515515
<classifier>${netty-transport-native-kqueue-classifier}</classifier>
516516
<!-- License: Apache 2.0 -->
517517
</dependency>
518+
<dependency>
519+
<groupId>io.netty</groupId>
520+
<artifactId>netty-transport-classes-io_uring</artifactId>
521+
<version>${netty.version}</version>
522+
<!-- License: Apache 2.0 -->
523+
</dependency>
524+
<dependency>
525+
<groupId>io.netty</groupId>
526+
<artifactId>netty-transport-native-io_uring</artifactId>
527+
<version>${netty.version}</version>
528+
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
529+
<!-- License: Apache 2.0 -->
530+
</dependency>
518531
<dependency>
519532
<groupId>io.netty</groupId>
520533
<artifactId>netty-tcnative-boringssl-static</artifactId>

0 commit comments

Comments
 (0)