Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions vertx-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<jmh.version>1.37</jmh.version>
<vertx.surefire.nettyTransport>jdk</vertx.surefire.nettyTransport>
<vertx.surefire.useDomainSockets>false</vertx.surefire.useDomainSockets>
<vertx.surefire.virtualThreadEventLoops>false</vertx.surefire.virtualThreadEventLoops>
</properties>

<dependencies>
Expand Down Expand Up @@ -283,6 +284,7 @@
<vertx.handle100Continue>true</vertx.handle100Continue>
<vertx.transport>${vertx.surefire.nettyTransport}</vertx.transport>
<vertx.useDomainSockets>${vertx.surefire.useDomainSockets}</vertx.useDomainSockets>
<vertx.virtualThreadEventLoops>${vertx.surefire.virtualThreadEventLoops}</vertx.virtualThreadEventLoops>
<vertx.threadChecks>true</vertx.threadChecks>
</systemPropertyVariables>
<!-- Needs to be small enough to run in a EC2 1.7GB small instance -->
Expand Down Expand Up @@ -808,6 +810,15 @@
</properties>
</profile>

<profile>
<id>VirtualThreadNio</id>
<properties>
<vertx.surefire.virtualThreadEventLoops>true</vertx.surefire.virtualThreadEventLoops>
<vertx.surefire.useDomainSockets>false</vertx.surefire.useDomainSockets>
<vertx.surefire.useModulePath>false</vertx.surefire.useModulePath>
</properties>
</profile>

<profile>
<id>benchmarks</id>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, VertxOp
obj.setPreferNativeTransport((Boolean)member.getValue());
}
break;
case "virtualThreadEventLoops":
if (member.getValue() instanceof Boolean) {
obj.setVirtualThreadEventLoops((Boolean)member.getValue());
}
break;
case "maxEventLoopExecuteTimeUnit":
if (member.getValue() instanceof String) {
obj.setMaxEventLoopExecuteTimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
Expand Down Expand Up @@ -156,6 +161,7 @@ static void toJson(VertxOptions obj, java.util.Map<String, Object> json) {
json.put("addressResolverOptions", obj.getAddressResolverOptions().toJson());
}
json.put("preferNativeTransport", obj.getPreferNativeTransport());
json.put("virtualThreadEventLoops", obj.getVirtualThreadEventLoops());
if (obj.getMaxEventLoopExecuteTimeUnit() != null) {
json.put("maxEventLoopExecuteTimeUnit", obj.getMaxEventLoopExecuteTimeUnit().name());
}
Expand Down
32 changes: 32 additions & 0 deletions vertx-core/src/main/java/io/vertx/core/VertxOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public class VertxOptions {
*/
public static final boolean DEFAULT_PREFER_NATIVE_TRANSPORT = false;

/**
* The default value for using virtual thread event loops = false
*/
public static final boolean DEFAULT_VIRTUAL_THREAD_EVENT_LOOPS = false;

/**
* The default value of warning exception time 5000000000 ns (5 seconds)
* If a thread is blocked longer than this threshold, the warning log
Expand Down Expand Up @@ -138,6 +143,7 @@ public class VertxOptions {
private EventBusOptions eventBusOptions = new EventBusOptions();
private AddressResolverOptions addressResolverOptions = new AddressResolverOptions();
private boolean preferNativeTransport = DEFAULT_PREFER_NATIVE_TRANSPORT;
private boolean virtualThreadEventLoops = DEFAULT_VIRTUAL_THREAD_EVENT_LOOPS;
private TimeUnit maxEventLoopExecuteTimeUnit = DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME_UNIT;
private TimeUnit maxWorkerExecuteTimeUnit = DEFAULT_MAX_WORKER_EXECUTE_TIME_UNIT;
private TimeUnit warningExceptionTimeUnit = DEFAULT_WARNING_EXCEPTION_TIME_UNIT;
Expand Down Expand Up @@ -172,6 +178,7 @@ public VertxOptions(VertxOptions other) {
this.eventBusOptions = new EventBusOptions(other.eventBusOptions);
this.addressResolverOptions = other.addressResolverOptions != null ? new AddressResolverOptions(other.getAddressResolverOptions()) : null;
this.preferNativeTransport = other.preferNativeTransport;
this.virtualThreadEventLoops = other.virtualThreadEventLoops;
this.maxEventLoopExecuteTimeUnit = other.maxEventLoopExecuteTimeUnit;
this.maxWorkerExecuteTimeUnit = other.maxWorkerExecuteTimeUnit;
this.warningExceptionTimeUnit = other.warningExceptionTimeUnit;
Expand Down Expand Up @@ -538,6 +545,30 @@ public VertxOptions setPreferNativeTransport(boolean preferNativeTransport) {
return this;
}

/**
* @return whether event loop threads run as virtual threads on the NIO transport
*/
public boolean getVirtualThreadEventLoops() {
return virtualThreadEventLoops;
}

/**
* Set whether event loop threads should run as virtual threads using Netty's {@code ManualIoEventLoop}.
* <p>
* When enabled, each Netty event loop runs as a long-running virtual thread, allowing the JVM's
* virtual thread scheduler to multiplex event loops onto platform threads alongside other virtual threads.
* <p>
* This option is only supported with the NIO transport. Native transports (epoll, kqueue, io_uring) use
* JNI calls that pin virtual threads to carrier threads, defeating the purpose.
*
* @param virtualThreadEventLoops {@code true} to run event loops as virtual threads
* @return a reference to this, so the API can be used fluently
*/
public VertxOptions setVirtualThreadEventLoops(boolean virtualThreadEventLoops) {
this.virtualThreadEventLoops = virtualThreadEventLoops;
return this;
}

/**
* @return the time unit of {@code maxEventLoopExecuteTime}
*/
Expand Down Expand Up @@ -699,6 +730,7 @@ public String toString() {
", maxWorkerExecuteTime=" + maxWorkerExecuteTime +
", haEnabled=" + haEnabled +
", preferNativeTransport=" + preferNativeTransport +
", virtualThreadEventLoops=" + virtualThreadEventLoops +
", quorumSize=" + quorumSize +
", haGroup='" + haGroup + '\'' +
", metrics=" + metricsOptions +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ private VertxImpl instantiateVertx(ClusterManager clusterManager, NodeSelector n
} else {
tr = NioTransport.INSTANCE;
}
if (options.getVirtualThreadEventLoops() && !(tr instanceof NioTransport)) {
throw new IllegalStateException("Virtual thread event loops are only supported with the NIO transport");
}
return new VertxImpl(
options,
clusterManager,
Expand Down
5 changes: 3 additions & 2 deletions vertx-core/src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,11 @@ private static ThreadFactory virtualThreadFactory() {
maxEventLoopExecTime = maxEventLoopExecuteTime;
maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit;
eventLoopThreadFactory = createThreadFactory(threadFactory, checker, useDaemonThread, maxEventLoopExecTime, maxEventLoopExecTimeUnit, "vert.x-eventloop-thread-", false);
eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
boolean vtEventLoops = options.getVirtualThreadEventLoops();
eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO, vtEventLoops);
// The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections
// under a lot of load
acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100);
acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100, vtEventLoops);
virtualThreadExecutor = virtualThreadFactory != null ? new ThreadPerTaskExecutorService(virtualThreadFactory) : null;
virtualThreadWorkerPool = virtualThreadFactory != null ? new WorkerPool(virtualThreadExecutor, virtualThreadWorkerPoolMetrics) : null;
internalWorkerPool = new WorkerPool(internalWorkerExec, internalBlockingPoolMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,44 @@
*/
package io.vertx.core.impl.transports;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.IoHandlerFactory;
import io.netty.channel.ServerChannel;
import io.netty.channel.*;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.*;
import io.vertx.core.spi.transport.Transport;

import java.lang.reflect.Method;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class NioTransport implements Transport {
/**
* The NIO transport, always there.
*/
public static final Transport INSTANCE = new NioTransport();

private static final long RUNNING_YIELD_NS = TimeUnit.MICROSECONDS
.toNanos(Integer.getInteger("io.vertx.virtualthread.running.yield.us", 1));

// Not cached for graalvm
private static ThreadFactory virtualThreadFactory(String prefix) {
try {
Class<?> builderClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder");
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vietj better ideas than doing this?

Copy link
Copy Markdown
Member

@vietj vietj Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in vertx 5.1 we will use multi release jar with Java 21, I think it should help.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will

So right now is not implemented AFAIK correct? 🙏

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, there is a PR for Jackson V3 in progress that provides that

Class<?> ofVirtualClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder$OfVirtual");
Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual");
Object builder = ofVirtualMethod.invoke(null);
Method nameMethod = ofVirtualClass.getDeclaredMethod("name", String.class, long.class);
builder = nameMethod.invoke(builder, prefix, 0L);
Method factoryMethod = builderClass.getDeclaredMethod("factory");
return (ThreadFactory) factoryMethod.invoke(builder);
} catch (Exception e) {
return null;
}
}

private final UnixDomainSocketNioTransport unixDomainSocketNioTransport = UnixDomainSocketNioTransport.load();

@Override
Expand Down Expand Up @@ -96,4 +116,37 @@ public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean doma
}
return NioServerSocketChannel::new;
}

@Override
public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio, boolean virtualThreadEventLoops) {
if (!virtualThreadEventLoops) {
return Transport.super.eventLoopGroup(type, nThreads, threadFactory, ioRatio);
}
String prefix = type == ACCEPTOR_EVENT_LOOP_GROUP ? "vert.x-acceptor-thread-" : "vert.x-eventloop-thread-";
ThreadFactory vtFactory = virtualThreadFactory(prefix);
if (vtFactory == null) {
throw new IllegalStateException("Virtual threads are not available");
}
return new MultiThreadIoEventLoopGroup(nThreads, (Executor) null, ioHandlerFactory()) {
@Override
protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, Object... args) {
ManualIoEventLoop eventLoop = new ManualIoEventLoop(this, null, ioHandlerFactory);
Thread vt = vtFactory.newThread(() -> {
while (!eventLoop.isShuttingDown()) {
eventLoop.run(0, RUNNING_YIELD_NS);
Thread.yield();
eventLoop.runNonBlockingTasks(RUNNING_YIELD_NS);
Thread.yield();
}
while (!eventLoop.isTerminated()) {
eventLoop.runNow();
Thread.yield();
}
});
eventLoop.setOwningThread(vt);
vt.start();
return eventLoop;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ default EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory thre
return new MultiThreadIoEventLoopGroup(nThreads, threadFactory, ioHandlerFactory());
}

/**
* @param type one of {@link #ACCEPTOR_EVENT_LOOP_GROUP} or {@link #IO_EVENT_LOOP_GROUP}.
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use.
* @param ioRatio the IO ratio
* @param virtualThreadEventLoops whether event loops should run as virtual threads
*
* @return a new event loop group
*/
default EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio, boolean virtualThreadEventLoops) {
return eventLoopGroup(type, nThreads, threadFactory, ioRatio);
}

/**
* @return a new datagram channel
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,41 @@
"parameterTypes": []
}
]
},
{
"name": "java.lang.Thread$Builder",
"condition": {
"typeReachable": "io.vertx.core.impl.transports.NioTransport"
},
"methods": [
{
"name": "factory",
"parameterTypes": []
}
]
},
{
"name": "java.lang.Thread$Builder$OfVirtual",
"condition": {
"typeReachable": "io.vertx.core.impl.transports.NioTransport"
},
"methods": [
{
"name": "name",
"parameterTypes": ["java.lang.String", "long"]
}
]
},
{
"name": "java.lang.Thread",
"condition": {
"typeReachable": "io.vertx.core.impl.transports.NioTransport"
},
"methods": [
{
"name": "ofVirtual",
"parameterTypes": []
}
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class VertxTestBase extends AsyncTestBase {

public static final Transport TRANSPORT;
public static final boolean USE_DOMAIN_SOCKETS = Boolean.getBoolean("vertx.useDomainSockets");
public static final boolean USE_VIRTUAL_THREAD_EVENT_LOOPS = Boolean.getBoolean("vertx.virtualThreadEventLoops");
public static final boolean USE_JAVA_MODULES = VertxTestBase.class.getModule().isNamed();
private static final Logger log = LoggerFactory.getLogger(VertxTestBase.class);

Expand Down Expand Up @@ -160,7 +161,11 @@ protected VertxMetricsFactory getMetrics() {
}

protected VertxOptions getOptions() {
return new VertxOptions();
VertxOptions options = new VertxOptions();
if (USE_VIRTUAL_THREAD_EVENT_LOOPS) {
options.setVirtualThreadEventLoops(true);
}
return options;
}

protected void tearDown() throws Exception {
Expand Down
Loading