Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 58edd89

Browse files
author
Enrico Olivelli
committed
[improvement] Used more Pooled ByteBufs
1 parent 8ea7981 commit 58edd89

6 files changed

Lines changed: 28 additions & 17 deletions

File tree

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ protected abstract void channelPrepare(ChannelHandlerContext ctx,
584584
protected abstract void
585585
handleCreatePartitions(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);
586586

587-
public static class KafkaHeaderAndRequest implements Closeable {
587+
public static class KafkaHeaderAndRequest {
588588

589589
private static final String DEFAULT_CLIENT_HOST = "";
590590

@@ -593,6 +593,8 @@ public static class KafkaHeaderAndRequest implements Closeable {
593593
private final ByteBuf buffer;
594594
private final SocketAddress remoteAddress;
595595

596+
private final AtomicBoolean released = new AtomicBoolean();
597+
596598
public KafkaHeaderAndRequest(RequestHeader header,
597599
AbstractRequest request,
598600
ByteBuf buffer,
@@ -604,6 +606,9 @@ public KafkaHeaderAndRequest(RequestHeader header,
604606
}
605607

606608
public ByteBuf getBuffer() {
609+
if (released.get()) {
610+
throw new IllegalStateException("Already released");
611+
}
607612
return buffer;
608613
}
609614

@@ -637,9 +642,17 @@ public String toString() {
637642
this.header, this.request, this.remoteAddress);
638643
}
639644

640-
@Override
645+
public void bufferReleased() {
646+
if (!released.compareAndSet(false, true)) {
647+
throw new IllegalStateException("Already released");
648+
}
649+
}
650+
641651
public void close() {
642-
this.buffer.release();
652+
if (!released.compareAndSet(false, true)) {
653+
return;
654+
}
655+
ReferenceCountUtil.safeRelease(this.buffer);
643656
}
644657
}
645658

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;
1515

16+
import io.netty.buffer.ByteBuf;
1617
import java.nio.ByteBuffer;
1718
import java.util.concurrent.CompletableFuture;
1819
import java.util.function.Consumer;
@@ -41,7 +42,7 @@ public PendingRequest(final ApiKeys apiKeys,
4142
this.responseConsumerHandler = responseConsumerHandler;
4243
}
4344

44-
public ByteBuffer serialize() {
45+
public ByteBuf serialize() {
4546
return KopResponseUtils.serializeRequest(requestHeader, request);
4647
}
4748

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public TransactionMarkerChannelHandler(
7373
private void enqueueRequest(ChannelHandlerContext channel, PendingRequest pendingRequest) {
7474
final long correlationId = pendingRequest.getCorrelationId();
7575
pendingRequestMap.put(correlationId, pendingRequest);
76-
channel.writeAndFlush(Unpooled.wrappedBuffer(pendingRequest.serialize())).addListener(writeFuture -> {
76+
channel.writeAndFlush(pendingRequest.serialize()).addListener(writeFuture -> {
7777
if (!writeFuture.isSuccess()) {
7878
pendingRequest.completeExceptionally(writeFuture.cause());
7979
pendingRequestMap.remove(correlationId);

kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ private static ByteBuf serializeWithHeader(AbstractResponse response, ResponseHe
4646
return serialize(header.data(), header.headerVersion(), response.data(), version);
4747
}
4848

49-
public static ByteBuffer serializeRequest(RequestHeader requestHeader, AbstractRequest request) {
50-
return RequestUtils.serialize(requestHeader.data(), requestHeader.headerVersion(),
49+
public static ByteBuf serializeRequest(RequestHeader requestHeader, AbstractRequest request) {
50+
return serialize(requestHeader.data(), requestHeader.headerVersion(),
5151
request.data(), request.version());
5252
}
5353

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,13 @@ public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractReq
115115
RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233);
116116

117117

118-
ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(mockHeader, request);
118+
ByteBuf byteBuf = KopResponseUtils.serializeRequest(mockHeader, request);
119119

120-
ByteBuf byteBuf = Unpooled.copiedBuffer(serializedRequest);
121-
122-
RequestHeader header = RequestHeader.parse(serializedRequest);
120+
RequestHeader header = RequestHeader.parse(byteBuf.nioBuffer());
123121

124122
ApiKeys apiKey = header.apiKey();
125123
short apiVersion = header.apiVersion();
126-
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, serializedRequest).request;
124+
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, byteBuf.nioBuffer()).request;
127125
return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress);
128126
}
129127
}

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,17 +188,16 @@ public void testByteBufToRequest() {
188188
correlationId);
189189

190190
// 1. serialize request into ByteBuf
191-
ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest);
192-
int size = serializedRequest.remaining();
193-
ByteBuf inputBuf = Unpooled.buffer(size);
194-
inputBuf.writeBytes(serializedRequest);
191+
ByteBuf serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest);
195192

196193
// 2. turn Bytebuf into KafkaHeaderAndRequest.
197-
KafkaHeaderAndRequest request = handler.byteBufToRequest(inputBuf);
194+
KafkaHeaderAndRequest request = handler.byteBufToRequest(serializedRequest, null);
198195

199196
// 3. verify byteBufToRequest works well.
200197
assertEquals(request.getHeader().data(), header.data());
201198
assertTrue(request.getRequest() instanceof ApiVersionsRequest);
199+
200+
request.close();
202201
}
203202

204203

0 commit comments

Comments
 (0)