3434import io .grpc .ServerCallHandler ;
3535import io .grpc .ServerInterceptor ;
3636import io .grpc .internal .GrpcUtil ;
37- import io .grpc .xds . Filter . ServerInterceptorBuilder ;
37+ import io .grpc .internal . SharedResourceHolder ;
3838import io .grpc .xds .client .XdsLogger ;
3939import io .grpc .xds .client .XdsLogger .XdsLogLevel ;
4040import io .grpc .xds .internal .datatype .GrpcService ;
4646import io .grpc .xds .internal .rlqs .RlqsCache ;
4747import io .grpc .xds .internal .rlqs .RlqsFilterState ;
4848import io .grpc .xds .internal .rlqs .RlqsRateLimitResult ;
49+ import java .util .ConcurrentModificationException ;
4950import java .util .concurrent .ScheduledExecutorService ;
51+ import java .util .concurrent .atomic .AtomicBoolean ;
5052import java .util .concurrent .atomic .AtomicReference ;
53+ import java .util .logging .Logger ;
5154import javax .annotation .Nullable ;
5255
5356/** RBAC Http filter implementation. */
5457// TODO(sergiitk): introduce a layer between the filter and interceptor.
5558// lds has filter names and the names are unique - even for server instances.
56- final class RlqsFilter implements Filter , ServerInterceptorBuilder {
59+ final class RlqsFilter implements Filter {
5760 private final XdsLogger logger ;
5861
5962 static final boolean enabled = GrpcUtil .getFlag ("GRPC_EXPERIMENTAL_XDS_ENABLE_RLQS" , false );
@@ -62,70 +65,124 @@ final class RlqsFilter implements Filter, ServerInterceptorBuilder {
6265 // Do do not fail on parsing errors, only log requests.
6366 static final boolean dryRun = GrpcUtil .getFlag ("GRPC_EXPERIMENTAL_RLQS_DRY_RUN" , false );
6467
65- static final RlqsFilter INSTANCE = new RlqsFilter ();
66-
6768 static final String TYPE_URL = "type.googleapis.com/"
6869 + "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig" ;
6970 static final String TYPE_URL_OVERRIDE_CONFIG = "type.googleapis.com/"
7071 + "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride" ;
7172
73+ private final AtomicBoolean shutdown = new AtomicBoolean ();
7274 private final AtomicReference <RlqsCache > rlqsCache = new AtomicReference <>();
7375
74- public RlqsFilter () {
75- // TODO(sergiitk): one per new instance when filters are refactored.
76+ // TODO(sergiitk): [IMPL] figure out what to use here.
77+ private final ScheduledExecutorService scheduler =
78+ SharedResourceHolder .get (GrpcUtil .TIMER_SERVICE );
79+
80+ public RlqsFilter (String name ) {
7681 logger = XdsLogger .withLogId (InternalLogId .allocate (this .getClass (), null ));
7782 logger .log (XdsLogLevel .DEBUG ,
78- "Created RLQS Filter with enabled=" + enabled + " , dryRun=" + dryRun );
83+ "Created RLQS Filter name='%s' with enabled=%s , dryRun=%s" , name , enabled , dryRun );
7984 }
8085
81- @ Override
82- public String [] typeUrls () {
83- return new String []{TYPE_URL , TYPE_URL_OVERRIDE_CONFIG };
84- }
86+ static final class Provider implements Filter .Provider {
87+ private static final Logger logger = Logger .getLogger (Provider .class .getName ());
8588
86- @ Override
87- public boolean isEnabled () {
88- return enabled ;
89- }
89+ @ Override
90+ public String [] typeUrls () {
91+ return new String []{ TYPE_URL , TYPE_URL_OVERRIDE_CONFIG } ;
92+ }
9093
91- @ Override
92- public ConfigOrError <RlqsFilterConfig > parseFilterConfig (Message rawProtoMessage ) {
93- try {
94- RlqsFilterConfig rlqsFilterConfig =
95- parseRlqsFilter (unpackAny (rawProtoMessage , RateLimitQuotaFilterConfig .class ));
96- return ConfigOrError .fromConfig (rlqsFilterConfig );
97- } catch (InvalidProtocolBufferException e ) {
98- return ConfigOrError .fromError ("Can't unpack RateLimitQuotaFilterConfig proto: " + e );
99- } catch (ResourceInvalidException e ) {
100- return ConfigOrError .fromError (e .getMessage ());
94+ @ Override
95+ public boolean isServerFilter () {
96+ return true ;
97+ }
98+
99+ @ Override
100+ public RlqsFilter newInstance (String name ) {
101+ return new RlqsFilter (name );
102+ }
103+
104+ @ Override
105+ public ConfigOrError <RlqsFilterConfig > parseFilterConfig (Message rawProtoMessage ) {
106+ try {
107+ RlqsFilterConfig rlqsFilterConfig =
108+ parseRlqsFilter (unpackAny (rawProtoMessage , RateLimitQuotaFilterConfig .class ));
109+ return ConfigOrError .fromConfig (rlqsFilterConfig );
110+ } catch (InvalidProtocolBufferException e ) {
111+ return ConfigOrError .fromError ("Can't unpack RateLimitQuotaFilterConfig proto: " + e );
112+ } catch (ResourceInvalidException e ) {
113+ return ConfigOrError .fromError (e .getMessage ());
114+ }
115+ }
116+
117+ @ Override
118+ public ConfigOrError <RlqsFilterConfig > parseFilterConfigOverride (Message rawProtoMessage ) {
119+ try {
120+ RlqsFilterConfig rlqsFilterConfig =
121+ parseRlqsFilterOverride (unpackAny (rawProtoMessage , RateLimitQuotaOverride .class ));
122+ return ConfigOrError .fromConfig (rlqsFilterConfig );
123+ } catch (InvalidProtocolBufferException e ) {
124+ return ConfigOrError .fromError ("Can't unpack RateLimitQuotaOverride proto: " + e );
125+ } catch (ResourceInvalidException e ) {
126+ return ConfigOrError .fromError (e .getMessage ());
127+ }
128+ }
129+
130+ @ VisibleForTesting
131+ RlqsFilterConfig parseRlqsFilter (RateLimitQuotaFilterConfig rlqsFilterProto )
132+ throws ResourceInvalidException , InvalidProtocolBufferException {
133+ RlqsFilterConfig .Builder builder = RlqsFilterConfig .builder ();
134+ if (rlqsFilterProto .getDomain ().isEmpty ()) {
135+ throw new ResourceInvalidException ("RateLimitQuotaFilterConfig domain is required" );
136+ }
137+ builder .domain (rlqsFilterProto .getDomain ())
138+ .rlqsService (GrpcService .fromEnvoyProto (rlqsFilterProto .getRlqsServer ()));
139+
140+ // TODO(sergiitk): [IMPL] Remove
141+ if (dryRun ) {
142+ logger .finest ("RLQS DRY RUN: not parsing matchers" );
143+ return builder .build ();
144+ }
145+
146+ // TODO(sergiitk): [IMPL] actually parse, move to RlqsBucketSettings.fromProto()
147+ RateLimitQuotaBucketSettings fallbackBucketSettingsProto = unpackAny (
148+ rlqsFilterProto .getBucketMatchers ().getOnNoMatch ().getAction ().getTypedConfig (),
149+ RateLimitQuotaBucketSettings .class );
150+ RlqsBucketSettings fallbackBucket = RlqsBucketSettings .create (
151+ ImmutableMap .of ("bucket_id" , headers -> "hello" ),
152+ fallbackBucketSettingsProto .getReportingInterval ());
153+
154+ // TODO(sergiitk): [IMPL] actually parse, move to Matcher.fromProto()
155+ Matcher <HttpMatchInput , RlqsBucketSettings > bucketMatchers = new RlqsMatcher (fallbackBucket );
156+
157+ return builder .bucketMatchers (bucketMatchers ).build ();
101158 }
102159 }
103160
104161 @ Override
105- public ConfigOrError <RlqsFilterConfig > parseFilterConfigOverride (Message rawProtoMessage ) {
106- try {
107- RlqsFilterConfig rlqsFilterConfig =
108- parseRlqsFilterOverride (unpackAny (rawProtoMessage , RateLimitQuotaOverride .class ));
109- return ConfigOrError .fromConfig (rlqsFilterConfig );
110- } catch (InvalidProtocolBufferException e ) {
111- return ConfigOrError .fromError ("Can't unpack RateLimitQuotaOverride proto: " + e );
112- } catch (ResourceInvalidException e ) {
113- return ConfigOrError .fromError (e .getMessage ());
162+ public void close () {
163+ // TODO(sergiitk): [DESIGN] besides shutting down everything, should there
164+ // be per-route interceptor destructors?
165+ if (!shutdown .compareAndSet (false , true )) {
166+ throw new ConcurrentModificationException (
167+ "Unexpected: RlqsFilter#close called multiple times" );
168+ }
169+ RlqsCache oldCache = rlqsCache .getAndUpdate (unused -> null );
170+ if (oldCache != null ) {
171+ oldCache .shutdown ();
114172 }
115173 }
116174
175+ // @Override
176+ public boolean isEnabled () {
177+ return enabled ;
178+ }
179+
117180 @ Nullable
118181 @ Override
119182 public ServerInterceptor buildServerInterceptor (
120183 FilterConfig config , @ Nullable FilterConfig overrideConfig ) {
121- throw new UnsupportedOperationException ("ScheduledExecutorService scheduler required" );
122- }
184+ // ScheduledExecutorService scheduler
123185
124- @ Override
125- public ServerInterceptor buildServerInterceptor (
126- FilterConfig config ,
127- @ Nullable FilterConfig overrideConfig ,
128- ScheduledExecutorService scheduler ) {
129186 // Called when we get an xds update - when the LRS or RLS changes.
130187 RlqsFilterConfig rlqsFilterConfig = (RlqsFilterConfig ) checkNotNull (config , "config" );
131188
@@ -148,16 +205,6 @@ public ServerInterceptor buildServerInterceptor(
148205 return generateRlqsInterceptor (rlqsFilterConfig );
149206 }
150207
151- @ Override
152- public void shutdown () {
153- // TODO(sergiitk): [DESIGN] besides shutting down everything, should there
154- // be per-route interceptor destructors?
155- RlqsCache oldCache = rlqsCache .getAndUpdate (unused -> null );
156- if (oldCache != null ) {
157- oldCache .shutdown ();
158- }
159- }
160-
161208 @ Nullable
162209 private ServerInterceptor generateRlqsInterceptor (RlqsFilterConfig config ) {
163210 checkNotNull (config , "config" );
@@ -193,36 +240,6 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(
193240 };
194241 }
195242
196- @ VisibleForTesting
197- RlqsFilterConfig parseRlqsFilter (RateLimitQuotaFilterConfig rlqsFilterProto )
198- throws ResourceInvalidException , InvalidProtocolBufferException {
199- RlqsFilterConfig .Builder builder = RlqsFilterConfig .builder ();
200- if (rlqsFilterProto .getDomain ().isEmpty ()) {
201- throw new ResourceInvalidException ("RateLimitQuotaFilterConfig domain is required" );
202- }
203- builder .domain (rlqsFilterProto .getDomain ())
204- .rlqsService (GrpcService .fromEnvoyProto (rlqsFilterProto .getRlqsServer ()));
205-
206- // TODO(sergiitk): [IMPL] Remove
207- if (dryRun ) {
208- logger .log (XdsLogLevel .DEBUG , "Dry run: not parsing matchers in the filter filter" );
209- return builder .build ();
210- }
211-
212- // TODO(sergiitk): [IMPL] actually parse, move to RlqsBucketSettings.fromProto()
213- RateLimitQuotaBucketSettings fallbackBucketSettingsProto = unpackAny (
214- rlqsFilterProto .getBucketMatchers ().getOnNoMatch ().getAction ().getTypedConfig (),
215- RateLimitQuotaBucketSettings .class );
216- RlqsBucketSettings fallbackBucket = RlqsBucketSettings .create (
217- ImmutableMap .of ("bucket_id" , headers -> "hello" ),
218- fallbackBucketSettingsProto .getReportingInterval ());
219-
220- // TODO(sergiitk): [IMPL] actually parse, move to Matcher.fromProto()
221- Matcher <HttpMatchInput , RlqsBucketSettings > bucketMatchers = new RlqsMatcher (fallbackBucket );
222-
223- return builder .bucketMatchers (bucketMatchers ).build ();
224- }
225-
226243 static class RlqsMatcher extends Matcher <HttpMatchInput , RlqsBucketSettings > {
227244 private final RlqsBucketSettings fallbackBucket ;
228245
0 commit comments