Skip to content

Commit 71b42e6

Browse files
committed
Adopt IoHandlerFactories for EventLoopGroups
1 parent 3a6c314 commit 71b42e6

19 files changed

Lines changed: 380 additions & 205 deletions

benchmark/src/main/java/com/eatthepath/pushy/apns/ApnsClientBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void setUp() throws Exception {
120120
this.server = new BenchmarkApnsServerBuilder()
121121
.setServerCredentials(SERVER_CERTIFICATE_BUNDLE.getCertificatePathWithRoot(), SERVER_CERTIFICATE_BUNDLE.getKeyPair().getPrivate())
122122
.setTrustedClientCertificateChain(CA_BUNDLE.getCertificate())
123-
.setEventLoopGroup(this.serverEventLoopGroup)
123+
.setIoEventLoopGroup(this.serverEventLoopGroup)
124124
.build();
125125

126126
this.pushNotifications = new ArrayList<>(this.notificationCount);

pushy/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@
7676
<classifier>linux-x86_64</classifier>
7777
<scope>test</scope>
7878
</dependency>
79+
<dependency>
80+
<groupId>io.netty</groupId>
81+
<artifactId>netty-transport-native-io_uring</artifactId>
82+
<scope>test</scope>
83+
</dependency>
7984
<dependency>
8085
<groupId>io.netty</groupId>
8186
<artifactId>netty-transport-native-kqueue</artifactId>

pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class ApnsChannelFactory implements PooledObjectFactory<Channel>, Closeable {
7878
: clientResources.getRoundRobinDnsAddressResolverGroup();
7979

8080
this.bootstrapTemplate = new Bootstrap();
81-
this.bootstrapTemplate.group(clientResources.getEventLoopGroup());
81+
this.bootstrapTemplate.group(clientResources.getIoEventLoopGroup());
8282
this.bootstrapTemplate.option(ChannelOption.TCP_NODELAY, true);
8383
this.bootstrapTemplate.remoteAddress(clientConfiguration.getApnsServerAddress());
8484
this.bootstrapTemplate.resolver(this.addressResolverGroup);
@@ -148,11 +148,10 @@ public Future<Channel> create(final Promise<Channel> channelReadyPromise) {
148148
ApnsChannelFactory.this.currentDelaySeconds.compareAndSet(delay, updatedDelay);
149149
});
150150

151-
152151
this.bootstrapTemplate.config().group().schedule(() -> {
153152
final Bootstrap bootstrap = ApnsChannelFactory.this.bootstrapTemplate.clone()
154153
.channelFactory(new AugmentingReflectiveChannelFactory<>(
155-
ClientChannelClassUtil.getSocketChannelClass(ApnsChannelFactory.this.bootstrapTemplate.config().group()),
154+
ClientChannelClassUtil.getSocketChannelClass((IoEventLoopGroup) ApnsChannelFactory.this.bootstrapTemplate.config().group()),
156155
CHANNEL_READY_PROMISE_ATTRIBUTE_KEY, channelReadyPromise));
157156

158157
final ChannelFuture connectFuture = bootstrap.connect();

pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
import com.eatthepath.pushy.apns.util.concurrent.PushNotificationFuture;
2626
import io.netty.channel.Channel;
2727
import io.netty.channel.ChannelFuture;
28-
import io.netty.channel.nio.NioEventLoopGroup;
28+
import io.netty.channel.MultiThreadIoEventLoopGroup;
29+
import io.netty.channel.nio.NioIoHandler;
2930
import io.netty.util.concurrent.Future;
3031
import io.netty.util.concurrent.GenericFutureListener;
3132
import org.slf4j.Logger;
@@ -117,12 +118,11 @@ public void handleConnectionCreationFailed() {
117118
}
118119

119120
ApnsClient(final ApnsClientConfiguration clientConfiguration, final ApnsClientResources clientResources) {
120-
121121
if (clientResources != null) {
122122
this.clientResources = clientResources;
123123
this.shouldShutDownClientResources = false;
124124
} else {
125-
this.clientResources = new ApnsClientResources(new NioEventLoopGroup(1));
125+
this.clientResources = new ApnsClientResources(new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()));
126126
this.shouldShutDownClientResources = true;
127127
}
128128

@@ -151,7 +151,7 @@ public void handleConnectionCreationFailed() {
151151

152152
this.channelPool = new ApnsChannelPool(channelFactory,
153153
clientConfiguration.getConcurrentConnections(),
154-
this.clientResources.getEventLoopGroup().next(),
154+
this.clientResources.getIoEventLoopGroup().next(),
155155
channelPoolMetricsListener);
156156
}
157157

pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientResources.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.eatthepath.pushy.apns;
22

3-
import io.netty.channel.EventLoopGroup;
3+
import io.netty.channel.IoEventLoopGroup;
44
import io.netty.resolver.dns.DefaultDnsServerAddressStreamProvider;
55
import io.netty.resolver.dns.RoundRobinDnsAddressResolverGroup;
66
import io.netty.util.concurrent.Future;
@@ -19,20 +19,20 @@
1919
*/
2020
public class ApnsClientResources {
2121

22-
private final EventLoopGroup eventLoopGroup;
22+
private final IoEventLoopGroup ioEventLoopGroup;
2323
private final RoundRobinDnsAddressResolverGroup roundRobinDnsAddressResolverGroup;
2424

2525
/**
2626
* Constructs a new set of client resources that uses the given default event loop group. Clients that use this
2727
* resource set will use the given event loop group for IO operations.
2828
*
29-
* @param eventLoopGroup the event loop group for this set of resources
29+
* @param ioEventLoopGroup the event loop group for this set of resources
3030
*/
31-
public ApnsClientResources(final EventLoopGroup eventLoopGroup) {
32-
this.eventLoopGroup = Objects.requireNonNull(eventLoopGroup);
31+
public ApnsClientResources(final IoEventLoopGroup ioEventLoopGroup) {
32+
this.ioEventLoopGroup = Objects.requireNonNull(ioEventLoopGroup);
3333

3434
this.roundRobinDnsAddressResolverGroup = new RoundRobinDnsAddressResolverGroup(
35-
ClientChannelClassUtil.getDatagramChannelClass(eventLoopGroup),
35+
ClientChannelClassUtil.getDatagramChannelClass(ioEventLoopGroup),
3636
DefaultDnsServerAddressStreamProvider.INSTANCE);
3737
}
3838

@@ -41,8 +41,8 @@ public ApnsClientResources(final EventLoopGroup eventLoopGroup) {
4141
*
4242
* @return the event loop group for this resource set
4343
*/
44-
public EventLoopGroup getEventLoopGroup() {
45-
return eventLoopGroup;
44+
public IoEventLoopGroup getIoEventLoopGroup() {
45+
return ioEventLoopGroup;
4646
}
4747

4848
/**
@@ -65,6 +65,6 @@ public RoundRobinDnsAddressResolverGroup getRoundRobinDnsAddressResolverGroup()
6565
*/
6666
public Future<?> shutdownGracefully() {
6767
roundRobinDnsAddressResolverGroup.close();
68-
return eventLoopGroup.shutdownGracefully();
68+
return ioEventLoopGroup.shutdownGracefully();
6969
}
7070
}

pushy/src/main/java/com/eatthepath/pushy/apns/ClientChannelClassUtil.java

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222

2323
package com.eatthepath.pushy.apns;
2424

25-
import io.netty.channel.EventLoopGroup;
25+
import io.netty.channel.Channel;
26+
import io.netty.channel.IoEventLoopGroup;
27+
import io.netty.channel.IoHandler;
2628
import io.netty.channel.socket.DatagramChannel;
2729
import io.netty.channel.socket.SocketChannel;
2830

@@ -36,64 +38,66 @@ class ClientChannelClassUtil {
3638
private static final Map<String, String> DATAGRAM_CHANNEL_CLASSES = new HashMap<>();
3739

3840
static {
39-
SOCKET_CHANNEL_CLASSES.put("io.netty.channel.nio.NioEventLoopGroup", "io.netty.channel.socket.nio.NioSocketChannel");
40-
SOCKET_CHANNEL_CLASSES.put("io.netty.channel.epoll.EpollEventLoopGroup", "io.netty.channel.epoll.EpollSocketChannel");
41-
SOCKET_CHANNEL_CLASSES.put("io.netty.channel.kqueue.KQueueEventLoopGroup", "io.netty.channel.kqueue.KQueueSocketChannel");
41+
SOCKET_CHANNEL_CLASSES.put("io.netty.channel.nio.NioIoHandler", "io.netty.channel.socket.nio.NioSocketChannel");
42+
SOCKET_CHANNEL_CLASSES.put("io.netty.channel.uring.IoUringIoHandler", "io.netty.channel.uring.IoUringSocketChannel");
43+
SOCKET_CHANNEL_CLASSES.put("io.netty.channel.epoll.EpollIoHandler", "io.netty.channel.epoll.EpollSocketChannel");
44+
SOCKET_CHANNEL_CLASSES.put("io.netty.channel.kqueue.KQueueIoHandler", "io.netty.channel.kqueue.KQueueSocketChannel");
4245

43-
DATAGRAM_CHANNEL_CLASSES.put("io.netty.channel.nio.NioEventLoopGroup", "io.netty.channel.socket.nio.NioDatagramChannel");
44-
DATAGRAM_CHANNEL_CLASSES.put("io.netty.channel.epoll.EpollEventLoopGroup", "io.netty.channel.epoll.EpollDatagramChannel");
45-
DATAGRAM_CHANNEL_CLASSES.put("io.netty.channel.kqueue.KQueueEventLoopGroup", "io.netty.channel.kqueue.KQueueDatagramChannel");
46+
DATAGRAM_CHANNEL_CLASSES.put("io.netty.channel.nio.NioIoHandler", "io.netty.channel.socket.nio.NioDatagramChannel");
47+
DATAGRAM_CHANNEL_CLASSES.put("io.netty.channel.uring.IoUringIoHandler", "io.netty.channel.uring.IoUringDatagramChannel");
48+
DATAGRAM_CHANNEL_CLASSES.put("io.netty.channel.epoll.EpollIoHandler", "io.netty.channel.epoll.EpollDatagramChannel");
49+
DATAGRAM_CHANNEL_CLASSES.put("io.netty.channel.kqueue.KQueueIoHandler", "io.netty.channel.kqueue.KQueueDatagramChannel");
4650
}
4751

4852
/**
4953
* Returns a socket channel class suitable for specified event loop group.
5054
*
51-
* @param eventLoopGroup the event loop group for which to identify an appropriate socket channel class; must not
55+
* @param ioEventLoopGroup the event loop group for which to identify an appropriate socket channel class; must not
5256
* be {@code null}
5357
*
5458
* @return a socket channel class suitable for use with the given event loop group
5559
*
56-
* @throws IllegalArgumentException in case of null or unrecognized event loop group
60+
* @throws IllegalArgumentException if no suitable socket channel class could be found for the given event loop
61+
* group
62+
* @throws NullPointerException if the given {@code ioEventLoopGroup} was {@code null}
5763
*/
58-
static Class<? extends SocketChannel> getSocketChannelClass(final EventLoopGroup eventLoopGroup) {
59-
Objects.requireNonNull(eventLoopGroup);
60-
61-
final String socketChannelClassName = SOCKET_CHANNEL_CLASSES.get(eventLoopGroup.getClass().getName());
62-
63-
if (socketChannelClassName == null) {
64-
throw new IllegalArgumentException("No socket channel class found for event loop group type: " + eventLoopGroup.getClass().getName());
65-
}
66-
67-
try {
68-
return Class.forName(socketChannelClassName).asSubclass(SocketChannel.class);
69-
} catch (final ClassNotFoundException e) {
70-
throw new IllegalArgumentException(e);
71-
}
64+
static Class<? extends SocketChannel> getSocketChannelClass(final IoEventLoopGroup ioEventLoopGroup) {
65+
return getChannelClass(Objects.requireNonNull(ioEventLoopGroup), SOCKET_CHANNEL_CLASSES, SocketChannel.class);
7266
}
7367

7468
/**
7569
* Returns a datagram channel class suitable for specified event loop group.
7670
*
77-
* @param eventLoopGroup the event loop group for which to identify an appropriate datagram channel class; must not
78-
* be {@code null}
71+
* @param ioEventLoopGroup the event loop group for which to identify an appropriate datagram channel class; must
72+
* not be {@code null}
7973
*
8074
* @return a datagram channel class suitable for use with the given event loop group
8175
*
82-
* @throws IllegalArgumentException in case of null or unrecognized event loop group
76+
* @throws IllegalArgumentException if no suitable datagram channel class could be found for the given event loop
77+
* group
78+
* @throws NullPointerException if the given {@code ioEventLoopGroup} was {@code null}
8379
*/
84-
static Class<? extends DatagramChannel> getDatagramChannelClass(final EventLoopGroup eventLoopGroup) {
85-
Objects.requireNonNull(eventLoopGroup);
80+
static Class<? extends DatagramChannel> getDatagramChannelClass(final IoEventLoopGroup ioEventLoopGroup) {
81+
return getChannelClass(Objects.requireNonNull(ioEventLoopGroup), DATAGRAM_CHANNEL_CLASSES, DatagramChannel.class);
82+
}
8683

87-
final String datagramChannelClassName = DATAGRAM_CHANNEL_CLASSES.get(eventLoopGroup.getClass().getName());
84+
private static <C extends Channel> Class<? extends C> getChannelClass(final IoEventLoopGroup ioEventLoopGroup,
85+
final Map<String, String> channelClassesByIoHandlerClass,
86+
final Class<C> channelType) {
8887

89-
if (datagramChannelClassName == null) {
90-
throw new IllegalArgumentException("No datagram channel class found for event loop group type: " + eventLoopGroup.getClass().getName());
91-
}
88+
for (final Map.Entry<String, String> entry : channelClassesByIoHandlerClass.entrySet()) {
89+
try {
90+
final Class<? extends IoHandler> ioHandlerClass =
91+
Class.forName(entry.getKey()).asSubclass(IoHandler.class);
9292

93-
try {
94-
return Class.forName(datagramChannelClassName).asSubclass(DatagramChannel.class);
95-
} catch (final ClassNotFoundException e) {
96-
throw new IllegalArgumentException(e);
93+
if (ioEventLoopGroup.isIoType(ioHandlerClass)) {
94+
return Class.forName(entry.getValue()).asSubclass(channelType);
95+
}
96+
} catch (final ClassNotFoundException e) {
97+
continue;
98+
}
9799
}
100+
101+
throw new IllegalArgumentException("No suitable channel class found for event loop group");
98102
}
99103
}

pushy/src/main/java/com/eatthepath/pushy/apns/server/BaseHttp2Server.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.netty.channel.*;
2727
import io.netty.channel.group.ChannelGroup;
2828
import io.netty.channel.group.DefaultChannelGroup;
29-
import io.netty.channel.nio.NioEventLoopGroup;
29+
import io.netty.channel.nio.NioIoHandler;
3030
import io.netty.channel.socket.SocketChannel;
3131
import io.netty.handler.ssl.SslContext;
3232
import io.netty.handler.ssl.SslHandler;
@@ -76,13 +76,13 @@ public void exceptionCaught(final ChannelHandlerContext context, final Throwable
7676
this.bootstrap.group(eventLoopGroup);
7777
this.shouldShutDownEventLoopGroup = false;
7878
} else {
79-
this.bootstrap.group(new NioEventLoopGroup(1));
79+
this.bootstrap.group(new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()));
8080
this.shouldShutDownEventLoopGroup = true;
8181
}
8282

8383
this.allChannels = new DefaultChannelGroup(this.bootstrap.config().group().next());
8484

85-
this.bootstrap.channel(ServerChannelClassUtil.getServerSocketChannelClass(this.bootstrap.config().group()));
85+
this.bootstrap.channel(ServerChannelClassUtil.getServerSocketChannelClass((IoEventLoopGroup) this.bootstrap.config().group()));
8686
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
8787

8888
@Override

pushy/src/main/java/com/eatthepath/pushy/apns/server/BaseHttp2ServerBuilder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
package com.eatthepath.pushy.apns.server;
2424

25-
import io.netty.channel.EventLoopGroup;
25+
import io.netty.channel.IoEventLoopGroup;
2626
import io.netty.handler.codec.http2.Http2SecurityUtil;
2727
import io.netty.handler.ssl.*;
2828
import io.netty.util.ReferenceCounted;
@@ -53,7 +53,7 @@ abstract class BaseHttp2ServerBuilder <T extends BaseHttp2Server> {
5353
protected InputStream trustedClientCertificateInputStream;
5454
protected X509Certificate[] trustedClientCertificates;
5555

56-
protected EventLoopGroup eventLoopGroup;
56+
protected IoEventLoopGroup ioEventLoopGroup;
5757

5858
protected int maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
5959

@@ -231,15 +231,15 @@ public BaseHttp2ServerBuilder<T> setTrustedClientCertificateChain(final X509Cert
231231
* <p>Sets the event loop group to be used by the server under construction. If not set (or if {@code null}), the
232232
* server will create and manage its own event loop group.</p>
233233
*
234-
* @param eventLoopGroup the event loop group to use for this server, or {@code null} to let the server manage its
234+
* @param ioEventLoopGroup the event loop group to use for this server, or {@code null} to let the server manage its
235235
* own event loop group
236236
*
237237
* @return a reference to this builder
238238
*
239239
* @since 0.8
240240
*/
241-
public BaseHttp2ServerBuilder<T> setEventLoopGroup(final EventLoopGroup eventLoopGroup) {
242-
this.eventLoopGroup = eventLoopGroup;
241+
public BaseHttp2ServerBuilder<T> setIoEventLoopGroup(final IoEventLoopGroup ioEventLoopGroup) {
242+
this.ioEventLoopGroup = ioEventLoopGroup;
243243
return this;
244244
}
245245

pushy/src/main/java/com/eatthepath/pushy/apns/server/BenchmarkApnsServer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
package com.eatthepath.pushy.apns.server;
2424

2525
import io.netty.channel.ChannelPipeline;
26-
import io.netty.channel.EventLoopGroup;
26+
import io.netty.channel.IoEventLoopGroup;
2727
import io.netty.handler.codec.http2.Http2Settings;
2828
import io.netty.handler.ssl.SslContext;
2929

@@ -42,8 +42,8 @@ public class BenchmarkApnsServer extends BaseHttp2Server {
4242

4343
private final int maxConcurrentStreams;
4444

45-
BenchmarkApnsServer(final SslContext sslContext, final EventLoopGroup eventLoopGroup, final int maxConcurrentStreams) {
46-
super(sslContext, eventLoopGroup);
45+
BenchmarkApnsServer(final SslContext sslContext, final IoEventLoopGroup ioEventLoopGroup, final int maxConcurrentStreams) {
46+
super(sslContext, ioEventLoopGroup);
4747

4848
this.maxConcurrentStreams = maxConcurrentStreams;
4949
}

pushy/src/main/java/com/eatthepath/pushy/apns/server/BenchmarkApnsServerBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
package com.eatthepath.pushy.apns.server;
2424

25-
import io.netty.channel.EventLoopGroup;
25+
import io.netty.channel.IoEventLoopGroup;
2626
import io.netty.handler.ssl.SslContext;
2727

2828
import javax.net.ssl.SSLException;
@@ -88,8 +88,8 @@ public BenchmarkApnsServerBuilder setTrustedClientCertificateChain(final X509Cer
8888
}
8989

9090
@Override
91-
public BenchmarkApnsServerBuilder setEventLoopGroup(final EventLoopGroup eventLoopGroup) {
92-
super.setEventLoopGroup(eventLoopGroup);
91+
public BenchmarkApnsServerBuilder setIoEventLoopGroup(final IoEventLoopGroup ioEventLoopGroup) {
92+
super.setIoEventLoopGroup(ioEventLoopGroup);
9393
return this;
9494
}
9595

@@ -112,6 +112,6 @@ public BenchmarkApnsServer build() throws SSLException {
112112

113113
@Override
114114
protected BenchmarkApnsServer constructServer(final SslContext sslContext) {
115-
return new BenchmarkApnsServer(sslContext, this.eventLoopGroup, this.maxConcurrentStreams);
115+
return new BenchmarkApnsServer(sslContext, this.ioEventLoopGroup, this.maxConcurrentStreams);
116116
}
117117
}

0 commit comments

Comments
 (0)