11/*
2- * Copyright 2021 The gRPC Authors
2+ * Copyright 2026 The gRPC Authors
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1616
1717package io .grpc .xds ;
1818
19+ import com .github .udpa .udpa .type .v1 .TypedStruct ;
1920import com .github .xds .type .matcher .v3 .Matcher ;
2021import com .google .common .base .Preconditions ;
2122import com .google .protobuf .Any ;
@@ -112,112 +113,129 @@ public ConfigOrError<CompositeFilterConfig> parseFilterConfig(Message rawProtoMe
112113 return ConfigOrError .fromError ("Composite Filter is experimental "
113114 + "and disabled by default." );
114115 }
115- return parseConfig (rawProtoMessage );
116+ if (!(rawProtoMessage instanceof Any )) {
117+ return ConfigOrError .fromError ("Invalid message type: "
118+ + rawProtoMessage .getClass ().getName ());
119+ }
120+ try {
121+ Any any = (Any ) rawProtoMessage ;
122+ if (any .is (ExtensionWithMatcher .class )) {
123+ ExtensionWithMatcher proto = any .unpack (ExtensionWithMatcher .class );
124+ return parseMatcherConfig (proto .getXdsMatcher ());
125+ } else if (any .is (Composite .class )) {
126+ return ConfigOrError .fromConfig (new CompositeFilterConfig (null ));
127+ }
128+ } catch (InvalidProtocolBufferException e ) {
129+ return ConfigOrError .fromError ("Invalid proto: " + e );
130+ }
131+ return ConfigOrError .fromError ("Unsupported message type in parseFilterConfig" );
116132 }
117133
118134 @ Override
119135 public ConfigOrError <CompositeFilterConfig > parseFilterConfigOverride (Message rawProtoMessage ) {
120136 if (!isSupported ()) {
121- return ConfigOrError .fromError ("Composite Filter is experimental and disabled by default." );
137+ return ConfigOrError .fromError ("Composite Filter is experimental and disabled"
138+ + " by default." );
122139 }
123- return parseConfig (rawProtoMessage );
124- }
125-
126- private boolean isSupported () {
127- return GrpcUtil .getFlag ("GRPC_EXPERIMENTAL_XDS_COMPOSITE_FILTER" ,
128- false );
129- }
130-
131- private ConfigOrError <CompositeFilterConfig > parseConfig (Message rawProtoMessage ) {
132- Matcher matcherProto = null ;
133- if (rawProtoMessage instanceof Any ) {
134- try {
135- Any any = (Any ) rawProtoMessage ;
136- if (any .is (ExtensionWithMatcher .class )) {
137- ExtensionWithMatcher proto = any .unpack (ExtensionWithMatcher .class );
138- matcherProto = proto .getXdsMatcher ();
139- } else if (any .is (ExtensionWithMatcherPerRoute .class )) {
140- ExtensionWithMatcherPerRoute proto = any .unpack (ExtensionWithMatcherPerRoute .class );
141- matcherProto = proto .getXdsMatcher ();
142- } else if (any .is (Composite .class )) {
143- return ConfigOrError .fromConfig (new CompositeFilterConfig (null ));
144- }
145- } catch (InvalidProtocolBufferException e ) {
146- return ConfigOrError .fromError ("Invalid proto: " + e );
140+ if (!(rawProtoMessage instanceof Any )) {
141+ return ConfigOrError .fromError ("Invalid message type: "
142+ + rawProtoMessage .getClass ().getName ());
143+ }
144+ try {
145+ Any any = (Any ) rawProtoMessage ;
146+ if (any .is (ExtensionWithMatcherPerRoute .class )) {
147+ ExtensionWithMatcherPerRoute proto = any .unpack (ExtensionWithMatcherPerRoute .class );
148+ return parseMatcherConfig (proto .getXdsMatcher ());
147149 }
150+ } catch (InvalidProtocolBufferException e ) {
151+ return ConfigOrError .fromError ("Invalid proto: " + e );
148152 }
153+ return ConfigOrError .fromError ("Unsupported message type in "
154+ + "parseFilterConfigOverride" );
155+ }
149156
157+ private ConfigOrError <CompositeFilterConfig > parseMatcherConfig (
158+ @ Nullable Matcher matcherProto ) {
150159 if (matcherProto == null ) {
151160 return ConfigOrError .fromConfig (new CompositeFilterConfig (null ));
152161 }
153162
154163 try {
155164 UnifiedMatcher <FilterDelegate > matcher = UnifiedMatcher .create (matcherProto ,
156- (com .github .xds .core .v3 .TypedExtensionConfig config ) -> {
157- try {
158- Any actionAny = config .getTypedConfig ();
159- if (actionAny .is (ExecuteFilterAction .class )) {
160- ExecuteFilterAction executeAction = actionAny .unpack (ExecuteFilterAction .class );
161- FractionalPercent samplePercent = executeAction .hasSamplePercent ()
162- ? executeAction .getSamplePercent ().getDefaultValue ()
163- : null ;
164- List <TypedExtensionConfig > childConfigs = new ArrayList <>();
165-
166- if (executeAction .hasFilterChain ()) {
167- childConfigs .addAll (executeAction .getFilterChain ().getTypedConfigList ());
168- } else if (executeAction .hasTypedConfig ()) {
169- childConfigs .add (executeAction .getTypedConfig ());
170- }
171-
172- if (!childConfigs .isEmpty ()) {
173- List <DelegateEntry > delegates = new ArrayList <>();
174- for (TypedExtensionConfig childFilterConfig : childConfigs ) {
175- String typeUrl = childFilterConfig .getTypedConfig ().getTypeUrl ();
176- Message rawConfig = childFilterConfig .getTypedConfig ();
177-
178- try {
179- if (typeUrl .equals ("type.googleapis.com/udpa.type.v1.TypedStruct" )) {
180- com .github .udpa .udpa .type .v1 .TypedStruct typedStruct = childFilterConfig
181- .getTypedConfig ()
182- .unpack (com .github .udpa .udpa .type .v1 .TypedStruct .class );
183- typeUrl = typedStruct .getTypeUrl ();
184- rawConfig = typedStruct .getValue ();
185- } else if (typeUrl .equals ("type.googleapis.com/xds.type.v3.TypedStruct" )) {
186- com .github .xds .type .v3 .TypedStruct typedStruct = childFilterConfig
187- .getTypedConfig ()
188- .unpack (com .github .xds .type .v3 .TypedStruct .class );
189- typeUrl = typedStruct .getTypeUrl ();
190- rawConfig = typedStruct .getValue ();
191- }
192- } catch (InvalidProtocolBufferException e ) {
193- throw new IllegalArgumentException ("Failed to unpack TypedStruct" , e );
194- }
195-
196- Filter .Provider provider = FilterRegistry .getDefaultRegistry ().get (typeUrl );
197- if (provider == null ) {
198- throw new IllegalArgumentException ("Action filter not found: " + typeUrl );
199- }
200- ConfigOrError <? extends FilterConfig > parsed = provider
201- .parseFilterConfig (rawConfig );
202- if (parsed .errorDetail != null ) {
203- throw new IllegalArgumentException (
204- "Failed to parse child filter: " + parsed .errorDetail );
205- }
206- delegates .add (new DelegateEntry (provider , parsed .config ));
207- }
208- return new FilterDelegate (delegates , samplePercent );
209- }
210- }
211- } catch (InvalidProtocolBufferException e ) {
212- throw new RuntimeException (e );
213- }
214- return null ;
215- });
165+ Provider ::createFilterDelegate );
216166 return ConfigOrError .fromConfig (new CompositeFilterConfig (matcher ));
217167 } catch (Exception e ) {
218168 return ConfigOrError .fromError ("Failed to create matcher: " + e .getMessage ());
219169 }
220170 }
171+
172+ private boolean isSupported () {
173+ return GrpcUtil .getFlag ("GRPC_EXPERIMENTAL_XDS_COMPOSITE_FILTER" , false );
174+ }
175+
176+ private static FilterDelegate createFilterDelegate (
177+ com .github .xds .core .v3 .TypedExtensionConfig config ) {
178+ try {
179+ Any actionAny = config .getTypedConfig ();
180+ if (actionAny .is (ExecuteFilterAction .class )) {
181+ ExecuteFilterAction executeAction = actionAny .unpack (ExecuteFilterAction .class );
182+ FractionalPercent samplePercent = executeAction .hasSamplePercent ()
183+ ? executeAction .getSamplePercent ().getDefaultValue ()
184+ : null ;
185+ List <TypedExtensionConfig > childConfigs = new ArrayList <>();
186+
187+ if (executeAction .hasFilterChain ()) {
188+ childConfigs .addAll (executeAction .getFilterChain ().getTypedConfigList ());
189+ } else if (executeAction .hasTypedConfig ()) {
190+ childConfigs .add (executeAction .getTypedConfig ());
191+ }
192+
193+ if (childConfigs .isEmpty ()) {
194+ return null ;
195+ }
196+
197+ List <DelegateEntry > delegates = new ArrayList <>();
198+ for (TypedExtensionConfig childFilterConfig : childConfigs ) {
199+ String typeUrl = childFilterConfig .getTypedConfig ().getTypeUrl ();
200+ Message rawConfig = childFilterConfig .getTypedConfig ();
201+
202+ try {
203+ if (typeUrl .equals ("type.googleapis.com/udpa.type.v1.TypedStruct" )) {
204+ TypedStruct typedStruct = childFilterConfig
205+ .getTypedConfig ()
206+ .unpack (TypedStruct .class );
207+ typeUrl = typedStruct .getTypeUrl ();
208+ rawConfig = typedStruct .getValue ();
209+ } else if (typeUrl .equals ("type.googleapis.com/xds.type.v3.TypedStruct" )) {
210+ TypedStruct typedStruct = childFilterConfig
211+ .getTypedConfig ()
212+ .unpack (TypedStruct .class );
213+ typeUrl = typedStruct .getTypeUrl ();
214+ rawConfig = typedStruct .getValue ();
215+ }
216+ } catch (InvalidProtocolBufferException e ) {
217+ throw new IllegalArgumentException ("Failed to unpack TypedStruct" , e );
218+ }
219+
220+ Filter .Provider provider = FilterRegistry .getDefaultRegistry ().get (typeUrl );
221+ if (provider == null ) {
222+ throw new IllegalArgumentException ("Action filter not found: " + typeUrl );
223+ }
224+ ConfigOrError <? extends FilterConfig > parsed = provider
225+ .parseFilterConfig (rawConfig );
226+ if (parsed .errorDetail != null ) {
227+ throw new IllegalArgumentException (
228+ "Failed to parse child filter: " + parsed .errorDetail );
229+ }
230+ delegates .add (new DelegateEntry (provider , parsed .config , childFilterConfig .getName ()));
231+ }
232+ return new FilterDelegate (delegates , samplePercent );
233+ }
234+ } catch (InvalidProtocolBufferException e ) {
235+ throw new RuntimeException (e );
236+ }
237+ return null ;
238+ }
221239 }
222240
223241 static final class CompositeFilterConfig implements FilterConfig {
@@ -236,44 +254,55 @@ public String typeUrl() {
236254
237255 static final class FilterDelegate {
238256 final List <DelegateEntry > delegates ;
239- @ Nullable
240- final FractionalPercent samplePercent ;
257+ private final double threshold ;
241258
242259 FilterDelegate (List <DelegateEntry > delegates , @ Nullable FractionalPercent samplePercent ) {
243260 this .delegates = Collections .unmodifiableList (delegates );
244- this .samplePercent = samplePercent ;
261+ this .threshold = calculateThreshold ( samplePercent ) ;
245262 }
246263
247- boolean shouldExecute ( ) {
264+ private static double calculateThreshold ( @ Nullable FractionalPercent samplePercent ) {
248265 if (samplePercent == null ) {
249- return true ;
266+ return 1.0 ;
250267 }
251- int numerator = samplePercent .getNumerator ();
252- int denominator ;
268+ double numerator = samplePercent .getNumerator ();
269+ double denominator ;
253270 switch (samplePercent .getDenominator ()) {
254271 case HUNDRED :
255- denominator = 100 ;
272+ denominator = 100.0 ;
256273 break ;
257274 case TEN_THOUSAND :
258- denominator = 10000 ;
275+ denominator = 10000.0 ;
259276 break ;
260277 case MILLION :
261- denominator = 1000000 ;
278+ denominator = 1000000.0 ;
262279 break ;
263280 default :
264- denominator = 100 ;
281+ denominator = 100.0 ;
265282 }
266- return ThreadLocalRandom .current ().nextInt (denominator ) < numerator ;
283+ return numerator / denominator ;
284+ }
285+
286+ boolean shouldExecute () {
287+ if (threshold >= 1.0 ) {
288+ return true ;
289+ }
290+ if (threshold <= 0.0 ) {
291+ return false ;
292+ }
293+ return ThreadLocalRandom .current ().nextDouble () < threshold ;
267294 }
268295 }
269296
270297 static final class DelegateEntry {
271298 final Filter .Provider provider ;
272299 final FilterConfig config ;
300+ final String name ;
273301
274- DelegateEntry (Filter .Provider provider , FilterConfig config ) {
302+ DelegateEntry (Filter .Provider provider , FilterConfig config , String name ) {
275303 this .provider = provider ;
276304 this .config = config ;
305+ this .name = name ;
277306 }
278307 }
279308
@@ -323,7 +352,7 @@ public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(
323352 continue ;
324353 }
325354 for (DelegateEntry entry : delegate .delegates ) {
326- Filter filter = entry .provider .newInstance ("composite_child" );
355+ Filter filter = entry .provider .newInstance (entry . name );
327356 filters .add (filter );
328357 ServerInterceptor interceptor = filter .buildServerInterceptor (entry .config , null );
329358 if (interceptor != null ) {
@@ -477,9 +506,7 @@ private static class CompositeClientCall<ReqT, RespT> extends io.grpc.ClientCall
477506 private final io .grpc .Channel next ;
478507 private final UnifiedMatcher <FilterDelegate > matcher ;
479508 private final ScheduledExecutorService scheduler ;
480-
481509 private io .grpc .ClientCall <ReqT , RespT > delegate ;
482- private final java .util .List <Runnable > pendingEvents = new java .util .ArrayList <>();
483510 private boolean started ;
484511 private boolean cancelled ;
485512 private String cancelMessage ;
@@ -515,7 +542,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
515542 continue ;
516543 }
517544 for (DelegateEntry entry : filterDelegate .delegates ) {
518- Filter filter = entry .provider .newInstance ("composite_child" );
545+ Filter filter = entry .provider .newInstance (entry . name );
519546 filters .add (filter );
520547 ClientInterceptor interceptor = filter .buildClientInterceptor (entry .config , null ,
521548 scheduler );
@@ -559,23 +586,15 @@ public void onClose(Status status, Metadata trailers) {
559586
560587 delegate .start (responseListener , headers );
561588
562- for (Runnable r : pendingEvents ) {
563- r .run ();
564- }
565- pendingEvents .clear ();
566-
567589 if (cancelled ) {
568590 delegate .cancel (cancelMessage , cancelCause );
569591 }
570592 }
571593
572594 @ Override
573595 public void request (int numMessages ) {
574- if (delegate != null ) {
575- delegate .request (numMessages );
576- } else {
577- pendingEvents .add (() -> delegate .request (numMessages ));
578- }
596+ checkDelegate ();
597+ delegate .request (numMessages );
579598 }
580599
581600 @ Override
@@ -591,29 +610,25 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) {
591610
592611 @ Override
593612 public void halfClose () {
594- if (delegate != null ) {
595- delegate .halfClose ();
596- } else {
597- pendingEvents .add (() -> delegate .halfClose ());
598- }
613+ checkDelegate ();
614+ delegate .halfClose ();
599615 }
600616
601617 @ Override
602618 public void sendMessage (ReqT message ) {
603- if (delegate != null ) {
604- delegate .sendMessage (message );
605- } else {
606- pendingEvents .add (() -> delegate .sendMessage (message ));
607- }
619+ checkDelegate ();
620+ delegate .sendMessage (message );
608621 }
609622
610623 @ Override
611624 public void setMessageCompression (boolean enabled ) {
612- if (delegate != null ) {
613- delegate .setMessageCompression (enabled );
614- } else {
615- pendingEvents .add (() -> delegate .setMessageCompression (enabled ));
616- }
625+ checkDelegate ();
626+ delegate .setMessageCompression (enabled );
627+ }
628+
629+ private void checkDelegate () {
630+ Preconditions .checkState (delegate != null ,
631+ "Not started" );
617632 }
618633 }
619634}
0 commit comments