Skip to content

Commit 4974f46

Browse files
committed
Create a "transport" Executor for grpc-binder Channels
1 parent efe9ccc commit 4974f46

3 files changed

Lines changed: 39 additions & 3 deletions

File tree

binder/src/main/java/io/grpc/binder/BinderChannelBuilder.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,25 @@ public BinderChannelBuilder scheduledExecutorService(
210210
return this;
211211
}
212212

213+
/**
214+
* Provides a custom Executor for potentially expensive, but not blocking, transport work.
215+
*
216+
* <p>This is an optional parameter. If the user has not provided a scheduled executor service
217+
* when the channel is built, the builder will use a static cached thread pool.
218+
*
219+
* <p>The channel does not take ownership of 'transportExecutor'. Callers must shut it down at
220+
* some point *after* every Channel built with in has terminated.
221+
*
222+
* <p>'transportExecutor' must not be directExecutor() or similar.
223+
*
224+
* @return this
225+
*/
226+
public BinderChannelBuilder transportExecutor(Executor transportExecutor) {
227+
transportFactoryBuilder.setTransportExecutorPool(
228+
new FixedObjectPool<>(checkNotNull(transportExecutor, "transportExecutor")));
229+
return this;
230+
}
231+
213232
/**
214233
* Provides a custom {@link Executor} for accessing this application's main thread.
215234
*

binder/src/main/java/io/grpc/binder/internal/BinderClientTransportFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public final class BinderClientTransportFactory implements ClientTransportFactor
4848
final BinderChannelCredentials channelCredentials;
4949
final Executor mainThreadExecutor;
5050
final ObjectPool<ScheduledExecutorService> scheduledExecutorPool;
51+
final ObjectPool<? extends Executor> transportExecutorPool;
5152
final ObjectPool<? extends Executor> offloadExecutorPool;
5253
final SecurityPolicy securityPolicy;
5354
@Nullable final UserHandle defaultTargetUserHandle;
@@ -58,6 +59,7 @@ public final class BinderClientTransportFactory implements ClientTransportFactor
5859

5960
ScheduledExecutorService executorService;
6061
Executor offloadExecutor;
62+
Executor transportExecutor;
6163
private boolean closed;
6264

6365
private BinderClientTransportFactory(Builder builder) {
@@ -69,6 +71,8 @@ private BinderClientTransportFactory(Builder builder) {
6971
: ContextCompat.getMainExecutor(sourceContext);
7072
scheduledExecutorPool = checkNotNull(builder.scheduledExecutorPool);
7173
offloadExecutorPool = checkNotNull(builder.offloadExecutorPool);
74+
transportExecutorPool = builder.transportExecutorPool != null ?
75+
builder.transportExecutorPool : builder.offloadExecutorPool;
7276
securityPolicy = checkNotNull(builder.securityPolicy);
7377
defaultTargetUserHandle = builder.defaultTargetUserHandle;
7478
bindServiceFlags = checkNotNull(builder.bindServiceFlags);
@@ -78,6 +82,7 @@ private BinderClientTransportFactory(Builder builder) {
7882

7983
executorService = scheduledExecutorPool.getObject();
8084
offloadExecutor = offloadExecutorPool.getObject();
85+
transportExecutor = transportExecutorPool.getObject();
8186
}
8287

8388
@Override
@@ -103,6 +108,7 @@ public SwapChannelCredentialsResult swapChannelCredentials(ChannelCredentials ch
103108
public void close() {
104109
closed = true;
105110
executorService = scheduledExecutorPool.returnObject(executorService);
111+
transportExecutor = transportExecutorPool.returnObject(transportExecutor);
106112
offloadExecutor = offloadExecutorPool.returnObject(offloadExecutor);
107113
}
108114

@@ -120,6 +126,7 @@ public static final class Builder implements ClientTransportFactoryBuilder {
120126
// Optional.
121127
BinderChannelCredentials channelCredentials = BinderChannelCredentials.forDefault();
122128
Executor mainThreadExecutor; // Default filled-in at build time once sourceContext is decided.
129+
ObjectPool<? extends Executor> transportExecutorPool; // Default filled-in at build time.
123130
ObjectPool<ScheduledExecutorService> scheduledExecutorPool =
124131
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
125132
SecurityPolicy securityPolicy = SecurityPolicies.internalOnly();
@@ -139,6 +146,11 @@ public Builder setSourceContext(Context sourceContext) {
139146
return this;
140147
}
141148

149+
public Builder setTransportExecutorPool(ObjectPool<? extends Executor> transportExecutorPool) {
150+
this.transportExecutorPool = checkNotNull(transportExecutorPool, "transportExecutorPool");
151+
return this;
152+
}
153+
142154
public Builder setOffloadExecutorPool(ObjectPool<? extends Executor> offloadExecutorPool) {
143155
this.offloadExecutorPool = checkNotNull(offloadExecutorPool, "offloadExecutorPool");
144156
return this;

binder/src/main/java/io/grpc/binder/internal/BinderTransport.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -559,8 +559,10 @@ final void handleAcknowledgedBytes(long numBytes) {
559559
public static final class BinderClientTransport extends BinderTransport
560560
implements ConnectionClientTransport, Bindable.Observer {
561561

562+
private final ObjectPool<? extends Executor> transportExecutorPool;
562563
private final ObjectPool<? extends Executor> offloadExecutorPool;
563564
private final Executor offloadExecutor;
565+
private final Executor transportExecutor;
564566
private final SecurityPolicy securityPolicy;
565567
private final Bindable serviceBinding;
566568

@@ -598,8 +600,10 @@ public BinderClientTransport(
598600
factory.inboundParcelablePolicy),
599601
factory.binderDecorator,
600602
buildLogId(factory.sourceContext, targetAddress));
603+
this.transportExecutorPool = factory.transportExecutorPool;
601604
this.offloadExecutorPool = factory.offloadExecutorPool;
602605
this.securityPolicy = factory.securityPolicy;
606+
this.transportExecutor = transportExecutorPool.getObject();
603607
this.offloadExecutor = offloadExecutorPool.getObject();
604608
this.readyTimeoutMillis = factory.readyTimeoutMillis;
605609
numInUseStreams = new AtomicInteger();
@@ -621,13 +625,14 @@ public BinderClientTransport(
621625
@Override
622626
void releaseExecutors() {
623627
super.releaseExecutors();
628+
transportExecutorPool.returnObject(transportExecutor);
624629
offloadExecutorPool.returnObject(offloadExecutor);
625630
}
626631

627632
@Override
628633
public synchronized void onBound(IBinder binder) {
629634
sendSetupTransaction(
630-
binderDecorator.decorate(OneWayBinderProxy.wrap(binder, offloadExecutor)));
635+
binderDecorator.decorate(OneWayBinderProxy.wrap(binder, transportExecutor)));
631636
}
632637

633638
@Override
@@ -788,7 +793,7 @@ public void onFailure(Throwable t) {
788793
handleAuthResult(t);
789794
}
790795
},
791-
offloadExecutor);
796+
transportExecutor);
792797
}
793798
}
794799
}
@@ -797,7 +802,7 @@ private synchronized void handleAuthResult(IBinder binder, Status authorization)
797802
if (inState(TransportState.SETUP)) {
798803
if (!authorization.isOk()) {
799804
shutdownInternal(authorization, true);
800-
} else if (!setOutgoingBinder(OneWayBinderProxy.wrap(binder, offloadExecutor))) {
805+
} else if (!setOutgoingBinder(OneWayBinderProxy.wrap(binder, transportExecutor))) {
801806
shutdownInternal(
802807
Status.UNAVAILABLE.withDescription("Failed to observe outgoing binder"), true);
803808
} else {

0 commit comments

Comments
 (0)