Skip to content

Commit 561b86c

Browse files
committed
Add setReadbufferSize API to CronetChannelBuilder
By default, CronetClientStreams would use a 4KB buffer to read data from Cronet. This can be inefficient especially if the amount of data being read is huge (~MBs) as each read callback operation incur overhead from Cronet itself (e.g. Context switch, JNI calls). The alternative would be to immediately bump the default to a bigger number but that would incur an increase in memory usage. If the API is not called then the default of 4KB is used.
1 parent 9193701 commit 561b86c

6 files changed

Lines changed: 82 additions & 17 deletions

File tree

cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public static CronetChannelBuilder forAddress(String name, int port) {
108108
private int trafficStatsTag;
109109
private boolean trafficStatsUidSet;
110110
private int trafficStatsUid;
111+
private int readBufferSize = 4 * 1024;
111112
private Network network;
112113

113114
private CronetChannelBuilder(String host, int port, CronetEngine cronetEngine) {
@@ -194,6 +195,17 @@ CronetChannelBuilder setTrafficStatsUid(int uid) {
194195
return this;
195196
}
196197

198+
/**
199+
* Sets the read buffer size which the GRPC layer will use to read data from Cronet. Higher buffer
200+
* size leads to less overhead but more memory consumption. The current default value is 4KB.
201+
* @param size Buffer size in bytes.
202+
* @return the builder to facilitate chaining.
203+
*/
204+
CronetChannelBuilder setReadBufferSize(int size) {
205+
readBufferSize = size;
206+
return this;
207+
}
208+
197209
/** Sets the network ID to use for this channel traffic. */
198210
@CanIgnoreReturnValue
199211
CronetChannelBuilder bindToNetwork(@Nullable Network network) {
@@ -233,7 +245,8 @@ ClientTransportFactory buildTransportFactory() {
233245
alwaysUsePut,
234246
transportTracerFactory.create(),
235247
useGetForSafeMethods,
236-
usePutForIdempotentMethods);
248+
usePutForIdempotentMethods,
249+
readBufferSize);
237250
}
238251

239252
@VisibleForTesting
@@ -247,6 +260,7 @@ static class CronetTransportFactory implements ClientTransportFactory {
247260
private final boolean usingSharedScheduler;
248261
private final boolean useGetForSafeMethods;
249262
private final boolean usePutForIdempotentMethods;
263+
private final int readBufferSize;
250264

251265
private CronetTransportFactory(
252266
StreamBuilderFactory streamFactory,
@@ -256,7 +270,8 @@ private CronetTransportFactory(
256270
boolean alwaysUsePut,
257271
TransportTracer transportTracer,
258272
boolean useGetForSafeMethods,
259-
boolean usePutForIdempotentMethods) {
273+
boolean usePutForIdempotentMethods,
274+
int readBufferSize) {
260275
usingSharedScheduler = timeoutService == null;
261276
this.timeoutService = usingSharedScheduler
262277
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService;
@@ -267,6 +282,7 @@ private CronetTransportFactory(
267282
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
268283
this.useGetForSafeMethods = useGetForSafeMethods;
269284
this.usePutForIdempotentMethods = usePutForIdempotentMethods;
285+
this.readBufferSize = readBufferSize;
270286
}
271287

272288
@Override
@@ -275,7 +291,7 @@ public ConnectionClientTransport newClientTransport(
275291
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
276292
return new CronetClientTransport(streamFactory, inetSocketAddr, options.getAuthority(),
277293
options.getUserAgent(), options.getEagAttributes(), executor, maxMessageSize,
278-
alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods);
294+
alwaysUsePut, transportTracer, useGetForSafeMethods, usePutForIdempotentMethods, readBufferSize);
279295
}
280296

281297
@Override

cronet/src/main/java/io/grpc/cronet/CronetClientStream.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
* Client stream for the cronet transport.
6060
*/
6161
class CronetClientStream extends AbstractClientStream {
62-
private static final int READ_BUFFER_CAPACITY = 4 * 1024;
6362
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
6463
private static final String LOG_TAG = "grpc-java-cronet";
6564

@@ -85,6 +84,8 @@ class CronetClientStream extends AbstractClientStream {
8584
private final Collection<Object> annotations;
8685
private final TransportState state;
8786
private final Sink sink = new Sink();
87+
@VisibleForTesting
88+
final int readBufferSize;
8889
private StreamBuilderFactory streamFactory;
8990

9091
CronetClientStream(
@@ -102,7 +103,8 @@ class CronetClientStream extends AbstractClientStream {
102103
CallOptions callOptions,
103104
TransportTracer transportTracer,
104105
boolean useGetForSafeMethods,
105-
boolean usePutForIdempotentMethods) {
106+
boolean usePutForIdempotentMethods,
107+
int readBufferSize) {
106108
super(
107109
new CronetWritableBufferAllocator(), statsTraceCtx, transportTracer, headers, callOptions,
108110
useGetForSafeMethods && method.isSafe());
@@ -120,6 +122,7 @@ class CronetClientStream extends AbstractClientStream {
120122
this.annotations = callOptions.getOption(CRONET_ANNOTATIONS_KEY);
121123
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, transportTracer,
122124
callOptions);
125+
this.readBufferSize = readBufferSize;
123126

124127
// Tests expect the "plain" deframer behavior, not MigratingDeframer
125128
// https://github.com/grpc/grpc-java/issues/7140
@@ -309,7 +312,7 @@ public void bytesRead(int processedBytes) {
309312
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
310313
Log.v(LOG_TAG, "BidirectionalStream.read");
311314
}
312-
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
315+
stream.read(ByteBuffer.allocateDirect(readBufferSize));
313316
}
314317
}
315318

@@ -429,7 +432,7 @@ public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInf
429432
Log.v(LOG_TAG, "BidirectionalStream.read");
430433
}
431434
reportHeaders(info.getAllHeadersAsList(), false);
432-
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
435+
stream.read(ByteBuffer.allocateDirect(readBufferSize));
433436
}
434437

435438
@Override

cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class CronetClientTransport implements ConnectionClientTransport {
6464
private Attributes attrs;
6565
private final boolean useGetForSafeMethods;
6666
private final boolean usePutForIdempotentMethods;
67+
private final int readBufferSize;
6768
private final StreamBuilderFactory streamFactory;
6869
// Indicates the transport is in go-away state: no new streams will be processed,
6970
// but existing streams may continue.
@@ -92,7 +93,8 @@ class CronetClientTransport implements ConnectionClientTransport {
9293
boolean alwaysUsePut,
9394
TransportTracer transportTracer,
9495
boolean useGetForSafeMethods,
95-
boolean usePutForIdempotentMethods) {
96+
boolean usePutForIdempotentMethods,
97+
int readBufferSize) {
9698
this.address = Preconditions.checkNotNull(address, "address");
9799
this.logId = InternalLogId.allocate(getClass(), address.toString());
98100
this.authority = authority;
@@ -108,6 +110,7 @@ class CronetClientTransport implements ConnectionClientTransport {
108110
.build();
109111
this.useGetForSafeMethods = useGetForSafeMethods;
110112
this.usePutForIdempotentMethods = usePutForIdempotentMethods;
113+
this.readBufferSize = readBufferSize;
111114
}
112115

113116
@Override
@@ -132,7 +135,7 @@ class StartCallback implements Runnable {
132135
final CronetClientStream clientStream = new CronetClientStream(
133136
url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize,
134137
alwaysUsePut, method, statsTraceCtx, callOptions, transportTracer, useGetForSafeMethods,
135-
usePutForIdempotentMethods);
138+
usePutForIdempotentMethods, readBufferSize);
136139

137140
@Override
138141
public void run() {

cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
2020
import static org.junit.Assert.assertFalse;
21+
import static org.junit.Assert.assertEquals;
2122
import static org.junit.Assert.assertSame;
2223
import static org.junit.Assert.assertTrue;
2324
import static org.mockito.Mockito.mock;
@@ -92,6 +93,40 @@ public void alwaysUsePut_defaultsToFalse() throws Exception {
9293
assertFalse(stream.idempotent);
9394
}
9495

96+
@Test
97+
public void channelBuilderReadBufferSize_defaultsTo4Kb() throws Exception {
98+
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
99+
CronetTransportFactory transportFactory =
100+
(CronetTransportFactory) builder.buildTransportFactory();
101+
CronetClientTransport transport =
102+
(CronetClientTransport)
103+
transportFactory.newClientTransport(
104+
new InetSocketAddress("localhost", 443),
105+
new ClientTransportOptions(),
106+
channelLogger);
107+
CronetClientStream stream = transport.newStream(
108+
method, new Metadata(), CallOptions.DEFAULT, tracers);
109+
110+
assertEquals(4 * 1024, stream.readBufferSize);
111+
}
112+
113+
@Test
114+
public void channelBuilderReadBufferSize_changeReflected() throws Exception {
115+
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
116+
CronetTransportFactory transportFactory =
117+
(CronetTransportFactory) builder.setReadBufferSize(1024 * 1024).buildTransportFactory();
118+
CronetClientTransport transport =
119+
(CronetClientTransport)
120+
transportFactory.newClientTransport(
121+
new InetSocketAddress("localhost", 443),
122+
new ClientTransportOptions(),
123+
channelLogger);
124+
CronetClientStream stream = transport.newStream(
125+
method, new Metadata(), CallOptions.DEFAULT, tracers);
126+
127+
assertEquals(1024 * 1024, stream.readBufferSize);
128+
}
129+
95130
@Test
96131
public void scheduledExecutorService_default() {
97132
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);

cronet/src/test/java/io/grpc/cronet/CronetClientStreamTest.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public void setUp() {
127127
CallOptions.DEFAULT,
128128
transportTracer,
129129
false,
130-
false);
130+
false,
131+
4 * 1024);
131132
callback.setStream(clientStream);
132133
when(factory.newBidirectionalStreamBuilder(
133134
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
@@ -591,7 +592,8 @@ public void addCronetRequestAnnotation_deprecated() {
591592
CallOptions.DEFAULT.withOption(CronetClientStream.CRONET_ANNOTATION_KEY, annotation),
592593
transportTracer,
593594
false,
594-
false);
595+
false,
596+
4 * 1024);
595597
callback.setStream(stream);
596598
when(factory.newBidirectionalStreamBuilder(
597599
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
@@ -626,7 +628,8 @@ public void withAnnotation() {
626628
callOptions,
627629
transportTracer,
628630
false,
629-
false);
631+
false,
632+
4 * 1024);
630633
callback.setStream(stream);
631634
when(factory.newBidirectionalStreamBuilder(
632635
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
@@ -666,7 +669,8 @@ public void getUnaryRequest() {
666669
CallOptions.DEFAULT,
667670
transportTracer,
668671
true,
669-
false);
672+
false,
673+
4 * 1024);
670674
callback.setStream(stream);
671675
BidirectionalStream.Builder getBuilder =
672676
mock(BidirectionalStream.Builder.class);
@@ -723,7 +727,8 @@ public void idempotentMethod_usesHttpPut() {
723727
CallOptions.DEFAULT,
724728
transportTracer,
725729
true,
726-
true);
730+
true,
731+
4 * 1024);
727732
callback.setStream(stream);
728733
BidirectionalStream.Builder builder =
729734
mock(BidirectionalStream.Builder.class);
@@ -755,7 +760,8 @@ public void alwaysUsePutOption_usesHttpPut() {
755760
CallOptions.DEFAULT,
756761
transportTracer,
757762
true,
758-
true);
763+
true,
764+
4 * 1024);
759765
callback.setStream(stream);
760766
BidirectionalStream.Builder builder =
761767
mock(BidirectionalStream.Builder.class);
@@ -795,7 +801,8 @@ public void reservedHeadersStripped() {
795801
CallOptions.DEFAULT,
796802
transportTracer,
797803
false,
798-
false);
804+
false,
805+
4 * 1024);
799806
callback.setStream(stream);
800807
BidirectionalStream.Builder builder =
801808
mock(BidirectionalStream.Builder.class);

cronet/src/test/java/io/grpc/cronet/CronetClientTransportTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ public void setUp() {
8989
false,
9090
TransportTracer.getDefaultFactory().create(),
9191
false,
92-
false);
92+
false,
93+
4 * 1024);
9394
Runnable callback = transport.start(clientTransportListener);
9495
assertNotNull(callback);
9596
callback.run();

0 commit comments

Comments
 (0)