Skip to content

Commit b38df6c

Browse files
authored
core: Reduce per-stream idle memory by 20%
Metadata was accidentally being retained after the start of the call. That can be an overwhelming percentage of memory for an idle RPC; don't do that. The other changes are considerably smaller, but I happened to notice them and the changes are straight-forward without magic numbers (e.g., there's many arrays that could be tuned). The regular interop server uses 4600 bytes per full duplex stream while idle, but much of that is Census recorded events hanging around. Keeping the Census integration but removing the Census impl (so a noop is used) drops that to 3000 bytes. This change brings that down to ~2450 bytes (which is still including stuff from TestServiceImpl). But there's very little Metadata in the interop tests, so absolute real-life savings would be much higher (but relative real-life savings may be lower, because the application will often have more state). The measurements were captured using a modified timeout_on_sleeping_server client that had 100,000 concurrent full duplex calls on one connection.
1 parent 13b4b97 commit b38df6c

3 files changed

Lines changed: 59 additions & 42 deletions

File tree

core/src/main/java/io/grpc/internal/MessageDeframer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,12 @@ private boolean readRequiredBytes() {
314314
int totalBytesRead = 0;
315315
int deflatedBytesRead = 0;
316316
try {
317+
// Avoid allocating nextFrame when idle
318+
if (requiredLength > 0 && fullStreamDecompressor == null
319+
&& unprocessed.readableBytes() == 0) {
320+
return false;
321+
}
322+
317323
if (nextFrame == null) {
318324
nextFrame = new CompositeReadableBuffer();
319325
}

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
151151
InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
152152
// Fork from the passed in context so that it does not propagate cancellation, it only
153153
// inherits values.
154-
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
154+
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext")
155+
.fork()
156+
.withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
155157
this.decompressorRegistry = builder.decompressorRegistry;
156158
this.compressorRegistry = builder.compressorRegistry;
157159
this.transportFilters = Collections.unmodifiableList(
@@ -622,19 +624,7 @@ private void runInternal() {
622624
// An extremely short deadline may expire before stream.setListener(jumpListener).
623625
// This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300
624626
// Delay of setting cancellationListener to context will fix the issue.
625-
final class ServerStreamCancellationListener implements Context.CancellationListener {
626-
@Override
627-
public void cancelled(Context context) {
628-
Status status = statusFromCancelled(context);
629-
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
630-
// This should rarely get run, since the client will likely cancel the stream
631-
// before the timeout is reached.
632-
stream.cancel(status);
633-
}
634-
}
635-
}
636-
637-
context.addListener(new ServerStreamCancellationListener(), directExecutor());
627+
context.addListener(new ServerStreamCancellationListener(stream), directExecutor());
638628
}
639629
}
640630

@@ -648,8 +638,7 @@ private Context.CancellableContext createContext(
648638

649639
Context baseContext =
650640
statsTraceCtx
651-
.serverFilterContext(rootContext)
652-
.withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
641+
.serverFilterContext(rootContext);
653642

654643
if (timeoutNanos == null) {
655644
return baseContext.withCancellation();
@@ -707,6 +696,31 @@ private <WReqT, WRespT> ServerStreamListener startWrappedCall(
707696
}
708697
}
709698

699+
/**
700+
* Propagates context cancellation to the ServerStream.
701+
*
702+
* <p>This is outside of HandleServerCall because that class holds Metadata and other state needed
703+
* only when starting the RPC. The cancellation listener will live for the life of the call, so we
704+
* avoid that useless state being retained.
705+
*/
706+
static final class ServerStreamCancellationListener implements Context.CancellationListener {
707+
private final ServerStream stream;
708+
709+
ServerStreamCancellationListener(ServerStream stream) {
710+
this.stream = checkNotNull(stream, "stream");
711+
}
712+
713+
@Override
714+
public void cancelled(Context context) {
715+
Status status = statusFromCancelled(context);
716+
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
717+
// This should rarely get run, since the client will likely cancel the stream
718+
// before the timeout is reached.
719+
stream.cancel(status);
720+
}
721+
}
722+
}
723+
710724
@Override
711725
public InternalLogId getLogId() {
712726
return logId;

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,10 @@
4545
import java.util.ArrayDeque;
4646
import java.util.Arrays;
4747
import java.util.HashMap;
48-
import java.util.HashSet;
4948
import java.util.List;
5049
import java.util.Map;
5150
import java.util.Queue;
5251
import java.util.Random;
53-
import java.util.Set;
5452
import java.util.concurrent.Future;
5553
import java.util.concurrent.ScheduledExecutorService;
5654
import java.util.concurrent.Semaphore;
@@ -511,27 +509,30 @@ public static List<ServerInterceptor> interceptors() {
511509
}
512510

513511
/**
514-
* Echo the request headers from a client into response headers and trailers. Useful for
512+
* Echo a request header from a client into response headers and trailers. Useful for
515513
* testing end-to-end metadata propagation.
516514
*/
517-
private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
518-
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
515+
private static <T> ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<T> key) {
519516
return new ServerInterceptor() {
520517
@Override
521518
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
522519
ServerCall<ReqT, RespT> call,
523-
final Metadata requestHeaders,
520+
Metadata requestHeaders,
524521
ServerCallHandler<ReqT, RespT> next) {
522+
if (!requestHeaders.containsKey(key)) {
523+
return next.startCall(call, requestHeaders);
524+
}
525+
T value = requestHeaders.get(key);
525526
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
526527
@Override
527528
public void sendHeaders(Metadata responseHeaders) {
528-
responseHeaders.merge(requestHeaders, keySet);
529+
responseHeaders.put(key, value);
529530
super.sendHeaders(responseHeaders);
530531
}
531532

532533
@Override
533534
public void close(Status status, Metadata trailers) {
534-
trailers.merge(requestHeaders, keySet);
535+
trailers.put(key, value);
535536
super.close(status, trailers);
536537
}
537538
}, requestHeaders);
@@ -540,52 +541,48 @@ public void close(Status status, Metadata trailers) {
540541
}
541542

542543
/**
543-
* Echoes request headers with the specified key(s) from a client into response headers only.
544+
* Echoes request headers with the specified key from a client into response headers only.
544545
*/
545-
private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
546-
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
546+
private static <T> ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<T> key) {
547547
return new ServerInterceptor() {
548548
@Override
549549
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
550550
ServerCall<ReqT, RespT> call,
551551
final Metadata requestHeaders,
552552
ServerCallHandler<ReqT, RespT> next) {
553+
if (!requestHeaders.containsKey(key)) {
554+
return next.startCall(call, requestHeaders);
555+
}
556+
T value = requestHeaders.get(key);
553557
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
554558
@Override
555559
public void sendHeaders(Metadata responseHeaders) {
556-
responseHeaders.merge(requestHeaders, keySet);
560+
responseHeaders.put(key, value);
557561
super.sendHeaders(responseHeaders);
558562
}
559-
560-
@Override
561-
public void close(Status status, Metadata trailers) {
562-
super.close(status, trailers);
563-
}
564563
}, requestHeaders);
565564
}
566565
};
567566
}
568567

569568
/**
570-
* Echoes request headers with the specified key(s) from a client into response trailers only.
569+
* Echoes request headers with the specified key from a client into response trailers only.
571570
*/
572-
private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
573-
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
571+
private static <T> ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<T> key) {
574572
return new ServerInterceptor() {
575573
@Override
576574
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
577575
ServerCall<ReqT, RespT> call,
578576
final Metadata requestHeaders,
579577
ServerCallHandler<ReqT, RespT> next) {
578+
if (!requestHeaders.containsKey(key)) {
579+
return next.startCall(call, requestHeaders);
580+
}
581+
T value = requestHeaders.get(key);
580582
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
581-
@Override
582-
public void sendHeaders(Metadata responseHeaders) {
583-
super.sendHeaders(responseHeaders);
584-
}
585-
586583
@Override
587584
public void close(Status status, Metadata trailers) {
588-
trailers.merge(requestHeaders, keySet);
585+
trailers.put(key, value);
589586
super.close(status, trailers);
590587
}
591588
}, requestHeaders);

0 commit comments

Comments
 (0)