Skip to content

Commit 951018b

Browse files
authored
Merge branch 'grpc:master' into master
2 parents 8a270c0 + b38df6c commit 951018b

3 files changed

Lines changed: 59 additions & 42 deletions

File tree

core/src/main/java/io/grpc/internal/MessageDeframer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,12 @@ private boolean readRequiredBytes() {
314314
int totalBytesRead = 0;
315315
int deflatedBytesRead = 0;
316316
try {
317+
// Avoid allocating nextFrame when idle
318+
if (requiredLength > 0 && fullStreamDecompressor == null
319+
&& unprocessed.readableBytes() == 0) {
320+
return false;
321+
}
322+
317323
if (nextFrame == null) {
318324
nextFrame = new CompositeReadableBuffer();
319325
}

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
151151
InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
152152
// Fork from the passed in context so that it does not propagate cancellation, it only
153153
// inherits values.
154-
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
154+
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext")
155+
.fork()
156+
.withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
155157
this.decompressorRegistry = builder.decompressorRegistry;
156158
this.compressorRegistry = builder.compressorRegistry;
157159
this.transportFilters = Collections.unmodifiableList(
@@ -622,19 +624,7 @@ private void runInternal() {
622624
// An extremely short deadline may expire before stream.setListener(jumpListener).
623625
// This causes NPE as in issue: https://github.com/grpc/grpc-java/issues/6300
624626
// Delay of setting cancellationListener to context will fix the issue.
625-
final class ServerStreamCancellationListener implements Context.CancellationListener {
626-
@Override
627-
public void cancelled(Context context) {
628-
Status status = statusFromCancelled(context);
629-
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
630-
// This should rarely get run, since the client will likely cancel the stream
631-
// before the timeout is reached.
632-
stream.cancel(status);
633-
}
634-
}
635-
}
636-
637-
context.addListener(new ServerStreamCancellationListener(), directExecutor());
627+
context.addListener(new ServerStreamCancellationListener(stream), directExecutor());
638628
}
639629
}
640630

@@ -648,8 +638,7 @@ private Context.CancellableContext createContext(
648638

649639
Context baseContext =
650640
statsTraceCtx
651-
.serverFilterContext(rootContext)
652-
.withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
641+
.serverFilterContext(rootContext);
653642

654643
if (timeoutNanos == null) {
655644
return baseContext.withCancellation();
@@ -707,6 +696,31 @@ private <WReqT, WRespT> ServerStreamListener startWrappedCall(
707696
}
708697
}
709698

699+
/**
700+
* Propagates context cancellation to the ServerStream.
701+
*
702+
* <p>This is outside of HandleServerCall because that class holds Metadata and other state needed
703+
* only when starting the RPC. The cancellation listener will live for the life of the call, so we
704+
* avoid that useless state being retained.
705+
*/
706+
static final class ServerStreamCancellationListener implements Context.CancellationListener {
707+
private final ServerStream stream;
708+
709+
ServerStreamCancellationListener(ServerStream stream) {
710+
this.stream = checkNotNull(stream, "stream");
711+
}
712+
713+
@Override
714+
public void cancelled(Context context) {
715+
Status status = statusFromCancelled(context);
716+
if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
717+
// This should rarely get run, since the client will likely cancel the stream
718+
// before the timeout is reached.
719+
stream.cancel(status);
720+
}
721+
}
722+
}
723+
710724
@Override
711725
public InternalLogId getLogId() {
712726
return logId;

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,10 @@
4545
import java.util.ArrayDeque;
4646
import java.util.Arrays;
4747
import java.util.HashMap;
48-
import java.util.HashSet;
4948
import java.util.List;
5049
import java.util.Map;
5150
import java.util.Queue;
5251
import java.util.Random;
53-
import java.util.Set;
5452
import java.util.concurrent.Future;
5553
import java.util.concurrent.ScheduledExecutorService;
5654
import java.util.concurrent.Semaphore;
@@ -511,27 +509,30 @@ public static List<ServerInterceptor> interceptors() {
511509
}
512510

513511
/**
514-
* Echo the request headers from a client into response headers and trailers. Useful for
512+
* Echo a request header from a client into response headers and trailers. Useful for
515513
* testing end-to-end metadata propagation.
516514
*/
517-
private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
518-
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
515+
private static <T> ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<T> key) {
519516
return new ServerInterceptor() {
520517
@Override
521518
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
522519
ServerCall<ReqT, RespT> call,
523-
final Metadata requestHeaders,
520+
Metadata requestHeaders,
524521
ServerCallHandler<ReqT, RespT> next) {
522+
if (!requestHeaders.containsKey(key)) {
523+
return next.startCall(call, requestHeaders);
524+
}
525+
T value = requestHeaders.get(key);
525526
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
526527
@Override
527528
public void sendHeaders(Metadata responseHeaders) {
528-
responseHeaders.merge(requestHeaders, keySet);
529+
responseHeaders.put(key, value);
529530
super.sendHeaders(responseHeaders);
530531
}
531532

532533
@Override
533534
public void close(Status status, Metadata trailers) {
534-
trailers.merge(requestHeaders, keySet);
535+
trailers.put(key, value);
535536
super.close(status, trailers);
536537
}
537538
}, requestHeaders);
@@ -540,52 +541,48 @@ public void close(Status status, Metadata trailers) {
540541
}
541542

542543
/**
543-
* Echoes request headers with the specified key(s) from a client into response headers only.
544+
* Echoes request headers with the specified key from a client into response headers only.
544545
*/
545-
private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
546-
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
546+
private static <T> ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<T> key) {
547547
return new ServerInterceptor() {
548548
@Override
549549
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
550550
ServerCall<ReqT, RespT> call,
551551
final Metadata requestHeaders,
552552
ServerCallHandler<ReqT, RespT> next) {
553+
if (!requestHeaders.containsKey(key)) {
554+
return next.startCall(call, requestHeaders);
555+
}
556+
T value = requestHeaders.get(key);
553557
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
554558
@Override
555559
public void sendHeaders(Metadata responseHeaders) {
556-
responseHeaders.merge(requestHeaders, keySet);
560+
responseHeaders.put(key, value);
557561
super.sendHeaders(responseHeaders);
558562
}
559-
560-
@Override
561-
public void close(Status status, Metadata trailers) {
562-
super.close(status, trailers);
563-
}
564563
}, requestHeaders);
565564
}
566565
};
567566
}
568567

569568
/**
570-
* Echoes request headers with the specified key(s) from a client into response trailers only.
569+
* Echoes request headers with the specified key from a client into response trailers only.
571570
*/
572-
private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
573-
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
571+
private static <T> ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<T> key) {
574572
return new ServerInterceptor() {
575573
@Override
576574
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
577575
ServerCall<ReqT, RespT> call,
578576
final Metadata requestHeaders,
579577
ServerCallHandler<ReqT, RespT> next) {
578+
if (!requestHeaders.containsKey(key)) {
579+
return next.startCall(call, requestHeaders);
580+
}
581+
T value = requestHeaders.get(key);
580582
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
581-
@Override
582-
public void sendHeaders(Metadata responseHeaders) {
583-
super.sendHeaders(responseHeaders);
584-
}
585-
586583
@Override
587584
public void close(Status status, Metadata trailers) {
588-
trailers.merge(requestHeaders, keySet);
585+
trailers.put(key, value);
589586
super.close(status, trailers);
590587
}
591588
}, requestHeaders);

0 commit comments

Comments
 (0)