Skip to content

Commit 787b26e

Browse files
committed
[CHECKER] add base code
1 parent 6857bab commit 787b26e

9 files changed

Lines changed: 359 additions & 6 deletions

File tree

app/src/main/java/org/astraea/app/App.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@
2323
import java.util.Map;
2424
import org.astraea.app.automation.Automation;
2525
import org.astraea.app.benchmark.BalancerBenchmarkApp;
26-
import org.astraea.app.homework.BulkChecker;
27-
import org.astraea.app.homework.BulkSender;
26+
import org.astraea.app.checker.Checker;
2827
import org.astraea.app.homework.Prepare;
2928
import org.astraea.app.homework.SendYourData;
3029
import org.astraea.app.performance.Performance;
@@ -35,10 +34,8 @@
3534
public class App {
3635
private static final Map<String, Class<?>> MAIN_CLASSES =
3736
Map.of(
38-
"bulk_sender",
39-
BulkSender.class,
40-
"bulk_checker",
41-
BulkChecker.class,
37+
"40_checker",
38+
Checker.class,
4239
"performance",
4340
Performance.class,
4441
"prepare",
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.checker;
18+
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.function.Function;
22+
import java.util.stream.Collectors;
23+
24+
public class Changelog {
25+
private List<Protocol> protocols;
26+
private List<Config> configs;
27+
28+
public Map<String, Protocol> protocols() {
29+
return protocols.stream().collect(Collectors.toMap(Protocol::name, Function.identity()));
30+
}
31+
32+
public Map<String, Config> configs() {
33+
return configs.stream().collect(Collectors.toMap(Config::name, Function.identity()));
34+
}
35+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.checker;
18+
19+
import com.beust.jcommander.Parameter;
20+
import java.io.IOException;
21+
import java.net.URL;
22+
import java.nio.charset.StandardCharsets;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.function.Function;
26+
import org.apache.kafka.clients.admin.Admin;
27+
import org.apache.kafka.common.Node;
28+
import org.astraea.app.argument.IntegerMapField;
29+
import org.astraea.app.argument.NonEmptyStringField;
30+
import org.astraea.app.argument.NonNegativeIntegerField;
31+
import org.astraea.common.json.JsonConverter;
32+
import org.astraea.common.json.TypeRef;
33+
import org.astraea.common.metrics.JndiClient;
34+
import org.astraea.common.metrics.MBeanClient;
35+
36+
public class Checker {
37+
38+
private static final List<Guard> GUARDS = List.of(new ProduceRpcGuard());
39+
40+
public static void main(String[] args) throws Exception {
41+
execute(Argument.parse(new Argument(), args));
42+
}
43+
44+
public static void execute(final Argument param) throws Exception {
45+
try (var admin = Admin.create(Map.of("bootstrap.servers", param.bootstrapServers()))) {
46+
for (var guard : GUARDS) {
47+
var result = guard.run(admin, param.mBeanClientFunction(), param.readChangelog());
48+
System.out.println(result);
49+
}
50+
}
51+
}
52+
53+
public static class Argument extends org.astraea.app.argument.Argument {
54+
@Parameter(
55+
names = {"--changelog"},
56+
description = "String: url of changelog file",
57+
validateWith = NonEmptyStringField.class)
58+
String changelog =
59+
"https://raw.githubusercontent.com/opensource4you/astraea/refs/heads/main/config/kafka_changelog.json";
60+
61+
Changelog readChangelog() throws IOException {
62+
try (var in = new URL(changelog).openStream()) {
63+
return JsonConverter.defaultConverter()
64+
.fromJson(
65+
new String(in.readAllBytes(), StandardCharsets.UTF_8), TypeRef.of(Changelog.class));
66+
}
67+
}
68+
69+
@Parameter(
70+
names = {"--jmx.port"},
71+
description = "Integer: the port to query JMX for each server",
72+
validateWith = NonNegativeIntegerField.class,
73+
converter = NonNegativeIntegerField.class)
74+
int jmxPort = -1;
75+
76+
@Parameter(
77+
names = {"--jmx.ports"},
78+
description =
79+
"Map: the JMX port for each broker. For example: 1024=19999 means for the broker with id 1024, its JMX port located at 19999 port",
80+
validateWith = IntegerMapField.class,
81+
converter = IntegerMapField.class)
82+
Map<Integer, Integer> jmxPorts = Map.of();
83+
84+
Function<Node, MBeanClient> mBeanClientFunction() {
85+
return node -> {
86+
int port = jmxPorts.getOrDefault(node.id(), jmxPort);
87+
if (port < 0)
88+
throw new IllegalArgumentException("Failed to get jmx port for broker: " + node);
89+
return JndiClient.of(node.host(), port);
90+
};
91+
}
92+
}
93+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.checker;
18+
19+
import java.util.Optional;
20+
21+
public class Config {
22+
private String name;
23+
private Optional<String> value;
24+
private String commit;
25+
private String kip;
26+
27+
public String name() {
28+
return name;
29+
}
30+
31+
public Optional<String> value() {
32+
return value;
33+
}
34+
35+
public String commit() {
36+
return commit;
37+
}
38+
39+
public String kip() {
40+
return kip;
41+
}
42+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.checker;
18+
19+
import java.util.Collection;
20+
import java.util.function.Function;
21+
import org.apache.kafka.clients.admin.Admin;
22+
import org.apache.kafka.common.Node;
23+
import org.astraea.common.metrics.MBeanClient;
24+
25+
public interface Guard {
26+
Collection<Report> run(Admin admin, Function<Node, MBeanClient> clients, Changelog changelog)
27+
throws Exception;
28+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.checker;
18+
19+
import java.util.Collection;
20+
import java.util.function.Function;
21+
import org.apache.kafka.clients.admin.Admin;
22+
import org.apache.kafka.common.Node;
23+
import org.astraea.common.metrics.MBeanClient;
24+
import org.astraea.common.metrics.broker.NetworkMetrics;
25+
26+
public class ProduceRpcGuard implements Guard {
27+
@Override
28+
public Collection<Report> run(
29+
Admin admin, Function<Node, MBeanClient> clients, Changelog changelog) throws Exception {
30+
return admin.describeCluster().nodes().get().stream()
31+
.map(
32+
node -> {
33+
var protocol =
34+
changelog
35+
.protocols()
36+
.get(NetworkMetrics.Request.PRODUCE.metricName().toLowerCase());
37+
if (protocol == null) return Report.empty();
38+
var versions = NetworkMetrics.Request.PRODUCE.versions(clients.apply(node));
39+
return Report.of(node, protocol, versions);
40+
})
41+
.flatMap(Report::stream)
42+
.toList();
43+
}
44+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.checker;
18+
19+
public class Protocol {
20+
private String name;
21+
private int base;
22+
private String commit;
23+
private String kip;
24+
25+
public int base() {
26+
return base;
27+
}
28+
29+
public String name() {
30+
return name;
31+
}
32+
33+
public String commit() {
34+
return commit;
35+
}
36+
37+
public String kip() {
38+
return kip;
39+
}
40+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.checker;
18+
19+
import java.util.Set;
20+
import java.util.stream.Collectors;
21+
import java.util.stream.Stream;
22+
import org.apache.kafka.common.Node;
23+
24+
public record Report(Node node, String why) {
25+
static Report noMetrics(Node node) {
26+
return new Report(node, "failed to get metrics from");
27+
}
28+
29+
static Report of(Node node, String why) {
30+
return new Report(node, why);
31+
}
32+
33+
static Report empty() {
34+
return new Report(null, "");
35+
}
36+
37+
static Report of(Node node, Protocol protocol, Set<Integer> versions) {
38+
var unsupportedVersions =
39+
versions.stream().filter(v -> v < protocol.base()).collect(Collectors.toSet());
40+
if (unsupportedVersions.isEmpty()) return empty();
41+
return new Report(
42+
node,
43+
String.format(
44+
"there are unsupported %s versions: %s due to new baseline: %s",
45+
protocol.name(), unsupportedVersions, protocol.base()));
46+
}
47+
48+
Stream<Report> stream() {
49+
if (why.isEmpty()) return Stream.empty();
50+
return Stream.of(this);
51+
}
52+
}

common/src/main/java/org/astraea/common/metrics/broker/NetworkMetrics.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.Collection;
2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.NoSuchElementException;
24+
import java.util.Set;
2325
import java.util.function.Function;
2426
import java.util.stream.Collectors;
2527
import org.astraea.common.EnumInfo;
@@ -132,6 +134,26 @@ public Histogram fetch(MBeanClient mBeanClient) {
132134
return new Histogram(mBeanClient.bean(ALL.get(this)));
133135
}
134136

137+
public Set<Integer> versions(MBeanClient mBeanClient) {
138+
try {
139+
var beanObjects =
140+
mBeanClient.beans(
141+
BeanQuery.builder()
142+
.domainName("kafka.network")
143+
.property("type", "RequestMetrics")
144+
.property("request", "Produce")
145+
.property("name", "RequestsPerSec")
146+
.property("version", "*")
147+
.build());
148+
return beanObjects.stream()
149+
.map(b -> Integer.parseInt(b.properties().get("version")))
150+
.collect(Collectors.toSet());
151+
} catch (NoSuchElementException ignored) {
152+
// this is expected if the node has no such request
153+
return Set.of();
154+
}
155+
}
156+
135157
public record Histogram(BeanObject beanObject) implements HasHistogram {
136158

137159
public Request type() {

0 commit comments

Comments
 (0)