diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 512eb79..36e19fd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -77,3 +77,51 @@ jobs: && (mvn --projects ingester-protocol test || mvn --projects ingester-protocol test || mvn --projects ingester-protocol test) + + integration_tests: + needs: [test_ingester_common, test_ingester_grpc, test_ingester_protocol] + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - uses: actions/checkout@v4 + - name: Start GreptimeDB + run: | + docker run -d \ + -p 4000:4000 \ + -p 4001:4001 \ + -p 4002:4002 \ + -p 4003:4003 \ + --name greptimedb \ + greptime/greptimedb:v1.0.0-beta.4 standalone start \ + --http-addr 0.0.0.0:4000 \ + --rpc-bind-addr 0.0.0.0:4001 \ + --mysql-addr 0.0.0.0:4002 \ + --postgres-addr 0.0.0.0:4003 + - name: Wait for GreptimeDB + run: | + for i in {1..30}; do + if curl -sf http://localhost:4000/health; then + echo "GreptimeDB is ready" + exit 0 + fi + echo "Waiting for GreptimeDB... ($i/30)" + sleep 2 + done + echo "GreptimeDB failed to start" + docker logs greptimedb + exit 1 + - uses: actions/setup-java@v4 + with: + java-version: '8' + distribution: 'zulu' + - name: Build + run: mvn clean install -DskipTests -B -V + - name: Integration Tests + run: mvn verify -pl ingester-integration-tests -B + env: + GREPTIMEDB_ENDPOINTS: localhost:4001 + GREPTIMEDB_DATABASE: public + GREPTIMEDB_JDBC_URL: jdbc:mysql://localhost:4002/public + - name: Debug logs + if: failure() + run: docker logs greptimedb diff --git a/ingester-integration-tests/pom.xml b/ingester-integration-tests/pom.xml new file mode 100644 index 0000000..bcf0873 --- /dev/null +++ b/ingester-integration-tests/pom.xml @@ -0,0 +1,99 @@ + + + + 4.0.0 + + io.greptime + greptimedb-ingester + 0.15.0 + + + ingester-integration-tests + ${project.groupId}:${project.artifactId} + Integration tests for GreptimeDB Java Ingester + + + + ${project.groupId} + ingester-all + ${project.version} + + + + + org.apache.logging.log4j + log4j-api + test + + + org.apache.logging.log4j + log4j-core + test + + + org.apache.logging.log4j + log4j-slf4j-impl + test + + + + + com.mysql + mysql-connector-j + 8.4.0 + test + + + + + junit + junit + test + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.2.5 + + true + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.2.5 + + + + integration-test + verify + + + + + + + diff --git a/ingester-integration-tests/src/test/java/io/greptime/BulkWriteIT.java b/ingester-integration-tests/src/test/java/io/greptime/BulkWriteIT.java new file mode 100644 index 0000000..8712ff2 --- /dev/null +++ b/ingester-integration-tests/src/test/java/io/greptime/BulkWriteIT.java @@ -0,0 +1,195 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime; + +import static org.junit.Assert.assertEquals; +import io.greptime.models.DataType; +import io.greptime.models.Table; +import io.greptime.models.TableSchema; +import java.sql.Connection; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for Bulk Write API. + */ +public class BulkWriteIT { + + private static final Logger LOG = LoggerFactory.getLogger(BulkWriteIT.class); + private static final int ROW_COUNT = 100; + + private static GreptimeDB client; + private static Connection jdbcConn; + + private String tableName; + private TableSchema schema; + + @BeforeClass + public static void setupClass() throws Exception { + client = ITHelper.createClient(); + jdbcConn = ITHelper.getJdbcConnection(); + LOG.info("Integration test client initialized"); + } + + @AfterClass + public static void teardownClass() { + if (client != null) { + client.shutdownGracefully(); + } + if (jdbcConn != null) { + try { + jdbcConn.close(); + } catch (Exception e) { + LOG.warn("Failed to close JDBC connection", e); + } + } + } + + @Before + public void setup() throws Exception { + tableName = ITHelper.uniqueTableName("test_bulk_write"); + schema = TableSchema.newBuilder(tableName) + .addTag("host", DataType.String) + .addTimestamp("ts", DataType.TimestampMillisecond) + .addField("cpu_user", DataType.Float64) + .addField("cpu_sys", DataType.Float64) + .build(); + + // Bulk Write requires pre-existing table, create via DDL + String createTableSql = String.format( + "CREATE TABLE %s (" + + "host STRING," + + "ts TIMESTAMP(3) TIME INDEX," + + "cpu_user DOUBLE," + + "cpu_sys DOUBLE," + + "PRIMARY KEY (host)" + + ")", + tableName); + ITHelper.executeUpdate(jdbcConn, createTableSql); + LOG.info("Created table: {}", tableName); + } + + @After + public void teardown() { + ITHelper.dropTableIfExists(jdbcConn, tableName); + } + + @Test + public void testBulkWrite() throws Exception { + BulkWrite.Config cfg = BulkWrite.Config.newBuilder() + .allocatorInitReservation(0) + .allocatorMaxAllocation(64 * 1024 * 1024L) + .timeoutMsPerMessage(30000) + .maxRequestsInFlight(4) + .build(); + + try (BulkStreamWriter writer = client.bulkStreamWriter(schema, cfg)) { + // Prepare test data with deterministic values + Table.TableBufferRoot table = writer.tableBufferRoot(128); + long baseTs = 1700000000000L; + + for (int i = 0; i < ROW_COUNT; i++) { + String host = "host_" + i; + long ts = baseTs + i * 1000; + double cpuUser = i * 0.1; + double cpuSys = i * 0.05; + table.addRow(host, ts, cpuUser, cpuSys); + } + table.complete(); + + // Write data + Integer affectedRows = writer.writeNext().get(); + LOG.info("Bulk write affected rows: {}", affectedRows); + + // Verify exact row count, not just > 0 + assertEquals("Should write exact row count", ROW_COUNT, affectedRows.intValue()); + + writer.completed(); + } + + // Verify row count via JDBC + int count = ITHelper.queryCount(jdbcConn, tableName); + assertEquals("Row count should match", ROW_COUNT, count); + + // Verify actual data content - spot check first, middle, and last rows + ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0); + ITHelper.verifyRow(jdbcConn, tableName, "host_50", 5.0, 2.5); + ITHelper.verifyRow(jdbcConn, tableName, "host_99", 9.9, 4.95); + + LOG.info("Verified {} rows with correct data in table {}", count, tableName); + } + + @Test + public void testBulkWriteMultipleBatches() throws Exception { + BulkWrite.Config cfg = BulkWrite.Config.newBuilder() + .allocatorInitReservation(0) + .allocatorMaxAllocation(64 * 1024 * 1024L) + .timeoutMsPerMessage(30000) + .maxRequestsInFlight(4) + .build(); + + int batchCount = 5; + int rowsPerBatch = 20; + int totalWritten = 0; + + try (BulkStreamWriter writer = client.bulkStreamWriter(schema, cfg)) { + long baseTs = 1700000000000L; + + for (int batch = 0; batch < batchCount; batch++) { + Table.TableBufferRoot table = writer.tableBufferRoot(64); + + for (int i = 0; i < rowsPerBatch; i++) { + int rowNum = batch * rowsPerBatch + i; + String host = "host_" + rowNum; + long ts = baseTs + rowNum * 1000; + double cpuUser = rowNum * 0.1; + double cpuSys = rowNum * 0.05; + table.addRow(host, ts, cpuUser, cpuSys); + } + table.complete(); + + Integer affectedRows = writer.writeNext().get(); + // Verify each batch writes exact count + assertEquals("Batch " + batch + " should write exact count", rowsPerBatch, affectedRows.intValue()); + totalWritten += affectedRows; + LOG.info("Batch {} wrote {} rows", batch, affectedRows); + } + + writer.completed(); + } + + // Verify total written count + int expectedCount = batchCount * rowsPerBatch; + assertEquals("Total written should match", expectedCount, totalWritten); + + // Verify row count via JDBC + int count = ITHelper.queryCount(jdbcConn, tableName); + assertEquals("Row count should match", expectedCount, count); + + // Verify actual data content across batches + ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0); // first row of batch 0 + ITHelper.verifyRow(jdbcConn, tableName, "host_40", 4.0, 2.0); // first row of batch 2 + ITHelper.verifyRow(jdbcConn, tableName, "host_99", 9.9, 4.95); // last row + + LOG.info("Verified {} rows with correct data in table {}", count, tableName); + } +} diff --git a/ingester-integration-tests/src/test/java/io/greptime/ITHelper.java b/ingester-integration-tests/src/test/java/io/greptime/ITHelper.java new file mode 100644 index 0000000..b56cefe --- /dev/null +++ b/ingester-integration-tests/src/test/java/io/greptime/ITHelper.java @@ -0,0 +1,167 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime; + +import io.greptime.models.AuthInfo; +import io.greptime.options.GreptimeOptions; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper utilities for integration tests. + */ +public final class ITHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ITHelper.class); + + private static final String DEFAULT_ENDPOINTS = "localhost:4001"; + private static final String DEFAULT_DATABASE = "public"; + private static final String DEFAULT_JDBC_URL = "jdbc:mysql://localhost:4002/public"; + + private ITHelper() {} + + /** + * Gets the gRPC endpoints from environment variable or default. + */ + public static String getEndpoints() { + return getEnvOrDefault("GREPTIMEDB_ENDPOINTS", DEFAULT_ENDPOINTS); + } + + /** + * Gets the database name from environment variable or default. + */ + public static String getDatabase() { + return getEnvOrDefault("GREPTIMEDB_DATABASE", DEFAULT_DATABASE); + } + + /** + * Gets the JDBC URL from environment variable or default. + */ + public static String getJdbcUrl() { + return getEnvOrDefault("GREPTIMEDB_JDBC_URL", DEFAULT_JDBC_URL); + } + + /** + * Creates a GreptimeDB client with configuration from environment variables. + */ + public static GreptimeDB createClient() { + String[] endpoints = getEndpoints().split(","); + String database = getDatabase(); + + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) + .authInfo(AuthInfo.noAuthorization()) + .build(); + + return GreptimeDB.create(opts); + } + + /** + * Creates a JDBC connection to GreptimeDB. + */ + public static Connection getJdbcConnection() throws SQLException { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + } catch (ClassNotFoundException e) { + throw new SQLException("MySQL JDBC driver not found", e); + } + return DriverManager.getConnection(getJdbcUrl()); + } + + /** + * Executes a SQL update statement (DDL/DML). + */ + public static void executeUpdate(Connection conn, String sql) throws SQLException { + try (Statement stmt = conn.createStatement()) { + LOG.debug("Executing SQL: {}", sql); + stmt.executeUpdate(sql); + } + } + + /** + * Queries the row count of a table. + */ + public static int queryCount(Connection conn, String tableName) throws SQLException { + String sql = "SELECT COUNT(*) FROM " + tableName; + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + if (rs.next()) { + return rs.getInt(1); + } + return 0; + } + } + + /** + * Verifies that a specific row exists with expected values. + */ + public static void verifyRow( + Connection conn, String tableName, String expectedHost, double expectedCpuUser, double expectedCpuSys) + throws SQLException { + String sql = String.format("SELECT host, cpu_user, cpu_sys FROM %s WHERE host = '%s'", tableName, expectedHost); + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + if (!rs.next()) { + throw new AssertionError("Row not found for host: " + expectedHost); + } + String actualHost = rs.getString("host"); + double actualCpuUser = rs.getDouble("cpu_user"); + double actualCpuSys = rs.getDouble("cpu_sys"); + + if (!expectedHost.equals(actualHost)) { + throw new AssertionError( + String.format("Host mismatch: expected=%s, actual=%s", expectedHost, actualHost)); + } + if (Math.abs(expectedCpuUser - actualCpuUser) > 0.0001) { + throw new AssertionError( + String.format("cpu_user mismatch: expected=%f, actual=%f", expectedCpuUser, actualCpuUser)); + } + if (Math.abs(expectedCpuSys - actualCpuSys) > 0.0001) { + throw new AssertionError( + String.format("cpu_sys mismatch: expected=%f, actual=%f", expectedCpuSys, actualCpuSys)); + } + } + } + + /** + * Drops a table if it exists. + */ + public static void dropTableIfExists(Connection conn, String tableName) { + try { + executeUpdate(conn, "DROP TABLE IF EXISTS " + tableName); + LOG.debug("Dropped table: {}", tableName); + } catch (SQLException e) { + LOG.warn("Failed to drop table {}: {}", tableName, e.getMessage()); + } + } + + /** + * Generates a unique table name with timestamp suffix. + */ + public static String uniqueTableName(String prefix) { + return prefix + "_" + System.currentTimeMillis(); + } + + private static String getEnvOrDefault(String key, String defaultValue) { + String value = System.getenv(key); + return (value != null && !value.isEmpty()) ? value : defaultValue; + } +} diff --git a/ingester-integration-tests/src/test/java/io/greptime/RegularWriteIT.java b/ingester-integration-tests/src/test/java/io/greptime/RegularWriteIT.java new file mode 100644 index 0000000..49bf851 --- /dev/null +++ b/ingester-integration-tests/src/test/java/io/greptime/RegularWriteIT.java @@ -0,0 +1,160 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import io.greptime.models.DataType; +import io.greptime.models.Err; +import io.greptime.models.Result; +import io.greptime.models.Table; +import io.greptime.models.TableSchema; +import io.greptime.models.WriteOk; +import java.sql.Connection; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for Regular Write API. + */ +public class RegularWriteIT { + + private static final Logger LOG = LoggerFactory.getLogger(RegularWriteIT.class); + private static final int ROW_COUNT = 10; + + private static GreptimeDB client; + private static Connection jdbcConn; + + private String tableName; + private TableSchema schema; + + @BeforeClass + public static void setupClass() throws Exception { + client = ITHelper.createClient(); + jdbcConn = ITHelper.getJdbcConnection(); + LOG.info("Integration test client initialized"); + } + + @AfterClass + public static void teardownClass() { + if (client != null) { + client.shutdownGracefully(); + } + if (jdbcConn != null) { + try { + jdbcConn.close(); + } catch (Exception e) { + LOG.warn("Failed to close JDBC connection", e); + } + } + } + + @Before + public void setup() { + tableName = ITHelper.uniqueTableName("test_regular_write"); + schema = TableSchema.newBuilder(tableName) + .addTag("host", DataType.String) + .addTimestamp("ts", DataType.TimestampMillisecond) + .addField("cpu_user", DataType.Float64) + .addField("cpu_sys", DataType.Float64) + .build(); + } + + @After + public void teardown() { + ITHelper.dropTableIfExists(jdbcConn, tableName); + } + + @Test + public void testRegularWrite() throws Exception { + Table table = Table.from(schema); + long baseTs = 1700000000000L; + + for (int i = 0; i < ROW_COUNT; i++) { + table.addRow("host_" + i, baseTs + i * 1000, i * 0.1, i * 0.05); + } + table.complete(); + + Result result = client.write(table).get(); + + assertTrue("Write should succeed", result.isOk()); + assertEquals(ROW_COUNT, result.getOk().getSuccess()); + + assertEquals(ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName)); + ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0); + ITHelper.verifyRow(jdbcConn, tableName, "host_5", 0.5, 0.25); + ITHelper.verifyRow(jdbcConn, tableName, "host_9", 0.9, 0.45); + } + + @Test + public void testRegularWriteMultipleTables() throws Exception { + String tableName2 = ITHelper.uniqueTableName("test_regular_write_2"); + TableSchema schema2 = TableSchema.newBuilder(tableName2) + .addTag("region", DataType.String) + .addTimestamp("ts", DataType.TimestampMillisecond) + .addField("mem_usage", DataType.Float64) + .build(); + + try { + Table table1 = Table.from(schema); + Table table2 = Table.from(schema2); + long baseTs = 1700000000000L; + + for (int i = 0; i < ROW_COUNT; i++) { + table1.addRow("host_" + i, baseTs + i * 1000, i * 0.1, i * 0.05); + table2.addRow("region_" + i, baseTs + i * 1000, i * 0.2); + } + table1.complete(); + table2.complete(); + + Result result = client.write(table1, table2).get(); + + assertTrue("Write should succeed", result.isOk()); + assertEquals(ROW_COUNT * 2, result.getOk().getSuccess()); + assertEquals(ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName)); + assertEquals(ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName2)); + ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0); + ITHelper.verifyRow(jdbcConn, tableName, "host_9", 0.9, 0.45); + } finally { + ITHelper.dropTableIfExists(jdbcConn, tableName2); + } + } + + @Test + public void testRegularWriteWithNullValues() throws Exception { + Table table = Table.from(schema); + long baseTs = 1700000000000L; + + table.addRow("host_0", baseTs, 1.0, 0.5); + table.addRow("host_1", baseTs + 1000, null, 0.5); // null cpu_user + table.addRow("host_2", baseTs + 2000, 1.0, null); // null cpu_sys + table.addRow("host_3", baseTs + 3000, null, null); // both null + table.complete(); + + Result result = client.write(table).get(); + + assertTrue("Write should succeed", result.isOk()); + assertEquals(4, result.getOk().getSuccess()); + assertEquals(4, ITHelper.queryCount(jdbcConn, tableName)); + ITHelper.verifyRow(jdbcConn, tableName, "host_0", 1.0, 0.5); + } +} diff --git a/ingester-integration-tests/src/test/java/io/greptime/StreamingWriteIT.java b/ingester-integration-tests/src/test/java/io/greptime/StreamingWriteIT.java new file mode 100644 index 0000000..2c5cfd0 --- /dev/null +++ b/ingester-integration-tests/src/test/java/io/greptime/StreamingWriteIT.java @@ -0,0 +1,171 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime; + +import static org.junit.Assert.assertEquals; +import io.greptime.models.DataType; +import io.greptime.models.Table; +import io.greptime.models.TableSchema; +import io.greptime.models.WriteOk; +import java.sql.Connection; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for Streaming Write API. + */ +public class StreamingWriteIT { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingWriteIT.class); + private static final int ROW_COUNT = 10; + + private static GreptimeDB client; + private static Connection jdbcConn; + + private String tableName; + private TableSchema schema; + + @BeforeClass + public static void setupClass() throws Exception { + client = ITHelper.createClient(); + jdbcConn = ITHelper.getJdbcConnection(); + LOG.info("Integration test client initialized"); + } + + @AfterClass + public static void teardownClass() { + if (client != null) { + client.shutdownGracefully(); + } + if (jdbcConn != null) { + try { + jdbcConn.close(); + } catch (Exception e) { + LOG.warn("Failed to close JDBC connection", e); + } + } + } + + @Before + public void setup() { + tableName = ITHelper.uniqueTableName("test_streaming_write"); + schema = TableSchema.newBuilder(tableName) + .addTag("host", DataType.String) + .addTimestamp("ts", DataType.TimestampMillisecond) + .addField("cpu_user", DataType.Float64) + .addField("cpu_sys", DataType.Float64) + .build(); + } + + @After + public void teardown() { + ITHelper.dropTableIfExists(jdbcConn, tableName); + } + + @Test + public void testStreamingWrite() throws Exception { + // Create stream writer + StreamWriter writer = client.streamWriter(); + + // Prepare and write test data with deterministic values + long baseTs = 1700000000000L; + + for (int i = 0; i < ROW_COUNT; i++) { + Table table = Table.from(schema); + String host = "host_" + i; + long ts = baseTs + i * 1000; + double cpuUser = i * 0.1; + double cpuSys = i * 0.05; + table.addRow(host, ts, cpuUser, cpuSys); + table.complete(); + + writer.write(table); + } + + // Complete the stream + WriteOk result = writer.completed().get(); + + // Verify write result - check exact count, not just > 0 + assertEquals("Should write exact row count", ROW_COUNT, result.getSuccess()); + LOG.info("Stream write result: {}", result); + + // Verify row count via JDBC + int count = ITHelper.queryCount(jdbcConn, tableName); + assertEquals("Row count should match", ROW_COUNT, count); + + // Verify actual data content + ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0); + ITHelper.verifyRow(jdbcConn, tableName, "host_5", 0.5, 0.25); + ITHelper.verifyRow(jdbcConn, tableName, "host_9", 0.9, 0.45); + + LOG.info("Verified {} rows with correct data in table {}", count, tableName); + } + + @Test + public void testStreamingWriteMultipleTables() throws Exception { + String tableName2 = ITHelper.uniqueTableName("test_streaming_write_2"); + TableSchema schema2 = TableSchema.newBuilder(tableName2) + .addTag("region", DataType.String) + .addTimestamp("ts", DataType.TimestampMillisecond) + .addField("mem_usage", DataType.Float64) + .build(); + + try { + // Create stream writer + StreamWriter writer = client.streamWriter(); + + // Write data to multiple tables with deterministic values + long baseTs = 1700000000000L; + + for (int i = 0; i < ROW_COUNT; i++) { + Table table1 = Table.from(schema); + table1.addRow("host_" + i, baseTs + i * 1000, i * 0.1, i * 0.05); + table1.complete(); + writer.write(table1); + + Table table2 = Table.from(schema2); + table2.addRow("region_" + i, baseTs + i * 1000, i * 0.2); + table2.complete(); + writer.write(table2); + } + + // Complete the stream + WriteOk result = writer.completed().get(); + + // Verify write result - check exact count + assertEquals("Should write exact row count", ROW_COUNT * 2, result.getSuccess()); + LOG.info("Stream write result: {}", result); + + // Verify row counts via JDBC + assertEquals("Table 1 row count", ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName)); + assertEquals("Table 2 row count", ROW_COUNT, ITHelper.queryCount(jdbcConn, tableName2)); + + // Verify actual data content + ITHelper.verifyRow(jdbcConn, tableName, "host_0", 0.0, 0.0); + ITHelper.verifyRow(jdbcConn, tableName, "host_9", 0.9, 0.45); + + LOG.info("Verified {} rows with correct data in each table", ROW_COUNT); + } finally { + ITHelper.dropTableIfExists(jdbcConn, tableName2); + } + } +} diff --git a/ingester-integration-tests/src/test/resources/log4j2.xml b/ingester-integration-tests/src/test/resources/log4j2.xml new file mode 100644 index 0000000..9442e80 --- /dev/null +++ b/ingester-integration-tests/src/test/resources/log4j2.xml @@ -0,0 +1,31 @@ + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 9dbda7e..3441aff 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ ingester-all ingester-bulk-protocol ingester-prometheus-metrics + ingester-integration-tests