Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ StreamWriter<Table, WriteOk> writer = client.streamWriter(1000);

The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages off-heap memory management to achieve optimal throughput when writing batches of data.

> **Important**:
> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either:
> - Insert API (which supports auto table creation), or
> - SQL DDL statements (CREATE TABLE)
> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema.
> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions.

This API supports writing to one table per stream and handles large data volumes (up to 200MB per write) with adaptive flow control. Performance advantages include:
- Off-heap memory management with Arrow buffers
- Efficient binary serialization and data transfer
Expand Down
7 changes: 7 additions & 0 deletions ingester-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ This API is particularly well-suited for:

The Bulk Write API provides a high-performance, memory-efficient mechanism for ingesting large volumes of time-series data into GreptimeDB. It leverages Apache Arrow's Flight protocol and off-heap memory management to achieve optimal throughput when writing batches of data.

> **Important**:
> 1. **Manual Table Creation Required**: Bulk API does **not** create tables automatically. You must create the table beforehand using either:
> - Insert API (which supports auto table creation), or
> - SQL DDL statements (CREATE TABLE)
> 2. **Schema Matching**: The table template in bulk API must exactly match the existing table schema.
> 3. **Column Types**: For bulk operations, currently use `addField()` instead of `addTag()`. Tag columns are part of the primary key in GreptimeDB, but bulk operations don't yet support tables with tag columns. This limitation will be addressed in future versions.

**Important Note**:
- This API is designed around streaming connections, which means each stream establishes a connection to only one database node. Unlike the regular write API, it lacks automatic load balancing for individual requests. However, if your use case involves multiple clients establishing multiple streams to the database, this limitation is not a concern.
- Unlike regular streaming, this API allows continuous writing to only one table per stream, but can handle very large data volumes (up to 200MB per write). It features sophisticated adaptive flow control mechanisms that automatically adjust to your data throughput requirements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.greptime.models.WriteOk;
import io.greptime.rpc.Compression;
import io.greptime.rpc.Context;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
Expand Down Expand Up @@ -74,7 +74,7 @@ public static void main(String[] args) throws Exception {

LOG.info("Start writing data");
long start = System.nanoTime();
for (; ; ) {
do {
Table table = Table.from(tableSchema);
for (int i = 0; i < batchSize; i++) {
if (!rows.hasNext()) {
Expand All @@ -88,16 +88,13 @@ public static void main(String[] args) throws Exception {
long fStart = System.nanoTime();
// Write the table data to the server
CompletableFuture<Result<WriteOk, Err>> future =
greptimeDB.write(Arrays.asList(table), WriteOp.Insert, ctx);
// Wait for the write to complete
int numRows = future.get().mapOr(0, writeOk -> writeOk.getSuccess());
greptimeDB.write(Collections.singletonList(table), WriteOp.Insert, ctx);
// Wait for the writing to complete
int numRows = future.get().mapOr(0, WriteOk::getSuccess);
long costMs = (System.nanoTime() - fStart) / 1000000;
LOG.info("Write rows: {}, time cost: {}ms", numRows, costMs);

if (!rows.hasNext()) {
break;
}
}
} while (rows.hasNext());

LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.greptime.BulkStreamWriter;
import io.greptime.BulkWrite;
import io.greptime.GreptimeDB;
import io.greptime.WriteOp;
import io.greptime.bench.DBConnector;
import io.greptime.bench.TableDataProvider;
import io.greptime.common.util.MetricsUtil;
Expand All @@ -30,6 +31,7 @@
import io.greptime.models.TableSchema;
import io.greptime.rpc.Compression;
import io.greptime.rpc.Context;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
Expand All @@ -41,6 +43,11 @@
* Env:
* - batch_size_per_request: the batch size per request
* - zstd_compression: whether to use zstd compression
* <p>
* <b>IMPORTANT:</b> Unlike the standard write method,
* this bulk writing stream API requires the target table to exist beforehand. It will
* NOT automatically create the table if it does not exist. Please ensure table creation
* before starting a bulk write operation.
*/
public class BulkWriteBenchmark {

Expand Down Expand Up @@ -73,12 +80,15 @@ public static void main(String[] args) throws Exception {
tableDataProvider.init();
TableSchema tableSchema = tableDataProvider.tableSchema();

// Before writing data, ensure the table exists, bulk write API does not create tables.
ensureTableExists(greptimeDB, tableSchema, tableDataProvider, ctx);

LOG.info("Start writing data");
try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) {
Iterator<Object[]> rows = tableDataProvider.rows();

long start = System.nanoTime();
for (; ; ) {
do {
Table.TableBufferRoot table = writer.tableBufferRoot(1024);
for (int i = 0; i < batchSize; i++) {
if (!rows.hasNext()) {
Expand All @@ -102,10 +112,7 @@ public static void main(String[] args) throws Exception {
}
});

if (!rows.hasNext()) {
break;
}
}
} while (rows.hasNext());

writer.completed();

Expand All @@ -117,4 +124,35 @@ public static void main(String[] args) throws Exception {
greptimeDB.shutdownGracefully();
metricsExporter.shutdownGracefully();
}

/**
* Ensures that the table exists in the database.
*
* @param greptimeDB the GreptimeDB instance
* @param tableSchema the schema of the table to ensure
* @param ctx the context for the operation
*/
private static void ensureTableExists(
GreptimeDB greptimeDB, TableSchema tableSchema, TableDataProvider tableDataProvider, Context ctx) {
Table initTable = Table.from(tableSchema);
Iterator<Object[]> rows = tableDataProvider.rows();
if (!rows.hasNext()) {
throw new IllegalStateException("No rows available in table data provider");
}
// Add an initial row to the table to get the table schema.
initTable.addRow(rows.next());
try {
// Write an initial row to ensure the table exists.
greptimeDB
.write(Collections.singletonList(initTable), WriteOp.Insert, ctx)
.get();
// Delete the initial row to leave the table empty.
greptimeDB
.write(Collections.singletonList(initTable), WriteOp.Delete, ctx)
.get();
LOG.info("Table ensured for benchmark: {}", tableSchema.getTableName());
} catch (Exception e) {
LOG.error("Table creation may have been skipped if it already exists: {}", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static void main(String[] args) throws Exception {

LOG.info("Start writing data");
long start = System.nanoTime();
for (; ; ) {
do {
Table table = Table.from(tableSchema);
for (int i = 0; i < batchSize; i++) {
if (!rows.hasNext()) {
Expand All @@ -89,15 +89,12 @@ public static void main(String[] args) throws Exception {
// Write the table data to the server
writer.write(table);

if (!rows.hasNext()) {
break;
}
}
} while (rows.hasNext());

// Completes the stream, and the stream will be closed.
CompletableFuture<WriteOk> future = writer.completed();

// Now we can get the write result.
// Now we can get the writing result.
WriteOk result = future.get();

LOG.info("Completed writing data: {}, time cost: {}s", result, (System.nanoTime() - start) / 1000000000);
Expand Down