Skip to content

Commit 26f1f76

Browse files
authored
[COST] merge MetricSensor with Fetcher (#1477)
1 parent d71e8d4 commit 26f1f76

41 files changed

Lines changed: 434 additions & 311 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

app/src/main/java/org/astraea/app/web/BalancerHandler.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
import org.astraea.common.cost.ReplicaLeaderSizeCost;
6262
import org.astraea.common.cost.ReplicaNumberCost;
6363
import org.astraea.common.json.TypeRef;
64-
import org.astraea.common.metrics.collector.Fetcher;
6564
import org.astraea.common.metrics.collector.MetricCollector;
6665
import org.astraea.common.metrics.collector.MetricSensor;
6766

@@ -177,15 +176,14 @@ public CompletionStage<Response> post(Channel channel) {
177176
CompletableFuture.supplyAsync(
178177
() -> {
179178
var currentClusterInfo = request.clusterInfo;
180-
var fetchers =
179+
var sensors =
181180
Stream.concat(
182-
request.algorithmConfig.clusterCostFunction().fetcher().stream(),
183-
request.algorithmConfig.moveCostFunction().fetcher().stream())
181+
request.algorithmConfig.clusterCostFunction().metricSensor().stream(),
182+
request.algorithmConfig.moveCostFunction().metricSensor().stream())
184183
.collect(Collectors.toUnmodifiableList());
185184
var bestPlan =
186185
metricContext(
187-
fetchers,
188-
request.algorithmConfig.clusterCostFunction().sensors(),
186+
sensors,
189187
(metricSource) ->
190188
Balancer.create(
191189
request.balancerClasspath,
@@ -254,15 +252,13 @@ private static List<MigrationCost> migrationCosts(MoveCost cost) {
254252
}
255253

256254
private Balancer.Plan metricContext(
257-
Collection<Fetcher> fetchers,
258255
Collection<MetricSensor> metricSensors,
259256
Function<Supplier<ClusterBean>, Balancer.Plan> execution) {
260257
// TODO: use a global metric collector when we are ready to enable long-run metric sampling
261258
// https://github.com/skiptests/astraea/pull/955#discussion_r1026491162
262259
try (var collector = MetricCollector.builder().interval(sampleInterval).build()) {
263260
freshJmxAddresses().forEach(collector::registerJmx);
264-
fetchers.forEach(collector::addFetcher);
265-
metricSensors.forEach(collector::addMetricSensors);
261+
metricSensors.forEach(collector::addMetricSensor);
266262
return execution.apply(collector::clusterBean);
267263
}
268264
}

app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
import org.astraea.common.cost.NoSufficientMetricsException;
6767
import org.astraea.common.json.JsonConverter;
6868
import org.astraea.common.json.TypeRef;
69-
import org.astraea.common.metrics.collector.Fetcher;
69+
import org.astraea.common.metrics.collector.MetricSensor;
7070
import org.astraea.common.metrics.platform.HostMetrics;
7171
import org.astraea.common.metrics.platform.JvmMemory;
7272
import org.astraea.common.producer.Producer;
@@ -1032,13 +1032,13 @@ void testTimeout() {
10321032
}
10331033

10341034
@Test
1035-
void testCostWithFetcher() {
1035+
void testCostWithSensor() {
10361036
var topics = createAndProduceTopic(3);
10371037
try (var admin = Admin.of(SERVICE.bootstrapServers())) {
10381038
var invoked = new AtomicBoolean();
10391039
var handler =
10401040
new BalancerHandler(admin, (ignore) -> Optional.of(SERVICE.jmxServiceURL().getPort()));
1041-
FetcherAndCost.callback.set(
1041+
SensorAndCost.callback.set(
10421042
(clusterBean) -> {
10431043
var metrics =
10441044
clusterBean.all().values().stream()
@@ -1047,15 +1047,15 @@ void testCostWithFetcher() {
10471047
.collect(Collectors.toUnmodifiableSet());
10481048
if (metrics.size() < 3)
10491049
throw new NoSufficientMetricsException(
1050-
new FetcherAndCost(null), Duration.ofSeconds(3));
1050+
new SensorAndCost(null), Duration.ofSeconds(3));
10511051
metrics.forEach(i -> Assertions.assertInstanceOf(JvmMemory.class, i));
10521052
invoked.set(true);
10531053
});
1054-
var fetcherAndCost = List.of(costWeight(FetcherAndCost.class.getName(), 1));
1054+
var function = List.of(costWeight(SensorAndCost.class.getName(), 1));
10551055

10561056
var request = new BalancerHandler.BalancerPostRequest();
10571057
request.timeout = Duration.ofSeconds(8);
1058-
request.costWeights = fetcherAndCost;
1058+
request.costWeights = function;
10591059
request.topics = topics;
10601060
var progress = submitPlanGeneration(handler, request);
10611061

@@ -1277,17 +1277,17 @@ public synchronized ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean
12771277
}
12781278
}
12791279

1280-
public static class FetcherAndCost extends DecreasingCost {
1280+
public static class SensorAndCost extends DecreasingCost {
12811281

12821282
static AtomicReference<Consumer<ClusterBean>> callback = new AtomicReference<>();
12831283

1284-
public FetcherAndCost(Configuration configuration) {
1284+
public SensorAndCost(Configuration configuration) {
12851285
super(configuration);
12861286
}
12871287

12881288
@Override
1289-
public Optional<Fetcher> fetcher() {
1290-
return Optional.of((c) -> List.of(HostMetrics.jvmMemory(c)));
1289+
public Optional<MetricSensor> metricSensor() {
1290+
return Optional.of((c, ignored) -> List.of(HostMetrics.jvmMemory(c)));
12911291
}
12921292

12931293
@Override

common/src/main/java/org/astraea/common/assignor/Assignor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public final void configure(Map<String, ?> configs) {
153153
? HasPartitionCost.of(Map.of(new ReplicaLeaderSizeCost(), 1D))
154154
: HasPartitionCost.of(costFunctions);
155155
this.jmxPortGetter = id -> Optional.ofNullable(customJMXPort.get(id)).or(() -> defaultJMXPort);
156-
this.costFunction.fetcher().ifPresent(metricCollector::addFetcher);
156+
this.costFunction.metricSensor().ifPresent(metricCollector::addMetricSensor);
157157
configure(config);
158158
}
159159
}

common/src/main/java/org/astraea/common/cost/BrokerInputCost.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.astraea.common.admin.ClusterBean;
2525
import org.astraea.common.admin.ClusterInfo;
2626
import org.astraea.common.metrics.broker.ServerMetrics;
27-
import org.astraea.common.metrics.collector.Fetcher;
27+
import org.astraea.common.metrics.collector.MetricSensor;
2828

2929
public class BrokerInputCost implements HasBrokerCost, HasClusterCost {
3030
private final Dispersion dispersion = Dispersion.cov();
@@ -44,8 +44,9 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
4444
}
4545

4646
@Override
47-
public Optional<Fetcher> fetcher() {
48-
return Optional.of(client -> List.of(ServerMetrics.BrokerTopic.BYTES_IN_PER_SEC.fetch(client)));
47+
public Optional<MetricSensor> metricSensor() {
48+
return Optional.of(
49+
(client, ignored) -> List.of(ServerMetrics.BrokerTopic.BYTES_IN_PER_SEC.fetch(client)));
4950
}
5051

5152
@Override

common/src/main/java/org/astraea/common/cost/BrokerOutputCost.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.astraea.common.admin.ClusterBean;
2424
import org.astraea.common.admin.ClusterInfo;
2525
import org.astraea.common.metrics.broker.ServerMetrics;
26-
import org.astraea.common.metrics.collector.Fetcher;
26+
import org.astraea.common.metrics.collector.MetricSensor;
2727

2828
public class BrokerOutputCost implements HasBrokerCost, HasClusterCost {
2929
private final Dispersion dispersion = Dispersion.cov();
@@ -43,9 +43,9 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
4343
}
4444

4545
@Override
46-
public Optional<Fetcher> fetcher() {
46+
public Optional<MetricSensor> metricSensor() {
4747
return Optional.of(
48-
client -> List.of(ServerMetrics.BrokerTopic.BYTES_OUT_PER_SEC.fetch(client)));
48+
(client, ignored) -> List.of(ServerMetrics.BrokerTopic.BYTES_OUT_PER_SEC.fetch(client)));
4949
}
5050

5151
@Override

common/src/main/java/org/astraea/common/cost/CostFunction.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,12 @@
1616
*/
1717
package org.astraea.common.cost;
1818

19-
import java.util.Collection;
2019
import java.util.Comparator;
21-
import java.util.List;
2220
import java.util.Map;
2321
import java.util.Optional;
2422
import java.util.function.BiFunction;
2523
import java.util.stream.Collectors;
2624
import org.astraea.common.Configuration;
27-
import org.astraea.common.metrics.Sensor;
28-
import org.astraea.common.metrics.collector.Fetcher;
2925
import org.astraea.common.metrics.collector.MetricSensor;
3026

3127
/**
@@ -36,7 +32,7 @@
3632
* Constructor({@link Configuration} configuration) or Constructor()
3733
*
3834
* <p>A cost function might take advantage of some Mbean metrics to function. One can indicate such
39-
* requirement in the {@link CostFunction#fetcher()} function. If the operation logic of
35+
* requirement in the {@link CostFunction#metricSensor()} function. If the operation logic of
4036
* CostFunction thinks the metrics on hand are insufficient or not ready to work. One can throw a
4137
* {@link NoSufficientMetricsException} from the calculation logic of {@link HasBrokerCost} or
4238
* {@link HasClusterCost}. This serves as a hint to the caller that it needs newer metrics and
@@ -50,18 +46,10 @@ public interface CostFunction {
5046
/**
5147
* @return the metrics getters. Those getters are used to fetch mbeans.
5248
*/
53-
default Optional<Fetcher> fetcher() {
49+
default Optional<MetricSensor> metricSensor() {
5450
return Optional.empty();
5551
}
5652

57-
/**
58-
* @return the {@link Sensor} and the type of {@link org.astraea.common.metrics.stats.Stat} name
59-
* to use.
60-
*/
61-
default Collection<MetricSensor> sensors() {
62-
return List.of();
63-
}
64-
6553
static String toStringComposite(Map<? extends CostFunction, Double> costWeights) {
6654
BiFunction<CostFunction, Double, String> descriptiveName =
6755
(cost, value) -> "{\"" + cost.toString() + "\" weight " + value + "}";

common/src/main/java/org/astraea/common/cost/CpuCost.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.astraea.common.admin.ClusterBean;
2525
import org.astraea.common.admin.ClusterInfo;
2626
import org.astraea.common.metrics.HasBeanObject;
27-
import org.astraea.common.metrics.collector.Fetcher;
27+
import org.astraea.common.metrics.collector.MetricSensor;
2828
import org.astraea.common.metrics.platform.HostMetrics;
2929
import org.astraea.common.metrics.platform.OperatingSystemInfo;
3030

@@ -56,8 +56,8 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
5656
}
5757

5858
@Override
59-
public Optional<Fetcher> fetcher() {
60-
return Optional.of(client -> List.of(HostMetrics.operatingSystem(client)));
59+
public Optional<MetricSensor> metricSensor() {
60+
return Optional.of((client, ignored) -> List.of(HostMetrics.operatingSystem(client)));
6161
}
6262

6363
@Override

common/src/main/java/org/astraea/common/cost/HasBrokerCost.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.stream.Collectors;
2424
import org.astraea.common.admin.ClusterBean;
2525
import org.astraea.common.admin.ClusterInfo;
26-
import org.astraea.common.metrics.collector.Fetcher;
26+
import org.astraea.common.metrics.collector.MetricSensor;
2727

2828
@FunctionalInterface
2929
public interface HasBrokerCost extends CostFunction {
@@ -34,10 +34,10 @@ static HasBrokerCost of(Map<HasBrokerCost, Double> costAndWeight) {
3434
// the temporary exception won't affect the smooth-weighted too much.
3535
// TODO: should we propagate the exception by better way? For example: Slf4j ?
3636
// see https://github.com/skiptests/astraea/issues/486
37-
var fetcher =
38-
Fetcher.of(
37+
var sensor =
38+
MetricSensor.of(
3939
costAndWeight.keySet().stream()
40-
.map(CostFunction::fetcher)
40+
.map(CostFunction::metricSensor)
4141
.filter(Optional::isPresent)
4242
.map(Optional::get)
4343
.collect(Collectors.toUnmodifiableList()));
@@ -59,8 +59,8 @@ public BrokerCost brokerCost(ClusterInfo clusterInfo, ClusterBean clusterBean) {
5959
}
6060

6161
@Override
62-
public Optional<Fetcher> fetcher() {
63-
return fetcher;
62+
public Optional<MetricSensor> metricSensor() {
63+
return sensor;
6464
}
6565

6666
@Override

common/src/main/java/org/astraea/common/cost/HasClusterCost.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,25 @@
1616
*/
1717
package org.astraea.common.cost;
1818

19-
import java.util.Collection;
2019
import java.util.Map;
2120
import java.util.Optional;
2221
import java.util.stream.Collectors;
2322
import org.astraea.common.admin.ClusterBean;
2423
import org.astraea.common.admin.ClusterInfo;
2524
import org.astraea.common.function.Bi3Function;
26-
import org.astraea.common.metrics.collector.Fetcher;
2725
import org.astraea.common.metrics.collector.MetricSensor;
2826

2927
@FunctionalInterface
3028
public interface HasClusterCost extends CostFunction {
3129

3230
static HasClusterCost of(Map<HasClusterCost, Double> costAndWeight) {
33-
var fetcher =
34-
Fetcher.of(
31+
var sensor =
32+
MetricSensor.of(
3533
costAndWeight.keySet().stream()
36-
.map(CostFunction::fetcher)
34+
.map(CostFunction::metricSensor)
3735
.filter(Optional::isPresent)
3836
.map(Optional::get)
3937
.collect(Collectors.toUnmodifiableList()));
40-
var sensors =
41-
costAndWeight.keySet().stream()
42-
.flatMap(x -> x.sensors().stream())
43-
.collect(Collectors.toList());
4438

4539
return new HasClusterCost() {
4640
@Override
@@ -85,13 +79,8 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean)
8579
}
8680

8781
@Override
88-
public Optional<Fetcher> fetcher() {
89-
return fetcher;
90-
}
91-
92-
@Override
93-
public Collection<MetricSensor> sensors() {
94-
return sensors;
82+
public Optional<MetricSensor> metricSensor() {
83+
return sensor;
9584
}
9685

9786
@Override

common/src/main/java/org/astraea/common/cost/HasMoveCost.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@
2323
import org.astraea.common.DataSize;
2424
import org.astraea.common.admin.ClusterBean;
2525
import org.astraea.common.admin.ClusterInfo;
26-
import org.astraea.common.metrics.collector.Fetcher;
26+
import org.astraea.common.metrics.collector.MetricSensor;
2727

2828
@FunctionalInterface
2929
public interface HasMoveCost extends CostFunction {
3030

3131
HasMoveCost EMPTY = (originClusterInfo, newClusterInfo, clusterBean) -> MoveCost.EMPTY;
3232

3333
static HasMoveCost of(Collection<HasMoveCost> hasMoveCosts) {
34-
var fetcher =
35-
Fetcher.of(
34+
var sensor =
35+
MetricSensor.of(
3636
hasMoveCosts.stream()
37-
.map(CostFunction::fetcher)
37+
.map(CostFunction::metricSensor)
3838
.filter(Optional::isPresent)
3939
.map(Optional::get)
4040
.collect(Collectors.toUnmodifiableList()));
@@ -94,8 +94,8 @@ public Map<Integer, Integer> changedReplicaLeaderCount() {
9494
}
9595

9696
@Override
97-
public Optional<Fetcher> fetcher() {
98-
return fetcher;
97+
public Optional<MetricSensor> metricSensor() {
98+
return sensor;
9999
}
100100

101101
@Override

0 commit comments

Comments
 (0)