Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 8e40ffe

Browse files
author
Enrico Olivelli
committed
[improvement] Used more Pooled ByteBufs
1 parent 42b8052 commit 8e40ffe

6 files changed

Lines changed: 27 additions & 19 deletions

File tree

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -593,10 +593,6 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
593593
protected abstract void
594594
handleCreatePartitions(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);
595595

596-
protected abstract void
597-
handleDescribeCluster(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);
598-
599-
600596
public static class KafkaHeaderAndRequest {
601597

602598
private static final String DEFAULT_CLIENT_HOST = "";
@@ -606,6 +602,8 @@ public static class KafkaHeaderAndRequest {
606602
private final ByteBuf buffer;
607603
private final SocketAddress remoteAddress;
608604

605+
private final AtomicBoolean released = new AtomicBoolean();
606+
609607
public KafkaHeaderAndRequest(RequestHeader header,
610608
AbstractRequest request,
611609
ByteBuf buffer,
@@ -617,6 +615,9 @@ public KafkaHeaderAndRequest(RequestHeader header,
617615
}
618616

619617
public ByteBuf getBuffer() {
618+
if (released.get()) {
619+
throw new IllegalStateException("Already released");
620+
}
620621
return buffer;
621622
}
622623

@@ -650,8 +651,17 @@ public String toString() {
650651
this.header, this.request, this.remoteAddress);
651652
}
652653

654+
public void bufferReleased() {
655+
if (!released.compareAndSet(false, true)) {
656+
throw new IllegalStateException("Already released");
657+
}
658+
}
659+
653660
public void close() {
654-
ReferenceCountUtil.safeRelease(this.buffer);
661+
if (!released.compareAndSet(false, true)) {
662+
return;
663+
}
664+
ReferenceCountUtil.safeRelease(this.buffer);
655665
}
656666
}
657667

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/PendingRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;
1515

16+
import io.netty.buffer.ByteBuf;
1617
import java.nio.ByteBuffer;
1718
import java.util.concurrent.CompletableFuture;
1819
import java.util.function.Consumer;
@@ -41,7 +42,7 @@ public PendingRequest(final ApiKeys apiKeys,
4142
this.responseConsumerHandler = responseConsumerHandler;
4243
}
4344

44-
public ByteBuffer serialize() {
45+
public ByteBuf serialize() {
4546
return KopResponseUtils.serializeRequest(requestHeader, request);
4647
}
4748

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public TransactionMarkerChannelHandler(
7373
private void enqueueRequest(ChannelHandlerContext channel, PendingRequest pendingRequest) {
7474
final long correlationId = pendingRequest.getCorrelationId();
7575
pendingRequestMap.put(correlationId, pendingRequest);
76-
channel.writeAndFlush(Unpooled.wrappedBuffer(pendingRequest.serialize())).addListener(writeFuture -> {
76+
channel.writeAndFlush(pendingRequest.serialize()).addListener(writeFuture -> {
7777
if (!writeFuture.isSuccess()) {
7878
pendingRequest.completeExceptionally(writeFuture.cause());
7979
pendingRequestMap.remove(correlationId);

kafka-impl/src/main/java/org/apache/kafka/common/requests/KopResponseUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ private static ByteBuf serializeWithHeader(AbstractResponse response, ResponseHe
4646
return serialize(header.data(), header.headerVersion(), response.data(), version);
4747
}
4848

49-
public static ByteBuffer serializeRequest(RequestHeader requestHeader, AbstractRequest request) {
50-
return RequestUtils.serialize(requestHeader.data(), requestHeader.headerVersion(),
49+
public static ByteBuf serializeRequest(RequestHeader requestHeader, AbstractRequest request) {
50+
return serialize(requestHeader.data(), requestHeader.headerVersion(),
5151
request.data(), request.version());
5252
}
5353

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaCommonTestUtils.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,13 @@ public static KafkaCommandDecoder.KafkaHeaderAndRequest buildRequest(AbstractReq
115115
RequestHeader mockHeader = new RequestHeader(builder.apiKey(), request.version(), "dummy", 1233);
116116

117117

118-
ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(mockHeader, request);
118+
ByteBuf byteBuf = KopResponseUtils.serializeRequest(mockHeader, request);
119119

120-
ByteBuf byteBuf = Unpooled.copiedBuffer(serializedRequest);
121-
122-
RequestHeader header = RequestHeader.parse(serializedRequest);
120+
RequestHeader header = RequestHeader.parse(byteBuf.nioBuffer());
123121

124122
ApiKeys apiKey = header.apiKey();
125123
short apiVersion = header.apiVersion();
126-
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, serializedRequest).request;
124+
AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, byteBuf.nioBuffer()).request;
127125
return new KafkaCommandDecoder.KafkaHeaderAndRequest(header, body, byteBuf, serviceAddress);
128126
}
129127
}

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,17 +188,16 @@ public void testByteBufToRequest() {
188188
correlationId);
189189

190190
// 1. serialize request into ByteBuf
191-
ByteBuffer serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest);
192-
int size = serializedRequest.remaining();
193-
ByteBuf inputBuf = Unpooled.buffer(size);
194-
inputBuf.writeBytes(serializedRequest);
191+
ByteBuf serializedRequest = KopResponseUtils.serializeRequest(header, apiVersionsRequest);
195192

196193
// 2. turn Bytebuf into KafkaHeaderAndRequest.
197-
KafkaHeaderAndRequest request = handler.byteBufToRequest(inputBuf, null);
194+
KafkaHeaderAndRequest request = handler.byteBufToRequest(serializedRequest, null);
198195

199196
// 3. verify byteBufToRequest works well.
200197
assertEquals(request.getHeader().data(), header.data());
201198
assertTrue(request.getRequest() instanceof ApiVersionsRequest);
199+
200+
request.close();
202201
}
203202

204203

0 commit comments

Comments
 (0)