Skip to content

Commit f3edb10

Browse files
authored
Add GZIP compression defaulting to on (temporalio#2911)
1 parent 78d0fee commit f3edb10

7 files changed

Lines changed: 212 additions & 9 deletions

File tree

temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.junit.Test;
2323

2424
public class GrpcMessageTooLargeTest {
25-
private static final String QUERY_ERROR_MESSAGE =
26-
"Failed to send query response: RESOURCE_EXHAUSTED: grpc: received message larger than max";
25+
// This string is kept intentionally short to match multiple possible too-large error messages
26+
private static final String TOO_BIG_ERR_MESSAGE = "larger than max";
2727
private static final String VERY_LARGE_DATA;
2828

2929
static {
@@ -120,7 +120,7 @@ public void queryResultTooLarge() {
120120
assertNotNull(e.getCause());
121121
// The exception will not contain the original failure object, so instead of type check we're
122122
// checking the message to ensure the correct error is being sent.
123-
assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE));
123+
assertTrue(e.getCause().getMessage().contains(TOO_BIG_ERR_MESSAGE));
124124
}
125125

126126
@Test
@@ -132,7 +132,7 @@ public void queryErrorTooLarge() {
132132
WorkflowQueryException e = assertThrows(WorkflowQueryException.class, workflow::query);
133133

134134
assertNotNull(e.getCause());
135-
assertTrue(e.getCause().getMessage().contains(QUERY_ERROR_MESSAGE));
135+
assertTrue(e.getCause().getMessage().contains(TOO_BIG_ERR_MESSAGE));
136136
}
137137

138138
private static <T> T createWorkflowStub(Class<T> clazz, SDKTestWorkflowRule workflowRule) {

temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ private Channel applyHeadStandardInterceptors(Channel channel) {
159159
}
160160
}
161161

162+
if (options.getGrpcCompression() != GrpcCompression.NONE) {
163+
channel =
164+
ClientInterceptors.intercept(
165+
channel, new GrpcCompressionInterceptor(options.getGrpcCompression()));
166+
}
162167
return ClientInterceptors.intercept(
163168
channel,
164169
MetadataUtils.newAttachHeadersInterceptor(headers),
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.temporal.serviceclient;
2+
3+
import javax.annotation.Nullable;
4+
5+
/** Selects outbound transport-level gRPC compression for service calls. */
6+
public enum GrpcCompression {
7+
/** Do not compress requests. */
8+
NONE(null),
9+
10+
/** Gzip-compress requests. */
11+
GZIP("gzip");
12+
13+
private final @Nullable String compressorName;
14+
15+
GrpcCompression(@Nullable String compressorName) {
16+
this.compressorName = compressorName;
17+
}
18+
19+
@Nullable
20+
String getCompressorName() {
21+
return compressorName;
22+
}
23+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.temporal.serviceclient;
2+
3+
import io.grpc.CallOptions;
4+
import io.grpc.Channel;
5+
import io.grpc.ClientCall;
6+
import io.grpc.ClientInterceptor;
7+
import io.grpc.MethodDescriptor;
8+
9+
final class GrpcCompressionInterceptor implements ClientInterceptor {
10+
private final GrpcCompression compression;
11+
12+
GrpcCompressionInterceptor(GrpcCompression compression) {
13+
this.compression = compression;
14+
}
15+
16+
@Override
17+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
18+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
19+
return next.newCall(method, callOptions.withCompression(compression.getCompressorName()));
20+
}
21+
}

temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ public class ServiceStubsOptions {
114114

115115
protected final Scope metricsScope;
116116

117+
/** Outbound transport-level gRPC compression. */
118+
protected final GrpcCompression grpcCompression;
119+
117120
ServiceStubsOptions(ServiceStubsOptions that) {
118121
this.channel = that.channel;
119122
this.target = that.target;
@@ -135,6 +138,7 @@ public class ServiceStubsOptions {
135138
this.grpcMetadataProviders = that.grpcMetadataProviders;
136139
this.grpcClientInterceptors = that.grpcClientInterceptors;
137140
this.metricsScope = that.metricsScope;
141+
this.grpcCompression = that.grpcCompression;
138142
}
139143

140144
ServiceStubsOptions(
@@ -157,7 +161,8 @@ public class ServiceStubsOptions {
157161
Metadata headers,
158162
Collection<GrpcMetadataProvider> grpcMetadataProviders,
159163
Collection<ClientInterceptor> grpcClientInterceptors,
160-
Scope metricsScope) {
164+
Scope metricsScope,
165+
GrpcCompression grpcCompression) {
161166
this.channel = channel;
162167
this.target = target;
163168
this.channelInitializer = channelInitializer;
@@ -178,6 +183,7 @@ public class ServiceStubsOptions {
178183
this.grpcMetadataProviders = grpcMetadataProviders;
179184
this.grpcClientInterceptors = grpcClientInterceptors;
180185
this.metricsScope = metricsScope;
186+
this.grpcCompression = grpcCompression;
181187
}
182188

183189
/**
@@ -342,6 +348,15 @@ public Scope getMetricsScope() {
342348
return metricsScope;
343349
}
344350

351+
/**
352+
* @return outbound transport-level gRPC compression used for requests.
353+
* @see Builder#setGrpcCompression(GrpcCompression)
354+
*/
355+
@Nonnull
356+
public GrpcCompression getGrpcCompression() {
357+
return grpcCompression;
358+
}
359+
345360
@Override
346361
public boolean equals(Object o) {
347362
if (this == o) return true;
@@ -366,7 +381,8 @@ public boolean equals(Object o) {
366381
&& Objects.equals(headers, that.headers)
367382
&& Objects.equals(grpcMetadataProviders, that.grpcMetadataProviders)
368383
&& Objects.equals(grpcClientInterceptors, that.grpcClientInterceptors)
369-
&& Objects.equals(metricsScope, that.metricsScope);
384+
&& Objects.equals(metricsScope, that.metricsScope)
385+
&& grpcCompression == that.grpcCompression;
370386
}
371387

372388
@Override
@@ -391,7 +407,8 @@ public int hashCode() {
391407
headers,
392408
grpcMetadataProviders,
393409
grpcClientInterceptors,
394-
metricsScope);
410+
metricsScope,
411+
grpcCompression);
395412
}
396413

397414
@Override
@@ -436,6 +453,8 @@ public String toString() {
436453
+ grpcClientInterceptors
437454
+ ", metricsScope="
438455
+ metricsScope
456+
+ ", grpcCompression="
457+
+ grpcCompression
439458
+ '}';
440459
}
441460

@@ -460,6 +479,7 @@ public static class Builder<T extends Builder<T>> {
460479
private Collection<ClientInterceptor> grpcClientInterceptors;
461480
private Scope metricsScope;
462481
private boolean apiKeyProvided;
482+
private GrpcCompression grpcCompression = GrpcCompression.GZIP;
463483

464484
protected Builder() {}
465485

@@ -491,6 +511,7 @@ protected Builder(ServiceStubsOptions options) {
491511
? new ArrayList<>(options.grpcClientInterceptors)
492512
: null;
493513
this.metricsScope = options.metricsScope;
514+
this.grpcCompression = options.grpcCompression;
494515
}
495516

496517
/**
@@ -720,6 +741,20 @@ public T setMetricsScope(Scope metricsScope) {
720741
return self();
721742
}
722743

744+
/**
745+
* Sets outbound transport-level gRPC compression. Defaults to {@link GrpcCompression#GZIP}. Set
746+
* to {@link GrpcCompression#NONE} to opt out of compressing requests.
747+
*
748+
* <p>The SDK uses the default gRPC response decompression registry for all compression options,
749+
* so disabling request compression does not disable accepting compressed responses.
750+
*
751+
* @return {@code this}
752+
*/
753+
public T setGrpcCompression(GrpcCompression grpcCompression) {
754+
this.grpcCompression = Objects.requireNonNull(grpcCompression);
755+
return self();
756+
}
757+
723758
/**
724759
* Set the time to wait between service responses on each health check.
725760
*
@@ -853,7 +888,8 @@ public ServiceStubsOptions build() {
853888
this.headers,
854889
this.grpcMetadataProviders,
855890
this.grpcClientInterceptors,
856-
this.metricsScope);
891+
this.metricsScope,
892+
this.grpcCompression);
857893
}
858894

859895
public ServiceStubsOptions validateAndBuildWithDefaults() {
@@ -916,7 +952,8 @@ public ServiceStubsOptions validateAndBuildWithDefaults() {
916952
headers,
917953
grpcMetadataProviders,
918954
grpcClientInterceptors,
919-
metricsScope);
955+
metricsScope,
956+
this.grpcCompression);
920957
}
921958
}
922959
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.temporal.serviceclient;
2+
3+
import static org.junit.Assert.*;
4+
5+
import io.grpc.Metadata;
6+
import io.grpc.Server;
7+
import io.grpc.ServerCall;
8+
import io.grpc.ServerCallHandler;
9+
import io.grpc.ServerInterceptor;
10+
import io.grpc.ServerInterceptors;
11+
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
12+
import io.grpc.stub.StreamObserver;
13+
import io.grpc.testing.GrpcCleanupRule;
14+
import io.temporal.api.workflowservice.v1.GetSystemInfoRequest;
15+
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
16+
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
import org.junit.Rule;
19+
import org.junit.Test;
20+
21+
public class GrpcCompressionTest {
22+
private static final Metadata.Key<String> GRPC_ENCODING =
23+
Metadata.Key.of("grpc-encoding", Metadata.ASCII_STRING_MARSHALLER);
24+
private static final Metadata.Key<String> GRPC_ACCEPT_ENCODING =
25+
Metadata.Key.of("grpc-accept-encoding", Metadata.ASCII_STRING_MARSHALLER);
26+
27+
@Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
28+
29+
@Test
30+
public void gzipCompressionSendsAndAcceptsGzip() throws Exception {
31+
Metadata headers = callGetSystemInfo(GrpcCompression.GZIP);
32+
33+
assertEquals("gzip", headers.get(GRPC_ENCODING));
34+
assertTrue(headers.get(GRPC_ACCEPT_ENCODING).contains("gzip"));
35+
}
36+
37+
@Test
38+
public void noneCompressionDoesNotSendGzipButStillAcceptsGzip() throws Exception {
39+
Metadata headers = callGetSystemInfo(GrpcCompression.NONE);
40+
41+
assertNull(headers.get(GRPC_ENCODING));
42+
assertTrue(headers.get(GRPC_ACCEPT_ENCODING).contains("gzip"));
43+
}
44+
45+
private Metadata callGetSystemInfo(GrpcCompression compression) throws Exception {
46+
AtomicReference<Metadata> capturedHeaders = new AtomicReference<>();
47+
ServerInterceptor captureHeadersInterceptor =
48+
new ServerInterceptor() {
49+
@Override
50+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
51+
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
52+
capturedHeaders.set(headers);
53+
return next.startCall(call, headers);
54+
}
55+
};
56+
Server server =
57+
grpcCleanupRule.register(
58+
NettyServerBuilder.forPort(0)
59+
.addService(
60+
ServerInterceptors.intercept(
61+
new TestWorkflowService(), captureHeadersInterceptor))
62+
.build()
63+
.start());
64+
65+
WorkflowServiceStubs serviceStubs =
66+
WorkflowServiceStubs.newServiceStubs(
67+
WorkflowServiceStubsOptions.newBuilder()
68+
.setTarget("127.0.0.1:" + server.getPort())
69+
.setEnableHttps(false)
70+
.setGrpcCompression(compression)
71+
.build());
72+
try {
73+
serviceStubs.blockingStub().getSystemInfo(GetSystemInfoRequest.getDefaultInstance());
74+
} finally {
75+
serviceStubs.shutdownNow();
76+
}
77+
78+
assertNotNull(capturedHeaders.get());
79+
return capturedHeaders.get();
80+
}
81+
82+
private static final class TestWorkflowService
83+
extends WorkflowServiceGrpc.WorkflowServiceImplBase {
84+
@Override
85+
public void getSystemInfo(
86+
GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
87+
responseObserver.onNext(GetSystemInfoResponse.getDefaultInstance());
88+
responseObserver.onCompleted();
89+
}
90+
}
91+
}

temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,30 @@ public void testSpringBootStyleAutoTLSWithApiKey() {
151151
"TLS should be disabled when no API key and no explicit TLS setting",
152152
options3.getEnableHttps());
153153
}
154+
155+
@Test
156+
public void testGrpcCompressionDefaultsToGzip() {
157+
ServiceStubsOptions options =
158+
WorkflowServiceStubsOptions.newBuilder()
159+
.setTarget("localhost:7233")
160+
.validateAndBuildWithDefaults();
161+
162+
assertEquals(GrpcCompression.GZIP, options.getGrpcCompression());
163+
}
164+
165+
@Test
166+
public void testGrpcCompressionNonePassesThroughBuilderCopy() {
167+
ServiceStubsOptions options =
168+
WorkflowServiceStubsOptions.newBuilder()
169+
.setTarget("localhost:7233")
170+
.setGrpcCompression(GrpcCompression.NONE)
171+
.validateAndBuildWithDefaults();
172+
173+
assertEquals(GrpcCompression.NONE, options.getGrpcCompression());
174+
175+
ServiceStubsOptions copied =
176+
WorkflowServiceStubsOptions.newBuilder(options).validateAndBuildWithDefaults();
177+
178+
assertEquals(GrpcCompression.NONE, copied.getGrpcCompression());
179+
}
154180
}

0 commit comments

Comments
 (0)