3434import java .util .Collections ;
3535import java .util .Iterator ;
3636import java .util .concurrent .CompletableFuture ;
37+ import java .util .concurrent .atomic .AtomicLong ;
3738import org .slf4j .Logger ;
3839import org .slf4j .LoggerFactory ;
3940
4041/**
4142 * BulkWriteBenchmark is a benchmark for the bulk write API of GreptimeDB.
4243 *
4344 * Env:
44- * - batch_size_per_request: the batch size per request
4545 * - zstd_compression: whether to use zstd compression
46+ * - batch_size_per_request: the batch size per request
47+ * - max_requests_in_flight: the max number of requests in flight
4648 * <p>
4749 * <b>IMPORTANT:</b> Unlike the standard write method,
4850 * this bulk writing stream API requires the target table to exist beforehand. It will
@@ -56,29 +58,33 @@ public class BulkWriteBenchmark {
5658 public static void main (String [] args ) throws Exception {
5759 boolean zstdCompression = SystemPropertyUtil .getBool ("zstd_compression" , true );
5860 int batchSize = SystemPropertyUtil .getInt ("batch_size_per_request" , 64 * 1024 );
61+ int maxRequestsInFlight = SystemPropertyUtil .getInt ("max_requests_in_flight" , 4 );
62+
5963 LOG .info ("Using zstd compression: {}" , zstdCompression );
6064 LOG .info ("Batch size: {}" , batchSize );
65+ LOG .info ("Max requests in flight: {}" , maxRequestsInFlight );
66+
67+ Compression compression = zstdCompression ? Compression .Zstd : Compression .None ;
68+ Context ctx = Context .newDefault ().withCompression (compression );
6169
6270 // Start a metrics exporter
6371 MetricsExporter metricsExporter = new MetricsExporter (MetricsUtil .metricRegistry ());
6472 metricsExporter .init (ExporterOptions .newDefault ());
6573
6674 GreptimeDB greptimeDB = DBConnector .connect ();
67-
6875 BulkWrite .Config cfg = BulkWrite .Config .newBuilder ()
6976 .allocatorInitReservation (0 )
7077 .allocatorMaxAllocation (4 * 1024 * 1024 * 1024L )
7178 .timeoutMsPerMessage (60000 )
72- .maxRequestsInFlight (4 )
79+ .maxRequestsInFlight (maxRequestsInFlight )
7380 .build ();
74- Compression compression = zstdCompression ? Compression .Zstd : Compression .None ;
75- Context ctx = Context .newDefault ().withCompression (compression );
7681
7782 TableDataProvider tableDataProvider =
7883 ServiceLoader .load (TableDataProvider .class ).first ();
7984 LOG .info ("Table data provider: {}" , tableDataProvider .getClass ().getName ());
8085 tableDataProvider .init ();
8186 TableSchema tableSchema = tableDataProvider .tableSchema ();
87+ AtomicLong totalRowsWritten = new AtomicLong (0 );
8288
8389 // Before writing data, ensure the table exists, bulk write API does not create tables.
8490 ensureTableExists (greptimeDB , tableSchema , tableDataProvider , ctx );
@@ -107,11 +113,20 @@ public static void main(String[] args) throws Exception {
107113 long costMs = (System .nanoTime () - fStart ) / 1000000 ;
108114 if (t != null ) {
109115 LOG .error ("Error writing data, time cost: {}ms" , costMs , t );
110- } else {
111- LOG .info ("Wrote rows: {}, time cost: {}ms" , r , costMs );
116+ return ;
112117 }
113- });
114118
119+ long totalRows = totalRowsWritten .addAndGet (r );
120+ long totalElapsedSec = (System .nanoTime () - start ) / 1000000000 ;
121+ long writeRatePerSecond = totalElapsedSec > 0 ? totalRows / totalElapsedSec : 0 ;
122+ LOG .info (
123+ "Wrote rows: {}, time cost: {}ms, total rows: {}, total elapsed: {}s, write rate: {} rows/sec" ,
124+ r ,
125+ costMs ,
126+ totalRows ,
127+ totalElapsedSec ,
128+ writeRatePerSecond );
129+ });
115130 } while (rows .hasNext ());
116131
117132 writer .completed ();
0 commit comments