Skip to content

Commit 4f2efd7

Browse files
authored
fix(bulk-write): ensure table exists before bulk write (#90)
* fix(bulk-write): ensure table exists before bulk write - Add logic to ensure the target table exists before starting bulk write - Update README with important notes about bulk write API usage - Refactor write loops in benchmark classes for better readability * fix(bulk-write): ensure table exists before bulk write - Add logic to ensure the target table exists before starting bulk write - Update README with important notes about bulk write API usage - Refactor write loops in benchmark classes for better readability * fix(bulk-write): ensure table exists before bulk write - Add logic to ensure the target table exists before starting bulk write - Update README with important notes about bulk write API usage - Refactor write loops in benchmark classes for better readability * fix(ingester-example): prevent writing empty row to database - Add a check to ensure there are rows available in the table data provider - Remove the conditional statement and directly call `rows.next()` to add the initial row - Improve error handling by throwing an `IllegalStateException` if no rows are available
1 parent 5a17e3a commit 4f2efd7

5 files changed

Lines changed: 66 additions & 20 deletions

File tree

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,13 @@ StreamWriter<Table, WriteOk> writer = client.streamWriter(1000);
257257

258258
The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages off-heap memory management to achieve optimal throughput when writing batches of data.
259259

260+
> **Important**:
261+
> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either:
262+
> - Insert API (which supports auto table creation), or
263+
> - SQL DDL statements (CREATE TABLE)
264+
> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema.
265+
> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions.
266+
260267
This API supports writing to one table per stream and handles large data volumes (up to 200MB per write) with adaptive flow control. Performance advantages include:
261268
- Off-heap memory management with Arrow buffers
262269
- Efficient binary serialization and data transfer

ingester-example/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ This API is particularly well-suited for:
8181

8282
The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages Apache Arrow's Flight protocol and off-heap memory management to achieve optimal throughput when writing batches of data.
8383

84+
> **Important**:
85+
> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either:
86+
> - Insert API (which supports auto table creation), or
87+
> - SQL DDL statements (CREATE TABLE)
88+
> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema.
89+
> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions.
90+
8491
**Important Note**:
8592
- This API is designed around streaming connections, which means each stream establishes a connection to only one database node. Unlike the regular write API, it lacks automatic load balancing for individual requests. However, if your use case involves multiple clients establishing multiple streams to the database, this limitation is not a concern.
8693
- Unlike regular streaming, this API allows continuous writing to only one table per stream, but can handle very large data volumes (up to 200MB per write). It features sophisticated adaptive flow control mechanisms that automatically adjust to your data throughput requirements.

ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import io.greptime.models.WriteOk;
3333
import io.greptime.rpc.Compression;
3434
import io.greptime.rpc.Context;
35-
import java.util.Arrays;
35+
import java.util.Collections;
3636
import java.util.Iterator;
3737
import java.util.concurrent.CompletableFuture;
3838
import org.slf4j.Logger;
@@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception {
7474

7575
LOG.info("Start writing data");
7676
long start = System.nanoTime();
77-
for (; ; ) {
77+
do {
7878
Table table = Table.from(tableSchema);
7979
for (int i = 0; i < batchSize; i++) {
8080
if (!rows.hasNext()) {
@@ -88,16 +88,13 @@ public static void main(String[] args) throws Exception {
8888
long fStart = System.nanoTime();
8989
// Write the table data to the server
9090
CompletableFuture<Result<WriteOk, Err>> future =
91-
greptimeDB.write(Arrays.asList(table), WriteOp.Insert, ctx);
92-
// Wait for the write to complete
93-
int numRows = future.get().mapOr(0, writeOk -> writeOk.getSuccess());
91+
greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx);
92+
// Wait for the writing to complete
93+
int numRows = future.get().mapOr(0, WriteOk::getSuccess);
9494
long costMs = (System.nanoTime() - fStart) / 1000000;
9595
LOG.info("Write rows: {}, time cost: {}ms", numRows, costMs);
9696

97-
if (!rows.hasNext()) {
98-
break;
99-
}
100-
}
97+
} while (rows.hasNext());
10198

10299
LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);
103100

ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.greptime.BulkStreamWriter;
2020
import io.greptime.BulkWrite;
2121
import io.greptime.GreptimeDB;
22+
import io.greptime.WriteOp;
2223
import io.greptime.bench.DBConnector;
2324
import io.greptime.bench.TableDataProvider;
2425
import io.greptime.common.util.MetricsUtil;
@@ -30,6 +31,7 @@
3031
import io.greptime.models.TableSchema;
3132
import io.greptime.rpc.Compression;
3233
import io.greptime.rpc.Context;
34+
import java.util.Collections;
3335
import java.util.Iterator;
3436
import java.util.concurrent.CompletableFuture;
3537
import org.slf4j.Logger;
@@ -41,6 +43,11 @@
4143
* Env:
4244
* - batch_size_per_request: the batch size per request
4345
* - zstd_compression: whether to use zstd compression
46+
* <p>
47+
* <b>IMPORTANT:</b> Unlike the standard write method,
48+
* this bulk writing stream API requires the target table to exist beforehand. It will
49+
* NOT automatically create the table if it does not exist. Please ensure table creation
50+
* before starting a bulk write operation.
4451
*/
4552
public class BulkWriteBenchmark {
4653

@@ -73,12 +80,15 @@ public static void main(String[] args) throws Exception {
7380
tableDataProvider.init();
7481
TableSchema tableSchema = tableDataProvider.tableSchema();
7582

83+
// Before writing data, ensure the table exists, bulk write API does not create tables.
84+
ensureTableExists(greptimeDB, tableSchema, tableDataProvider, ctx);
85+
7686
LOG.info("Start writing data");
7787
try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) {
7888
Iterator<Object[]> rows = tableDataProvider.rows();
7989

8090
long start = System.nanoTime();
81-
for (; ; ) {
91+
do {
8292
Table.TableBufferRoot table = writer.tableBufferRoot(1024);
8393
for (int i = 0; i < batchSize; i++) {
8494
if (!rows.hasNext()) {
@@ -102,10 +112,7 @@ public static void main(String[] args) throws Exception {
102112
}
103113
});
104114

105-
if (!rows.hasNext()) {
106-
break;
107-
}
108-
}
115+
} while (rows.hasNext());
109116

110117
writer.completed();
111118

@@ -117,4 +124,35 @@ public static void main(String[] args) throws Exception {
117124
greptimeDB.shutdownGracefully();
118125
metricsExporter.shutdownGracefully();
119126
}
127+
128+
/**
129+
* Ensures that the table exists in the database.
130+
*
131+
* @param greptimeDB the GreptimeDB instance
132+
* @param tableSchema the schema of the table to ensure
133+
* @param ctx the context for the operation
134+
*/
135+
private static void ensureTableExists(
136+
GreptimeDB greptimeDB, TableSchema tableSchema, TableDataProvider tableDataProvider, Context ctx) {
137+
Table initTable = Table.from(tableSchema);
138+
Iterator<Object[]> rows = tableDataProvider.rows();
139+
if (!rows.hasNext()) {
140+
throw new IllegalStateException("No rows available in table data provider");
141+
}
142+
// Add an initial row to the table to get the table schema.
143+
initTable.addRow(rows.next());
144+
try {
145+
// Write an initial row to ensure the table exists.
146+
greptimeDB
147+
.write(Collections.singletonList(initTable), WriteOp.Insert, ctx)
148+
.get();
149+
// Delete the initial row to leave the table empty.
150+
greptimeDB
151+
.write(Collections.singletonList(initTable), WriteOp.Delete, ctx)
152+
.get();
153+
LOG.info("Table ensured for benchmark: {}", tableSchema.getTableName());
154+
} catch (Exception e) {
155+
LOG.error("Table creation may have been skipped if it already exists: {}", e.getMessage());
156+
}
157+
}
120158
}

ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public static void main(String[] args) throws Exception {
7575

7676
LOG.info("Start writing data");
7777
long start = System.nanoTime();
78-
for (; ; ) {
78+
do {
7979
Table table = Table.from(tableSchema);
8080
for (int i = 0; i < batchSize; i++) {
8181
if (!rows.hasNext()) {
@@ -89,15 +89,12 @@ public static void main(String[] args) throws Exception {
8989
// Write the table data to the server
9090
writer.write(table);
9191

92-
if (!rows.hasNext()) {
93-
break;
94-
}
95-
}
92+
} while (rows.hasNext());
9693

9794
// Completes the stream, and the stream will be closed.
9895
CompletableFuture<WriteOk> future = writer.completed();
9996

100-
// Now we can get the write result.
97+
// Now we can get the writing result.
10198
WriteOk result = future.get();
10299

103100
LOG.info("Completed writing data: {}, time cost: {}s", result, (System.nanoTime() - start) / 1000000000);

0 commit comments

Comments
 (0)