Skip to content

Commit 28ec2c6

Browse files
[METRICS] Migrate topic collector to balancer, partitioiner (#1765)
1 parent 9613d6b commit 28ec2c6

5 files changed

Lines changed: 257 additions & 40 deletions

File tree

app/src/main/java/org/astraea/app/web/WebService.java

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,31 @@
2424
import java.util.Collection;
2525
import java.util.List;
2626
import java.util.Map;
27-
import java.util.concurrent.CompletionStage;
27+
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.ConcurrentLinkedQueue;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.function.BiConsumer;
3031
import java.util.function.Function;
3132
import java.util.function.Supplier;
3233
import java.util.stream.Collectors;
3334
import org.astraea.app.argument.DurationField;
3435
import org.astraea.app.argument.IntegerMapField;
3536
import org.astraea.app.argument.NonNegativeIntegerField;
37+
import org.astraea.common.Configuration;
3638
import org.astraea.common.Utils;
3739
import org.astraea.common.admin.Admin;
40+
import org.astraea.common.admin.Broker;
3841
import org.astraea.common.admin.NodeInfo;
3942
import org.astraea.common.metrics.JndiClient;
4043
import org.astraea.common.metrics.MBeanClient;
4144
import org.astraea.common.metrics.collector.MetricSensor;
4245
import org.astraea.common.metrics.collector.MetricStore;
4346

4447
public class WebService implements AutoCloseable {
48+
public static final String METRIC_STORE_KEY = "metric.store";
49+
public static final String METRIC_STORE_LOCAL = "local";
50+
public static final String METRIC_STORE_TOPIC = "topic";
51+
public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
4552

4653
private final HttpServer server;
4754
private final Admin admin;
@@ -51,32 +58,48 @@ public WebService(
5158
Admin admin,
5259
int port,
5360
Function<Integer, Integer> brokerIdToJmxPort,
54-
Duration beanExpiration) {
61+
Duration beanExpiration,
62+
Configuration config) {
5563
this.admin = admin;
56-
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier =
64+
Supplier<Map<MetricSensor, BiConsumer<Integer, Exception>>> sensorsSupplier =
5765
() ->
58-
admin
59-
.brokers()
60-
.thenApply(
61-
brokers ->
62-
brokers.stream()
63-
.collect(
64-
Collectors.toUnmodifiableMap(
65-
NodeInfo::id,
66-
b ->
67-
JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id())))));
66+
sensors.metricSensors().stream()
67+
.distinct()
68+
.collect(
69+
Collectors.toUnmodifiableMap(Function.identity(), ignored -> (id, ee) -> {}));
70+
71+
List<MetricStore.Receiver> receivers =
72+
switch (config.string(METRIC_STORE_KEY).orElse(METRIC_STORE_LOCAL)) {
73+
case METRIC_STORE_LOCAL -> {
74+
Function<List<Broker>, Map<Integer, MBeanClient>> asBeanClientMap =
75+
brokers ->
76+
brokers.stream()
77+
.collect(
78+
Collectors.toUnmodifiableMap(
79+
NodeInfo::id,
80+
b -> JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id()))));
81+
yield List.of(
82+
MetricStore.Receiver.local(() -> admin.brokers().thenApply(asBeanClientMap)));
83+
}
84+
case METRIC_STORE_TOPIC -> List.of(
85+
MetricStore.Receiver.topic(config.requireString(BOOTSTRAP_SERVERS_KEY)),
86+
MetricStore.Receiver.local(
87+
() -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local()))));
88+
default -> throw new IllegalArgumentException(
89+
"unknown metric store type: "
90+
+ config.string(METRIC_STORE_KEY)
91+
+ ". use "
92+
+ METRIC_STORE_LOCAL
93+
+ " or "
94+
+ METRIC_STORE_TOPIC);
95+
};
6896
var metricStore =
6997
MetricStore.builder()
7098
.beanExpiration(beanExpiration)
71-
.receivers(List.of(MetricStore.Receiver.local(clientSupplier)))
72-
.sensorsSupplier(
73-
() ->
74-
sensors.metricSensors().stream()
75-
.distinct()
76-
.collect(
77-
Collectors.toUnmodifiableMap(
78-
Function.identity(), ignored -> (id, ee) -> {})))
99+
.receivers(receivers)
100+
.sensorsSupplier(sensorsSupplier)
79101
.build();
102+
80103
server = Utils.packException(() -> HttpServer.create(new InetSocketAddress(port), 0));
81104
server.createContext("/topics", to(new TopicHandler(admin)));
82105
server.createContext("/groups", to(new GroupHandler(admin)));
@@ -109,7 +132,11 @@ public static void main(String[] args) throws Exception {
109132
throw new IllegalArgumentException("you must define either --jmx.port or --jmx.ports");
110133
try (var service =
111134
new WebService(
112-
Admin.of(arg.configs()), arg.port, arg::jmxPortMapping, arg.beanExpiration)) {
135+
Admin.of(arg.configs()),
136+
arg.port,
137+
arg::jmxPortMapping,
138+
arg.beanExpiration,
139+
new Configuration(arg.configs()))) {
113140
if (arg.ttl == null) {
114141
System.out.println("enter ctrl + c to terminate web service");
115142
TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE);

app/src/test/java/org/astraea/app/web/TopicHandlerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ThreadLocalRandom;
2828
import java.util.stream.Collectors;
2929
import java.util.stream.IntStream;
30+
import org.astraea.common.Configuration;
3031
import org.astraea.common.Utils;
3132
import org.astraea.common.admin.Admin;
3233
import org.astraea.common.admin.TopicPartition;
@@ -69,7 +70,8 @@ void testWithWebService() {
6970
Admin.of(SERVICE.bootstrapServers()),
7071
0,
7172
id -> SERVICE.jmxServiceURL().getPort(),
72-
Duration.ofMillis(5))) {
73+
Duration.ofMillis(5),
74+
Configuration.EMPTY)) {
7375
Response<TopicHandler.Topics> response =
7476
HttpExecutor.builder()
7577
.build()

app/src/test/java/org/astraea/app/web/WebServiceTest.java

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,18 @@
1717
package org.astraea.app.web;
1818

1919
import java.time.Duration;
20+
import java.util.Map;
2021
import java.util.concurrent.ThreadLocalRandom;
22+
import java.util.concurrent.atomic.AtomicInteger;
2123
import org.astraea.app.argument.Argument;
24+
import org.astraea.common.Configuration;
2225
import org.astraea.common.admin.Admin;
26+
import org.astraea.common.metrics.collector.MetricStore;
2327
import org.junit.jupiter.api.Assertions;
2428
import org.junit.jupiter.api.Test;
2529
import org.junit.jupiter.api.Timeout;
2630
import org.mockito.Mockito;
31+
import org.mockito.stubbing.Answer;
2732

2833
public class WebServiceTest {
2934

@@ -40,7 +45,9 @@ void testArgument() {
4045
@Timeout(10)
4146
@Test
4247
void testClose() {
43-
var web = new WebService(Mockito.mock(Admin.class), 0, id -> -1, Duration.ofMillis(5));
48+
var web =
49+
new WebService(
50+
Mockito.mock(Admin.class), 0, id -> -1, Duration.ofMillis(5), Configuration.EMPTY);
4451
web.close();
4552
}
4653

@@ -84,4 +91,81 @@ void testJmxPort() {
8491
Assertions.assertThrows(
8592
IllegalArgumentException.class, () -> noDefaultArgument.jmxPortMapping(4));
8693
}
94+
95+
@Test
96+
void testMetricStoreConfiguration() {
97+
try (var mockedReceiver = Mockito.mockStatic(MetricStore.Receiver.class)) {
98+
var topicReceiverCount = new AtomicInteger(0);
99+
var localReceiverCount = new AtomicInteger(0);
100+
mockedReceiver
101+
.when(() -> MetricStore.Receiver.topic(Mockito.any()))
102+
.then(
103+
(Answer<MetricStore.Receiver>)
104+
invocation -> {
105+
topicReceiverCount.incrementAndGet();
106+
return Mockito.mock(MetricStore.Receiver.class);
107+
});
108+
mockedReceiver
109+
.when(() -> MetricStore.Receiver.local(Mockito.any()))
110+
.then(
111+
(Answer<MetricStore.Receiver>)
112+
invocation -> {
113+
localReceiverCount.incrementAndGet();
114+
return Mockito.mock(MetricStore.Receiver.class);
115+
});
116+
// Test default metric store configuration
117+
try (var web =
118+
new WebService(
119+
Mockito.mock(Admin.class), 0, id -> -1, Duration.ofMillis(5), Configuration.EMPTY)) {
120+
121+
Assertions.assertEquals(1, localReceiverCount.get());
122+
Assertions.assertEquals(0, topicReceiverCount.get());
123+
}
124+
localReceiverCount.set(0);
125+
topicReceiverCount.set(0);
126+
// Test local metric store configuration
127+
try (var web =
128+
new WebService(
129+
Mockito.mock(Admin.class),
130+
0,
131+
id -> -1,
132+
Duration.ofMillis(5),
133+
new Configuration(
134+
Map.of(WebService.METRIC_STORE_KEY, WebService.METRIC_STORE_LOCAL)))) {
135+
136+
Assertions.assertEquals(1, localReceiverCount.get());
137+
Assertions.assertEquals(0, topicReceiverCount.get());
138+
}
139+
localReceiverCount.set(0);
140+
topicReceiverCount.set(0);
141+
// Test topic metric store configuration
142+
try (var web =
143+
new WebService(
144+
Mockito.mock(Admin.class),
145+
0,
146+
id -> -1,
147+
Duration.ofMillis(5),
148+
new Configuration(
149+
Map.of(
150+
WebService.METRIC_STORE_KEY,
151+
WebService.METRIC_STORE_TOPIC,
152+
WebService.BOOTSTRAP_SERVERS_KEY,
153+
"ignore")))) {
154+
155+
// topic collector may create local receiver to receive local jmx metric
156+
Assertions.assertEquals(1, topicReceiverCount.get());
157+
}
158+
159+
// Test invalid metric store configuration
160+
Assertions.assertThrows(
161+
IllegalArgumentException.class,
162+
() ->
163+
new WebService(
164+
Mockito.mock(Admin.class),
165+
0,
166+
id -> -1,
167+
Duration.ofMillis(5),
168+
new Configuration(Map.of(WebService.METRIC_STORE_KEY, "unknown"))));
169+
}
170+
}
87171
}

common/src/main/java/org/astraea/common/partitioner/StrictCostPartitioner.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424
import java.util.NoSuchElementException;
2525
import java.util.Optional;
26+
import java.util.concurrent.CompletableFuture;
2627
import java.util.concurrent.CompletionStage;
2728
import java.util.function.Function;
2829
import java.util.function.Supplier;
@@ -38,6 +39,7 @@
3839
import org.astraea.common.metrics.JndiClient;
3940
import org.astraea.common.metrics.MBeanClient;
4041
import org.astraea.common.metrics.collector.MetricStore;
42+
import org.astraea.common.producer.ProducerConfigs;
4143

4244
/**
4345
* this partitioner scores the nodes by multiples cost functions. Each function evaluate the target
@@ -55,6 +57,9 @@
5557
* `org.astraea.cost.ThroughputCost=1,org.astraea.cost.broker.BrokerOutputCost=1`.
5658
*/
5759
public class StrictCostPartitioner extends Partitioner {
60+
public static final String METRIC_STORE_KEY = "metric.store";
61+
public static final String METRIC_STORE_TOPIC = "topic";
62+
public static final String METRIC_STORE_LOCAL = "local";
5863
static final int ROUND_ROBIN_LENGTH = 400;
5964
static final String JMX_PORT = "jmx.port";
6065
static final String ROUND_ROBIN_LEASE_KEY = "round.robin.lease";
@@ -140,27 +145,47 @@ public void configure(Configuration config) {
140145
.string(ROUND_ROBIN_LEASE_KEY)
141146
.map(Utils::toDuration)
142147
.ifPresent(d -> this.roundRobinLease = d);
143-
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier =
144-
() ->
145-
admin
146-
.brokers()
147-
.thenApply(
148-
brokers -> {
149-
var map = new HashMap<Integer, JndiClient>();
150-
brokers.forEach(
151-
b ->
152-
map.put(
153-
b.id(), JndiClient.of(b.host(), jmxPortGetter.apply(b.id()))));
154-
// add local client to fetch consumer metrics
155-
map.put(-1, JndiClient.local());
156-
return Collections.unmodifiableMap(map);
157-
});
158148

149+
List<MetricStore.Receiver> receivers =
150+
switch (config.string(METRIC_STORE_KEY).orElse(METRIC_STORE_LOCAL)) {
151+
case METRIC_STORE_TOPIC -> List.of(
152+
MetricStore.Receiver.topic(
153+
config.requireString(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG)),
154+
MetricStore.Receiver.local(
155+
() -> CompletableFuture.completedStage(Map.of(-1, JndiClient.local()))));
156+
case METRIC_STORE_LOCAL -> {
157+
Supplier<CompletionStage<Map<Integer, MBeanClient>>> clientSupplier =
158+
() ->
159+
admin
160+
.brokers()
161+
.thenApply(
162+
brokers -> {
163+
var map = new HashMap<Integer, JndiClient>();
164+
brokers.forEach(
165+
b ->
166+
map.put(
167+
b.id(),
168+
JndiClient.of(b.host(), jmxPortGetter.apply(b.id()))));
169+
// add local client to fetch consumer metrics
170+
map.put(-1, JndiClient.local());
171+
return Collections.unmodifiableMap(map);
172+
});
173+
yield List.of(MetricStore.Receiver.local(clientSupplier));
174+
}
175+
default -> throw new IllegalArgumentException(
176+
"unknown metric store type: "
177+
+ config.string(METRIC_STORE_KEY)
178+
+ ". Use "
179+
+ METRIC_STORE_TOPIC
180+
+ " or "
181+
+ METRIC_STORE_LOCAL);
182+
};
159183
metricStore =
160184
MetricStore.builder()
161-
.receivers(List.of(MetricStore.Receiver.local(clientSupplier)))
185+
.receivers(receivers)
162186
.sensorsSupplier(() -> Map.of(this.costFunction.metricSensor(), (integer, e) -> {}))
163187
.build();
188+
164189
this.roundRobinKeeper = RoundRobinKeeper.of(ROUND_ROBIN_LENGTH, roundRobinLease);
165190
}
166191

0 commit comments

Comments
 (0)