Skip to content

Commit d933fa3

Browse files
committed
chore: print bench detail result
1 parent bb01f4c commit d933fa3

6 files changed

Lines changed: 247 additions & 51 deletions

File tree

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Copyright 2023 Greptime Team
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.greptime.bench;
18+
19+
import io.greptime.common.util.Cpus;
20+
import io.greptime.common.util.SystemPropertyUtil;
21+
import io.greptime.models.TableSchema;
22+
import org.slf4j.Logger;
23+
24+
/**
25+
* Utility class for printing benchmark results in a consistent format.
26+
*/
27+
public class BenchmarkResultPrinter {
28+
29+
public static void printBenchmarkHeader(Logger log, String apiType) {
30+
log.info("=== GreptimeDB {} API Log Benchmark ===", apiType);
31+
log.info("Synthetic log data generation and {} API ingestion performance test", apiType.toLowerCase());
32+
log.info("");
33+
}
34+
35+
public static void printConfiguration(
36+
Logger log, String apiType, boolean zstdCompression, int batchSize, int parallelismOrConcurrency) {
37+
printConfiguration(log, apiType, zstdCompression, batchSize, parallelismOrConcurrency, null);
38+
}
39+
40+
public static void printConfiguration(
41+
Logger log,
42+
String apiType,
43+
boolean zstdCompression,
44+
int batchSize,
45+
int parallelismOrConcurrency,
46+
Integer maxPointsPerSecond) {
47+
log.info("=== {} API Benchmark Configuration ===", apiType);
48+
49+
String endpointsStr = SystemPropertyUtil.get("db.endpoints");
50+
if (endpointsStr == null) {
51+
endpointsStr = "localhost:4001";
52+
}
53+
String database = SystemPropertyUtil.get("db.database");
54+
if (database == null) {
55+
database = "public";
56+
}
57+
58+
log.info("Endpoint: {}", endpointsStr);
59+
log.info("Database: {}", database);
60+
log.info("Batch size: {}", batchSize);
61+
62+
if (maxPointsPerSecond != null) {
63+
log.info(
64+
"Max points per second: {}",
65+
maxPointsPerSecond == Integer.MAX_VALUE ? "unlimited" : maxPointsPerSecond);
66+
} else if (apiType.equals("Bulk")) {
67+
log.info("Parallelism: {}", parallelismOrConcurrency);
68+
} else {
69+
log.info("Concurrency: {}", parallelismOrConcurrency);
70+
}
71+
72+
log.info("Compression: {}", (zstdCompression ? "zstd" : "none"));
73+
log.info("CPU cores: {}", Cpus.cpus());
74+
log.info("Build profile: release");
75+
log.info("");
76+
}
77+
78+
public static void printBenchmarkStart(
79+
Logger log,
80+
String apiType,
81+
TableDataProvider provider,
82+
TableSchema schema,
83+
int batchSize,
84+
int parallelismOrConcurrency) {
85+
printBenchmarkStart(log, apiType, provider, schema, batchSize, parallelismOrConcurrency, null);
86+
}
87+
88+
public static void printBenchmarkStart(
89+
Logger log,
90+
String apiType,
91+
TableDataProvider provider,
92+
TableSchema schema,
93+
int batchSize,
94+
int parallelismOrConcurrency,
95+
Integer maxPointsPerSecond) {
96+
log.info("=== Running {} API Log Data Benchmark ===", apiType);
97+
log.info("Setting up {} writer...", apiType.toLowerCase());
98+
log.info(
99+
"Starting {} API benchmark: {}",
100+
apiType.toLowerCase(),
101+
provider.getClass().getSimpleName());
102+
log.info(
103+
"Table: {} ({} columns)",
104+
schema.getTableName(),
105+
schema.getColumnNames().size());
106+
log.info("Target rows: {}", provider.rowCount());
107+
log.info("Batch size: {}", batchSize);
108+
109+
if (maxPointsPerSecond != null) {
110+
log.info(
111+
"Max points per second: {}",
112+
maxPointsPerSecond == Integer.MAX_VALUE ? "unlimited" : maxPointsPerSecond);
113+
} else if (apiType.equals("Bulk")) {
114+
log.info("Parallelism: {}", parallelismOrConcurrency);
115+
} else {
116+
log.info("Concurrency: {}", parallelismOrConcurrency);
117+
}
118+
119+
log.info("");
120+
}
121+
122+
public static void printBatchProgress(Logger log, long batch, long totalRows, long writeRatePerSecond) {
123+
log.info("→ Batch {}: {} rows processed ({} rows/sec)", batch, totalRows, writeRatePerSecond);
124+
125+
if (batch % 10 == 0) {
126+
log.info("Flushed {} responses (total {} affected rows)", batch, totalRows);
127+
}
128+
}
129+
130+
public static void printCompletionMessages(Logger log, String apiType) {
131+
log.info("Finishing {} writer and waiting for all responses...", apiType.toLowerCase());
132+
log.info("All {} writes completed successfully", apiType.toLowerCase());
133+
log.info("Cleaning up data provider...");
134+
log.info("{} API benchmark completed successfully!", apiType);
135+
}
136+
137+
public static void printFinalResults(Logger log, long totalRows, long durationMs, long throughput) {
138+
log.info("Final Result:");
139+
log.info("• Total rows: {}", totalRows);
140+
log.info("• Total batches: {}", (totalRows / 100000));
141+
log.info("• Duration: {}s", durationMs / 1000.0);
142+
log.info("• Throughput: {} rows/sec", throughput);
143+
log.info("");
144+
}
145+
146+
public static void printProviderResults(
147+
Logger log, TableDataProvider provider, long totalRows, long durationMs, long throughput) {
148+
log.info("=== {} Benchmark Results ===", provider.getClass().getSimpleName());
149+
log.info("Table: {}", provider.tableSchema().getTableName());
150+
log.info("SUCCESS");
151+
log.info("Total rows: {}", totalRows);
152+
log.info("Duration: {}ms", durationMs);
153+
log.info("Throughput: {} rows/sec", throughput);
154+
log.info("");
155+
}
156+
157+
public static void printBenchmarkSummary(
158+
Logger log, TableDataProvider provider, long totalRows, long durationMs, long throughput) {
159+
log.info("=== Benchmark Result ===");
160+
log.info("Fastest provider: {} ({} rows/sec)", provider.getClass().getSimpleName(), throughput);
161+
log.info("");
162+
log.info("Provider Rows Duration(ms) Throughput Status");
163+
log.info("--------------------------------------------------------------------------");
164+
log.info(String.format(
165+
"%-30s %8d %12d %12d r/s SUCCESS",
166+
provider.getClass().getSimpleName(), totalRows, durationMs, throughput));
167+
}
168+
}

ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class RandomTableDataProvider implements TableDataProvider {
5151
.addField("pod_name", DataType.String)
5252
.build();
5353
// Total number of rows to generate, configurable via system property
54-
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 1_000_000L);
54+
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 5_000_000L);
5555
}
5656

5757
@Override

ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.greptime.GreptimeDB;
2020
import io.greptime.WriteOp;
21+
import io.greptime.bench.BenchmarkResultPrinter;
2122
import io.greptime.bench.DBConnector;
2223
import io.greptime.bench.TableDataProvider;
2324
import io.greptime.common.util.MetricsUtil;
@@ -57,9 +58,8 @@ public static void main(String[] args) throws Exception {
5758
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024);
5859
int concurrency = SystemPropertyUtil.getInt("concurrency", 4);
5960

60-
LOG.info("Using zstd compression: {}", zstdCompression);
61-
LOG.info("Batch size: {}", batchSize);
62-
LOG.info("Concurrency: {}", concurrency);
61+
BenchmarkResultPrinter.printBenchmarkHeader(LOG, "Batching");
62+
BenchmarkResultPrinter.printConfiguration(LOG, "Batching", zstdCompression, batchSize, concurrency);
6363

6464
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
6565
Context ctx = Context.newDefault().withCompression(compression);
@@ -73,20 +73,18 @@ public static void main(String[] args) throws Exception {
7373
Semaphore semaphore = new Semaphore(concurrency);
7474
TableDataProvider tableDataProvider =
7575
ServiceLoader.load(TableDataProvider.class).first();
76-
LOG.info("Table data provider: {}", tableDataProvider.getClass().getName());
7776
tableDataProvider.init();
7877
TableSchema tableSchema = tableDataProvider.tableSchema();
7978
AtomicLong totalRowsWritten = new AtomicLong(0);
79+
AtomicLong batchCounter = new AtomicLong(0);
80+
81+
BenchmarkResultPrinter.printBenchmarkStart(
82+
LOG, "Batching", tableDataProvider, tableSchema, batchSize, concurrency);
8083

8184
try {
8285
Iterator<Object[]> rows = tableDataProvider.rows();
8386

84-
LOG.info(
85-
"Start writing data, table: {}, row count: {}",
86-
tableSchema.getTableName(),
87-
tableDataProvider.rowCount());
88-
89-
long start = System.nanoTime();
87+
long benchmarkStart = System.nanoTime();
9088
do {
9189
Table table = Table.from(tableSchema);
9290
for (int j = 0; j < batchSize; j++) {
@@ -104,36 +102,40 @@ public static void main(String[] args) throws Exception {
104102
// Write the table data to the server
105103
CompletableFuture<Result<WriteOk, Err>> future =
106104
greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx);
107-
long fStart = System.nanoTime();
108105
future.whenComplete((result, error) -> {
109106
semaphore.release();
110107

111-
long costMs = (System.nanoTime() - fStart) / 1000000;
112108
if (error != null) {
113109
LOG.error("Error writing data", error);
114110
return;
115111
}
116112

117113
int numRows = result.mapOr(0, writeOk -> writeOk.getSuccess());
118114
long totalRows = totalRowsWritten.addAndGet(numRows);
119-
long totalElapsedSec = (System.nanoTime() - start) / 1000000000;
120-
long writeRatePerSecond = totalElapsedSec > 0 ? totalRows / totalElapsedSec : 0;
121-
LOG.info(
122-
"Wrote rows: {}, time cost: {}ms, total rows: {}, total elapsed: {}s, write rate: {} rows/sec",
123-
numRows,
124-
costMs,
125-
totalRows,
126-
totalElapsedSec,
127-
writeRatePerSecond);
115+
long batch = batchCounter.incrementAndGet();
116+
long totalElapsedMs = (System.nanoTime() - benchmarkStart) / 1000000;
117+
long writeRatePerSecond = totalElapsedMs > 0 ? (totalRows * 1000) / totalElapsedMs : 0;
118+
BenchmarkResultPrinter.printBatchProgress(LOG, batch, totalRows, writeRatePerSecond);
128119
});
129120
} while (rows.hasNext());
130121

131122
// Wait for all the requests to complete
132123
semaphore.acquire(concurrency);
133124

134-
LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);
125+
BenchmarkResultPrinter.printCompletionMessages(LOG, "Batching");
126+
127+
long totalDurationMs = (System.nanoTime() - benchmarkStart) / 1000000;
128+
long finalRowCount = totalRowsWritten.get();
129+
long finalThroughput = totalDurationMs > 0 ? (finalRowCount * 1000) / totalDurationMs : 0;
130+
131+
BenchmarkResultPrinter.printFinalResults(LOG, finalRowCount, totalDurationMs, finalThroughput);
132+
BenchmarkResultPrinter.printProviderResults(
133+
LOG, tableDataProvider, finalRowCount, totalDurationMs, finalThroughput);
134+
BenchmarkResultPrinter.printBenchmarkSummary(
135+
LOG, tableDataProvider, finalRowCount, totalDurationMs, finalThroughput);
135136

136137
} finally {
138+
tableDataProvider.close();
137139
greptimeDB.shutdownGracefully();
138140
metricsExporter.shutdownGracefully();
139141
}

ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.greptime.BulkWrite;
2121
import io.greptime.GreptimeDB;
2222
import io.greptime.WriteOp;
23+
import io.greptime.bench.BenchmarkResultPrinter;
2324
import io.greptime.bench.DBConnector;
2425
import io.greptime.bench.TableDataProvider;
2526
import io.greptime.common.util.MetricsUtil;
@@ -60,9 +61,8 @@ public static void main(String[] args) throws Exception {
6061
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024);
6162
int maxRequestsInFlight = SystemPropertyUtil.getInt("max_requests_in_flight", 4);
6263

63-
LOG.info("Using zstd compression: {}", zstdCompression);
64-
LOG.info("Batch size: {}", batchSize);
65-
LOG.info("Max requests in flight: {}", maxRequestsInFlight);
64+
BenchmarkResultPrinter.printBenchmarkHeader(LOG, "Bulk");
65+
BenchmarkResultPrinter.printConfiguration(LOG, "Bulk", zstdCompression, batchSize, maxRequestsInFlight);
6666

6767
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
6868
Context ctx = Context.newDefault().withCompression(compression);
@@ -81,19 +81,21 @@ public static void main(String[] args) throws Exception {
8181

8282
TableDataProvider tableDataProvider =
8383
ServiceLoader.load(TableDataProvider.class).first();
84-
LOG.info("Table data provider: {}", tableDataProvider.getClass().getName());
8584
tableDataProvider.init();
8685
TableSchema tableSchema = tableDataProvider.tableSchema();
8786
AtomicLong totalRowsWritten = new AtomicLong(0);
87+
AtomicLong batchCounter = new AtomicLong(0);
88+
89+
BenchmarkResultPrinter.printBenchmarkStart(
90+
LOG, "Bulk", tableDataProvider, tableSchema, batchSize, maxRequestsInFlight);
8891

8992
// Before writing data, ensure the table exists, bulk write API does not create tables.
9093
ensureTableExists(greptimeDB, tableSchema, tableDataProvider, ctx);
9194

92-
LOG.info("Start writing data");
95+
long benchmarkStart = System.nanoTime();
9396
try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) {
9497
Iterator<Object[]> rows = tableDataProvider.rows();
9598

96-
long start = System.nanoTime();
9799
do {
98100
Table.TableBufferRoot table = writer.tableBufferRoot(1024);
99101
for (int i = 0; i < batchSize; i++) {
@@ -102,7 +104,6 @@ public static void main(String[] args) throws Exception {
102104
}
103105
table.addRow(rows.next());
104106
}
105-
LOG.info("Table bytes used: {}", table.bytesUsed());
106107
// Complete the table; adding rows is no longer permitted.
107108
table.complete();
108109

@@ -117,21 +118,26 @@ public static void main(String[] args) throws Exception {
117118
}
118119

119120
long totalRows = totalRowsWritten.addAndGet(r);
120-
long totalElapsedSec = (System.nanoTime() - start) / 1000000000;
121-
long writeRatePerSecond = totalElapsedSec > 0 ? totalRows / totalElapsedSec : 0;
122-
LOG.info(
123-
"Wrote rows: {}, time cost: {}ms, total rows: {}, total elapsed: {}s, write rate: {} rows/sec",
124-
r,
125-
costMs,
126-
totalRows,
127-
totalElapsedSec,
128-
writeRatePerSecond);
121+
long batch = batchCounter.incrementAndGet();
122+
long totalElapsedMs = (System.nanoTime() - benchmarkStart) / 1000000;
123+
long writeRatePerSecond = totalElapsedMs > 0 ? (totalRows * 1000) / totalElapsedMs : 0;
124+
BenchmarkResultPrinter.printBatchProgress(LOG, batch, totalRows, writeRatePerSecond);
129125
});
130126
} while (rows.hasNext());
131127

132128
writer.completed();
133129

134-
LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);
130+
BenchmarkResultPrinter.printCompletionMessages(LOG, "Bulk");
131+
132+
long totalDurationMs = (System.nanoTime() - benchmarkStart) / 1000000;
133+
long finalRowCount = totalRowsWritten.get();
134+
long finalThroughput = totalDurationMs > 0 ? (finalRowCount * 1000) / totalDurationMs : 0;
135+
136+
BenchmarkResultPrinter.printFinalResults(LOG, finalRowCount, totalDurationMs, finalThroughput);
137+
BenchmarkResultPrinter.printProviderResults(
138+
LOG, tableDataProvider, finalRowCount, totalDurationMs, finalThroughput);
139+
BenchmarkResultPrinter.printBenchmarkSummary(
140+
LOG, tableDataProvider, finalRowCount, totalDurationMs, finalThroughput);
135141
} finally {
136142
tableDataProvider.close();
137143
}

0 commit comments

Comments
 (0)