Skip to content

Commit 9f7b8fb

Browse files
committed
Add virtual thread NIO transport using Netty ManualIoEventLoop
Run Netty NIO event loops as long-running virtual threads instead of platform threads. Uses ManualIoEventLoop from Netty 4.2 to manually drive IO polling and task execution from virtual threads, allowing the JVM's ForkJoinPool scheduler to multiplex event loops alongside other virtual threads. Adds VirtualThreadNioTransport (SPI), Transport.VIRTUAL_THREAD_NIO (public API), VertxTestBase support, and a VirtualThreadNio Maven profile to run the full test suite with this transport.
1 parent 3144cc1 commit 9f7b8fb

File tree

4 files changed

+161
-1
lines changed

4 files changed

+161
-1
lines changed

vertx-core/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,15 @@
808808
</properties>
809809
</profile>
810810

811+
<profile>
812+
<id>VirtualThreadNio</id>
813+
<properties>
814+
<vertx.surefire.nettyTransport>virtual_thread_nio</vertx.surefire.nettyTransport>
815+
<vertx.surefire.useDomainSockets>false</vertx.surefire.useDomainSockets>
816+
<vertx.surefire.useModulePath>false</vertx.surefire.useModulePath>
817+
</properties>
818+
</profile>
819+
811820
<profile>
812821
<id>benchmarks</id>
813822
<build>
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package io.vertx.core.impl.transports;
12+
13+
import io.netty.channel.*;
14+
import io.netty.channel.nio.NioIoHandler;
15+
import io.netty.channel.socket.DatagramChannel;
16+
import io.netty.channel.socket.InternetProtocolFamily;
17+
import io.vertx.core.spi.transport.Transport;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.concurrent.Executor;
21+
import java.util.concurrent.ThreadFactory;
22+
import java.util.concurrent.TimeUnit;
23+
24+
/**
25+
* A transport that uses NIO channels but runs each Netty event loop as a long-running virtual thread
26+
* using {@link ManualIoEventLoop}. This allows the JVM's virtual thread scheduler (ForkJoinPool) to
27+
* multiplex event loops onto platform threads alongside other virtual threads.
28+
*/
29+
public class VirtualThreadNioTransport implements Transport {
30+
31+
private static final long RUNNING_YIELD_NS = TimeUnit.MICROSECONDS
32+
.toNanos(Integer.getInteger("io.vertx.virtualthread.running.yield.us", 1));
33+
34+
private static final ThreadFactory VIRTUAL_THREAD_FACTORY;
35+
private static final Throwable UNAVAILABILITY_CAUSE;
36+
37+
static {
38+
ThreadFactory factory = null;
39+
Throwable cause = null;
40+
try {
41+
Class<?> builderClass = ClassLoader.getSystemClassLoader().loadClass("java.lang.Thread$Builder");
42+
Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual");
43+
Object builder = ofVirtualMethod.invoke(null);
44+
Method factoryMethod = builderClass.getDeclaredMethod("factory");
45+
factory = (ThreadFactory) factoryMethod.invoke(builder);
46+
} catch (Exception e) {
47+
cause = e;
48+
}
49+
VIRTUAL_THREAD_FACTORY = factory;
50+
UNAVAILABILITY_CAUSE = cause;
51+
}
52+
53+
public static final Transport INSTANCE = new VirtualThreadNioTransport();
54+
55+
private final NioTransport delegate = (NioTransport) NioTransport.INSTANCE;
56+
57+
@Override
58+
public boolean isAvailable() {
59+
return VIRTUAL_THREAD_FACTORY != null;
60+
}
61+
62+
@Override
63+
public Throwable unavailabilityCause() {
64+
return UNAVAILABILITY_CAUSE;
65+
}
66+
67+
@Override
68+
public boolean supportsDomainSockets() {
69+
return delegate.supportsDomainSockets();
70+
}
71+
72+
@Override
73+
public java.net.SocketAddress convert(io.vertx.core.net.SocketAddress address) {
74+
return delegate.convert(address);
75+
}
76+
77+
@Override
78+
public io.vertx.core.net.SocketAddress convert(java.net.SocketAddress address) {
79+
return delegate.convert(address);
80+
}
81+
82+
@Override
83+
public IoHandlerFactory ioHandlerFactory() {
84+
return NioIoHandler.newFactory();
85+
}
86+
87+
@Override
88+
public DatagramChannel datagramChannel(InternetProtocolFamily family) {
89+
return delegate.datagramChannel(family);
90+
}
91+
92+
@Override
93+
public ChannelFactory<? extends DatagramChannel> datagramChannelFactory() {
94+
return delegate.datagramChannelFactory();
95+
}
96+
97+
@Override
98+
public ChannelFactory<? extends Channel> channelFactory(boolean domainSocket) {
99+
return delegate.channelFactory(domainSocket);
100+
}
101+
102+
@Override
103+
public ChannelFactory<? extends ServerChannel> serverChannelFactory(boolean domainSocket) {
104+
return delegate.serverChannelFactory(domainSocket);
105+
}
106+
107+
@Override
108+
public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio) {
109+
if (VIRTUAL_THREAD_FACTORY == null) {
110+
throw new IllegalStateException("Virtual threads are not available", UNAVAILABILITY_CAUSE);
111+
}
112+
return new MultiThreadIoEventLoopGroup(nThreads, (Executor) null, ioHandlerFactory()) {
113+
@Override
114+
protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory, Object... args) {
115+
ManualIoEventLoop eventLoop = new ManualIoEventLoop(this, null, ioHandlerFactory);
116+
// Create a platform thread via the Vert.x thread factory to obtain the correct name
117+
// and register it with the blocked thread checker. Then name the virtual thread the same.
118+
Thread platformThread = threadFactory.newThread(() -> {});
119+
Thread vt = VIRTUAL_THREAD_FACTORY.newThread(() -> {
120+
while (!eventLoop.isShuttingDown()) {
121+
eventLoop.run(0, RUNNING_YIELD_NS);
122+
Thread.yield();
123+
eventLoop.runNonBlockingTasks(RUNNING_YIELD_NS);
124+
Thread.yield();
125+
}
126+
while (!eventLoop.isTerminated()) {
127+
eventLoop.runNow();
128+
Thread.yield();
129+
}
130+
});
131+
vt.setName(platformThread.getName());
132+
eventLoop.setOwningThread(vt);
133+
vt.start();
134+
return eventLoop;
135+
}
136+
};
137+
}
138+
}

vertx-core/src/main/java/io/vertx/core/transport/Transport.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.vertx.core.impl.transports.NioTransport;
1515
import io.vertx.core.impl.transports.TransportInternal;
1616
import io.vertx.core.impl.transports.TransportLoader;
17+
import io.vertx.core.impl.transports.VirtualThreadNioTransport;
1718

1819
/**
1920
* The transport used by a {@link io.vertx.core.Vertx} instance.
@@ -42,6 +43,15 @@ public interface Transport {
4243
*/
4344
Transport IO_URING = TransportLoader.io_uring();
4445

46+
/**
47+
* NIO transport with event loops running as long-running virtual threads
48+
* using Netty's {@code ManualIoEventLoop}.
49+
*/
50+
Transport VIRTUAL_THREAD_NIO = new TransportInternal("virtual_thread_nio",
51+
VirtualThreadNioTransport.INSTANCE.isAvailable(),
52+
VirtualThreadNioTransport.INSTANCE.unavailabilityCause(),
53+
VirtualThreadNioTransport.INSTANCE);
54+
4555
/**
4656
* @return the name among {@code nio, kqueue, epoll, io_uring}
4757
*/

vertx-core/src/test/java/io/vertx/test/core/VertxTestBase.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public class VertxTestBase extends AsyncTestBase {
6868
case "io_uring":
6969
transport = Transport.IO_URING;
7070
break;
71+
case "virtual_thread_nio":
72+
transport = Transport.VIRTUAL_THREAD_NIO;
73+
break;
7174
default:
7275
transport = new Transport() {
7376
@Override
@@ -213,7 +216,7 @@ protected VertxBuilder createVertxBuilder(VertxOptions options) {
213216

214217
protected Vertx createVertx(VertxOptions options) {
215218
Vertx vertx = createVertxBuilder(options).build();
216-
if (TRANSPORT != Transport.NIO) {
219+
if (TRANSPORT != Transport.NIO && TRANSPORT != Transport.VIRTUAL_THREAD_NIO) {
217220
if (!vertx.isNativeTransportEnabled()) {
218221
fail(vertx.unavailableNativeTransportCause());
219222
}

0 commit comments

Comments
 (0)