Skip to content

Commit d2f353d

Browse files
author
Dhriti Chopra
committed
test(storage): isolate and manage gRPC callback executor in FakeServer to prevent pipeline timeouts
1 parent 81d12c5 commit d2f353d

1 file changed

Lines changed: 34 additions & 4 deletions

File tree

  • java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage

java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import com.google.api.gax.core.ExecutorProvider;
20+
import com.google.api.gax.core.FixedExecutorProvider;
21+
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
1922
import com.google.api.gax.retrying.RetrySettings;
2023
import com.google.cloud.NoCredentials;
2124
import com.google.cloud.storage.it.GrpcPlainRequestLoggingInterceptor;
@@ -28,36 +31,63 @@
2831
import java.net.InetSocketAddress;
2932
import java.time.Duration;
3033
import java.util.Locale;
34+
import java.util.concurrent.ScheduledThreadPoolExecutor;
3135
import java.util.concurrent.TimeUnit;
3236

3337
final class FakeServer implements AutoCloseable {
3438

3539
private final Server server;
3640
private final GrpcStorageOptions grpcStorageOptions;
41+
private final ScheduledThreadPoolExecutor executor;
3742

38-
FakeServer(Server server, GrpcStorageOptions grpcStorageOptions) {
43+
FakeServer(Server server, GrpcStorageOptions grpcStorageOptions, ScheduledThreadPoolExecutor executor) {
3944
this.server = server;
4045
this.grpcStorageOptions = grpcStorageOptions;
46+
this.executor = executor;
4147
}
4248

4349
GrpcStorageOptions getGrpcStorageOptions() {
4450
return grpcStorageOptions;
4551
}
4652

4753
StorageSettings storageSettings() throws IOException {
48-
return grpcStorageOptions.getStorageSettings();
54+
StorageSettings settings = grpcStorageOptions.getStorageSettings();
55+
if (executor != null) {
56+
ExecutorProvider executorProvider = FixedExecutorProvider.create(executor);
57+
StorageSettings.Builder builder = settings.toBuilder()
58+
.setBackgroundExecutorProvider(executorProvider);
59+
if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
60+
builder.setTransportChannelProvider(
61+
((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider())
62+
.toBuilder()
63+
.setExecutorProvider(executorProvider)
64+
.build());
65+
}
66+
return builder.build();
67+
}
68+
return settings;
4969
}
5070

5171
@Override
5272
public void close() throws InterruptedException {
53-
server.shutdownNow().awaitTermination(10, TimeUnit.SECONDS);
73+
try {
74+
server.shutdownNow().awaitTermination(10, TimeUnit.SECONDS);
75+
} finally {
76+
if (executor != null) {
77+
executor.shutdownNow();
78+
}
79+
}
5480
}
5581

5682
static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
5783
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 0);
5884
Server server = NettyServerBuilder.forAddress(address).addService(service).build();
5985
server.start();
6086
String endpoint = String.format(Locale.US, "%s:%d", address.getHostString(), server.getPort());
87+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(0);
88+
executor.setKeepAliveTime(10, TimeUnit.SECONDS);
89+
executor.allowCoreThreadTimeOut(true);
90+
6191
GrpcStorageOptions grpcStorageOptions =
6292
StorageOptions.grpc()
6393
.setHost("http://" + endpoint)
@@ -80,6 +110,6 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
80110
.setMaxRpcTimeoutDuration(Duration.ofSeconds(25))
81111
.build())
82112
.build();
83-
return new FakeServer(server, grpcStorageOptions);
113+
return new FakeServer(server, grpcStorageOptions, executor);
84114
}
85115
}

0 commit comments

Comments
 (0)