Skip to content

Commit ace1346

Browse files
authored
refactor: make benchmark result clearly (#93)
* feat: can connect db with cmd line * chore: refactor bulk write bench * chore: refactor bench * chore: print bench detail result * chore: simply result
1 parent bbd7b82 commit ace1346

9 files changed

Lines changed: 313 additions & 73 deletions

File tree

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright 2023 Greptime Team
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.greptime.bench;
18+
19+
import io.greptime.common.util.Cpus;
20+
import io.greptime.common.util.SystemPropertyUtil;
21+
import io.greptime.models.TableSchema;
22+
import org.slf4j.Logger;
23+
24+
/**
25+
* Utility class for printing benchmark results in a consistent format.
26+
*/
27+
public class BenchmarkResultPrinter {
28+
29+
public static void printBenchmarkHeader(Logger log, String apiType) {
30+
log.info("=== GreptimeDB {} API Log Benchmark ===", apiType);
31+
log.info("Synthetic log data generation and {} API ingestion performance test", apiType.toLowerCase());
32+
log.info("");
33+
}
34+
35+
public static void printConfiguration(
36+
Logger log, String apiType, boolean zstdCompression, int batchSize, int parallelismOrConcurrency) {
37+
printConfiguration(log, apiType, zstdCompression, batchSize, parallelismOrConcurrency, null);
38+
}
39+
40+
public static void printConfiguration(
41+
Logger log,
42+
String apiType,
43+
boolean zstdCompression,
44+
int batchSize,
45+
int parallelismOrConcurrency,
46+
Integer maxPointsPerSecond) {
47+
log.info("=== {} API Benchmark Configuration ===", apiType);
48+
49+
String endpointsStr = SystemPropertyUtil.get("db.endpoints");
50+
if (endpointsStr == null) {
51+
endpointsStr = "localhost:4001";
52+
}
53+
String database = SystemPropertyUtil.get("db.database");
54+
if (database == null) {
55+
database = "public";
56+
}
57+
58+
log.info("Endpoint: {}", endpointsStr);
59+
log.info("Database: {}", database);
60+
log.info("Batch size: {}", batchSize);
61+
62+
if (maxPointsPerSecond != null) {
63+
log.info(
64+
"Max points per second: {}",
65+
maxPointsPerSecond == Integer.MAX_VALUE ? "unlimited" : maxPointsPerSecond);
66+
} else if (apiType.equals("Bulk")) {
67+
log.info("Parallelism: {}", parallelismOrConcurrency);
68+
} else {
69+
log.info("Concurrency: {}", parallelismOrConcurrency);
70+
}
71+
72+
log.info("Compression: {}", (zstdCompression ? "zstd" : "none"));
73+
log.info("CPU cores: {}", Cpus.cpus());
74+
log.info("Build profile: release");
75+
log.info("");
76+
}
77+
78+
public static void printBenchmarkStart(
79+
Logger log,
80+
String apiType,
81+
TableDataProvider provider,
82+
TableSchema schema,
83+
int batchSize,
84+
int parallelismOrConcurrency) {
85+
printBenchmarkStart(log, apiType, provider, schema, batchSize, parallelismOrConcurrency, null);
86+
}
87+
88+
public static void printBenchmarkStart(
89+
Logger log,
90+
String apiType,
91+
TableDataProvider provider,
92+
TableSchema schema,
93+
int batchSize,
94+
int parallelismOrConcurrency,
95+
Integer maxPointsPerSecond) {
96+
log.info("=== Running {} API Log Data Benchmark ===", apiType);
97+
log.info("Setting up {} writer...", apiType.toLowerCase());
98+
log.info(
99+
"Starting {} API benchmark: {}",
100+
apiType.toLowerCase(),
101+
provider.getClass().getSimpleName());
102+
log.info(
103+
"Table: {} ({} columns)",
104+
schema.getTableName(),
105+
schema.getColumnNames().size());
106+
log.info("Target rows: {}", provider.rowCount());
107+
log.info("Batch size: {}", batchSize);
108+
109+
if (maxPointsPerSecond != null) {
110+
log.info(
111+
"Max points per second: {}",
112+
maxPointsPerSecond == Integer.MAX_VALUE ? "unlimited" : maxPointsPerSecond);
113+
} else if (apiType.equals("Bulk")) {
114+
log.info("Parallelism: {}", parallelismOrConcurrency);
115+
} else {
116+
log.info("Concurrency: {}", parallelismOrConcurrency);
117+
}
118+
119+
log.info("");
120+
}
121+
122+
public static void printBatchProgress(Logger log, long batch, long totalRows, long writeRatePerSecond) {
123+
log.info("→ Batch {}: {} rows processed ({} rows/sec)", batch, totalRows, writeRatePerSecond);
124+
125+
if (batch % 10 == 0) {
126+
log.info("Flushed {} responses (total {} affected rows)", batch, totalRows);
127+
}
128+
}
129+
130+
public static void printCompletionMessages(Logger log, String apiType) {
131+
log.info("Finishing {} writer and waiting for all responses...", apiType.toLowerCase());
132+
log.info("All {} writes completed successfully", apiType.toLowerCase());
133+
log.info("Cleaning up data provider...");
134+
log.info("{} API benchmark completed successfully!", apiType);
135+
}
136+
137+
public static void printBenchmarkSummary(
138+
Logger log, TableDataProvider provider, long totalRows, long durationMs, long throughput) {
139+
log.info("=== Benchmark Result ===");
140+
log.info("Table: {}", provider.tableSchema().getTableName());
141+
log.info("");
142+
log.info("Provider Rows Duration(ms) Throughput Status");
143+
log.info("--------------------------------------------------------------------------");
144+
log.info(String.format(
145+
"%-30s %8d %12d %12d r/s SUCCESS",
146+
provider.getClass().getSimpleName(), totalRows, durationMs, throughput));
147+
}
148+
}

ingester-example/src/main/java/io/greptime/bench/DBConnector.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package io.greptime.bench;
1818

1919
import io.greptime.GreptimeDB;
20+
import io.greptime.common.util.SystemPropertyUtil;
2021
import io.greptime.options.GreptimeOptions;
2122
import io.greptime.quickstart.query.QueryJDBCQuickStart;
23+
import io.greptime.rpc.RpcOptions;
2224
import java.io.IOException;
2325
import java.util.Properties;
2426
import org.slf4j.Logger;
@@ -38,11 +40,23 @@ public static GreptimeDB connect() {
3840
} catch (IOException e) {
3941
throw new RuntimeException(e);
4042
}
41-
String database = (String) prop.get("db.database");
42-
String endpointsStr = prop.getProperty("db.endpoints");
43+
44+
String database = SystemPropertyUtil.get("db.database");
45+
if (database == null) {
46+
database = (String) prop.get("db.database");
47+
}
48+
49+
String endpointsStr = SystemPropertyUtil.get("db.endpoints");
50+
if (endpointsStr == null) {
51+
endpointsStr = prop.getProperty("db.endpoints");
52+
}
53+
4354
String[] endpoints = endpointsStr.split(",");
55+
RpcOptions rpcOptions = RpcOptions.newDefault();
56+
rpcOptions.setDefaultRpcTimeout(60 * 1000);
4457
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database)
4558
.writeMaxRetries(0)
59+
.rpcOptions(rpcOptions)
4660
.defaultStreamMaxWritePointsPerSecond(Integer.MAX_VALUE)
4761
.maxInFlightWritePoints(Integer.MAX_VALUE)
4862
.useZeroCopyWriteInBulkWrite(true)

ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,12 @@
3737
public class MultiProducerTableDataProvider extends RandomTableDataProvider {
3838

3939
private final int producerCount;
40-
private final long rowCount;
4140
private final ExecutorService executorService;
4241
private final BlockingQueue<Object[]> buffer = new ArrayBlockingQueue<>(100000);
4342

4443
{
4544
this.producerCount = SystemPropertyUtil.getInt("multi_producer_table_data_provider.producer_count", 10);
4645
// Total number of rows to generate, configurable via system property
47-
this.rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 10_000_000L);
4846
this.executorService = ThreadPoolUtil.newBuilder()
4947
.poolName("multi-producer-table-data-provider")
5048
.enableMetric(true)
@@ -62,7 +60,7 @@ public void init() {
6260
AtomicLong rowIndex = new AtomicLong(0);
6361
for (int i = 0; i < producerCount; i++) {
6462
this.executorService.execute(() -> {
65-
while (rowIndex.getAndIncrement() < rowCount) {
63+
while (rowIndex.getAndIncrement() < rowCount()) {
6664
Object[] row = nextRow();
6765
try {
6866
buffer.put(row);
@@ -82,7 +80,7 @@ public Iterator<Object[]> rows() {
8280

8381
@Override
8482
public boolean hasNext() {
85-
return index < rowCount;
83+
return index < rowCount();
8684
}
8785

8886
@Override

ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class RandomTableDataProvider implements TableDataProvider {
5151
.addField("pod_name", DataType.String)
5252
.build();
5353
// Total number of rows to generate, configurable via system property
54-
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 10_000_000L);
54+
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 5_000_000L);
5555
}
5656

5757
@Override
@@ -133,4 +133,9 @@ public Object[] next() {
133133

134134
@Override
135135
public void close() throws Exception {}
136+
137+
@Override
138+
public long rowCount() {
139+
return rowCount;
140+
}
136141
}

ingester-example/src/main/java/io/greptime/bench/TableDataProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,9 @@ public interface TableDataProvider extends AutoCloseable {
3838
* Returns the iterator of the rows.
3939
*/
4040
Iterator<Object[]> rows();
41+
42+
/**
43+
* Returns the total number of rows.
44+
*/
45+
long rowCount();
4146
}

0 commit comments

Comments
 (0)