2121import com .google .api .gax .grpc .InstantiatingGrpcChannelProvider ;
2222import com .google .api .gax .retrying .RetrySettings ;
2323import com .google .cloud .NoCredentials ;
24+ import com .google .cloud .ServiceFactory ;
2425import com .google .cloud .storage .it .GrpcPlainRequestLoggingInterceptor ;
2526import com .google .cloud .storage .it .runner .registry .Registry ;
27+ import com .google .storage .v2 .StorageClient ;
2628import com .google .storage .v2 .StorageGrpc ;
2729import com .google .storage .v2 .StorageSettings ;
2830import io .grpc .Server ;
@@ -79,6 +81,47 @@ public void close() throws InterruptedException {
7981 }
8082 }
8183
84+ static void injectIsolatedClient (Storage storage , ScheduledThreadPoolExecutor executor ) {
85+ Storage delegate = storage ;
86+ if (storage instanceof OtelStorageDecorator ) {
87+ try {
88+ java .lang .reflect .Field delegateField = OtelStorageDecorator .class .getDeclaredField ("delegate" );
89+ delegateField .setAccessible (true );
90+ delegate = (Storage ) delegateField .get (storage );
91+ } catch (Exception e ) {
92+ throw new RuntimeException ("Failed to unwrap OtelStorageDecorator" , e );
93+ }
94+ }
95+ if (delegate instanceof GrpcStorageImpl ) {
96+ GrpcStorageImpl impl = (GrpcStorageImpl ) delegate ;
97+ try {
98+ StorageSettings settings = impl .getOptions ().getStorageSettings ();
99+ ExecutorProvider executorProvider = FixedExecutorProvider .create (executor );
100+ StorageSettings .Builder settingsBuilder = settings .toBuilder ()
101+ .setBackgroundExecutorProvider (executorProvider );
102+ if (settingsBuilder .getTransportChannelProvider () instanceof InstantiatingGrpcChannelProvider ) {
103+ settingsBuilder .setTransportChannelProvider (
104+ ((InstantiatingGrpcChannelProvider ) settingsBuilder .getTransportChannelProvider ())
105+ .toBuilder ()
106+ .setExecutorProvider (executorProvider )
107+ .build ());
108+ }
109+ StorageSettings isolatedSettings = settingsBuilder .build ();
110+ StorageClient isolatedClient = StorageClient .create (isolatedSettings );
111+
112+ java .lang .reflect .Field clientField = GrpcStorageImpl .class .getDeclaredField ("storageClient" );
113+ clientField .setAccessible (true );
114+ StorageClient oldClient = (StorageClient ) clientField .get (impl );
115+ if (oldClient != null ) {
116+ oldClient .close ();
117+ }
118+ clientField .set (impl , isolatedClient );
119+ } catch (Exception e ) {
120+ throw new RuntimeException ("Failed to inject isolated StorageClient" , e );
121+ }
122+ }
123+ }
124+
82125 static FakeServer of (StorageGrpc .StorageImplBase service ) throws IOException {
83126 InetSocketAddress address = new InetSocketAddress ("127.0.0.1" , 0 );
84127 Server server = NettyServerBuilder .forAddress (address ).addService (service ).build ();
@@ -97,6 +140,16 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
97140 .setEnableGrpcClientMetrics (false )
98141 .setAttemptDirectPath (false )
99142 .setOpenTelemetry (Registry .getInstance ().otelSdk .get ().get ())
143+ .setServiceFactory (
144+ new ServiceFactory <Storage , StorageOptions >() {
145+ @ Override
146+ @ SuppressWarnings ("deprecation" )
147+ public Storage create (StorageOptions opts ) {
148+ Storage storage = new GrpcStorageOptions .GrpcStorageFactory ().create (opts );
149+ injectIsolatedClient (storage , executor );
150+ return storage ;
151+ }
152+ })
100153 // cut most retry settings by half. we're hitting an in process server.
101154 .setRetrySettings (
102155 RetrySettings .newBuilder ()
0 commit comments