Skip to content

Commit ccba6d0

Browse files
committed
Create ext-proc channel using CachedChannelManager passing in the GrpcService proto and the GrpcServiceXdsContextProvider. Also added GrpcServiceXdsContextProvider in the newInstance method for Filter in Filter.Provider.
1 parent 87779af commit ccba6d0

16 files changed

+44
-69
lines changed

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22

33
import static com.google.common.base.Preconditions.checkNotNull;
44

5-
import com.google.common.annotations.VisibleForTesting;
65
import com.google.common.io.ByteStreams;
76
import com.google.protobuf.Any;
87
import com.google.protobuf.InvalidProtocolBufferException;
98
import com.google.protobuf.Message;
109
import io.envoyproxy.envoy.config.core.v3.GrpcService;
11-
import io.envoyproxy.envoy.config.core.v3.HeaderValueOption;
1210
import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor;
1311
import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ProcessingMode;
1412
import io.envoyproxy.envoy.service.ext_proc.v3.ExternalProcessorGrpc;
@@ -26,8 +24,11 @@
2624
import io.grpc.MethodDescriptor;
2725
import io.grpc.Status;
2826
import io.grpc.stub.ClientCallStreamObserver;
29-
import io.grpc.xds.internal.grpcservice.GrpcServiceChannelCreator;
30-
import io.grpc.xds.internal.grpcservice.GrpcServiceChannelCreatorImpl;
27+
import io.grpc.xds.internal.grpcservice.CachedChannelManager;
28+
import io.grpc.xds.internal.grpcservice.GrpcServiceConfig;
29+
import io.grpc.xds.internal.grpcservice.GrpcServiceConfigParser;
30+
import io.grpc.xds.internal.grpcservice.GrpcServiceParseException;
31+
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
3132
import java.io.ByteArrayInputStream;
3233
import java.io.IOException;
3334
import java.io.InputStream;
@@ -39,19 +40,16 @@ public class ExternalProcessorFilter implements Filter {
3940
static final String TYPE_URL = "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor";
4041

4142
final String filterInstanceName;
42-
// TODO: Make final after the need to replace with a mock from unit tests is removed.
43-
GrpcServiceChannelCreator grpcServiceChannelCreator;
4443
ManagedChannel grpcServiceChannel;
4544
ExternalProcessorGrpc.ExternalProcessorStub externalProcessorStub;
4645
private final Object lock = new Object();
47-
private GrpcService lastGrpcServiceConfig;
4846

4947
public ExternalProcessorFilter(String name) {
5048
filterInstanceName = checkNotNull(name, "name");
51-
grpcServiceChannelCreator = new GrpcServiceChannelCreatorImpl();
5249
}
5350

5451
static final class Provider implements Filter.Provider {
52+
private GrpcServiceXdsContextProvider grpcServiceXdsContextProvider;
5553
@Override
5654
public String[] typeUrls() {
5755
return new String[]{TYPE_URL};
@@ -63,7 +61,8 @@ public boolean isClientFilter() {
6361
}
6462

6563
@Override
66-
public ExternalProcessorFilter newInstance(String name) {
64+
public ExternalProcessorFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
65+
this.grpcServiceXdsContextProvider = grpcServiceXdsContextProvider;
6766
return new ExternalProcessorFilter(name);
6867
}
6968

@@ -87,7 +86,12 @@ public ConfigOrError<ExternalProcessorFilterConfig> parseFilterConfig(Message ra
8786
return ConfigOrError.fromError("Invalid response_body_mode: " + mode.getResponseBodyMode() + ". Only GRPC is supported.");
8887
}
8988

90-
return ConfigOrError.fromConfig(new ExternalProcessorFilterConfig(externalProcessor));
89+
try {
90+
GrpcServiceConfig grpcServiceConfig = GrpcServiceConfigParser.parse(externalProcessor.getGrpcService(), grpcServiceXdsContextProvider);
91+
return ConfigOrError.fromConfig(new ExternalProcessorFilterConfig(externalProcessor, grpcServiceConfig));
92+
} catch (GrpcServiceParseException e) {
93+
return ConfigOrError.fromError("Error parsing GrpcService config: " + e.getMessage());
94+
}
9195
}
9296

9397
@Override
@@ -103,31 +107,14 @@ public ClientInterceptor buildClientInterceptor(FilterConfig filterConfig,
103107
return new ExternalProcessorInterceptor(this, (ExternalProcessorFilterConfig) filterConfig, overrideConfig, scheduler);
104108
}
105109

106-
ExternalProcessorGrpc.ExternalProcessorStub getExternalProcessorStub(ExternalProcessor config) {
107-
GrpcService newServiceConfig = config.getGrpcService();
108-
synchronized (lock) {
109-
// TODO: gRFC only mentions we should recreate channel if target or channel creds changed
110-
// but other fields in grpc service config also do seem relevant to warrant channel
111-
// recreation.
112-
if (grpcServiceChannel == null || !newServiceConfig.equals(lastGrpcServiceConfig)) {
113-
if (grpcServiceChannel != null) {
114-
// Shutdown the old channel if the config has changed
115-
grpcServiceChannel.shutdown();
116-
}
117-
grpcServiceChannel = grpcServiceChannelCreator.create(newServiceConfig);
118-
externalProcessorStub = ExternalProcessorGrpc.newStub(grpcServiceChannel);
119-
lastGrpcServiceConfig = newServiceConfig;
120-
}
121-
return externalProcessorStub;
122-
}
123-
}
124-
125110
static final class ExternalProcessorFilterConfig implements FilterConfig {
126111

127112
private final ExternalProcessor externalProcessor;
113+
private final GrpcServiceConfig grpcServiceConfig;
128114

129-
ExternalProcessorFilterConfig(ExternalProcessor externalProcessor) {
115+
ExternalProcessorFilterConfig(ExternalProcessor externalProcessor, GrpcServiceConfig grpcServiceConfig) {
130116
this.externalProcessor = externalProcessor;
117+
this.grpcServiceConfig = grpcServiceConfig;
131118
}
132119

133120
@Override
@@ -137,6 +124,7 @@ public String typeUrl() {
137124
}
138125

139126
static final class ExternalProcessorInterceptor implements ClientInterceptor {
127+
private final CachedChannelManager cachedChannelManager = new CachedChannelManager();
140128
private final ExternalProcessorFilter filter;
141129
private final ExternalProcessorFilterConfig filterConfig;
142130
private final FilterConfig overrideConfig;
@@ -164,7 +152,8 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
164152
MethodDescriptor<ReqT, RespT> method,
165153
CallOptions callOptions,
166154
Channel next) {
167-
ExternalProcessorGrpc.ExternalProcessorStub stub = filter.getExternalProcessorStub(filterConfig.externalProcessor);
155+
ExternalProcessorGrpc.ExternalProcessorStub stub = ExternalProcessorGrpc.newStub(
156+
cachedChannelManager.getChannel(filterConfig.grpcServiceConfig));
168157
ExternalProcessor config = filterConfig.externalProcessor;
169158

170159
MethodDescriptor<InputStream, InputStream> rawMethod = method.toBuilder(RAW_MARSHALLER, RAW_MARSHALLER).build();

xds/src/main/java/io/grpc/xds/FaultFilter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import io.grpc.xds.FaultConfig.FaultAbort;
4747
import io.grpc.xds.FaultConfig.FaultDelay;
4848
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
49+
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
4950
import java.util.Locale;
5051
import java.util.concurrent.Executor;
5152
import java.util.concurrent.ScheduledExecutorService;
@@ -99,7 +100,7 @@ public boolean isClientFilter() {
99100
}
100101

101102
@Override
102-
public FaultFilter newInstance(String name) {
103+
public FaultFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
103104
return INSTANCE;
104105
}
105106

xds/src/main/java/io/grpc/xds/Filter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.protobuf.Message;
2121
import io.grpc.ClientInterceptor;
2222
import io.grpc.ServerInterceptor;
23+
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
2324
import java.io.Closeable;
2425
import java.util.Objects;
2526
import java.util.concurrent.ScheduledExecutorService;
@@ -87,7 +88,7 @@ default boolean isServerFilter() {
8788
* <li>Filter name+typeUrl in FilterChain's HCM.http_filters.</li>
8889
* </ol>
8990
*/
90-
Filter newInstance(String name);
91+
Filter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider);
9192

9293
/**
9394
* Parses the top-level filter config from raw proto message. The message may be either a {@link

xds/src/main/java/io/grpc/xds/GcpAuthenticationFilter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import io.grpc.xds.MetadataRegistry.MetadataValueParser;
4646
import io.grpc.xds.XdsConfig.XdsClusterConfig;
4747
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
48+
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
4849
import java.util.LinkedHashMap;
4950
import java.util.Map;
5051
import java.util.concurrent.ScheduledExecutorService;
@@ -81,7 +82,7 @@ public boolean isClientFilter() {
8182
}
8283

8384
@Override
84-
public GcpAuthenticationFilter newInstance(String name) {
85+
public GcpAuthenticationFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
8586
return new GcpAuthenticationFilter(name, cacheSize);
8687
}
8788

xds/src/main/java/io/grpc/xds/InternalRbacFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public static ServerInterceptor createInterceptor(RBAC rbac) {
3333
throw new IllegalArgumentException(
3434
String.format("Failed to parse Rbac policy: %s", filterConfig.errorDetail));
3535
}
36-
return new RbacFilter.Provider().newInstance("internalRbacFilter")
36+
return new RbacFilter.Provider().newInstance("internalRbacFilter", null)
3737
.buildServerInterceptor(filterConfig.config, null);
3838
}
3939
}

xds/src/main/java/io/grpc/xds/RbacFilter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.grpc.Status;
3636
import io.grpc.xds.internal.MatcherParser;
3737
import io.grpc.xds.internal.Matchers;
38+
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
3839
import io.grpc.xds.internal.rbac.engine.GrpcAuthorizationEngine;
3940
import io.grpc.xds.internal.rbac.engine.GrpcAuthorizationEngine.AlwaysTrueMatcher;
4041
import io.grpc.xds.internal.rbac.engine.GrpcAuthorizationEngine.AndMatcher;
@@ -89,7 +90,7 @@ public boolean isServerFilter() {
8990
}
9091

9192
@Override
92-
public RbacFilter newInstance(String name) {
93+
public RbacFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
9394
return INSTANCE;
9495
}
9596

xds/src/main/java/io/grpc/xds/RouterFilter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.xds;
1818

1919
import com.google.protobuf.Message;
20+
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
2021

2122
/**
2223
* Router filter implementation. Currently this filter does not parse any field in the config.
@@ -56,7 +57,7 @@ public boolean isServerFilter() {
5657
}
5758

5859
@Override
59-
public RouterFilter newInstance(String name) {
60+
public RouterFilter newInstance(String name, GrpcServiceXdsContextProvider grpcServiceXdsContextProvider) {
6061
return INSTANCE;
6162
}
6263

xds/src/main/java/io/grpc/xds/XdsNameResolver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ private void updateActiveFilters(@Nullable List<NamedFilterConfig> filterConfigs
675675
Filter.Provider provider = filterRegistry.get(typeUrl);
676676
checkNotNull(provider, "provider %s", typeUrl);
677677
Filter filter = activeFilters.computeIfAbsent(
678-
filterKey, k -> provider.newInstance(namedFilter.name));
678+
filterKey, k -> provider.newInstance(namedFilter.name, null));
679679
checkNotNull(filter, "filter %s", filterKey);
680680
filtersToShutdown.remove(filterKey);
681681
}

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,7 @@ private void updateActiveFiltersForChain(
612612
Filter.Provider provider = filterRegistry.get(typeUrl);
613613
checkNotNull(provider, "provider %s", typeUrl);
614614
Filter filter = chainFilters.computeIfAbsent(
615-
filterKey, k -> provider.newInstance(namedFilter.name));
615+
filterKey, k -> provider.newInstance(namedFilter.name, null));
616616
checkNotNull(filter, "filter %s", filterKey);
617617
filtersToShutdown.remove(filterKey);
618618
}

xds/src/main/java/io/grpc/xds/internal/grpcservice/GrpcServiceChannelCreator.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)