diff --git a/vertx-core/pom.xml b/vertx-core/pom.xml index a3e6c5d00ba..6d3ae92994f 100644 --- a/vertx-core/pom.xml +++ b/vertx-core/pom.xml @@ -31,6 +31,7 @@ 1.37 jdk false + false @@ -283,6 +284,7 @@ true ${vertx.surefire.nettyTransport} ${vertx.surefire.useDomainSockets} + ${vertx.surefire.virtualThreadEventLoops} true @@ -808,6 +810,15 @@ + + VirtualThreadNio + + true + false + false + + + benchmarks diff --git a/vertx-core/src/main/generated/io/vertx/core/VertxOptionsConverter.java b/vertx-core/src/main/generated/io/vertx/core/VertxOptionsConverter.java index 3838fbf8832..0ebc9c56c31 100644 --- a/vertx-core/src/main/generated/io/vertx/core/VertxOptionsConverter.java +++ b/vertx-core/src/main/generated/io/vertx/core/VertxOptionsConverter.java @@ -87,6 +87,11 @@ static void fromJson(Iterable> json, VertxOp obj.setPreferNativeTransport((Boolean)member.getValue()); } break; + case "virtualThreadEventLoops": + if (member.getValue() instanceof Boolean) { + obj.setVirtualThreadEventLoops((Boolean)member.getValue()); + } + break; case "maxEventLoopExecuteTimeUnit": if (member.getValue() instanceof String) { obj.setMaxEventLoopExecuteTimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); @@ -156,6 +161,7 @@ static void toJson(VertxOptions obj, java.util.Map json) { json.put("addressResolverOptions", obj.getAddressResolverOptions().toJson()); } json.put("preferNativeTransport", obj.getPreferNativeTransport()); + json.put("virtualThreadEventLoops", obj.getVirtualThreadEventLoops()); if (obj.getMaxEventLoopExecuteTimeUnit() != null) { json.put("maxEventLoopExecuteTimeUnit", obj.getMaxEventLoopExecuteTimeUnit().name()); } diff --git a/vertx-core/src/main/java/io/vertx/core/VertxOptions.java b/vertx-core/src/main/java/io/vertx/core/VertxOptions.java index 0f577ca2e44..3f1755df825 100644 --- a/vertx-core/src/main/java/io/vertx/core/VertxOptions.java +++ b/vertx-core/src/main/java/io/vertx/core/VertxOptions.java @@ -98,6 +98,11 @@ public class VertxOptions { */ public static final boolean DEFAULT_PREFER_NATIVE_TRANSPORT = false; + /** + * The default value for using virtual thread event loops = false + */ + public static final boolean DEFAULT_VIRTUAL_THREAD_EVENT_LOOPS = false; + /** * The default value of warning exception time 5000000000 ns (5 seconds) * If a thread is blocked longer than this threshold, the warning log @@ -138,6 +143,7 @@ public class VertxOptions { private EventBusOptions eventBusOptions = new EventBusOptions(); private AddressResolverOptions addressResolverOptions = new AddressResolverOptions(); private boolean preferNativeTransport = DEFAULT_PREFER_NATIVE_TRANSPORT; + private boolean virtualThreadEventLoops = DEFAULT_VIRTUAL_THREAD_EVENT_LOOPS; private TimeUnit maxEventLoopExecuteTimeUnit = DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME_UNIT; private TimeUnit maxWorkerExecuteTimeUnit = DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT; private TimeUnit warningExceptionTimeUnit = DEFAULT_WARNING_EXCEPTION_TIME_UNIT; @@ -172,6 +178,7 @@ public VertxOptions(VertxOptions other) { this.eventBusOptions = new EventBusOptions(other.eventBusOptions); this.addressResolverOptions = other.addressResolverOptions != null ? new AddressResolverOptions(other.getAddressResolverOptions()) : null; this.preferNativeTransport = other.preferNativeTransport; + this.virtualThreadEventLoops = other.virtualThreadEventLoops; this.maxEventLoopExecuteTimeUnit = other.maxEventLoopExecuteTimeUnit; this.maxWorkerExecuteTimeUnit = other.maxWorkerExecuteTimeUnit; this.warningExceptionTimeUnit = other.warningExceptionTimeUnit; @@ -538,6 +545,30 @@ public VertxOptions setPreferNativeTransport(boolean preferNativeTransport) { return this; } + /** + * @return whether event loop threads run as virtual threads on the NIO transport + */ + public boolean getVirtualThreadEventLoops() { + return virtualThreadEventLoops; + } + + /** + * Set whether event loop threads should run as virtual threads using Netty's {@code ManualIoEventLoop}. + *

+ * When enabled, each Netty event loop runs as a long-running virtual thread, allowing the JVM's + * virtual thread scheduler to multiplex event loops onto platform threads alongside other virtual threads. + *

+ * This option is only supported with the NIO transport. Native transports (epoll, kqueue, io_uring) use + * JNI calls that pin virtual threads to carrier threads, defeating the purpose. + * + * @param virtualThreadEventLoops {@code true} to run event loops as virtual threads + * @return a reference to this, so the API can be used fluently + */ + public VertxOptions setVirtualThreadEventLoops(boolean virtualThreadEventLoops) { + this.virtualThreadEventLoops = virtualThreadEventLoops; + return this; + } + /** * @return the time unit of {@code maxEventLoopExecuteTime} */ @@ -699,6 +730,7 @@ public String toString() { ", maxWorkerExecuteTime=" + maxWorkerExecuteTime + ", haEnabled=" + haEnabled + ", preferNativeTransport=" + preferNativeTransport + + ", virtualThreadEventLoops=" + virtualThreadEventLoops + ", quorumSize=" + quorumSize + ", haGroup='" + haGroup + '\'' + ", metrics=" + metricsOptions + diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java index 66cd94b796b..1c3a12959cb 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java @@ -246,6 +246,9 @@ private VertxImpl instantiateVertx(ClusterManager clusterManager, NodeSelector n } else { tr = NioTransport.INSTANCE; } + if (options.getVirtualThreadEventLoops() && !(tr instanceof NioTransport)) { + throw new IllegalStateException("Virtual thread event loops are only supported with the NIO transport"); + } return new VertxImpl( options, clusterManager, diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index b28c5ee1f01..5a4e954d359 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -217,10 +217,11 @@ private static ThreadFactory virtualThreadFactory() { maxEventLoopExecTime = maxEventLoopExecuteTime; maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit; eventLoopThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxEventLoopExecTime, maxEventLoopExecTimeUnit, "vert.x-eventloop-thread-", false); - eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO); + boolean vtEventLoops = options.getVirtualThreadEventLoops(); + eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO, vtEventLoops); // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections // under a lot of load - acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100); + acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100, vtEventLoops); virtualThreadExecutor = virtualThreadFactory != null ? new ThreadPerTaskExecutorService(virtualThreadFactory) : null; virtualThreadWorkerPool = virtualThreadFactory != null ? new WorkerPool(virtualThreadExecutor, virtualThreadWorkerPoolMetrics) : null; internalWorkerPool = new WorkerPool(internalWorkerExec, internalBlockingPoolMetrics); diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java index b578c5e13bf..3e7a2790c0b 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java @@ -10,17 +10,18 @@ */ package io.vertx.core.impl.transports; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFactory; -import io.netty.channel.IoHandlerFactory; -import io.netty.channel.ServerChannel; +import io.netty.channel.*; import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.*; import io.vertx.core.spi.transport.Transport; +import java.lang.reflect.Method; import java.net.SocketAddress; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; public class NioTransport implements Transport { /** @@ -28,6 +29,25 @@ public class NioTransport implements Transport { */ public static final Transport INSTANCE = new NioTransport(); + private static final long RUNNING_YIELD_NS = TimeUnit.MICROSECONDS + .toNanos(Integer.getInteger("io.vertx.virtualthread.running.yield.us", 1)); + + // Not cached for graalvm + private static ThreadFactory virtualThreadFactory(String prefix) { + try { + Class builderClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder"); + Class ofVirtualClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder$OfVirtual"); + Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual"); + Object builder = ofVirtualMethod.invoke(null); + Method nameMethod = ofVirtualClass.getDeclaredMethod("name", String.class, long.class); + builder = nameMethod.invoke(builder, prefix, 0L); + Method factoryMethod = builderClass.getDeclaredMethod("factory"); + return (ThreadFactory) factoryMethod.invoke(builder); + } catch (Exception e) { + return null; + } + } + private final UnixDomainSocketNioTransport unixDomainSocketNioTransport = UnixDomainSocketNioTransport.load(); @Override @@ -96,4 +116,37 @@ public ChannelFactory serverChannelFactory(boolean doma } return NioServerSocketChannel::new; } + + @Override + public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio, boolean virtualThreadEventLoops) { + if (!virtualThreadEventLoops) { + return Transport.super.eventLoopGroup(type, nThreads, threadFactory, ioRatio); + } + String prefix = type == ACCEPTOR_EVENT_LOOP_GROUP ? "vert.x-acceptor-thread-" : "vert.x-eventloop-thread-"; + ThreadFactory vtFactory = virtualThreadFactory(prefix); + if (vtFactory == null) { + throw new IllegalStateException("Virtual threads are not available"); + } + return new MultiThreadIoEventLoopGroup(nThreads, (Executor) null, ioHandlerFactory()) { + @Override + protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, Object... args) { + ManualIoEventLoop eventLoop = new ManualIoEventLoop(this, null, ioHandlerFactory); + Thread vt = vtFactory.newThread(() -> { + while (!eventLoop.isShuttingDown()) { + eventLoop.run(0, RUNNING_YIELD_NS); + Thread.yield(); + eventLoop.runNonBlockingTasks(RUNNING_YIELD_NS); + Thread.yield(); + } + while (!eventLoop.isTerminated()) { + eventLoop.runNow(); + Thread.yield(); + } + }); + eventLoop.setOwningThread(vt); + vt.start(); + return eventLoop; + } + }; + } } diff --git a/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java b/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java index 0b5b6bbb831..22823f8d99d 100644 --- a/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java +++ b/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java @@ -92,6 +92,19 @@ default EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory thre return new MultiThreadIoEventLoopGroup(nThreads, threadFactory, ioHandlerFactory()); } + /** + * @param type one of {@link #ACCEPTOR_EVENT_LOOP_GROUP} or {@link #IO_EVENT_LOOP_GROUP}. + * @param nThreads the number of threads that will be used by this instance. + * @param threadFactory the ThreadFactory to use. + * @param ioRatio the IO ratio + * @param virtualThreadEventLoops whether event loops should run as virtual threads + * + * @return a new event loop group + */ + default EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio, boolean virtualThreadEventLoops) { + return eventLoopGroup(type, nThreads, threadFactory, ioRatio); + } + /** * @return a new datagram channel */ diff --git a/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json b/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json index a4897137de2..ed718a56505 100644 --- a/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json +++ b/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json @@ -34,5 +34,41 @@ "parameterTypes": [] } ] + }, + { + "name": "java.lang.Thread$Builder", + "condition": { + "typeReachable": "io.vertx.core.impl.transports.NioTransport" + }, + "methods": [ + { + "name": "factory", + "parameterTypes": [] + } + ] + }, + { + "name": "java.lang.Thread$Builder$OfVirtual", + "condition": { + "typeReachable": "io.vertx.core.impl.transports.NioTransport" + }, + "methods": [ + { + "name": "name", + "parameterTypes": ["java.lang.String", "long"] + } + ] + }, + { + "name": "java.lang.Thread", + "condition": { + "typeReachable": "io.vertx.core.impl.transports.NioTransport" + }, + "methods": [ + { + "name": "ofVirtual", + "parameterTypes": [] + } + ] } ] diff --git a/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java b/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java index 695e4669878..d1c460a1846 100644 --- a/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java +++ b/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java @@ -46,6 +46,7 @@ public class VertxTestBase extends AsyncTestBase { public static final Transport TRANSPORT; public static final boolean USE_DOMAIN_SOCKETS = Boolean.getBoolean("vertx.useDomainSockets"); + public static final boolean USE_VIRTUAL_THREAD_EVENT_LOOPS = Boolean.getBoolean("vertx.virtualThreadEventLoops"); public static final boolean USE_JAVA_MODULES = VertxTestBase.class.getModule().isNamed(); private static final Logger log = LoggerFactory.getLogger(VertxTestBase.class); @@ -160,7 +161,11 @@ protected VertxMetricsFactory getMetrics() { } protected VertxOptions getOptions() { - return new VertxOptions(); + VertxOptions options = new VertxOptions(); + if (USE_VIRTUAL_THREAD_EVENT_LOOPS) { + options.setVirtualThreadEventLoops(true); + } + return options; } protected void tearDown() throws Exception {