From c7402d49b59c61d7e83ed94face4b2c2789d2b61 Mon Sep 17 00:00:00 2001 From: Logic Date: Mon, 14 Jul 2025 20:33:17 +0800 Subject: [PATCH 1/4] fix(bulk-write): ensure table exists before bulk write - Add logic to ensure the target table exists before starting bulk write - Update README with important notes about bulk write API usage - Refactor write loops in benchmark classes for better readability --- README.md | 7 ++++ ingester-example/README.md | 7 ++++ .../benchmark/BatchingWriteBenchmark.java | 15 +++---- .../bench/benchmark/BulkWriteBenchmark.java | 40 ++++++++++++++++--- .../benchmark/StreamingWriteBenchmark.java | 9 ++--- 5 files changed, 58 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 143ca39..33f1fad 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,13 @@ StreamWriter writer = client.streamWriter(1000); The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages off-heap memory management to achieve optimal throughput when writing batches of data. +> **Important**: +> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either: +> - Insert API (which supports auto table creation), or +> - SQL DDL statements (CREATE TABLE) +> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema. +> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions. + This API supports writing to one table per stream and handles large data volumes (up to 200MB per write) with adaptive flow control. Performance advantages include: - Off-heap memory management with Arrow buffers - Efficient binary serialization and data transfer diff --git a/ingester-example/README.md b/ingester-example/README.md index 9c762ff..1df3b79 100644 --- a/ingester-example/README.md +++ b/ingester-example/README.md @@ -81,6 +81,13 @@ This API is particularly well-suited for: The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages Apache Arrow's Flight protocol and off-heap memory management to achieve optimal throughput when writing batches of data. +> **Important**: +> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either: + > - Insert API (which supports auto table creation), or +> - SQL DDL statements (CREATE TABLE) +> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema. +> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions. + **Important Note**: - This API is designed around streaming connections, which means each stream establishes a connection to only one database node. Unlike the regular write API, it lacks automatic load balancing for individual requests. However, if your use case involves multiple clients establishing multiple streams to the database, this limitation is not a concern. - Unlike regular streaming, this API allows continuous writing to only one table per stream, but can handle very large data volumes (up to 200MB per write). It features sophisticated adaptive flow control mechanisms that automatically adjust to your data throughput requirements. diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java index 8b26848..86b2826 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java @@ -32,7 +32,7 @@ import io.greptime.models.WriteOk; import io.greptime.rpc.Compression; import io.greptime.rpc.Context; -import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; @@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception { LOG.info("Start writing data"); long start = System.nanoTime(); - for (; ; ) { + do { Table table = Table.from(tableSchema); for (int i = 0; i < batchSize; i++) { if (!rows.hasNext()) { @@ -88,16 +88,13 @@ public static void main(String[] args) throws Exception { long fStart = System.nanoTime(); // Write the table data to the server CompletableFuture> future = - greptimeDB.write(Arrays.asList(table), WriteOp.Insert, ctx); - // Wait for the write to complete - int numRows = future.get().mapOr(0, writeOk -> writeOk.getSuccess()); + greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx); + // Wait for the writing to complete + int numRows = future.get().mapOr(0, WriteOk::getSuccess); long costMs = (System.nanoTime() - fStart) / 1000000; LOG.info("Write rows: {}, time cost: {}ms", numRows, costMs); - if (!rows.hasNext()) { - break; - } - } + } while (rows.hasNext()); LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000); diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java index 6029824..0954893 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java @@ -19,6 +19,7 @@ import io.greptime.BulkStreamWriter; import io.greptime.BulkWrite; import io.greptime.GreptimeDB; +import io.greptime.WriteOp; import io.greptime.bench.DBConnector; import io.greptime.bench.TableDataProvider; import io.greptime.common.util.MetricsUtil; @@ -30,6 +31,7 @@ import io.greptime.models.TableSchema; import io.greptime.rpc.Compression; import io.greptime.rpc.Context; +import java.util.Collections; import java.util.Iterator; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; @@ -41,6 +43,11 @@ * Env: * - batch_size_per_request: the batch size per request * - zstd_compression: whether to use zstd compression + *

+ * IMPORTANT: Unlike the standard write method, + * this bulk writing stream API requires the target table to exist beforehand. It will + * NOT automatically create the table if it does not exist. Please ensure table creation + * before starting a bulk write operation. */ public class BulkWriteBenchmark { @@ -73,12 +80,15 @@ public static void main(String[] args) throws Exception { tableDataProvider.init(); TableSchema tableSchema = tableDataProvider.tableSchema(); + // Before writing data, ensure the table exists, bulk write API does not create tables. + ensureTableExists(greptimeDB, tableSchema, tableDataProvider, ctx); + LOG.info("Start writing data"); try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) { Iterator rows = tableDataProvider.rows(); long start = System.nanoTime(); - for (; ; ) { + do { Table.TableBufferRoot table = writer.tableBufferRoot(1024); for (int i = 0; i < batchSize; i++) { if (!rows.hasNext()) { @@ -102,10 +112,7 @@ public static void main(String[] args) throws Exception { } }); - if (!rows.hasNext()) { - break; - } - } + } while (rows.hasNext()); writer.completed(); @@ -117,4 +124,27 @@ public static void main(String[] args) throws Exception { greptimeDB.shutdownGracefully(); metricsExporter.shutdownGracefully(); } + + /** + * Ensures that the table exists in the database. + * + * @param greptimeDB the GreptimeDB instance + * @param tableSchema the schema of the table to ensure + * @param ctx the context for the operation + */ + private static void ensureTableExists(GreptimeDB greptimeDB, TableSchema tableSchema, TableDataProvider tableDataProvider, Context ctx) { + Table initTable = Table.from(tableSchema); + Iterator rows = tableDataProvider.rows(); + // Add an initial row to the table to get the table schema. + initTable.addRow(rows.hasNext() ? rows.next() : new Object[0]); + try { + // Write an initial row to ensure the table exists. + greptimeDB.write(Collections.singletonList(initTable), WriteOp.Insert, ctx).get(); + // Delete the initial row to leave the table empty. + greptimeDB.write(Collections.singletonList(initTable), WriteOp.Delete, ctx).get(); + LOG.info("Table ensured for benchmark: {}", tableSchema.getTableName()); + } catch (Exception e) { + LOG.error("Table creation may have been skipped if it already exists: {}", e.getMessage()); + } + } } diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java index 1f9bbe9..934d450 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java @@ -75,7 +75,7 @@ public static void main(String[] args) throws Exception { LOG.info("Start writing data"); long start = System.nanoTime(); - for (; ; ) { + do { Table table = Table.from(tableSchema); for (int i = 0; i < batchSize; i++) { if (!rows.hasNext()) { @@ -89,15 +89,12 @@ public static void main(String[] args) throws Exception { // Write the table data to the server writer.write(table); - if (!rows.hasNext()) { - break; - } - } + } while (rows.hasNext()); // Completes the stream, and the stream will be closed. CompletableFuture future = writer.completed(); - // Now we can get the write result. + // Now we can get the writing result. WriteOk result = future.get(); LOG.info("Completed writing data: {}, time cost: {}s", result, (System.nanoTime() - start) / 1000000000); From 1da2de017210a9ed4bfd2acfd9b170815cd185f5 Mon Sep 17 00:00:00 2001 From: Logic Date: Mon, 14 Jul 2025 20:37:05 +0800 Subject: [PATCH 2/4] fix(bulk-write): ensure table exists before bulk write - Add logic to ensure the target table exists before starting bulk write - Update README with important notes about bulk write API usage - Refactor write loops in benchmark classes for better readability --- .../greptime/bench/benchmark/BulkWriteBenchmark.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java index 0954893..22add20 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java @@ -132,16 +132,21 @@ public static void main(String[] args) throws Exception { * @param tableSchema the schema of the table to ensure * @param ctx the context for the operation */ - private static void ensureTableExists(GreptimeDB greptimeDB, TableSchema tableSchema, TableDataProvider tableDataProvider, Context ctx) { + private static void ensureTableExists( + GreptimeDB greptimeDB, TableSchema tableSchema, TableDataProvider tableDataProvider, Context ctx) { Table initTable = Table.from(tableSchema); Iterator rows = tableDataProvider.rows(); // Add an initial row to the table to get the table schema. initTable.addRow(rows.hasNext() ? rows.next() : new Object[0]); try { // Write an initial row to ensure the table exists. - greptimeDB.write(Collections.singletonList(initTable), WriteOp.Insert, ctx).get(); + greptimeDB + .write(Collections.singletonList(initTable), WriteOp.Insert, ctx) + .get(); // Delete the initial row to leave the table empty. - greptimeDB.write(Collections.singletonList(initTable), WriteOp.Delete, ctx).get(); + greptimeDB + .write(Collections.singletonList(initTable), WriteOp.Delete, ctx) + .get(); LOG.info("Table ensured for benchmark: {}", tableSchema.getTableName()); } catch (Exception e) { LOG.error("Table creation may have been skipped if it already exists: {}", e.getMessage()); From 1b33b0f12d37120eb55e56d01b47df70be839ad5 Mon Sep 17 00:00:00 2001 From: Logic Date: Mon, 14 Jul 2025 20:41:11 +0800 Subject: [PATCH 3/4] fix(bulk-write): ensure table exists before bulk write - Add logic to ensure the target table exists before starting bulk write - Update README with important notes about bulk write API usage - Refactor write loops in benchmark classes for better readability --- ingester-example/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingester-example/README.md b/ingester-example/README.md index 1df3b79..a67244c 100644 --- a/ingester-example/README.md +++ b/ingester-example/README.md @@ -83,7 +83,7 @@ The Bulk Write API provides a high-performance, memory-efficient mechanism for i > **Important**: > 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either: - > - Insert API (which supports auto table creation), or +> - Insert API (which supports auto table creation), or > - SQL DDL statements (CREATE TABLE) > 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema. > 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions. From 637498e0b3e00eb0401113134ed362b0f798a755 Mon Sep 17 00:00:00 2001 From: Logic Date: Tue, 15 Jul 2025 15:48:37 +0800 Subject: [PATCH 4/4] fix(ingester-example): prevent writing empty row to database - Add a check to ensure there are rows available in the table data provider - Remove the conditional statement and directly call `rows.next()` to add the initial row - Improve error handling by throwing an `IllegalStateException` if no rows are available --- .../java/io/greptime/bench/benchmark/BulkWriteBenchmark.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java index 22add20..53e8fbd 100644 --- a/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java @@ -136,8 +136,11 @@ private static void ensureTableExists( GreptimeDB greptimeDB, TableSchema tableSchema, TableDataProvider tableDataProvider, Context ctx) { Table initTable = Table.from(tableSchema); Iterator rows = tableDataProvider.rows(); + if (!rows.hasNext()) { + throw new IllegalStateException("No rows available in table data provider"); + } // Add an initial row to the table to get the table schema. - initTable.addRow(rows.hasNext() ? rows.next() : new Object[0]); + initTable.addRow(rows.next()); try { // Write an initial row to ensure the table exists. greptimeDB