Skip to content

Commit 0ae0b1c

Browse files
feat: enable netty event loop and byte buffer allocator metrics (kroxylicious#2787)
* feat: enable netty event loop and byte buffer allocator metrics --------- Signed-off-by: The-East-Wind <hariatul1998@gmail.com>
1 parent f33c801 commit 0ae0b1c

4 files changed

Lines changed: 136 additions & 36 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ Format `<github issue/pr number>: <short description>`.
99

1010
* [#2580](https://github.com/kroxylicious/kroxylicious/issues/2580): Add an Azure Key Vault KMS implementation for Record Encryption
1111
* [#2759](https://github.com/kroxylicious/kroxylicious/pull/2759): Remove kroxylicious-sample and document how to use io.kroxylicious:kroxylicious-filter-archetype
12-
* [#2671](https://github.com/kroxylicious/kroxylicious/pull/2671): SASL inspection filter supoporting PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512.
12+
* [#2671](https://github.com/kroxylicious/kroxylicious/pull/2671): SASL inspection filter supporting PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512.
1313
* [#2681](https://github.com/kroxylicious/kroxylicious/pull/2681): Create a maven archetype for filter development io.kroxylicious:kroxylicious-filter-archetype
1414
* [#2778](https://github.com/kroxylicious/kroxylicious/pull/2778): The proxy now allocates a sessionId to connect client and server channels for logging purposes. Allowing users to track activity between downstream and upstream channels.
15-
15+
* [#143](https://github.com/kroxylicious/kroxylicious/issues/143): Add support for Netty metrics
1616

1717
## 0.16.0
1818

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/KafkaProxy.java

Lines changed: 30 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.slf4j.LoggerFactory;
2121

2222
import io.netty.bootstrap.ServerBootstrap;
23+
import io.netty.buffer.ByteBufAllocator;
2324
import io.netty.channel.ChannelFutureListener;
2425
import io.netty.channel.ChannelOption;
2526
import io.netty.channel.EventLoopGroup;
@@ -66,11 +67,28 @@ public final class KafkaProxy implements AutoCloseable {
6667
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProxy.class);
6768
private static final Logger STARTUP_SHUTDOWN_LOGGER = LoggerFactory.getLogger("io.kroxylicious.proxy.StartupShutdownLogger");
6869

69-
private record EventGroupConfig(String name, EventLoopGroup bossGroup, EventLoopGroup workerGroup, Class<? extends ServerChannel> clazz) {
70+
@VisibleForTesting
71+
record EventGroupConfig(String name, EventLoopGroup bossGroup, EventLoopGroup workerGroup, Class<? extends ServerChannel> clazz) {
7072

7173
public List<Future<?>> shutdownGracefully() {
7274
return List.of(bossGroup.shutdownGracefully(), workerGroup.shutdownGracefully());
7375
}
76+
77+
public static EventGroupConfig build(String name, int availableCores, boolean useIoUring) {
78+
if (useIoUring) {
79+
if (!IOUring.isAvailable()) {
80+
throw new IllegalStateException("io_uring not available due to: " + IOUring.unavailabilityCause());
81+
}
82+
return new EventGroupConfig(name, new IOUringEventLoopGroup(1), new IOUringEventLoopGroup(availableCores), IOUringServerSocketChannel.class);
83+
}
84+
if (Epoll.isAvailable()) {
85+
return new EventGroupConfig(name, new EpollEventLoopGroup(1), new EpollEventLoopGroup(availableCores), EpollServerSocketChannel.class);
86+
}
87+
if (KQueue.isAvailable()) {
88+
return new EventGroupConfig(name, new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(availableCores), KQueueServerSocketChannel.class);
89+
}
90+
return new EventGroupConfig(name, new NioEventLoopGroup(1), new NioEventLoopGroup(availableCores), NioServerSocketChannel.class);
91+
}
7492
}
7593

7694
private final Configuration config;
@@ -127,8 +145,10 @@ public KafkaProxy startup() {
127145

128146
var availableCores = Runtime.getRuntime().availableProcessors();
129147

130-
this.managementEventGroup = buildNettyEventGroups("management", availableCores, config.isUseIoUring());
131-
this.serverEventGroup = buildNettyEventGroups("server", availableCores, config.isUseIoUring());
148+
this.managementEventGroup = EventGroupConfig.build("management", availableCores, config.isUseIoUring());
149+
this.serverEventGroup = EventGroupConfig.build("server", availableCores, config.isUseIoUring());
150+
151+
enableNettyMetrics(managementEventGroup, serverEventGroup);
132152

133153
var managementFuture = maybeStartManagementListener(managementEventGroup, meterRegistries);
134154

@@ -161,6 +181,13 @@ public KafkaProxy startup() {
161181
}
162182
}
163183

184+
private void enableNettyMetrics(final EventGroupConfig... eventGroups) {
185+
Metrics.bindNettyAllocatorMetrics(ByteBufAllocator.DEFAULT);
186+
for (final var group : eventGroups) {
187+
Metrics.bindNettyEventExecutorMetrics(group.bossGroup(), group.workerGroup());
188+
}
189+
}
190+
164191
private void initVersionInfoMetric() {
165192
Metrics.versionInfoMetric(VersionInfo.VERSION_INFO);
166193
}
@@ -186,37 +213,6 @@ private ServerBootstrap buildServerBootstrap(EventGroupConfig virtualHostEventGr
186213
.childOption(ChannelOption.TCP_NODELAY, true);
187214
}
188215

189-
private EventGroupConfig buildNettyEventGroups(String name, int availableCores, boolean useIoUring) {
190-
final Class<? extends ServerChannel> channelClass;
191-
final EventLoopGroup bossGroup;
192-
final EventLoopGroup workerGroup;
193-
194-
if (useIoUring) {
195-
if (!IOUring.isAvailable()) {
196-
throw new IllegalStateException("io_uring not available due to: " + IOUring.unavailabilityCause());
197-
}
198-
bossGroup = new IOUringEventLoopGroup(1);
199-
workerGroup = new IOUringEventLoopGroup(availableCores);
200-
channelClass = IOUringServerSocketChannel.class;
201-
}
202-
else if (Epoll.isAvailable()) {
203-
bossGroup = new EpollEventLoopGroup(1);
204-
workerGroup = new EpollEventLoopGroup(availableCores);
205-
channelClass = EpollServerSocketChannel.class;
206-
}
207-
else if (KQueue.isAvailable()) {
208-
bossGroup = new KQueueEventLoopGroup(1);
209-
workerGroup = new KQueueEventLoopGroup(availableCores);
210-
channelClass = KQueueServerSocketChannel.class;
211-
}
212-
else {
213-
bossGroup = new NioEventLoopGroup(1);
214-
workerGroup = new NioEventLoopGroup(availableCores);
215-
channelClass = NioServerSocketChannel.class;
216-
}
217-
return new EventGroupConfig(name, bossGroup, workerGroup, channelClass);
218-
}
219-
220216
private CompletableFuture<Void> maybeStartManagementListener(EventGroupConfig eventGroupConfig, MeterRegistries meterRegistries) {
221217
return Optional.ofNullable(managementConfiguration)
222218
.map(mc -> {

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/util/Metrics.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
import io.micrometer.core.instrument.Tag;
2121
import io.micrometer.core.instrument.Timer;
2222
import io.micrometer.core.instrument.binder.BaseUnits;
23+
import io.micrometer.core.instrument.binder.netty4.NettyAllocatorMetrics;
24+
import io.micrometer.core.instrument.binder.netty4.NettyEventExecutorMetrics;
25+
import io.netty.buffer.ByteBufAllocator;
26+
import io.netty.buffer.ByteBufAllocatorMetricProvider;
27+
import io.netty.channel.EventLoopGroup;
2328

2429
import io.kroxylicious.proxy.VersionInfo;
2530

@@ -288,4 +293,16 @@ public static void clear() {
288293
CLIENT_TO_PROXY_CONNECTION_CACHE.clear();
289294
PROXY_TO_SERVER_CONNECTION_CACHE.clear();
290295
}
296+
297+
public static void bindNettyEventExecutorMetrics(final EventLoopGroup... eventLoopGroups) {
298+
for (final var eventLoopGroup : eventLoopGroups) {
299+
new NettyEventExecutorMetrics(eventLoopGroup).bindTo(globalRegistry);
300+
}
301+
}
302+
303+
public static void bindNettyAllocatorMetrics(final ByteBufAllocator alloc) {
304+
if (alloc instanceof ByteBufAllocatorMetricProvider byteBufAllocatorMetricProvider) {
305+
new NettyAllocatorMetrics(byteBufAllocatorMetricProvider).bindTo(globalRegistry);
306+
}
307+
}
291308
}

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/KafkaProxyTest.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,27 @@
1212
import java.net.http.HttpResponse;
1313
import java.util.stream.Stream;
1414

15+
import org.junit.jupiter.api.DisplayNameGeneration;
16+
import org.junit.jupiter.api.DisplayNameGenerator;
17+
import org.junit.jupiter.api.Nested;
1518
import org.junit.jupiter.api.Test;
1619
import org.junit.jupiter.params.ParameterizedTest;
1720
import org.junit.jupiter.params.provider.Arguments;
1821
import org.junit.jupiter.params.provider.MethodSource;
1922
import org.mockito.Mockito;
2023

24+
import io.netty.channel.epoll.Epoll;
25+
import io.netty.channel.epoll.EpollEventLoopGroup;
26+
import io.netty.channel.epoll.EpollServerSocketChannel;
27+
import io.netty.channel.kqueue.KQueue;
28+
import io.netty.channel.kqueue.KQueueEventLoopGroup;
29+
import io.netty.channel.kqueue.KQueueServerSocketChannel;
30+
import io.netty.channel.nio.NioEventLoopGroup;
31+
import io.netty.channel.socket.nio.NioServerSocketChannel;
32+
import io.netty.incubator.channel.uring.IOUring;
33+
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
34+
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
35+
2136
import io.kroxylicious.proxy.config.ConfigParser;
2237
import io.kroxylicious.proxy.config.Configuration;
2338
import io.kroxylicious.proxy.config.IllegalConfigurationException;
@@ -188,4 +203,76 @@ void supportsLivezEndpoint() throws Exception {
188203
assertThat(response.statusCode()).isEqualTo(200);
189204
}
190205
}
206+
207+
@Nested
208+
@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
209+
class EventGroupConfigTest {
210+
211+
@Test
212+
void build_whenIoUringIsConfiguredToBeUsedAndAvailable_shouldUseIoUring() {
213+
// the constructor is mocked since native classes used in actual constructors can be unavailable based on the test infra
214+
try (var mockIOUring = Mockito.mockStatic(IOUring.class);
215+
var mockGroupConstructor = Mockito.mockConstruction(IOUringEventLoopGroup.class)) {
216+
mockIOUring.when(IOUring::isAvailable).thenReturn(true);
217+
final var config = KafkaProxy.EventGroupConfig.build("test", 1, true);
218+
assertThat(config.bossGroup()).isInstanceOf(IOUringEventLoopGroup.class);
219+
assertThat(config.workerGroup()).isInstanceOf(IOUringEventLoopGroup.class);
220+
assertThat(config.clazz()).isEqualTo(IOUringServerSocketChannel.class);
221+
assertThat(mockGroupConstructor.constructed()).hasSize(2);
222+
}
223+
}
224+
225+
@Test
226+
void build_whenIoUringIsConfiguredToBeUsedAndNotAvailable_shouldThrowException() {
227+
try (var mockIOUring = Mockito.mockStatic(IOUring.class)) {
228+
mockIOUring.when(IOUring::isAvailable).thenReturn(false);
229+
// noinspection ResultOfMethodCallIgnored
230+
mockIOUring.when(IOUring::unavailabilityCause).thenReturn(new Throwable());
231+
assertThatThrownBy(() -> KafkaProxy.EventGroupConfig.build("test", 1, true)).isInstanceOf(IllegalStateException.class);
232+
}
233+
}
234+
235+
@Test
236+
void build_whenEpollIsAvailable_shouldUseEpoll() {
237+
try (var mockEpoll = Mockito.mockStatic(Epoll.class);
238+
var mockGroupConstructor = Mockito.mockConstruction(EpollEventLoopGroup.class)) {
239+
mockEpoll.when(Epoll::isAvailable).thenReturn(true);
240+
final var config = KafkaProxy.EventGroupConfig.build("test", 1, false);
241+
assertThat(config.bossGroup()).isInstanceOf(EpollEventLoopGroup.class);
242+
assertThat(config.workerGroup()).isInstanceOf(EpollEventLoopGroup.class);
243+
assertThat(config.clazz()).isEqualTo(EpollServerSocketChannel.class);
244+
assertThat(mockGroupConstructor.constructed()).hasSize(2);
245+
}
246+
}
247+
248+
@Test
249+
void build_whenEpollIsUnavailableAndKQueueIsAvailable_shouldUseKQueue() {
250+
try (var mockEpoll = Mockito.mockStatic(Epoll.class);
251+
var mockKQueue = Mockito.mockStatic(KQueue.class);
252+
var mockGroupConstructor = Mockito.mockConstruction(KQueueEventLoopGroup.class)) {
253+
mockEpoll.when(Epoll::isAvailable).thenReturn(false);
254+
mockKQueue.when(KQueue::isAvailable).thenReturn(true);
255+
final var config = KafkaProxy.EventGroupConfig.build("test", 1, false);
256+
assertThat(config.bossGroup()).isInstanceOf(KQueueEventLoopGroup.class);
257+
assertThat(config.workerGroup()).isInstanceOf(KQueueEventLoopGroup.class);
258+
assertThat(config.clazz()).isEqualTo(KQueueServerSocketChannel.class);
259+
assertThat(mockGroupConstructor.constructed()).hasSize(2);
260+
}
261+
}
262+
263+
@Test
264+
void build_whenEpollAndKqueueAreUnavailable_shouldFallbackToNio() {
265+
try (var mockEpoll = Mockito.mockStatic(Epoll.class);
266+
var mockKQueue = Mockito.mockStatic(KQueue.class);
267+
var mockGroupConstructor = Mockito.mockConstruction(NioEventLoopGroup.class)) {
268+
mockEpoll.when(Epoll::isAvailable).thenReturn(false);
269+
mockKQueue.when(KQueue::isAvailable).thenReturn(false);
270+
final var config = KafkaProxy.EventGroupConfig.build("test", 1, false);
271+
assertThat(config.bossGroup()).isInstanceOf(NioEventLoopGroup.class);
272+
assertThat(config.workerGroup()).isInstanceOf(NioEventLoopGroup.class);
273+
assertThat(config.clazz()).isEqualTo(NioServerSocketChannel.class);
274+
assertThat(mockGroupConstructor.constructed()).hasSize(2);
275+
}
276+
}
277+
}
191278
}

0 commit comments

Comments
 (0)