diff --git a/README.md b/README.md index 143ca39..33f1fad 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,13 @@ StreamWriter writer = client.streamWriter(1000); The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages off-heap memory management to achieve optimal throughput when writing batches of data. +> **Important**: +> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either: +> - Insert API (which supports auto table creation), or +> - SQL DDL statements (CREATE TABLE) +> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema. +> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions. + This API supports writing to one table per stream and handles large data volumes (up to 200MB per write) with adaptive flow control. Performance advantages include: - Off-heap memory management with Arrow buffers - Efficient binary serialization and data transfer diff --git a/ingester-example/README.md b/ingester-example/README.md index 9c762ff..a67244c 100644 --- a/ingester-example/README.md +++ b/ingester-example/README.md @@ -81,6 +81,13 @@ This API is particularly well-suited for: The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages Apache Arrow's Flight protocol and off-heap memory management to achieve optimal throughput when writing batches of data. +> **Important**: +> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either: +> - Insert API (which supports auto table creation), or +> - SQL DDL statements (CREATE TABLE) +> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema. +> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions. + **Important Note**: - This API is designed around streaming connections, which means each stream establishes a connection to only one database node. Unlike the regular write API, it lacks automatic load balancing for individual requests. However, if your use case involves multiple clients establishing multiple streams to the database, this limitation is not a concern. - Unlike regular streaming, this API allows continuous writing to only one table per stream, but can handle very large data volumes (up to 200MB per write). It features sophisticated adaptive flow control mechanisms that automatically adjust to your data throughput requirements. diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java index 8b26848..86b2826 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java @@ -32,7 +32,7 @@ import io.greptime.models.WriteOk; import io.greptime.rpc.Compression; import io.greptime.rpc.Context; -import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; @@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception { LOG.info("Start writing data"); long start = System.nanoTime(); - for (; ; ) { + do { Table table = Table.from(tableSchema); for (int i = 0; i < batchSize; i++) { if (!rows.hasNext()) { @@ -88,16 +88,13 @@ public static void main(String[] args) throws Exception { long fStart = System.nanoTime(); // Write the table data to the server CompletableFuture> future = - greptimeDB.write(Arrays.asList(table), WriteOp.Insert, ctx); - // Wait for the write to complete - int numRows = future.get().mapOr(0, writeOk -> writeOk.getSuccess()); + greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx); + // Wait for the writing to complete + int numRows = future.get().mapOr(0, WriteOk::getSuccess); long costMs = (System.nanoTime() - fStart) / 1000000; LOG.info("Write rows: {}, time cost: {}ms", numRows, costMs); - if (!rows.hasNext()) { - break; - } - } + } while (rows.hasNext()); LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000); diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java index 6029824..53e8fbd 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java @@ -19,6 +19,7 @@ import io.greptime.BulkStreamWriter; import io.greptime.BulkWrite; import io.greptime.GreptimeDB; +import io.greptime.WriteOp; import io.greptime.bench.DBConnector; import io.greptime.bench.TableDataProvider; import io.greptime.common.util.MetricsUtil; @@ -30,6 +31,7 @@ import io.greptime.models.TableSchema; import io.greptime.rpc.Compression; import io.greptime.rpc.Context; +import java.util.Collections; import java.util.Iterator; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; @@ -41,6 +43,11 @@ * Env: * - batch_size_per_request: the batch size per request * - zstd_compression: whether to use zstd compression + *

+ * IMPORTANT: Unlike the standard write method, + * this bulk writing stream API requires the target table to exist beforehand. It will + * NOT automatically create the table if it does not exist. Please ensure table creation + * before starting a bulk write operation. */ public class BulkWriteBenchmark { @@ -73,12 +80,15 @@ public static void main(String[] args) throws Exception { tableDataProvider.init(); TableSchema tableSchema = tableDataProvider.tableSchema(); + // Before writing data, ensure the table exists, bulk write API does not create tables. + ensureTableExists(greptimeDB, tableSchema, tableDataProvider, ctx); + LOG.info("Start writing data"); try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) { Iterator rows = tableDataProvider.rows(); long start = System.nanoTime(); - for (; ; ) { + do { Table.TableBufferRoot table = writer.tableBufferRoot(1024); for (int i = 0; i < batchSize; i++) { if (!rows.hasNext()) { @@ -102,10 +112,7 @@ public static void main(String[] args) throws Exception { } }); - if (!rows.hasNext()) { - break; - } - } + } while (rows.hasNext()); writer.completed(); @@ -117,4 +124,35 @@ public static void main(String[] args) throws Exception { greptimeDB.shutdownGracefully(); metricsExporter.shutdownGracefully(); } + + /** + * Ensures that the table exists in the database. + * + * @param greptimeDB the GreptimeDB instance + * @param tableSchema the schema of the table to ensure + * @param ctx the context for the operation + */ + private static void ensureTableExists( + GreptimeDB greptimeDB, TableSchema tableSchema, TableDataProvider tableDataProvider, Context ctx) { + Table initTable = Table.from(tableSchema); + Iterator rows = tableDataProvider.rows(); + if (!rows.hasNext()) { + throw new IllegalStateException("No rows available in table data provider"); + } + // Add an initial row to the table to get the table schema. + initTable.addRow(rows.next()); + try { + // Write an initial row to ensure the table exists. + greptimeDB + .write(Collections.singletonList(initTable), WriteOp.Insert, ctx) + .get(); + // Delete the initial row to leave the table empty. + greptimeDB + .write(Collections.singletonList(initTable), WriteOp.Delete, ctx) + .get(); + LOG.info("Table ensured for benchmark: {}", tableSchema.getTableName()); + } catch (Exception e) { + LOG.error("Table creation may have been skipped if it already exists: {}", e.getMessage()); + } + } } diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java index 1f9bbe9..934d450 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java @@ -75,7 +75,7 @@ public static void main(String[] args) throws Exception { LOG.info("Start writing data"); long start = System.nanoTime(); - for (; ; ) { + do { Table table = Table.from(tableSchema); for (int i = 0; i < batchSize; i++) { if (!rows.hasNext()) { @@ -89,15 +89,12 @@ public static void main(String[] args) throws Exception { // Write the table data to the server writer.write(table); - if (!rows.hasNext()) { - break; - } - } + } while (rows.hasNext()); // Completes the stream, and the stream will be closed. CompletableFuture future = writer.completed(); - // Now we can get the write result. + // Now we can get the writing result. WriteOk result = future.get(); LOG.info("Completed writing data: {}, time cost: {}s", result, (System.nanoTime() - start) / 1000000000);