Skip to content

Commit bb01f4c

Browse files
committed
chore: refactor bench
1 parent 64da7c8 commit bb01f4c

5 files changed

Lines changed: 86 additions & 42 deletions

File tree

ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,12 @@
3737
public class MultiProducerTableDataProvider extends RandomTableDataProvider {
3838

3939
private final int producerCount;
40-
private final long rowCount;
4140
private final ExecutorService executorService;
4241
private final BlockingQueue<Object[]> buffer = new ArrayBlockingQueue<>(100000);
4342

4443
{
4544
this.producerCount = SystemPropertyUtil.getInt("multi_producer_table_data_provider.producer_count", 10);
4645
// Total number of rows to generate, configurable via system property
47-
this.rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 10_000_000L);
4846
this.executorService = ThreadPoolUtil.newBuilder()
4947
.poolName("multi-producer-table-data-provider")
5048
.enableMetric(true)
@@ -62,7 +60,7 @@ public void init() {
6260
AtomicLong rowIndex = new AtomicLong(0);
6361
for (int i = 0; i < producerCount; i++) {
6462
this.executorService.execute(() -> {
65-
while (rowIndex.getAndIncrement() < rowCount) {
63+
while (rowIndex.getAndIncrement() < rowCount()) {
6664
Object[] row = nextRow();
6765
try {
6866
buffer.put(row);
@@ -82,7 +80,7 @@ public Iterator<Object[]> rows() {
8280

8381
@Override
8482
public boolean hasNext() {
85-
return index < rowCount;
83+
return index < rowCount();
8684
}
8785

8886
@Override

ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class RandomTableDataProvider implements TableDataProvider {
5151
.addField("pod_name", DataType.String)
5252
.build();
5353
// Total number of rows to generate, configurable via system property
54-
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 10_000_000L);
54+
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 1_000_000L);
5555
}
5656

5757
@Override
@@ -133,4 +133,9 @@ public Object[] next() {
133133

134134
@Override
135135
public void close() throws Exception {}
136+
137+
@Override
138+
public long rowCount() {
139+
return rowCount;
140+
}
136141
}

ingester-example/src/main/java/io/greptime/bench/TableDataProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,9 @@ public interface TableDataProvider extends AutoCloseable {
3838
* Returns the iterator of the rows.
3939
*/
4040
Iterator<Object[]> rows();
41+
42+
/**
43+
* Returns the total number of rows.
44+
*/
45+
long rowCount();
4146
}

ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java

Lines changed: 72 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,71 +35,107 @@
3535
import java.util.Collections;
3636
import java.util.Iterator;
3737
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.Semaphore;
39+
import java.util.concurrent.atomic.AtomicLong;
3840
import org.slf4j.Logger;
3941
import org.slf4j.LoggerFactory;
4042

4143
/**
4244
* BatchingWriteBenchmark is a benchmark for the batching write API of GreptimeDB.
4345
*
4446
* Env:
45-
* - batch_size_per_request: the batch size per request
4647
* - zstd_compression: whether to use zstd compression
47-
* - max_points_per_second: the max number of points that can be written per second, exceeding which may cause blockage
48+
* - batch_size_per_request: the batch size per request
49+
* - concurrency: the number of concurrent writers
4850
*/
4951
public class BatchingWriteBenchmark {
5052

5153
private static final Logger LOG = LoggerFactory.getLogger(BatchingWriteBenchmark.class);
5254

5355
public static void main(String[] args) throws Exception {
54-
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true);
56+
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false);
5557
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024);
58+
int concurrency = SystemPropertyUtil.getInt("concurrency", 4);
5659

5760
LOG.info("Using zstd compression: {}", zstdCompression);
5861
LOG.info("Batch size: {}", batchSize);
62+
LOG.info("Concurrency: {}", concurrency);
63+
64+
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
65+
Context ctx = Context.newDefault().withCompression(compression);
5966

6067
// Start a metrics exporter
6168
MetricsExporter metricsExporter = new MetricsExporter(MetricsUtil.metricRegistry());
6269
metricsExporter.init(ExporterOptions.newDefault());
63-
GreptimeDB greptimeDB = DBConnector.connect();
6470

65-
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
66-
Context ctx = Context.newDefault().withCompression(compression);
71+
GreptimeDB greptimeDB = DBConnector.connect();
6772

73+
Semaphore semaphore = new Semaphore(concurrency);
6874
TableDataProvider tableDataProvider =
6975
ServiceLoader.load(TableDataProvider.class).first();
7076
LOG.info("Table data provider: {}", tableDataProvider.getClass().getName());
7177
tableDataProvider.init();
7278
TableSchema tableSchema = tableDataProvider.tableSchema();
73-
Iterator<Object[]> rows = tableDataProvider.rows();
74-
75-
LOG.info("Start writing data");
76-
long start = System.nanoTime();
77-
do {
78-
Table table = Table.from(tableSchema);
79-
for (int i = 0; i < batchSize; i++) {
80-
if (!rows.hasNext()) {
81-
break;
79+
AtomicLong totalRowsWritten = new AtomicLong(0);
80+
81+
try {
82+
Iterator<Object[]> rows = tableDataProvider.rows();
83+
84+
LOG.info(
85+
"Start writing data, table: {}, row count: {}",
86+
tableSchema.getTableName(),
87+
tableDataProvider.rowCount());
88+
89+
long start = System.nanoTime();
90+
do {
91+
Table table = Table.from(tableSchema);
92+
for (int j = 0; j < batchSize; j++) {
93+
if (!rows.hasNext()) {
94+
break;
95+
}
96+
table.addRow(rows.next());
8297
}
83-
table.addRow(rows.next());
84-
}
85-
LOG.info("Table bytes used: {}", table.bytesUsed());
86-
// Complete the table; adding rows is no longer permitted.
87-
table.complete();
88-
long fStart = System.nanoTime();
89-
// Write the table data to the server
90-
CompletableFuture<Result<WriteOk, Err>> future =
91-
greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx);
92-
// Wait for the writing to complete
93-
int numRows = future.get().mapOr(0, WriteOk::getSuccess);
94-
long costMs = (System.nanoTime() - fStart) / 1000000;
95-
LOG.info("Write rows: {}, time cost: {}ms", numRows, costMs);
96-
97-
} while (rows.hasNext());
98-
99-
LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);
100-
101-
greptimeDB.shutdownGracefully();
102-
tableDataProvider.close();
103-
metricsExporter.shutdownGracefully();
98+
99+
// Complete the table; adding rows is no longer permitted.
100+
table.complete();
101+
102+
semaphore.acquire();
103+
104+
// Write the table data to the server
105+
CompletableFuture<Result<WriteOk, Err>> future =
106+
greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx);
107+
long fStart = System.nanoTime();
108+
future.whenComplete((result, error) -> {
109+
semaphore.release();
110+
111+
long costMs = (System.nanoTime() - fStart) / 1000000;
112+
if (error != null) {
113+
LOG.error("Error writing data", error);
114+
return;
115+
}
116+
117+
int numRows = result.mapOr(0, writeOk -> writeOk.getSuccess());
118+
long totalRows = totalRowsWritten.addAndGet(numRows);
119+
long totalElapsedSec = (System.nanoTime() - start) / 1000000000;
120+
long writeRatePerSecond = totalElapsedSec > 0 ? totalRows / totalElapsedSec : 0;
121+
LOG.info(
122+
"Wrote rows: {}, time cost: {}ms, total rows: {}, total elapsed: {}s, write rate: {} rows/sec",
123+
numRows,
124+
costMs,
125+
totalRows,
126+
totalElapsedSec,
127+
writeRatePerSecond);
128+
});
129+
} while (rows.hasNext());
130+
131+
// Wait for all the requests to complete
132+
semaphore.acquire(concurrency);
133+
134+
LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);
135+
136+
} finally {
137+
greptimeDB.shutdownGracefully();
138+
metricsExporter.shutdownGracefully();
139+
}
104140
}
105141
}

ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class BulkWriteBenchmark {
5656
private static final Logger LOG = LoggerFactory.getLogger(BulkWriteBenchmark.class);
5757

5858
public static void main(String[] args) throws Exception {
59-
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true);
59+
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false);
6060
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024);
6161
int maxRequestsInFlight = SystemPropertyUtil.getInt("max_requests_in_flight", 4);
6262

0 commit comments

Comments
 (0)