2424import java .util .Collection ;
2525import java .util .List ;
2626import java .util .Map ;
27- import java .util .concurrent .CompletionStage ;
27+ import java .util .concurrent .CompletableFuture ;
2828import java .util .concurrent .ConcurrentLinkedQueue ;
2929import java .util .concurrent .TimeUnit ;
30+ import java .util .function .BiConsumer ;
3031import java .util .function .Function ;
3132import java .util .function .Supplier ;
3233import java .util .stream .Collectors ;
3334import org .astraea .app .argument .DurationField ;
3435import org .astraea .app .argument .IntegerMapField ;
3536import org .astraea .app .argument .NonNegativeIntegerField ;
37+ import org .astraea .common .Configuration ;
3638import org .astraea .common .Utils ;
3739import org .astraea .common .admin .Admin ;
38- import org .astraea .common .admin .NodeInfo ;
40+ import org .astraea .common .admin .Broker ;
3941import org .astraea .common .metrics .JndiClient ;
4042import org .astraea .common .metrics .MBeanClient ;
4143import org .astraea .common .metrics .collector .MetricSensor ;
4244import org .astraea .common .metrics .collector .MetricStore ;
4345
4446public class WebService implements AutoCloseable {
47+ public static final String METRIC_STORE_KEY = "metric.store" ;
48+ public static final String METRIC_STORE_LOCAL = "local" ;
49+ public static final String METRIC_STORE_TOPIC = "topic" ;
50+ public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers" ;
4551
4652 private final HttpServer server ;
4753 private final Admin admin ;
@@ -51,32 +57,48 @@ public WebService(
5157 Admin admin ,
5258 int port ,
5359 Function <Integer , Integer > brokerIdToJmxPort ,
54- Duration beanExpiration ) {
60+ Duration beanExpiration ,
61+ Configuration config ) {
5562 this .admin = admin ;
56- Supplier <CompletionStage < Map <Integer , MBeanClient >>> clientSupplier =
63+ Supplier <Map <MetricSensor , BiConsumer < Integer , Exception >>> sensorsSupplier =
5764 () ->
58- admin
59- .brokers ()
60- .thenApply (
61- brokers ->
62- brokers .stream ()
63- .collect (
64- Collectors .toUnmodifiableMap (
65- NodeInfo ::id ,
66- b ->
67- JndiClient .of (b .host (), brokerIdToJmxPort .apply (b .id ())))));
65+ sensors .metricSensors ().stream ()
66+ .distinct ()
67+ .collect (
68+ Collectors .toUnmodifiableMap (Function .identity (), ignored -> (id , ee ) -> {}));
69+
70+ List <MetricStore .Receiver > receivers =
71+ switch (config .string (METRIC_STORE_KEY ).orElse (METRIC_STORE_LOCAL )) {
72+ case METRIC_STORE_LOCAL -> {
73+ Function <List <Broker >, Map <Integer , MBeanClient >> asBeanClientMap =
74+ brokers ->
75+ brokers .stream ()
76+ .collect (
77+ Collectors .toUnmodifiableMap (
78+ Broker ::id ,
79+ b -> JndiClient .of (b .host (), brokerIdToJmxPort .apply (b .id ()))));
80+ yield List .of (
81+ MetricStore .Receiver .local (() -> admin .brokers ().thenApply (asBeanClientMap )));
82+ }
83+ case METRIC_STORE_TOPIC -> List .of (
84+ MetricStore .Receiver .topic (config .requireString (BOOTSTRAP_SERVERS_KEY )),
85+ MetricStore .Receiver .local (
86+ () -> CompletableFuture .completedStage (Map .of (-1 , JndiClient .local ()))));
87+ default -> throw new IllegalArgumentException (
88+ "unknown metric store type: "
89+ + config .string (METRIC_STORE_KEY )
90+ + ". use "
91+ + METRIC_STORE_LOCAL
92+ + " or "
93+ + METRIC_STORE_TOPIC );
94+ };
6895 var metricStore =
6996 MetricStore .builder ()
7097 .beanExpiration (beanExpiration )
71- .receivers (List .of (MetricStore .Receiver .local (clientSupplier )))
72- .sensorsSupplier (
73- () ->
74- sensors .metricSensors ().stream ()
75- .distinct ()
76- .collect (
77- Collectors .toUnmodifiableMap (
78- Function .identity (), ignored -> (id , ee ) -> {})))
98+ .receivers (receivers )
99+ .sensorsSupplier (sensorsSupplier )
79100 .build ();
101+
80102 server = Utils .packException (() -> HttpServer .create (new InetSocketAddress (port ), 0 ));
81103 server .createContext ("/topics" , to (new TopicHandler (admin )));
82104 server .createContext ("/groups" , to (new GroupHandler (admin )));
@@ -109,7 +131,11 @@ public static void main(String[] args) throws Exception {
109131 throw new IllegalArgumentException ("you must define either --jmx.port or --jmx.ports" );
110132 try (var service =
111133 new WebService (
112- Admin .of (arg .configs ()), arg .port , arg ::jmxPortMapping , arg .beanExpiration )) {
134+ Admin .of (arg .configs ()),
135+ arg .port ,
136+ arg ::jmxPortMapping ,
137+ arg .beanExpiration ,
138+ new Configuration (arg .configs ()))) {
113139 if (arg .ttl == null ) {
114140 System .out .println ("enter ctrl + c to terminate web service" );
115141 TimeUnit .MILLISECONDS .sleep (Long .MAX_VALUE );
0 commit comments