From 9f7b8fbbe4a7ff68f4f364b67d2e84095c407c53 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 26 Mar 2026 14:11:13 +0100 Subject: [PATCH 1/3] Add virtual thread NIO transport using Netty ManualIoEventLoop Run Netty NIO event loops as long-running virtual threads instead of platform threads. Uses ManualIoEventLoop from Netty 4.2 to manually drive IO polling and task execution from virtual threads, allowing the JVM's ForkJoinPool scheduler to multiplex event loops alongside other virtual threads. Adds VirtualThreadNioTransport (SPI), Transport.VIRTUAL_THREAD_NIO (public API), VertxTestBase support, and a VirtualThreadNio Maven profile to run the full test suite with this transport. --- vertx-core/pom.xml | 9 ++ .../transports/VirtualThreadNioTransport.java | 138 ++++++++++++++++++ .../io/vertx/core/transport/Transport.java | 10 ++ .../io/vertx/test/core/VertxTestBase.java | 5 +- 4 files changed, 161 insertions(+), 1 deletion(-) create mode 100644 vertx-core/src/main/java/io/vertx/core/impl/transports/VirtualThreadNioTransport.java diff --git a/vertx-core/pom.xml b/vertx-core/pom.xml index a3e6c5d00ba..828dc992a6b 100644 --- a/vertx-core/pom.xml +++ b/vertx-core/pom.xml @@ -808,6 +808,15 @@ + + VirtualThreadNio + + virtual_thread_nio + false + false + + + benchmarks diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/VirtualThreadNioTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/VirtualThreadNioTransport.java new file mode 100644 index 00000000000..a9f0692671f --- /dev/null +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/VirtualThreadNioTransport.java @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.impl.transports; + +import io.netty.channel.*; +import io.netty.channel.nio.NioIoHandler; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.InternetProtocolFamily; +import io.vertx.core.spi.transport.Transport; + +import java.lang.reflect.Method; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * A transport that uses NIO channels but runs each Netty event loop as a long-running virtual thread + * using {@link ManualIoEventLoop}. This allows the JVM's virtual thread scheduler (ForkJoinPool) to + * multiplex event loops onto platform threads alongside other virtual threads. + */ +public class VirtualThreadNioTransport implements Transport { + + private static final long RUNNING_YIELD_NS = TimeUnit.MICROSECONDS + .toNanos(Integer.getInteger("io.vertx.virtualthread.running.yield.us", 1)); + + private static final ThreadFactory VIRTUAL_THREAD_FACTORY; + private static final Throwable UNAVAILABILITY_CAUSE; + + static { + ThreadFactory factory = null; + Throwable cause = null; + try { + Class builderClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder"); + Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual"); + Object builder = ofVirtualMethod.invoke(null); + Method factoryMethod = builderClass.getDeclaredMethod("factory"); + factory = (ThreadFactory) factoryMethod.invoke(builder); + } catch (Exception e) { + cause = e; + } + VIRTUAL_THREAD_FACTORY = factory; + UNAVAILABILITY_CAUSE = cause; + } + + public static final Transport INSTANCE = new VirtualThreadNioTransport(); + + private final NioTransport delegate = (NioTransport) NioTransport.INSTANCE; + + @Override + public boolean isAvailable() { + return VIRTUAL_THREAD_FACTORY != null; + } + + @Override + public Throwable unavailabilityCause() { + return UNAVAILABILITY_CAUSE; + } + + @Override + public boolean supportsDomainSockets() { + return delegate.supportsDomainSockets(); + } + + @Override + public java.net.SocketAddress convert(io.vertx.core.net.SocketAddress address) { + return delegate.convert(address); + } + + @Override + public io.vertx.core.net.SocketAddress convert(java.net.SocketAddress address) { + return delegate.convert(address); + } + + @Override + public IoHandlerFactory ioHandlerFactory() { + return NioIoHandler.newFactory(); + } + + @Override + public DatagramChannel datagramChannel(InternetProtocolFamily family) { + return delegate.datagramChannel(family); + } + + @Override + public ChannelFactory datagramChannelFactory() { + return delegate.datagramChannelFactory(); + } + + @Override + public ChannelFactory channelFactory(boolean domainSocket) { + return delegate.channelFactory(domainSocket); + } + + @Override + public ChannelFactory serverChannelFactory(boolean domainSocket) { + return delegate.serverChannelFactory(domainSocket); + } + + @Override + public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio) { + if (VIRTUAL_THREAD_FACTORY == null) { + throw new IllegalStateException("Virtual threads are not available", UNAVAILABILITY_CAUSE); + } + return new MultiThreadIoEventLoopGroup(nThreads, (Executor) null, ioHandlerFactory()) { + @Override + protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, Object... args) { + ManualIoEventLoop eventLoop = new ManualIoEventLoop(this, null, ioHandlerFactory); + // Create a platform thread via the Vert.x thread factory to obtain the correct name + // and register it with the blocked thread checker. Then name the virtual thread the same. + Thread platformThread = threadFactory.newThread(() -> {}); + Thread vt = VIRTUAL_THREAD_FACTORY.newThread(() -> { + while (!eventLoop.isShuttingDown()) { + eventLoop.run(0, RUNNING_YIELD_NS); + Thread.yield(); + eventLoop.runNonBlockingTasks(RUNNING_YIELD_NS); + Thread.yield(); + } + while (!eventLoop.isTerminated()) { + eventLoop.runNow(); + Thread.yield(); + } + }); + vt.setName(platformThread.getName()); + eventLoop.setOwningThread(vt); + vt.start(); + return eventLoop; + } + }; + } +} diff --git a/vertx-core/src/main/java/io/vertx/core/transport/Transport.java b/vertx-core/src/main/java/io/vertx/core/transport/Transport.java index 35fb6bc390b..7fe6e70ecf5 100644 --- a/vertx-core/src/main/java/io/vertx/core/transport/Transport.java +++ b/vertx-core/src/main/java/io/vertx/core/transport/Transport.java @@ -14,6 +14,7 @@ import io.vertx.core.impl.transports.NioTransport; import io.vertx.core.impl.transports.TransportInternal; import io.vertx.core.impl.transports.TransportLoader; +import io.vertx.core.impl.transports.VirtualThreadNioTransport; /** * The transport used by a {@link io.vertx.core.Vertx} instance. @@ -42,6 +43,15 @@ public interface Transport { */ Transport IO_URING = TransportLoader.io_uring(); + /** + * NIO transport with event loops running as long-running virtual threads + * using Netty's {@code ManualIoEventLoop}. + */ + Transport VIRTUAL_THREAD_NIO = new TransportInternal("virtual_thread_nio", + VirtualThreadNioTransport.INSTANCE.isAvailable(), + VirtualThreadNioTransport.INSTANCE.unavailabilityCause(), + VirtualThreadNioTransport.INSTANCE); + /** * @return the name among {@code nio, kqueue, epoll, io_uring} */ diff --git a/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java b/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java index 695e4669878..4782682f75d 100644 --- a/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java +++ b/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java @@ -68,6 +68,9 @@ public class VertxTestBase extends AsyncTestBase { case "io_uring": transport = Transport.IO_URING; break; + case "virtual_thread_nio": + transport = Transport.VIRTUAL_THREAD_NIO; + break; default: transport = new Transport() { @Override @@ -213,7 +216,7 @@ protected VertxBuilder createVertxBuilder(VertxOptions options) { protected Vertx createVertx(VertxOptions options) { Vertx vertx = createVertxBuilder(options).build(); - if (TRANSPORT != Transport.NIO) { + if (TRANSPORT != Transport.NIO && TRANSPORT != Transport.VIRTUAL_THREAD_NIO) { if (!vertx.isNativeTransportEnabled()) { fail(vertx.unavailableNativeTransportCause()); } From 7d2ea697ef70ae31ae483eea27bf97db1215a1e9 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 30 Mar 2026 15:28:51 +0200 Subject: [PATCH 2/3] Refactor VT event loops from separate transport to NIO transport option Move virtual thread event loop support from a standalone VirtualThreadNioTransport into a VertxOptions.virtualThreadEventLoops flag that enables ManualIoEventLoop-based VT event loops on the existing NIO transport. Native transports (epoll, kqueue, io_uring) are rejected when this option is set since JNI pins virtual threads. --- vertx-core/pom.xml | 4 +- .../io/vertx/core/VertxOptionsConverter.java | 6 + .../main/java/io/vertx/core/VertxOptions.java | 32 ++++ .../vertx/core/impl/VertxBootstrapImpl.java | 3 + .../java/io/vertx/core/impl/VertxImpl.java | 5 +- .../core/impl/transports/NioTransport.java | 61 +++++++- .../transports/VirtualThreadNioTransport.java | 138 ------------------ .../vertx/core/spi/transport/Transport.java | 13 ++ .../io/vertx/core/transport/Transport.java | 10 -- .../io.vertx/vertx-core/reflect-config.json | 24 +++ .../io/vertx/test/core/VertxTestBase.java | 12 +- 11 files changed, 148 insertions(+), 160 deletions(-) delete mode 100644 vertx-core/src/main/java/io/vertx/core/impl/transports/VirtualThreadNioTransport.java diff --git a/vertx-core/pom.xml b/vertx-core/pom.xml index 828dc992a6b..6d3ae92994f 100644 --- a/vertx-core/pom.xml +++ b/vertx-core/pom.xml @@ -31,6 +31,7 @@ 1.37 jdk false + false @@ -283,6 +284,7 @@ true ${vertx.surefire.nettyTransport} ${vertx.surefire.useDomainSockets} + ${vertx.surefire.virtualThreadEventLoops} true @@ -811,7 +813,7 @@ VirtualThreadNio - virtual_thread_nio + true false false diff --git a/vertx-core/src/main/generated/io/vertx/core/VertxOptionsConverter.java b/vertx-core/src/main/generated/io/vertx/core/VertxOptionsConverter.java index 3838fbf8832..0ebc9c56c31 100644 --- a/vertx-core/src/main/generated/io/vertx/core/VertxOptionsConverter.java +++ b/vertx-core/src/main/generated/io/vertx/core/VertxOptionsConverter.java @@ -87,6 +87,11 @@ static void fromJson(Iterable> json, VertxOp obj.setPreferNativeTransport((Boolean)member.getValue()); } break; + case "virtualThreadEventLoops": + if (member.getValue() instanceof Boolean) { + obj.setVirtualThreadEventLoops((Boolean)member.getValue()); + } + break; case "maxEventLoopExecuteTimeUnit": if (member.getValue() instanceof String) { obj.setMaxEventLoopExecuteTimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); @@ -156,6 +161,7 @@ static void toJson(VertxOptions obj, java.util.Map json) { json.put("addressResolverOptions", obj.getAddressResolverOptions().toJson()); } json.put("preferNativeTransport", obj.getPreferNativeTransport()); + json.put("virtualThreadEventLoops", obj.getVirtualThreadEventLoops()); if (obj.getMaxEventLoopExecuteTimeUnit() != null) { json.put("maxEventLoopExecuteTimeUnit", obj.getMaxEventLoopExecuteTimeUnit().name()); } diff --git a/vertx-core/src/main/java/io/vertx/core/VertxOptions.java b/vertx-core/src/main/java/io/vertx/core/VertxOptions.java index 0f577ca2e44..3f1755df825 100644 --- a/vertx-core/src/main/java/io/vertx/core/VertxOptions.java +++ b/vertx-core/src/main/java/io/vertx/core/VertxOptions.java @@ -98,6 +98,11 @@ public class VertxOptions { */ public static final boolean DEFAULT_PREFER_NATIVE_TRANSPORT = false; + /** + * The default value for using virtual thread event loops = false + */ + public static final boolean DEFAULT_VIRTUAL_THREAD_EVENT_LOOPS = false; + /** * The default value of warning exception time 5000000000 ns (5 seconds) * If a thread is blocked longer than this threshold, the warning log @@ -138,6 +143,7 @@ public class VertxOptions { private EventBusOptions eventBusOptions = new EventBusOptions(); private AddressResolverOptions addressResolverOptions = new AddressResolverOptions(); private boolean preferNativeTransport = DEFAULT_PREFER_NATIVE_TRANSPORT; + private boolean virtualThreadEventLoops = DEFAULT_VIRTUAL_THREAD_EVENT_LOOPS; private TimeUnit maxEventLoopExecuteTimeUnit = DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME_UNIT; private TimeUnit maxWorkerExecuteTimeUnit = DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT; private TimeUnit warningExceptionTimeUnit = DEFAULT_WARNING_EXCEPTION_TIME_UNIT; @@ -172,6 +178,7 @@ public VertxOptions(VertxOptions other) { this.eventBusOptions = new EventBusOptions(other.eventBusOptions); this.addressResolverOptions = other.addressResolverOptions != null ? new AddressResolverOptions(other.getAddressResolverOptions()) : null; this.preferNativeTransport = other.preferNativeTransport; + this.virtualThreadEventLoops = other.virtualThreadEventLoops; this.maxEventLoopExecuteTimeUnit = other.maxEventLoopExecuteTimeUnit; this.maxWorkerExecuteTimeUnit = other.maxWorkerExecuteTimeUnit; this.warningExceptionTimeUnit = other.warningExceptionTimeUnit; @@ -538,6 +545,30 @@ public VertxOptions setPreferNativeTransport(boolean preferNativeTransport) { return this; } + /** + * @return whether event loop threads run as virtual threads on the NIO transport + */ + public boolean getVirtualThreadEventLoops() { + return virtualThreadEventLoops; + } + + /** + * Set whether event loop threads should run as virtual threads using Netty's {@code ManualIoEventLoop}. + *

+ * When enabled, each Netty event loop runs as a long-running virtual thread, allowing the JVM's + * virtual thread scheduler to multiplex event loops onto platform threads alongside other virtual threads. + *

+ * This option is only supported with the NIO transport. Native transports (epoll, kqueue, io_uring) use + * JNI calls that pin virtual threads to carrier threads, defeating the purpose. + * + * @param virtualThreadEventLoops {@code true} to run event loops as virtual threads + * @return a reference to this, so the API can be used fluently + */ + public VertxOptions setVirtualThreadEventLoops(boolean virtualThreadEventLoops) { + this.virtualThreadEventLoops = virtualThreadEventLoops; + return this; + } + /** * @return the time unit of {@code maxEventLoopExecuteTime} */ @@ -699,6 +730,7 @@ public String toString() { ", maxWorkerExecuteTime=" + maxWorkerExecuteTime + ", haEnabled=" + haEnabled + ", preferNativeTransport=" + preferNativeTransport + + ", virtualThreadEventLoops=" + virtualThreadEventLoops + ", quorumSize=" + quorumSize + ", haGroup='" + haGroup + '\'' + ", metrics=" + metricsOptions + diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java index 66cd94b796b..1c3a12959cb 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxBootstrapImpl.java @@ -246,6 +246,9 @@ private VertxImpl instantiateVertx(ClusterManager clusterManager, NodeSelector n } else { tr = NioTransport.INSTANCE; } + if (options.getVirtualThreadEventLoops() && !(tr instanceof NioTransport)) { + throw new IllegalStateException("Virtual thread event loops are only supported with the NIO transport"); + } return new VertxImpl( options, clusterManager, diff --git a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java index b28c5ee1f01..5a4e954d359 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -217,10 +217,11 @@ private static ThreadFactory virtualThreadFactory() { maxEventLoopExecTime = maxEventLoopExecuteTime; maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit; eventLoopThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxEventLoopExecTime, maxEventLoopExecTimeUnit, "vert.x-eventloop-thread-", false); - eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO); + boolean vtEventLoops = options.getVirtualThreadEventLoops(); + eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO, vtEventLoops); // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections // under a lot of load - acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100); + acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100, vtEventLoops); virtualThreadExecutor = virtualThreadFactory != null ? new ThreadPerTaskExecutorService(virtualThreadFactory) : null; virtualThreadWorkerPool = virtualThreadFactory != null ? new WorkerPool(virtualThreadExecutor, virtualThreadWorkerPoolMetrics) : null; internalWorkerPool = new WorkerPool(internalWorkerExec, internalBlockingPoolMetrics); diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java index b578c5e13bf..2f930ac8a6b 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java @@ -10,17 +10,18 @@ */ package io.vertx.core.impl.transports; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFactory; -import io.netty.channel.IoHandlerFactory; -import io.netty.channel.ServerChannel; +import io.netty.channel.*; import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.*; import io.vertx.core.spi.transport.Transport; +import java.lang.reflect.Method; import java.net.SocketAddress; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; public class NioTransport implements Transport { /** @@ -28,6 +29,22 @@ public class NioTransport implements Transport { */ public static final Transport INSTANCE = new NioTransport(); + private static final long RUNNING_YIELD_NS = TimeUnit.MICROSECONDS + .toNanos(Integer.getInteger("io.vertx.virtualthread.running.yield.us", 1)); + + // Not cached for graalvm + private static ThreadFactory virtualThreadFactory() { + try { + Class builderClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder"); + Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual"); + Object builder = ofVirtualMethod.invoke(null); + Method factoryMethod = builderClass.getDeclaredMethod("factory"); + return (ThreadFactory) factoryMethod.invoke(builder); + } catch (Exception e) { + return null; + } + } + private final UnixDomainSocketNioTransport unixDomainSocketNioTransport = UnixDomainSocketNioTransport.load(); @Override @@ -96,4 +113,40 @@ public ChannelFactory serverChannelFactory(boolean doma } return NioServerSocketChannel::new; } + + @Override + public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio, boolean virtualThreadEventLoops) { + if (!virtualThreadEventLoops) { + return Transport.super.eventLoopGroup(type, nThreads, threadFactory, ioRatio); + } + ThreadFactory vtFactory = virtualThreadFactory(); + if (vtFactory == null) { + throw new IllegalStateException("Virtual threads are not available"); + } + return new MultiThreadIoEventLoopGroup(nThreads, (Executor) null, ioHandlerFactory()) { + @Override + protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, Object... args) { + ManualIoEventLoop eventLoop = new ManualIoEventLoop(this, null, ioHandlerFactory); + // Create a platform thread via the Vert.x thread factory to obtain the correct name + // and register it with the blocked thread checker + Thread platformThread = threadFactory.newThread(() -> {}); + Thread vt = vtFactory.newThread(() -> { + while (!eventLoop.isShuttingDown()) { + eventLoop.run(0, RUNNING_YIELD_NS); + Thread.yield(); + eventLoop.runNonBlockingTasks(RUNNING_YIELD_NS); + Thread.yield(); + } + while (!eventLoop.isTerminated()) { + eventLoop.runNow(); + Thread.yield(); + } + }); + vt.setName(platformThread.getName()); + eventLoop.setOwningThread(vt); + vt.start(); + return eventLoop; + } + }; + } } diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/VirtualThreadNioTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/VirtualThreadNioTransport.java deleted file mode 100644 index a9f0692671f..00000000000 --- a/vertx-core/src/main/java/io/vertx/core/impl/transports/VirtualThreadNioTransport.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 - * which is available at https://www.apache.org/licenses/LICENSE-2.0. - * - * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 - */ -package io.vertx.core.impl.transports; - -import io.netty.channel.*; -import io.netty.channel.nio.NioIoHandler; -import io.netty.channel.socket.DatagramChannel; -import io.netty.channel.socket.InternetProtocolFamily; -import io.vertx.core.spi.transport.Transport; - -import java.lang.reflect.Method; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * A transport that uses NIO channels but runs each Netty event loop as a long-running virtual thread - * using {@link ManualIoEventLoop}. This allows the JVM's virtual thread scheduler (ForkJoinPool) to - * multiplex event loops onto platform threads alongside other virtual threads. - */ -public class VirtualThreadNioTransport implements Transport { - - private static final long RUNNING_YIELD_NS = TimeUnit.MICROSECONDS - .toNanos(Integer.getInteger("io.vertx.virtualthread.running.yield.us", 1)); - - private static final ThreadFactory VIRTUAL_THREAD_FACTORY; - private static final Throwable UNAVAILABILITY_CAUSE; - - static { - ThreadFactory factory = null; - Throwable cause = null; - try { - Class builderClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder"); - Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual"); - Object builder = ofVirtualMethod.invoke(null); - Method factoryMethod = builderClass.getDeclaredMethod("factory"); - factory = (ThreadFactory) factoryMethod.invoke(builder); - } catch (Exception e) { - cause = e; - } - VIRTUAL_THREAD_FACTORY = factory; - UNAVAILABILITY_CAUSE = cause; - } - - public static final Transport INSTANCE = new VirtualThreadNioTransport(); - - private final NioTransport delegate = (NioTransport) NioTransport.INSTANCE; - - @Override - public boolean isAvailable() { - return VIRTUAL_THREAD_FACTORY != null; - } - - @Override - public Throwable unavailabilityCause() { - return UNAVAILABILITY_CAUSE; - } - - @Override - public boolean supportsDomainSockets() { - return delegate.supportsDomainSockets(); - } - - @Override - public java.net.SocketAddress convert(io.vertx.core.net.SocketAddress address) { - return delegate.convert(address); - } - - @Override - public io.vertx.core.net.SocketAddress convert(java.net.SocketAddress address) { - return delegate.convert(address); - } - - @Override - public IoHandlerFactory ioHandlerFactory() { - return NioIoHandler.newFactory(); - } - - @Override - public DatagramChannel datagramChannel(InternetProtocolFamily family) { - return delegate.datagramChannel(family); - } - - @Override - public ChannelFactory datagramChannelFactory() { - return delegate.datagramChannelFactory(); - } - - @Override - public ChannelFactory channelFactory(boolean domainSocket) { - return delegate.channelFactory(domainSocket); - } - - @Override - public ChannelFactory serverChannelFactory(boolean domainSocket) { - return delegate.serverChannelFactory(domainSocket); - } - - @Override - public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio) { - if (VIRTUAL_THREAD_FACTORY == null) { - throw new IllegalStateException("Virtual threads are not available", UNAVAILABILITY_CAUSE); - } - return new MultiThreadIoEventLoopGroup(nThreads, (Executor) null, ioHandlerFactory()) { - @Override - protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, Object... args) { - ManualIoEventLoop eventLoop = new ManualIoEventLoop(this, null, ioHandlerFactory); - // Create a platform thread via the Vert.x thread factory to obtain the correct name - // and register it with the blocked thread checker. Then name the virtual thread the same. - Thread platformThread = threadFactory.newThread(() -> {}); - Thread vt = VIRTUAL_THREAD_FACTORY.newThread(() -> { - while (!eventLoop.isShuttingDown()) { - eventLoop.run(0, RUNNING_YIELD_NS); - Thread.yield(); - eventLoop.runNonBlockingTasks(RUNNING_YIELD_NS); - Thread.yield(); - } - while (!eventLoop.isTerminated()) { - eventLoop.runNow(); - Thread.yield(); - } - }); - vt.setName(platformThread.getName()); - eventLoop.setOwningThread(vt); - vt.start(); - return eventLoop; - } - }; - } -} diff --git a/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java b/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java index 0b5b6bbb831..22823f8d99d 100644 --- a/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java +++ b/vertx-core/src/main/java/io/vertx/core/spi/transport/Transport.java @@ -92,6 +92,19 @@ default EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory thre return new MultiThreadIoEventLoopGroup(nThreads, threadFactory, ioHandlerFactory()); } + /** + * @param type one of {@link #ACCEPTOR_EVENT_LOOP_GROUP} or {@link #IO_EVENT_LOOP_GROUP}. + * @param nThreads the number of threads that will be used by this instance. + * @param threadFactory the ThreadFactory to use. + * @param ioRatio the IO ratio + * @param virtualThreadEventLoops whether event loops should run as virtual threads + * + * @return a new event loop group + */ + default EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio, boolean virtualThreadEventLoops) { + return eventLoopGroup(type, nThreads, threadFactory, ioRatio); + } + /** * @return a new datagram channel */ diff --git a/vertx-core/src/main/java/io/vertx/core/transport/Transport.java b/vertx-core/src/main/java/io/vertx/core/transport/Transport.java index 7fe6e70ecf5..35fb6bc390b 100644 --- a/vertx-core/src/main/java/io/vertx/core/transport/Transport.java +++ b/vertx-core/src/main/java/io/vertx/core/transport/Transport.java @@ -14,7 +14,6 @@ import io.vertx.core.impl.transports.NioTransport; import io.vertx.core.impl.transports.TransportInternal; import io.vertx.core.impl.transports.TransportLoader; -import io.vertx.core.impl.transports.VirtualThreadNioTransport; /** * The transport used by a {@link io.vertx.core.Vertx} instance. @@ -43,15 +42,6 @@ public interface Transport { */ Transport IO_URING = TransportLoader.io_uring(); - /** - * NIO transport with event loops running as long-running virtual threads - * using Netty's {@code ManualIoEventLoop}. - */ - Transport VIRTUAL_THREAD_NIO = new TransportInternal("virtual_thread_nio", - VirtualThreadNioTransport.INSTANCE.isAvailable(), - VirtualThreadNioTransport.INSTANCE.unavailabilityCause(), - VirtualThreadNioTransport.INSTANCE); - /** * @return the name among {@code nio, kqueue, epoll, io_uring} */ diff --git a/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json b/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json index a4897137de2..5c16732bddf 100644 --- a/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json +++ b/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json @@ -34,5 +34,29 @@ "parameterTypes": [] } ] + }, + { + "name": "java.lang.Thread$Builder", + "condition": { + "typeReachable": "io.vertx.core.impl.transports.NioTransport" + }, + "methods": [ + { + "name": "factory", + "parameterTypes": [] + } + ] + }, + { + "name": "java.lang.Thread", + "condition": { + "typeReachable": "io.vertx.core.impl.transports.NioTransport" + }, + "methods": [ + { + "name": "ofVirtual", + "parameterTypes": [] + } + ] } ] diff --git a/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java b/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java index 4782682f75d..d1c460a1846 100644 --- a/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java +++ b/vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java @@ -46,6 +46,7 @@ public class VertxTestBase extends AsyncTestBase { public static final Transport TRANSPORT; public static final boolean USE_DOMAIN_SOCKETS = Boolean.getBoolean("vertx.useDomainSockets"); + public static final boolean USE_VIRTUAL_THREAD_EVENT_LOOPS = Boolean.getBoolean("vertx.virtualThreadEventLoops"); public static final boolean USE_JAVA_MODULES = VertxTestBase.class.getModule().isNamed(); private static final Logger log = LoggerFactory.getLogger(VertxTestBase.class); @@ -68,9 +69,6 @@ public class VertxTestBase extends AsyncTestBase { case "io_uring": transport = Transport.IO_URING; break; - case "virtual_thread_nio": - transport = Transport.VIRTUAL_THREAD_NIO; - break; default: transport = new Transport() { @Override @@ -163,7 +161,11 @@ protected VertxMetricsFactory getMetrics() { } protected VertxOptions getOptions() { - return new VertxOptions(); + VertxOptions options = new VertxOptions(); + if (USE_VIRTUAL_THREAD_EVENT_LOOPS) { + options.setVirtualThreadEventLoops(true); + } + return options; } protected void tearDown() throws Exception { @@ -216,7 +218,7 @@ protected VertxBuilder createVertxBuilder(VertxOptions options) { protected Vertx createVertx(VertxOptions options) { Vertx vertx = createVertxBuilder(options).build(); - if (TRANSPORT != Transport.NIO && TRANSPORT != Transport.VIRTUAL_THREAD_NIO) { + if (TRANSPORT != Transport.NIO) { if (!vertx.isNativeTransportEnabled()) { fail(vertx.unavailableNativeTransportCause()); } From 24b4ed9940ccbbd657d6bda031e8de0391b3436d Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 30 Mar 2026 15:41:06 +0200 Subject: [PATCH 3/3] Use named VT factory instead of phantom platform thread for naming Use Thread.Builder.OfVirtual.name(prefix, start) to create a named virtual thread factory directly, removing the hack that created throwaway platform threads just to steal their name. --- .../io/vertx/core/impl/transports/NioTransport.java | 12 ++++++------ .../io.vertx/vertx-core/reflect-config.json | 12 ++++++++++++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java index 2f930ac8a6b..3e7a2790c0b 100644 --- a/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java +++ b/vertx-core/src/main/java/io/vertx/core/impl/transports/NioTransport.java @@ -33,11 +33,14 @@ public class NioTransport implements Transport { .toNanos(Integer.getInteger("io.vertx.virtualthread.running.yield.us", 1)); // Not cached for graalvm - private static ThreadFactory virtualThreadFactory() { + private static ThreadFactory virtualThreadFactory(String prefix) { try { Class builderClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder"); + Class ofVirtualClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder$OfVirtual"); Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual"); Object builder = ofVirtualMethod.invoke(null); + Method nameMethod = ofVirtualClass.getDeclaredMethod("name", String.class, long.class); + builder = nameMethod.invoke(builder, prefix, 0L); Method factoryMethod = builderClass.getDeclaredMethod("factory"); return (ThreadFactory) factoryMethod.invoke(builder); } catch (Exception e) { @@ -119,7 +122,8 @@ public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threa if (!virtualThreadEventLoops) { return Transport.super.eventLoopGroup(type, nThreads, threadFactory, ioRatio); } - ThreadFactory vtFactory = virtualThreadFactory(); + String prefix = type == ACCEPTOR_EVENT_LOOP_GROUP ? "vert.x-acceptor-thread-" : "vert.x-eventloop-thread-"; + ThreadFactory vtFactory = virtualThreadFactory(prefix); if (vtFactory == null) { throw new IllegalStateException("Virtual threads are not available"); } @@ -127,9 +131,6 @@ public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threa @Override protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, Object... args) { ManualIoEventLoop eventLoop = new ManualIoEventLoop(this, null, ioHandlerFactory); - // Create a platform thread via the Vert.x thread factory to obtain the correct name - // and register it with the blocked thread checker - Thread platformThread = threadFactory.newThread(() -> {}); Thread vt = vtFactory.newThread(() -> { while (!eventLoop.isShuttingDown()) { eventLoop.run(0, RUNNING_YIELD_NS); @@ -142,7 +143,6 @@ protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFact Thread.yield(); } }); - vt.setName(platformThread.getName()); eventLoop.setOwningThread(vt); vt.start(); return eventLoop; diff --git a/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json b/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json index 5c16732bddf..ed718a56505 100644 --- a/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json +++ b/vertx-core/src/main/resources/META-INF/native-image/io.vertx/vertx-core/reflect-config.json @@ -47,6 +47,18 @@ } ] }, + { + "name": "java.lang.Thread$Builder$OfVirtual", + "condition": { + "typeReachable": "io.vertx.core.impl.transports.NioTransport" + }, + "methods": [ + { + "name": "name", + "parameterTypes": ["java.lang.String", "long"] + } + ] + }, { "name": "java.lang.Thread", "condition": {