writer = client.streamWriter(1000);
The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages off-heap memory management to achieve optimal throughput when writing batches of data.
+> **Important**:
+> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either:
+> - Insert API (which supports auto table creation), or
+> - SQL DDL statements (CREATE TABLE)
+> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema.
+> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions.
+
This API supports writing to one table per stream and handles large data volumes (up to 200MB per write) with adaptive flow control. Performance advantages include:
- Off-heap memory management with Arrow buffers
- Efficient binary serialization and data transfer
diff --git a/ingester-example/README.md b/ingester-example/README.md
index 9c762ff..a67244c 100644
--- a/ingester-example/README.md
+++ b/ingester-example/README.md
@@ -81,6 +81,13 @@ This API is particularly well-suited for:
The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages Apache Arrow's Flight protocol and off-heap memory management to achieve optimal throughput when writing batches of data.
+> **Important**:
+> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either:
+> - Insert API (which supports auto table creation), or
+> - SQL DDL statements (CREATE TABLE)
+> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema.
+> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions.
+
**Important Note**:
- This API is designed around streaming connections, which means each stream establishes a connection to only one database node. Unlike the regular write API, it lacks automatic load balancing for individual requests. However, if your use case involves multiple clients establishing multiple streams to the database, this limitation is not a concern.
- Unlike regular streaming, this API allows continuous writing to only one table per stream, but can handle very large data volumes (up to 200MB per write). It features sophisticated adaptive flow control mechanisms that automatically adjust to your data throughput requirements.
diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java
index 8b26848..86b2826 100644
--- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java
+++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java
@@ -32,7 +32,7 @@
import io.greptime.models.WriteOk;
import io.greptime.rpc.Compression;
import io.greptime.rpc.Context;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
@@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception {
LOG.info("Start writing data");
long start = System.nanoTime();
- for (; ; ) {
+ do {
Table table = Table.from(tableSchema);
for (int i = 0; i < batchSize; i++) {
if (!rows.hasNext()) {
@@ -88,16 +88,13 @@ public static void main(String[] args) throws Exception {
long fStart = System.nanoTime();
// Write the table data to the server
CompletableFuture> future =
- greptimeDB.write(Arrays.asList(table), WriteOp.Insert, ctx);
- // Wait for the write to complete
- int numRows = future.get().mapOr(0, writeOk -> writeOk.getSuccess());
+ greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx);
+ // Wait for the writing to complete
+ int numRows = future.get().mapOr(0, WriteOk::getSuccess);
long costMs = (System.nanoTime() - fStart) / 1000000;
LOG.info("Write rows: {}, time cost: {}ms", numRows, costMs);
- if (!rows.hasNext()) {
- break;
- }
- }
+ } while (rows.hasNext());
LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);
diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java
index 6029824..53e8fbd 100644
--- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java
+++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java
@@ -19,6 +19,7 @@
import io.greptime.BulkStreamWriter;
import io.greptime.BulkWrite;
import io.greptime.GreptimeDB;
+import io.greptime.WriteOp;
import io.greptime.bench.DBConnector;
import io.greptime.bench.TableDataProvider;
import io.greptime.common.util.MetricsUtil;
@@ -30,6 +31,7 @@
import io.greptime.models.TableSchema;
import io.greptime.rpc.Compression;
import io.greptime.rpc.Context;
+import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
@@ -41,6 +43,11 @@
* Env:
* - batch_size_per_request: the batch size per request
* - zstd_compression: whether to use zstd compression
+ *
+ * IMPORTANT: Unlike the standard write method,
+ * this bulk writing stream API requires the target table to exist beforehand. It will
+ * NOT automatically create the table if it does not exist. Please ensure table creation
+ * before starting a bulk write operation.
*/
public class BulkWriteBenchmark {
@@ -73,12 +80,15 @@ public static void main(String[] args) throws Exception {
tableDataProvider.init();
TableSchema tableSchema = tableDataProvider.tableSchema();
+ // Before writing data, ensure the table exists, bulk write API does not create tables.
+ ensureTableExists(greptimeDB, tableSchema, tableDataProvider, ctx);
+
LOG.info("Start writing data");
try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) {
Iterator