Skip to content

Commit 55eea5e

Browse files
committed
Add GZIP compression defaulting to on
1 parent 27cfa7d commit 55eea5e

6 files changed

Lines changed: 216 additions & 5 deletions

File tree

temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ private Channel applyHeadStandardInterceptors(Channel channel) {
161161

162162
return ClientInterceptors.intercept(
163163
channel,
164+
new GrpcCompressionInterceptor(options.getGrpcCompression()),
164165
MetadataUtils.newAttachHeadersInterceptor(headers),
165166
new SystemInfoInterceptor(serverCapabilitiesFuture));
166167
}
@@ -206,6 +207,8 @@ private ManagedChannel prepareChannel() {
206207
builder.useTransportSecurity();
207208
}
208209

210+
builder.decompressorRegistry(options.getGrpcCompression().getDecompressorRegistry());
211+
209212
// Disable built-in idleTimer until https://github.com/grpc/grpc-java/issues/8714 is resolved.
210213
// jsdk force-idles channels often anyway, so this is not needed until we stop doing
211214
// force-idling as a part of
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.temporal.serviceclient;
2+
3+
import io.grpc.Codec;
4+
import io.grpc.DecompressorRegistry;
5+
import javax.annotation.Nullable;
6+
7+
/** Selects transport-level gRPC compression for service calls. */
8+
public enum GrpcCompression {
9+
/** Do not compress requests or advertise support for compressed responses. */
10+
NONE(null, DecompressorRegistry.emptyInstance().with(Codec.Identity.NONE, false)),
11+
12+
/** Gzip-compress outbound requests and accept gzip-compressed responses. */
13+
GZIP("gzip", DecompressorRegistry.getDefaultInstance());
14+
15+
private final @Nullable String compressorName;
16+
private final DecompressorRegistry decompressorRegistry;
17+
18+
GrpcCompression(@Nullable String compressorName, DecompressorRegistry decompressorRegistry) {
19+
this.compressorName = compressorName;
20+
this.decompressorRegistry = decompressorRegistry;
21+
}
22+
23+
@Nullable
24+
String getCompressorName() {
25+
return compressorName;
26+
}
27+
28+
DecompressorRegistry getDecompressorRegistry() {
29+
return decompressorRegistry;
30+
}
31+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.temporal.serviceclient;
2+
3+
import io.grpc.CallOptions;
4+
import io.grpc.Channel;
5+
import io.grpc.ClientCall;
6+
import io.grpc.ClientInterceptor;
7+
import io.grpc.MethodDescriptor;
8+
9+
final class GrpcCompressionInterceptor implements ClientInterceptor {
10+
private final GrpcCompression compression;
11+
12+
GrpcCompressionInterceptor(GrpcCompression compression) {
13+
this.compression = compression;
14+
}
15+
16+
@Override
17+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
18+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
19+
return next.newCall(method, callOptions.withCompression(compression.getCompressorName()));
20+
}
21+
}

temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ public class ServiceStubsOptions {
114114

115115
protected final Scope metricsScope;
116116

117+
/** Transport-level gRPC compression. */
118+
protected final GrpcCompression grpcCompression;
119+
117120
ServiceStubsOptions(ServiceStubsOptions that) {
118121
this.channel = that.channel;
119122
this.target = that.target;
@@ -135,6 +138,7 @@ public class ServiceStubsOptions {
135138
this.grpcMetadataProviders = that.grpcMetadataProviders;
136139
this.grpcClientInterceptors = that.grpcClientInterceptors;
137140
this.metricsScope = that.metricsScope;
141+
this.grpcCompression = that.grpcCompression;
138142
}
139143

140144
ServiceStubsOptions(
@@ -157,7 +161,8 @@ public class ServiceStubsOptions {
157161
Metadata headers,
158162
Collection<GrpcMetadataProvider> grpcMetadataProviders,
159163
Collection<ClientInterceptor> grpcClientInterceptors,
160-
Scope metricsScope) {
164+
Scope metricsScope,
165+
GrpcCompression grpcCompression) {
161166
this.channel = channel;
162167
this.target = target;
163168
this.channelInitializer = channelInitializer;
@@ -178,6 +183,7 @@ public class ServiceStubsOptions {
178183
this.grpcMetadataProviders = grpcMetadataProviders;
179184
this.grpcClientInterceptors = grpcClientInterceptors;
180185
this.metricsScope = metricsScope;
186+
this.grpcCompression = grpcCompression;
181187
}
182188

183189
/**
@@ -342,6 +348,15 @@ public Scope getMetricsScope() {
342348
return metricsScope;
343349
}
344350

351+
/**
352+
* @return transport-level gRPC compression used for requests and response negotiation.
353+
* @see Builder#setGrpcCompression(GrpcCompression)
354+
*/
355+
@Nonnull
356+
public GrpcCompression getGrpcCompression() {
357+
return grpcCompression;
358+
}
359+
345360
@Override
346361
public boolean equals(Object o) {
347362
if (this == o) return true;
@@ -366,7 +381,8 @@ public boolean equals(Object o) {
366381
&& Objects.equals(headers, that.headers)
367382
&& Objects.equals(grpcMetadataProviders, that.grpcMetadataProviders)
368383
&& Objects.equals(grpcClientInterceptors, that.grpcClientInterceptors)
369-
&& Objects.equals(metricsScope, that.metricsScope);
384+
&& Objects.equals(metricsScope, that.metricsScope)
385+
&& grpcCompression == that.grpcCompression;
370386
}
371387

372388
@Override
@@ -391,7 +407,8 @@ public int hashCode() {
391407
headers,
392408
grpcMetadataProviders,
393409
grpcClientInterceptors,
394-
metricsScope);
410+
metricsScope,
411+
grpcCompression);
395412
}
396413

397414
@Override
@@ -436,6 +453,8 @@ public String toString() {
436453
+ grpcClientInterceptors
437454
+ ", metricsScope="
438455
+ metricsScope
456+
+ ", grpcCompression="
457+
+ grpcCompression
439458
+ '}';
440459
}
441460

@@ -460,6 +479,7 @@ public static class Builder<T extends Builder<T>> {
460479
private Collection<ClientInterceptor> grpcClientInterceptors;
461480
private Scope metricsScope;
462481
private boolean apiKeyProvided;
482+
private GrpcCompression grpcCompression = GrpcCompression.GZIP;
463483

464484
protected Builder() {}
465485

@@ -491,6 +511,7 @@ protected Builder(ServiceStubsOptions options) {
491511
? new ArrayList<>(options.grpcClientInterceptors)
492512
: null;
493513
this.metricsScope = options.metricsScope;
514+
this.grpcCompression = options.grpcCompression;
494515
}
495516

496517
/**
@@ -720,6 +741,22 @@ public T setMetricsScope(Scope metricsScope) {
720741
return self();
721742
}
722743

744+
/**
745+
* Sets transport-level gRPC compression. Defaults to {@link GrpcCompression#GZIP}. Set to
746+
* {@link GrpcCompression#NONE} to opt out.
747+
*
748+
* <p>For SDK-created channels, this controls both request compression and the {@code
749+
* grpc-accept-encoding} response negotiation header. For user-supplied channels, the SDK still
750+
* controls request compression, but response decompression negotiation is configured by the
751+
* supplied channel.
752+
*
753+
* @return {@code this}
754+
*/
755+
public T setGrpcCompression(GrpcCompression grpcCompression) {
756+
this.grpcCompression = Objects.requireNonNull(grpcCompression);
757+
return self();
758+
}
759+
723760
/**
724761
* Set the time to wait between service responses on each health check.
725762
*
@@ -853,7 +890,8 @@ public ServiceStubsOptions build() {
853890
this.headers,
854891
this.grpcMetadataProviders,
855892
this.grpcClientInterceptors,
856-
this.metricsScope);
893+
this.metricsScope,
894+
this.grpcCompression);
857895
}
858896

859897
public ServiceStubsOptions validateAndBuildWithDefaults() {
@@ -916,7 +954,8 @@ public ServiceStubsOptions validateAndBuildWithDefaults() {
916954
headers,
917955
grpcMetadataProviders,
918956
grpcClientInterceptors,
919-
metricsScope);
957+
metricsScope,
958+
this.grpcCompression);
920959
}
921960
}
922961
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.temporal.serviceclient;
2+
3+
import static org.junit.Assert.*;
4+
5+
import io.grpc.Metadata;
6+
import io.grpc.Server;
7+
import io.grpc.ServerCall;
8+
import io.grpc.ServerCallHandler;
9+
import io.grpc.ServerInterceptor;
10+
import io.grpc.ServerInterceptors;
11+
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
12+
import io.grpc.stub.StreamObserver;
13+
import io.grpc.testing.GrpcCleanupRule;
14+
import io.temporal.api.workflowservice.v1.GetSystemInfoRequest;
15+
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
16+
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
import org.junit.Rule;
19+
import org.junit.Test;
20+
21+
public class GrpcCompressionTest {
22+
private static final Metadata.Key<String> GRPC_ENCODING =
23+
Metadata.Key.of("grpc-encoding", Metadata.ASCII_STRING_MARSHALLER);
24+
private static final Metadata.Key<String> GRPC_ACCEPT_ENCODING =
25+
Metadata.Key.of("grpc-accept-encoding", Metadata.ASCII_STRING_MARSHALLER);
26+
27+
@Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
28+
29+
@Test
30+
public void gzipCompressionSendsAndAcceptsGzip() throws Exception {
31+
Metadata headers = callGetSystemInfo(GrpcCompression.GZIP);
32+
33+
assertEquals("gzip", headers.get(GRPC_ENCODING));
34+
assertTrue(headers.get(GRPC_ACCEPT_ENCODING).contains("gzip"));
35+
}
36+
37+
@Test
38+
public void noneCompressionDoesNotSendOrAcceptGzip() throws Exception {
39+
Metadata headers = callGetSystemInfo(GrpcCompression.NONE);
40+
41+
assertNull(headers.get(GRPC_ENCODING));
42+
assertNull(headers.get(GRPC_ACCEPT_ENCODING));
43+
}
44+
45+
private Metadata callGetSystemInfo(GrpcCompression compression) throws Exception {
46+
AtomicReference<Metadata> capturedHeaders = new AtomicReference<>();
47+
ServerInterceptor captureHeadersInterceptor =
48+
new ServerInterceptor() {
49+
@Override
50+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
51+
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
52+
capturedHeaders.set(headers);
53+
return next.startCall(call, headers);
54+
}
55+
};
56+
Server server =
57+
grpcCleanupRule.register(
58+
NettyServerBuilder.forPort(0)
59+
.addService(
60+
ServerInterceptors.intercept(
61+
new TestWorkflowService(), captureHeadersInterceptor))
62+
.build()
63+
.start());
64+
65+
WorkflowServiceStubs serviceStubs =
66+
WorkflowServiceStubs.newServiceStubs(
67+
WorkflowServiceStubsOptions.newBuilder()
68+
.setTarget("127.0.0.1:" + server.getPort())
69+
.setEnableHttps(false)
70+
.setGrpcCompression(compression)
71+
.build());
72+
try {
73+
serviceStubs.blockingStub().getSystemInfo(GetSystemInfoRequest.getDefaultInstance());
74+
} finally {
75+
serviceStubs.shutdownNow();
76+
}
77+
78+
assertNotNull(capturedHeaders.get());
79+
return capturedHeaders.get();
80+
}
81+
82+
private static final class TestWorkflowService
83+
extends WorkflowServiceGrpc.WorkflowServiceImplBase {
84+
@Override
85+
public void getSystemInfo(
86+
GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
87+
responseObserver.onNext(GetSystemInfoResponse.getDefaultInstance());
88+
responseObserver.onCompleted();
89+
}
90+
}
91+
}

temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,30 @@ public void testSpringBootStyleAutoTLSWithApiKey() {
151151
"TLS should be disabled when no API key and no explicit TLS setting",
152152
options3.getEnableHttps());
153153
}
154+
155+
@Test
156+
public void testGrpcCompressionDefaultsToGzip() {
157+
ServiceStubsOptions options =
158+
WorkflowServiceStubsOptions.newBuilder()
159+
.setTarget("localhost:7233")
160+
.validateAndBuildWithDefaults();
161+
162+
assertEquals(GrpcCompression.GZIP, options.getGrpcCompression());
163+
}
164+
165+
@Test
166+
public void testGrpcCompressionNonePassesThroughBuilderCopy() {
167+
ServiceStubsOptions options =
168+
WorkflowServiceStubsOptions.newBuilder()
169+
.setTarget("localhost:7233")
170+
.setGrpcCompression(GrpcCompression.NONE)
171+
.validateAndBuildWithDefaults();
172+
173+
assertEquals(GrpcCompression.NONE, options.getGrpcCompression());
174+
175+
ServiceStubsOptions copied =
176+
WorkflowServiceStubsOptions.newBuilder(options).validateAndBuildWithDefaults();
177+
178+
assertEquals(GrpcCompression.NONE, copied.getGrpcCompression());
179+
}
154180
}

0 commit comments

Comments
 (0)