diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 512eb79..36e19fd 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -77,3 +77,51 @@ jobs:
&& (mvn --projects ingester-protocol test
|| mvn --projects ingester-protocol test
|| mvn --projects ingester-protocol test)
+
+ integration_tests:
+ needs: [test_ingester_common, test_ingester_grpc, test_ingester_protocol]
+ runs-on: ubuntu-latest
+ timeout-minutes: 10
+ steps:
+ - uses: actions/checkout@v4
+ - name: Start GreptimeDB
+ run: |
+ docker run -d \
+ -p 4000:4000 \
+ -p 4001:4001 \
+ -p 4002:4002 \
+ -p 4003:4003 \
+ --name greptimedb \
+ greptime/greptimedb:v1.0.0-beta.4 standalone start \
+ --http-addr 0.0.0.0:4000 \
+ --rpc-bind-addr 0.0.0.0:4001 \
+ --mysql-addr 0.0.0.0:4002 \
+ --postgres-addr 0.0.0.0:4003
+ - name: Wait for GreptimeDB
+ run: |
+ for i in {1..30}; do
+ if curl -sf http://localhost:4000/health; then
+ echo "GreptimeDB is ready"
+ exit 0
+ fi
+ echo "Waiting for GreptimeDB... ($i/30)"
+ sleep 2
+ done
+ echo "GreptimeDB failed to start"
+ docker logs greptimedb
+ exit 1
+ - uses: actions/setup-java@v4
+ with:
+ java-version: '8'
+ distribution: 'zulu'
+ - name: Build
+ run: mvn clean install -DskipTests -B -V
+ - name: Integration Tests
+ run: mvn verify -pl ingester-integration-tests -B
+ env:
+ GREPTIMEDB_ENDPOINTS: localhost:4001
+ GREPTIMEDB_DATABASE: public
+ GREPTIMEDB_JDBC_URL: jdbc:mysql://localhost:4002/public
+ - name: Debug logs
+ if: failure()
+ run: docker logs greptimedb
diff --git a/ingester-integration-tests/pom.xml b/ingester-integration-tests/pom.xml
new file mode 100644
index 0000000..bcf0873
--- /dev/null
+++ b/ingester-integration-tests/pom.xml
@@ -0,0 +1,99 @@
+
+
+
+ 4.0.0
+
+ io.greptime
+ greptimedb-ingester
+ 0.15.0
+
+
+ ingester-integration-tests
+ ${project.groupId}:${project.artifactId}
+ Integration tests for GreptimeDB Java Ingester
+
+
+
+ ${project.groupId}
+ ingester-all
+ ${project.version}
+
+
+
+
+ org.apache.logging.log4j
+ log4j-api
+ test
+
+
+ org.apache.logging.log4j
+ log4j-core
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ test
+
+
+
+
+ com.mysql
+ mysql-connector-j
+ 8.4.0
+ test
+
+
+
+
+ junit
+ junit
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.5
+
+ true
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 3.2.5
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+
+
diff --git a/ingester-integration-tests/src/test/java/io/greptime/BulkWriteIT.java b/ingester-integration-tests/src/test/java/io/greptime/BulkWriteIT.java
new file mode 100644
index 0000000..8712ff2
--- /dev/null
+++ b/ingester-integration-tests/src/test/java/io/greptime/BulkWriteIT.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2023 Greptime Team
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.greptime;
+
+import static org.junit.Assert.assertEquals;
+import io.greptime.models.DataType;
+import io.greptime.models.Table;
+import io.greptime.models.TableSchema;
+import java.sql.Connection;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests for Bulk Write API.
+ */
+public class BulkWriteIT {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BulkWriteIT.class);
+ private static final int ROW_COUNT = 100;
+
+ private static GreptimeDB client;
+ private static Connection jdbcConn;
+
+ private String tableName;
+ private TableSchema schema;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ client = ITHelper.createClient();
+ jdbcConn = ITHelper.getJdbcConnection();
+ LOG.info("Integration test client initialized");
+ }
+
+ @AfterClass
+ public static void teardownClass() {
+ if (client != null) {
+ client.shutdownGracefully();
+ }
+ if (jdbcConn != null) {
+ try {
+ jdbcConn.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close JDBC connection", e);
+ }
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ tableName = ITHelper.uniqueTableName("test_bulk_write");
+ schema = TableSchema.newBuilder(tableName)
+ .addTag("host", DataType.String)
+ .addTimestamp("ts", DataType.TimestampMillisecond)
+ .addField("cpu_user", DataType.Float64)
+ .addField("cpu_sys", DataType.Float64)
+ .build();
+
+ // Bulk Write requires pre-existing table, create via DDL
+ String createTableSql = String.format(
+ "CREATE TABLE %s ("
+ + "host STRING,"
+ + "ts TIMESTAMP(3) TIME INDEX,"
+ + "cpu_user DOUBLE,"
+ + "cpu_sys DOUBLE,"
+ + "PRIMARY KEY (host)"
+ + ")",
+ tableName);
+ ITHelper.executeUpdate(jdbcConn, createTableSql);
+ LOG.info("Created table: {}", tableName);
+ }
+
+ @After
+ public void teardown() {
+ ITHelper.dropTableIfExists(jdbcConn, tableName);
+ }
+
+ @Test
+ public void testBulkWrite() throws Exception {
+ BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
+ .allocatorInitReservation(0)
+ .allocatorMaxAllocation(64 * 1024 * 1024L)
+ .timeoutMsPerMessage(30000)
+ .maxRequestsInFlight(4)
+ .build();
+
+ try (BulkStreamWriter writer = client.bulkStreamWriter(schema, cfg)) {
+ // Prepare test data with deterministic values
+ Table.TableBufferRoot table = writer.tableBufferRoot(128);
+ long baseTs = 1700000000000L;
+
+ for (int i = 0; i < ROW_COUNT; i++) {
+ String host = "host_" + i;
+ long ts = baseTs + i * 1000;
+ double cpuUser = i * 0.1;
+ double cpuSys = i * 0.05;
+ table.addRow(host, ts, cpuUser, cpuSys);
+ }
+ table.complete();
+
+ // Write data
+ Integer affectedRows = writer.writeNext().get();
+ LOG.info("Bulk write affected rows: {}", affectedRows);
+
+ // Verify exact row count, not just > 0
+ assertEquals("Should write exact row count", ROW_COUNT, affectedRows.intValue());
+
+ writer.completed();
+ }
+
+ // Verify row count via JDBC
+ int count = ITHelper.queryCount(jdbcConn, tableName);
+ assertEquals("Row count should match", ROW_COUNT, count);
+
+ // Verify actual data content - spot check first, middle, and last rows
+ ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0);
+ ITHelper.verifyRow(jdbcConn, tableName, "host_50", 5.0, 2.5);
+ ITHelper.verifyRow(jdbcConn, tableName, "host_99", 9.9, 4.95);
+
+ LOG.info("Verified {} rows with correct data in table {}", count, tableName);
+ }
+
+ @Test
+ public void testBulkWriteMultipleBatches() throws Exception {
+ BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
+ .allocatorInitReservation(0)
+ .allocatorMaxAllocation(64 * 1024 * 1024L)
+ .timeoutMsPerMessage(30000)
+ .maxRequestsInFlight(4)
+ .build();
+
+ int batchCount = 5;
+ int rowsPerBatch = 20;
+ int totalWritten = 0;
+
+ try (BulkStreamWriter writer = client.bulkStreamWriter(schema, cfg)) {
+ long baseTs = 1700000000000L;
+
+ for (int batch = 0; batch < batchCount; batch++) {
+ Table.TableBufferRoot table = writer.tableBufferRoot(64);
+
+ for (int i = 0; i < rowsPerBatch; i++) {
+ int rowNum = batch * rowsPerBatch + i;
+ String host = "host_" + rowNum;
+ long ts = baseTs + rowNum * 1000;
+ double cpuUser = rowNum * 0.1;
+ double cpuSys = rowNum * 0.05;
+ table.addRow(host, ts, cpuUser, cpuSys);
+ }
+ table.complete();
+
+ Integer affectedRows = writer.writeNext().get();
+ // Verify each batch writes exact count
+ assertEquals("Batch " + batch + " should write exact count", rowsPerBatch, affectedRows.intValue());
+ totalWritten += affectedRows;
+ LOG.info("Batch {} wrote {} rows", batch, affectedRows);
+ }
+
+ writer.completed();
+ }
+
+ // Verify total written count
+ int expectedCount = batchCount * rowsPerBatch;
+ assertEquals("Total written should match", expectedCount, totalWritten);
+
+ // Verify row count via JDBC
+ int count = ITHelper.queryCount(jdbcConn, tableName);
+ assertEquals("Row count should match", expectedCount, count);
+
+ // Verify actual data content across batches
+ ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0); // first row of batch 0
+ ITHelper.verifyRow(jdbcConn, tableName, "host_40", 4.0, 2.0); // first row of batch 2
+ ITHelper.verifyRow(jdbcConn, tableName, "host_99", 9.9, 4.95); // last row
+
+ LOG.info("Verified {} rows with correct data in table {}", count, tableName);
+ }
+}
diff --git a/ingester-integration-tests/src/test/java/io/greptime/ITHelper.java b/ingester-integration-tests/src/test/java/io/greptime/ITHelper.java
new file mode 100644
index 0000000..b56cefe
--- /dev/null
+++ b/ingester-integration-tests/src/test/java/io/greptime/ITHelper.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2023 Greptime Team
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.greptime;
+
+import io.greptime.models.AuthInfo;
+import io.greptime.options.GreptimeOptions;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper utilities for integration tests.
+ */
+public final class ITHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ITHelper.class);
+
+ private static final String DEFAULT_ENDPOINTS = "localhost:4001";
+ private static final String DEFAULT_DATABASE = "public";
+ private static final String DEFAULT_JDBC_URL = "jdbc:mysql://localhost:4002/public";
+
+ private ITHelper() {}
+
+ /**
+ * Gets the gRPC endpoints from environment variable or default.
+ */
+ public static String getEndpoints() {
+ return getEnvOrDefault("GREPTIMEDB_ENDPOINTS", DEFAULT_ENDPOINTS);
+ }
+
+ /**
+ * Gets the database name from environment variable or default.
+ */
+ public static String getDatabase() {
+ return getEnvOrDefault("GREPTIMEDB_DATABASE", DEFAULT_DATABASE);
+ }
+
+ /**
+ * Gets the JDBC URL from environment variable or default.
+ */
+ public static String getJdbcUrl() {
+ return getEnvOrDefault("GREPTIMEDB_JDBC_URL", DEFAULT_JDBC_URL);
+ }
+
+ /**
+ * Creates a GreptimeDB client with configuration from environment variables.
+ */
+ public static GreptimeDB createClient() {
+ String[] endpoints = getEndpoints().split(",");
+ String database = getDatabase();
+
+ GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database)
+ .authInfo(AuthInfo.noAuthorization())
+ .build();
+
+ return GreptimeDB.create(opts);
+ }
+
+ /**
+ * Creates a JDBC connection to GreptimeDB.
+ */
+ public static Connection getJdbcConnection() throws SQLException {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (ClassNotFoundException e) {
+ throw new SQLException("MySQL JDBC driver not found", e);
+ }
+ return DriverManager.getConnection(getJdbcUrl());
+ }
+
+ /**
+ * Executes a SQL update statement (DDL/DML).
+ */
+ public static void executeUpdate(Connection conn, String sql) throws SQLException {
+ try (Statement stmt = conn.createStatement()) {
+ LOG.debug("Executing SQL: {}", sql);
+ stmt.executeUpdate(sql);
+ }
+ }
+
+ /**
+ * Queries the row count of a table.
+ */
+ public static int queryCount(Connection conn, String tableName) throws SQLException {
+ String sql = "SELECT COUNT(*) FROM " + tableName;
+ try (Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql)) {
+ if (rs.next()) {
+ return rs.getInt(1);
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * Verifies that a specific row exists with expected values.
+ */
+ public static void verifyRow(
+ Connection conn, String tableName, String expectedHost, double expectedCpuUser, double expectedCpuSys)
+ throws SQLException {
+ String sql = String.format("SELECT host, cpu_user, cpu_sys FROM %s WHERE host = '%s'", tableName, expectedHost);
+ try (Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql)) {
+ if (!rs.next()) {
+ throw new AssertionError("Row not found for host: " + expectedHost);
+ }
+ String actualHost = rs.getString("host");
+ double actualCpuUser = rs.getDouble("cpu_user");
+ double actualCpuSys = rs.getDouble("cpu_sys");
+
+ if (!expectedHost.equals(actualHost)) {
+ throw new AssertionError(
+ String.format("Host mismatch: expected=%s, actual=%s", expectedHost, actualHost));
+ }
+ if (Math.abs(expectedCpuUser - actualCpuUser) > 0.0001) {
+ throw new AssertionError(
+ String.format("cpu_user mismatch: expected=%f, actual=%f", expectedCpuUser, actualCpuUser));
+ }
+ if (Math.abs(expectedCpuSys - actualCpuSys) > 0.0001) {
+ throw new AssertionError(
+ String.format("cpu_sys mismatch: expected=%f, actual=%f", expectedCpuSys, actualCpuSys));
+ }
+ }
+ }
+
+ /**
+ * Drops a table if it exists.
+ */
+ public static void dropTableIfExists(Connection conn, String tableName) {
+ try {
+ executeUpdate(conn, "DROP TABLE IF EXISTS " + tableName);
+ LOG.debug("Dropped table: {}", tableName);
+ } catch (SQLException e) {
+ LOG.warn("Failed to drop table {}: {}", tableName, e.getMessage());
+ }
+ }
+
+ /**
+ * Generates a unique table name with timestamp suffix.
+ */
+ public static String uniqueTableName(String prefix) {
+ return prefix + "_" + System.currentTimeMillis();
+ }
+
+ private static String getEnvOrDefault(String key, String defaultValue) {
+ String value = System.getenv(key);
+ return (value != null && !value.isEmpty()) ? value : defaultValue;
+ }
+}
diff --git a/ingester-integration-tests/src/test/java/io/greptime/RegularWriteIT.java b/ingester-integration-tests/src/test/java/io/greptime/RegularWriteIT.java
new file mode 100644
index 0000000..49bf851
--- /dev/null
+++ b/ingester-integration-tests/src/test/java/io/greptime/RegularWriteIT.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2023 Greptime Team
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.greptime;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import io.greptime.models.DataType;
+import io.greptime.models.Err;
+import io.greptime.models.Result;
+import io.greptime.models.Table;
+import io.greptime.models.TableSchema;
+import io.greptime.models.WriteOk;
+import java.sql.Connection;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests for Regular Write API.
+ */
+public class RegularWriteIT {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RegularWriteIT.class);
+ private static final int ROW_COUNT = 10;
+
+ private static GreptimeDB client;
+ private static Connection jdbcConn;
+
+ private String tableName;
+ private TableSchema schema;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ client = ITHelper.createClient();
+ jdbcConn = ITHelper.getJdbcConnection();
+ LOG.info("Integration test client initialized");
+ }
+
+ @AfterClass
+ public static void teardownClass() {
+ if (client != null) {
+ client.shutdownGracefully();
+ }
+ if (jdbcConn != null) {
+ try {
+ jdbcConn.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close JDBC connection", e);
+ }
+ }
+ }
+
+ @Before
+ public void setup() {
+ tableName = ITHelper.uniqueTableName("test_regular_write");
+ schema = TableSchema.newBuilder(tableName)
+ .addTag("host", DataType.String)
+ .addTimestamp("ts", DataType.TimestampMillisecond)
+ .addField("cpu_user", DataType.Float64)
+ .addField("cpu_sys", DataType.Float64)
+ .build();
+ }
+
+ @After
+ public void teardown() {
+ ITHelper.dropTableIfExists(jdbcConn, tableName);
+ }
+
+ @Test
+ public void testRegularWrite() throws Exception {
+ Table table = Table.from(schema);
+ long baseTs = 1700000000000L;
+
+ for (int i = 0; i < ROW_COUNT; i++) {
+ table.addRow("host_" + i, baseTs + i * 1000, i * 0.1, i * 0.05);
+ }
+ table.complete();
+
+ Result result = client.write(table).get();
+
+ assertTrue("Write should succeed", result.isOk());
+ assertEquals(ROW_COUNT, result.getOk().getSuccess());
+
+ assertEquals(ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName));
+ ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0);
+ ITHelper.verifyRow(jdbcConn, tableName, "host_5", 0.5, 0.25);
+ ITHelper.verifyRow(jdbcConn, tableName, "host_9", 0.9, 0.45);
+ }
+
+ @Test
+ public void testRegularWriteMultipleTables() throws Exception {
+ String tableName2 = ITHelper.uniqueTableName("test_regular_write_2");
+ TableSchema schema2 = TableSchema.newBuilder(tableName2)
+ .addTag("region", DataType.String)
+ .addTimestamp("ts", DataType.TimestampMillisecond)
+ .addField("mem_usage", DataType.Float64)
+ .build();
+
+ try {
+ Table table1 = Table.from(schema);
+ Table table2 = Table.from(schema2);
+ long baseTs = 1700000000000L;
+
+ for (int i = 0; i < ROW_COUNT; i++) {
+ table1.addRow("host_" + i, baseTs + i * 1000, i * 0.1, i * 0.05);
+ table2.addRow("region_" + i, baseTs + i * 1000, i * 0.2);
+ }
+ table1.complete();
+ table2.complete();
+
+ Result result = client.write(table1, table2).get();
+
+ assertTrue("Write should succeed", result.isOk());
+ assertEquals(ROW_COUNT * 2, result.getOk().getSuccess());
+ assertEquals(ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName));
+ assertEquals(ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName2));
+ ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0);
+ ITHelper.verifyRow(jdbcConn, tableName, "host_9", 0.9, 0.45);
+ } finally {
+ ITHelper.dropTableIfExists(jdbcConn, tableName2);
+ }
+ }
+
+ @Test
+ public void testRegularWriteWithNullValues() throws Exception {
+ Table table = Table.from(schema);
+ long baseTs = 1700000000000L;
+
+ table.addRow("host_0", baseTs, 1.0, 0.5);
+ table.addRow("host_1", baseTs + 1000, null, 0.5); // null cpu_user
+ table.addRow("host_2", baseTs + 2000, 1.0, null); // null cpu_sys
+ table.addRow("host_3", baseTs + 3000, null, null); // both null
+ table.complete();
+
+ Result result = client.write(table).get();
+
+ assertTrue("Write should succeed", result.isOk());
+ assertEquals(4, result.getOk().getSuccess());
+ assertEquals(4, ITHelper.queryCount(jdbcConn, tableName));
+ ITHelper.verifyRow(jdbcConn, tableName, "host_0", 1.0, 0.5);
+ }
+}
diff --git a/ingester-integration-tests/src/test/java/io/greptime/StreamingWriteIT.java b/ingester-integration-tests/src/test/java/io/greptime/StreamingWriteIT.java
new file mode 100644
index 0000000..2c5cfd0
--- /dev/null
+++ b/ingester-integration-tests/src/test/java/io/greptime/StreamingWriteIT.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2023 Greptime Team
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.greptime;
+
+import static org.junit.Assert.assertEquals;
+import io.greptime.models.DataType;
+import io.greptime.models.Table;
+import io.greptime.models.TableSchema;
+import io.greptime.models.WriteOk;
+import java.sql.Connection;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests for Streaming Write API.
+ */
+public class StreamingWriteIT {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingWriteIT.class);
+ private static final int ROW_COUNT = 10;
+
+ private static GreptimeDB client;
+ private static Connection jdbcConn;
+
+ private String tableName;
+ private TableSchema schema;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ client = ITHelper.createClient();
+ jdbcConn = ITHelper.getJdbcConnection();
+ LOG.info("Integration test client initialized");
+ }
+
+ @AfterClass
+ public static void teardownClass() {
+ if (client != null) {
+ client.shutdownGracefully();
+ }
+ if (jdbcConn != null) {
+ try {
+ jdbcConn.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close JDBC connection", e);
+ }
+ }
+ }
+
+ @Before
+ public void setup() {
+ tableName = ITHelper.uniqueTableName("test_streaming_write");
+ schema = TableSchema.newBuilder(tableName)
+ .addTag("host", DataType.String)
+ .addTimestamp("ts", DataType.TimestampMillisecond)
+ .addField("cpu_user", DataType.Float64)
+ .addField("cpu_sys", DataType.Float64)
+ .build();
+ }
+
+ @After
+ public void teardown() {
+ ITHelper.dropTableIfExists(jdbcConn, tableName);
+ }
+
+ @Test
+ public void testStreamingWrite() throws Exception {
+ // Create stream writer
+ StreamWriter writer = client.streamWriter();
+
+ // Prepare and write test data with deterministic values
+ long baseTs = 1700000000000L;
+
+ for (int i = 0; i < ROW_COUNT; i++) {
+ Table table = Table.from(schema);
+ String host = "host_" + i;
+ long ts = baseTs + i * 1000;
+ double cpuUser = i * 0.1;
+ double cpuSys = i * 0.05;
+ table.addRow(host, ts, cpuUser, cpuSys);
+ table.complete();
+
+ writer.write(table);
+ }
+
+ // Complete the stream
+ WriteOk result = writer.completed().get();
+
+ // Verify write result - check exact count, not just > 0
+ assertEquals("Should write exact row count", ROW_COUNT, result.getSuccess());
+ LOG.info("Stream write result: {}", result);
+
+ // Verify row count via JDBC
+ int count = ITHelper.queryCount(jdbcConn, tableName);
+ assertEquals("Row count should match", ROW_COUNT, count);
+
+ // Verify actual data content
+ ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0);
+ ITHelper.verifyRow(jdbcConn, tableName, "host_5", 0.5, 0.25);
+ ITHelper.verifyRow(jdbcConn, tableName, "host_9", 0.9, 0.45);
+
+ LOG.info("Verified {} rows with correct data in table {}", count, tableName);
+ }
+
+ @Test
+ public void testStreamingWriteMultipleTables() throws Exception {
+ String tableName2 = ITHelper.uniqueTableName("test_streaming_write_2");
+ TableSchema schema2 = TableSchema.newBuilder(tableName2)
+ .addTag("region", DataType.String)
+ .addTimestamp("ts", DataType.TimestampMillisecond)
+ .addField("mem_usage", DataType.Float64)
+ .build();
+
+ try {
+ // Create stream writer
+ StreamWriter writer = client.streamWriter();
+
+ // Write data to multiple tables with deterministic values
+ long baseTs = 1700000000000L;
+
+ for (int i = 0; i < ROW_COUNT; i++) {
+ Table table1 = Table.from(schema);
+ table1.addRow("host_" + i, baseTs + i * 1000, i * 0.1, i * 0.05);
+ table1.complete();
+ writer.write(table1);
+
+ Table table2 = Table.from(schema2);
+ table2.addRow("region_" + i, baseTs + i * 1000, i * 0.2);
+ table2.complete();
+ writer.write(table2);
+ }
+
+ // Complete the stream
+ WriteOk result = writer.completed().get();
+
+ // Verify write result - check exact count
+ assertEquals("Should write exact row count", ROW_COUNT * 2, result.getSuccess());
+ LOG.info("Stream write result: {}", result);
+
+ // Verify row counts via JDBC
+ assertEquals("Table 1 row count", ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName));
+ assertEquals("Table 2 row count", ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName2));
+
+ // Verify actual data content
+ ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0);
+ ITHelper.verifyRow(jdbcConn, tableName, "host_9", 0.9, 0.45);
+
+ LOG.info("Verified {} rows with correct data in each table", ROW_COUNT);
+ } finally {
+ ITHelper.dropTableIfExists(jdbcConn, tableName2);
+ }
+ }
+}
diff --git a/ingester-integration-tests/src/test/resources/log4j2.xml b/ingester-integration-tests/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..9442e80
--- /dev/null
+++ b/ingester-integration-tests/src/test/resources/log4j2.xml
@@ -0,0 +1,31 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
index 9dbda7e..3441aff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@
ingester-all
ingester-bulk-protocol
ingester-prometheus-metrics
+ ingester-integration-tests