Skip to content

Commit 03f41b9

Browse files
authored
support dynamic guarded route (#1478)
1 parent 645f55f commit 03f41b9

38 files changed

Lines changed: 316 additions & 111 deletions

File tree

incubator/binding-amqp/src/main/java/io/aklivity/zilla/runtime/binding/amqp/internal/config/AmqpRouteConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,23 @@
1515
*/
1616
package io.aklivity.zilla.runtime.binding.amqp.internal.config;
1717

18+
import static java.util.function.UnaryOperator.identity;
1819
import static java.util.stream.Collectors.toList;
1920

2021
import java.util.List;
21-
import java.util.function.LongPredicate;
22+
import java.util.function.UnaryOperator;
2223

2324
import io.aklivity.zilla.runtime.binding.amqp.config.AmqpConditionConfig;
2425
import io.aklivity.zilla.runtime.binding.amqp.internal.types.AmqpCapabilities;
2526
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
27+
import io.aklivity.zilla.runtime.engine.util.function.LongObjectPredicate;
2628

2729
public final class AmqpRouteConfig
2830
{
2931
public final long id;
3032

3133
private final List<AmqpConditionMatcher> when;
32-
private final LongPredicate authorized;
34+
private final LongObjectPredicate<UnaryOperator<String>> authorized;
3335

3436
public AmqpRouteConfig(
3537
RouteConfig route)
@@ -45,7 +47,7 @@ public AmqpRouteConfig(
4547
boolean authorized(
4648
long authorization)
4749
{
48-
return authorized.test(authorization);
50+
return authorized.test(authorization, identity());
4951
}
5052

5153
boolean matches(

incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/config/PgsqlKafkaRouteConfig.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.pgsql.kafka.internal.config;
1616

17-
import java.util.function.LongPredicate;
17+
import static java.util.function.UnaryOperator.identity;
18+
19+
import java.util.function.UnaryOperator;
1820

1921
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
22+
import io.aklivity.zilla.runtime.engine.util.function.LongObjectPredicate;
2023

2124
public final class PgsqlKafkaRouteConfig
2225
{
2326
public final long id;
2427
public final int order;
2528

26-
private final LongPredicate authorized;
29+
private final LongObjectPredicate<UnaryOperator<String>> authorized;
2730

2831
public PgsqlKafkaRouteConfig(
2932
RouteConfig route)
@@ -36,6 +39,6 @@ public PgsqlKafkaRouteConfig(
3639
boolean authorized(
3740
long authorization)
3841
{
39-
return authorized.test(authorization);
42+
return authorized.test(authorization, identity());
4043
}
4144
}

incubator/binding-pgsql/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/internal/config/PgsqlRouteConfig.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.pgsql.internal.config;
1616

17-
import java.util.function.LongPredicate;
17+
import static java.util.function.UnaryOperator.identity;
18+
19+
import java.util.function.UnaryOperator;
1820

1921
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
22+
import io.aklivity.zilla.runtime.engine.util.function.LongObjectPredicate;
2023

2124
public final class PgsqlRouteConfig
2225
{
2326
public final long id;
2427
public final int order;
2528

26-
private final LongPredicate authorized;
29+
private final LongObjectPredicate<UnaryOperator<String>> authorized;
2730

2831
public PgsqlRouteConfig(
2932
RouteConfig route)
@@ -36,6 +39,6 @@ public PgsqlRouteConfig(
3639
boolean authorized(
3740
long authorization)
3841
{
39-
return authorized.test(authorization);
42+
return authorized.test(authorization, identity());
4043
}
4144
}

incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveRouteConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,24 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.risingwave.internal.config;
1616

17+
import static java.util.function.UnaryOperator.identity;
1718
import static java.util.stream.Collectors.toList;
1819

1920
import java.util.List;
20-
import java.util.function.LongPredicate;
21+
import java.util.function.UnaryOperator;
2122

2223
import org.agrona.DirectBuffer;
2324

2425
import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveConditionConfig;
2526
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
27+
import io.aklivity.zilla.runtime.engine.util.function.LongObjectPredicate;
2628

2729
public final class RisingwaveRouteConfig
2830
{
2931
public final long id;
3032

3133
private final List<RisingwaveConditionMatcher> when;
32-
private final LongPredicate authorized;
34+
private final LongObjectPredicate<UnaryOperator<String>> authorized;
3335

3436
public RisingwaveRouteConfig(
3537
RouteConfig route)
@@ -45,7 +47,7 @@ public RisingwaveRouteConfig(
4547
boolean authorized(
4648
long authorization)
4749
{
48-
return authorized.test(authorization);
50+
return authorized.test(authorization, identity());
4951
}
5052

5153
boolean matches(

runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiRouteConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,22 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.asyncapi.internal.config;
1616

17+
import static java.util.function.UnaryOperator.identity;
1718
import static java.util.stream.Collectors.toList;
1819

1920
import java.util.List;
20-
import java.util.function.LongPredicate;
21+
import java.util.function.UnaryOperator;
2122

2223
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
24+
import io.aklivity.zilla.runtime.engine.util.function.LongObjectPredicate;
2325

2426
public final class AsyncapiRouteConfig
2527
{
2628
public final long id;
2729
public final List<AsyncapiConditionConfig> when;
2830
public final AsyncapiWithConfig with;
2931

30-
private final LongPredicate authorized;
32+
private final LongObjectPredicate<UnaryOperator<String>> authorized;
3133

3234
public AsyncapiRouteConfig(
3335
RouteConfig route)
@@ -43,7 +45,7 @@ public AsyncapiRouteConfig(
4345
boolean authorized(
4446
long authorization)
4547
{
46-
return authorized.test(authorization);
48+
return authorized.test(authorization, identity());
4749
}
4850

4951
boolean matches(

runtime/binding-fan/src/main/java/io/aklivity/zilla/runtime/binding/fan/internal/stream/FanServerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static io.aklivity.zilla.runtime.engine.budget.BudgetCreditor.NO_BUDGET_ID;
1919
import static io.aklivity.zilla.runtime.engine.budget.BudgetCreditor.NO_CREDITOR_INDEX;
2020
import static io.aklivity.zilla.runtime.engine.budget.BudgetDebitor.NO_DEBITOR_INDEX;
21+
import static java.util.function.UnaryOperator.identity;
2122

2223
import java.util.List;
2324
import java.util.concurrent.CopyOnWriteArrayList;
@@ -132,7 +133,7 @@ public MessageConsumer newStream(
132133
final long replyId = supplyReplyId.applyAsLong(initialId);
133134

134135
final RouteConfig route = binding.routes.stream()
135-
.filter(r -> r.authorized.test(authorization))
136+
.filter(r -> r.authorized.test(authorization, identity()))
136137
.findFirst()
137138
.orElse(null);
138139

runtime/binding-grpc-kafka/src/main/java/io/aklivity/zilla/runtime/binding/grpc/kafka/internal/config/GrpcKafkaRouteConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.grpc.kafka.internal.config;
1616

17+
import static java.util.function.UnaryOperator.identity;
1718
import static java.util.stream.Collectors.toList;
1819

1920
import java.util.List;
2021
import java.util.Map;
2122
import java.util.Optional;
2223
import java.util.function.LongFunction;
23-
import java.util.function.LongPredicate;
24+
import java.util.function.UnaryOperator;
2425
import java.util.regex.MatchResult;
2526
import java.util.stream.Collectors;
2627

@@ -31,6 +32,7 @@
3132
import io.aklivity.zilla.runtime.binding.grpc.kafka.internal.types.stream.GrpcMetadataFW;
3233
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
3334
import io.aklivity.zilla.runtime.engine.util.function.LongObjectBiFunction;
35+
import io.aklivity.zilla.runtime.engine.util.function.LongObjectPredicate;
3436

3537

3638
public final class GrpcKafkaRouteConfig
@@ -39,7 +41,7 @@ public final class GrpcKafkaRouteConfig
3941

4042
private final List<GrpcKafkaConditionMatcher> when;
4143
public final GrpcKafkaWithResolver with;
42-
private final LongPredicate authorized;
44+
private final LongObjectPredicate<UnaryOperator<String>> authorized;
4345

4446
public GrpcKafkaRouteConfig(
4547
GrpcKafkaOptionsConfig options,
@@ -72,7 +74,7 @@ public GrpcKafkaRouteConfig(
7274
boolean authorized(
7375
long authorization)
7476
{
75-
return authorized.test(authorization);
77+
return authorized.test(authorization, identity());
7678
}
7779

7880
boolean matches(

runtime/binding-grpc/src/main/java/io/aklivity/zilla/runtime/binding/grpc/internal/config/GrpcRouteConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,24 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.grpc.internal.config;
1616

17+
import static java.util.function.UnaryOperator.identity;
1718
import static java.util.stream.Collectors.toList;
1819

1920
import java.util.List;
20-
import java.util.function.LongPredicate;
21+
import java.util.function.UnaryOperator;
2122

2223
import io.aklivity.zilla.runtime.binding.grpc.config.GrpcConditionConfig;
2324
import io.aklivity.zilla.runtime.binding.grpc.internal.types.Array32FW;
2425
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.GrpcMetadataFW;
2526
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
27+
import io.aklivity.zilla.runtime.engine.util.function.LongObjectPredicate;
2628

2729
public final class GrpcRouteConfig
2830
{
2931
public final long id;
3032

3133
private final List<GrpcConditionMatcher> when;
32-
private final LongPredicate authorized;
34+
private final LongObjectPredicate<UnaryOperator<String>> authorized;
3335

3436
public GrpcRouteConfig(
3537
RouteConfig route)
@@ -45,7 +47,7 @@ public GrpcRouteConfig(
4547
boolean authorized(
4648
long authorization)
4749
{
48-
return authorized.test(authorization);
50+
return authorized.test(authorization, identity());
4951
}
5052

5153
boolean matches(

runtime/binding-http-filesystem/src/main/java/io/aklivity/zilla/runtime/binding/http/filesystem/internal/config/HttpFileSystemRouteConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,25 @@
1414
*/
1515
package io.aklivity.zilla.runtime.binding.http.filesystem.internal.config;
1616

17+
import static java.util.function.UnaryOperator.identity;
1718
import static java.util.stream.Collectors.toList;
1819

1920
import java.util.List;
2021
import java.util.Optional;
2122
import java.util.function.Consumer;
22-
import java.util.function.LongPredicate;
23+
import java.util.function.UnaryOperator;
2324

2425
import io.aklivity.zilla.runtime.binding.http.filesystem.config.HttpFileSystemConditionConfig;
2526
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
27+
import io.aklivity.zilla.runtime.engine.util.function.LongObjectPredicate;
2628

2729
public final class HttpFileSystemRouteConfig
2830
{
2931
public final long id;
3032
public final Optional<HttpFileSystemWithResolver> with;
3133

3234
private final List<HttpFileSystemConditionMatcher> when;
33-
private final LongPredicate authorized;
35+
private final LongObjectPredicate<UnaryOperator<String>> authorized;
3436

3537
public HttpFileSystemRouteConfig(
3638
RouteConfig route)
@@ -51,7 +53,7 @@ public HttpFileSystemRouteConfig(
5153
boolean authorized(
5254
long authorization)
5355
{
54-
return authorized.test(authorization);
56+
return authorized.test(authorization, identity());
5557
}
5658

5759
boolean matches(

runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaBindingConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public HttpKafkaRouteConfig resolve(
6666
CharSequence path = helper.path;
6767

6868
return routes.stream()
69-
.filter(r -> r.authorized(authorization) && r.matches(method, path))
69+
.filter(r -> r.authorized(authorization, method, path) && r.matches(method, path))
7070
.findFirst()
7171
.orElse(null);
7272
}

0 commit comments

Comments
 (0)