From b862429c14829204c5e7cd09dfe61464c88e30fe Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 22 Apr 2025 17:03:55 +0800 Subject: [PATCH 1/5] feat: real pipeline write and new bench code --- .../java/io/greptime/BulkWriteManager.java | 14 +- .../java/io/greptime/BulkWriteService.java | 17 +- .../apache/arrow/flight/BulkFlightClient.java | 39 ++++- .../io/greptime/bench/BulkWriteBenchmark.java | 25 +-- .../java/io/greptime/bench/LogTextHelper.java | 128 +++++++++++++++ .../bench/MultiProducerTableDataProvider.java | 152 ++++++++++++++++++ .../bench/RandomTableDataProvider.java | 108 +------------ .../bench/StreamingWriteBenchmark.java | 15 +- .../io.greptime.bench.TableDataProvider | 1 + .../src/main/java/io/greptime/BulkWrite.java | 2 +- .../java/io/greptime/BulkWriteClient.java | 4 +- .../main/java/io/greptime/models/Table.java | 22 +++ 12 files changed, 381 insertions(+), 146 deletions(-) create mode 100644 ingester-example/src/main/java/io/greptime/bench/LogTextHelper.java create mode 100644 ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java diff --git a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java index 1d83dfc..f837ce3 100644 --- a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java +++ b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java @@ -135,20 +135,26 @@ public static BulkWriteManager create( * @param table the name of the target table * @param schema the Arrow schema defining the structure of the data to be written * @param timeoutMs the timeout in milliseconds for the write operation + * @param maxRequestsInFlight the max in-flight requests in the stream * @param options optional RPC-layer hints to configure the underlying Flight client call * @return a BulkStreamWriter instance that manages the data transfer process */ - public BulkWriteService intoBulkWriteStream(String table, Schema schema, long timeoutMs, CallOption... options) { + public BulkWriteService intoBulkWriteStream( + String table, Schema schema, long timeoutMs, int maxRequestsInFlight, CallOption... options) { FlightDescriptor descriptor = FlightDescriptor.path(table); - return new BulkWriteService(this, this.allocator, schema, descriptor, timeoutMs, options); + return new BulkWriteService(this, this.allocator, schema, descriptor, timeoutMs, maxRequestsInFlight, options); } VectorSchemaRoot createSchemaRoot(Schema schema) { return VectorSchemaRoot.create(schema, this.allocator); } - ClientStreamListener startPut(FlightDescriptor descriptor, PutListener metadataListener, CallOption... options) { - return this.flightClient.startPut(descriptor, metadataListener, options); + ClientStreamListener startPut( + FlightDescriptor descriptor, + PutListener metadataListener, + long maxRequestsInFlight, + CallOption... options) { + return this.flightClient.startPut(descriptor, metadataListener, maxRequestsInFlight, options); } DictionaryProvider newDefaultDictionaryProvider() { diff --git a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java index 0ec2c41..0aeef20 100644 --- a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java +++ b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java @@ -61,7 +61,7 @@ public class BulkWriteService implements AutoCloseable { private final ClientStreamListener listener; private final AsyncPutListener metadataListener; private final long timeoutMs; - + /** * Constructs a new BulkWriteService. * @@ -70,6 +70,7 @@ public class BulkWriteService implements AutoCloseable { * @param schema The Arrow schema defining the data structure * @param descriptor The FlightDescriptor identifying the data stream * @param timeoutMs The timeout in milliseconds for operations + * @param maxRequestsInFlight the max in-flight requests in the stream * @param options Additional call options for the Flight client */ public BulkWriteService( @@ -78,12 +79,13 @@ public BulkWriteService( Schema schema, FlightDescriptor descriptor, long timeoutMs, + int maxRequestsInFlight, CallOption... options) { this.manager = manager; this.allocator = allocator; this.root = manager.createSchemaRoot(schema); this.metadataListener = new AsyncPutListener(); - this.listener = manager.startPut(descriptor, this.metadataListener, options); + this.listener = manager.startPut(descriptor, this.metadataListener, maxRequestsInFlight, options); this.timeoutMs = timeoutMs; } @@ -154,11 +156,8 @@ public PutStage putNext() { // Prepare metadata buffer byte[] metadata = new Metadata.RequestMetadata(id).toJsonBytesUtf8(); - ArrowBuf metadataBuf = null; try { - // The buffer will be closed in the putNext method, but if an error occurs during execution, - // we need to close it ourselves in the catch block to prevent memory leaks. - metadataBuf = this.allocator.buffer(metadata.length); + ArrowBuf metadataBuf = this.allocator.buffer(metadata.length); metadataBuf.writeBytes(metadata); // Send data to the server @@ -169,12 +168,6 @@ public PutStage putNext() { LOG.debug("Data sent successfully [id={}], in-flight requests: {}", id, inFlightCount); return new PutStage(future, inFlightCount); - } catch (Throwable t) { - // Close the metadata buffer on error - if (metadataBuf != null) { - metadataBuf.close(); - } - throw t; } finally { // Clear the root to prepare for next batch this.root.clear(); diff --git a/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java b/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java index 123668f..b297962 100644 --- a/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java +++ b/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java @@ -113,12 +113,17 @@ public void addClientMiddleware(FlightClientMiddleware.Factory factory) { * @param root VectorSchemaRoot the root containing data * @param metadataListener A handler for metadata messages from the server. This will be passed buffers that will be * freed after {@link StreamListener#onNext(Object)} is called! + * @param maxRequestsInFlight the max in-flight requests in the stream * @param options RPC-layer hints for this call. * @return ClientStreamListener an interface to control uploading data */ public ClientStreamListener startPut( - FlightDescriptor descriptor, VectorSchemaRoot root, PutListener metadataListener, CallOption... options) { - return startPut(descriptor, root, new MapDictionaryProvider(), metadataListener, options); + FlightDescriptor descriptor, + VectorSchemaRoot root, + PutListener metadataListener, + long maxRequestsInFlight, + CallOption... options) { + return startPut(descriptor, root, new MapDictionaryProvider(), metadataListener, maxRequestsInFlight, options); } /** @@ -128,6 +133,7 @@ public ClientStreamListener startPut( * @param root VectorSchemaRoot the root containing data * @param provider A dictionary provider for the root. * @param metadataListener A handler for metadata messages from the server. + * @param maxRequestsInFlight the max in-flight requests in the stream * @param options RPC-layer hints for this call. * @return ClientStreamListener an interface to control uploading data. * {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will already have been called. @@ -137,10 +143,11 @@ public ClientStreamListener startPut( VectorSchemaRoot root, DictionaryProvider provider, PutListener metadataListener, + long maxRequestsInFlight, CallOption... options) { Preconditions.checkNotNull(root, "root must not be null"); Preconditions.checkNotNull(provider, "provider must not be null"); - ClientStreamListener writer = startPut(descriptor, metadataListener, options); + ClientStreamListener writer = startPut(descriptor, metadataListener, maxRequestsInFlight, options); writer.start(root, provider); return writer; } @@ -150,18 +157,22 @@ public ClientStreamListener startPut( * * @param descriptor FlightDescriptor the descriptor for the data * @param metadataListener A handler for metadata messages from the server. + * @param maxRequestsInFlight the max in-flight requests in the stream * @param options RPC-layer hints for this call. * @return ClientStreamListener an interface to control uploading data. * {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will NOT already have been called. */ public ClientStreamListener startPut( - FlightDescriptor descriptor, PutListener metadataListener, CallOption... options) { + FlightDescriptor descriptor, + PutListener metadataListener, + long maxRequestsInFlight, + CallOption... options) { Preconditions.checkNotNull(descriptor, "descriptor must not be null"); Preconditions.checkNotNull(metadataListener, "metadataListener must not be null"); try { ClientCall call = asyncStubNewCall(this.doPutDescriptor, options); - OnStreamReadyHandler onStreamReadyHandler = new OnStreamReadyHandler(); + OnStreamReadyHandler onStreamReadyHandler = new OnStreamReadyHandler((int) maxRequestsInFlight); SetStreamObserver resultObserver = new SetStreamObserver(this.allocator, metadataListener, onStreamReadyHandler); ClientCallStreamObserver observer = @@ -184,11 +195,20 @@ public DictionaryProvider newDefaultDictionaryProvider() { } private static class OnStreamReadyHandler implements Runnable { - private final Semaphore semaphore = new Semaphore(0); + private final int maxRequestsInFlight; + private final Semaphore semaphore; + + OnStreamReadyHandler(int maxRequestsInFlight) { + this.maxRequestsInFlight = maxRequestsInFlight; + this.semaphore = new Semaphore(maxRequestsInFlight); + } @Override public void run() { - this.semaphore.release(); + int mayReleasePermits = this.maxRequestsInFlight - this.semaphore.availablePermits(); + if (mayReleasePermits > 0) { + this.semaphore.release(mayReleasePermits); + } } /** @@ -338,7 +358,10 @@ protected void waitUntilStreamReady() { // If the stream is not ready, wait for a short time to avoid busy waiting // This helps reduce CPU usage while still being responsive try { - this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS); + if (this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS)) { + // Allow some in-flight requests to be sent + break; + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted while waiting for stream to be ready", e); diff --git a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java index 61840ce..c8e7fe1 100644 --- a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java @@ -46,27 +46,29 @@ public class BulkWriteBenchmark { public static void main(String[] args) throws Exception { String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001"); String dbName = SystemPropertyUtil.get("db_name", "public"); - boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false); - int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024); + boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true); + int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024); LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint); LOG.info("Using zstd compression: {}", zstdCompression); LOG.info("Batch size: {}", batchSize); GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName); - TableDataProvider tableDataProvider = - ServiceLoader.load(TableDataProvider.class).first(); - tableDataProvider.init(); - TableSchema tableSchema = tableDataProvider.tableSchema(); BulkWrite.Config cfg = BulkWrite.Config.newBuilder() .allocatorInitReservation(0) .allocatorMaxAllocation(4 * 1024 * 1024 * 1024L) .timeoutMsPerMessage(60000) - .maxRequestsInFlight(32) + .maxRequestsInFlight(4) .build(); Compression compression = zstdCompression ? Compression.Zstd : Compression.None; Context ctx = Context.newDefault().withCompression(compression); + TableDataProvider tableDataProvider = + ServiceLoader.load(TableDataProvider.class).first(); + LOG.info("Table data provider: {}", tableDataProvider.getClass().getName()); + tableDataProvider.init(); + TableSchema tableSchema = tableDataProvider.tableSchema(); + LOG.info("Start writing data"); try (BulkStreamWriter writer = greptimeDB.bulkStreamWriter(tableSchema, cfg, ctx)) { Iterator rows = tableDataProvider.rows(); @@ -80,16 +82,19 @@ public static void main(String[] args) throws Exception { } table.addRow(rows.next()); } + LOG.info("Table bytes used: {}", table.bytesUsed()); // Complete the table; adding rows is no longer permitted. table.complete(); // Write the table data to the server CompletableFuture future = writer.writeNext(); + long fStart = System.nanoTime(); future.whenComplete((r, t) -> { + long costMs = (System.nanoTime() - fStart) / 1000000; if (t != null) { - LOG.error("Error writing data", t); + LOG.error("Error writing data, time cost: {}ms", costMs, t); } else { - LOG.info("Wrote rows: {}", r); + LOG.info("Wrote rows: {}, time cost: {}ms", r, costMs); } }); @@ -101,6 +106,8 @@ public static void main(String[] args) throws Exception { writer.completed(); LOG.info("Completed writing data, time cost: {}s", (System.nanoTime() - start) / 1000000000); + } finally { + tableDataProvider.close(); } greptimeDB.shutdownGracefully(); diff --git a/ingester-example/src/main/java/io/greptime/bench/LogTextHelper.java b/ingester-example/src/main/java/io/greptime/bench/LogTextHelper.java new file mode 100644 index 0000000..18a4bbd --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/bench/LogTextHelper.java @@ -0,0 +1,128 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime.bench; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * Helper class for generating log messages. + */ +public class LogTextHelper { + + private static final String[] LOG_LEVELS = {"INFO", "WARN", "ERROR", "DEBUG"}; + + // Log templates as static constants to avoid recreating them for each call + private static final String[] LOG_TEMPLATES = { + "INFO: Request processed successfully. RequestID: %s, Duration: %dms, UserID: %s", + "WARN: Slow query detected. QueryID: %s, Duration: %dms, SQL: %s", + "ERROR: Failed to connect to database. Attempt: %d, Error: %s, Host: %s", + "DEBUG: Cache hit ratio: %d%%, Keys: %d, Misses: %d, Memory usage: %dMB", + "INFO: User authentication successful. UserID: %s, IP: %s, LoginTime: %s" + }; + + // Stack trace frames as static constants + private static final String[] STACK_FRAMES = { + "at io.greptime.service.DatabaseConnector.connect(DatabaseConnector.java:142)", + "at io.greptime.service.QueryExecutor.execute(QueryExecutor.java:85)", + "at io.greptime.api.RequestHandler.processQuery(RequestHandler.java:213)", + "at io.greptime.server.GrpcService.handleRequest(GrpcService.java:178)", + "at io.greptime.server.HttpEndpoint.doPost(HttpEndpoint.java:95)", + "at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)", + "at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)" + }; + + // Context keys as static constants + private static final String[] CONTEXT_KEYS = { + "client_version", "server_version", "cluster_id", "node_id", "region", "datacenter" + }; + + // Pre-computed stack trace and context strings to avoid concatenation in hot path + private static final String STACK_TRACE_PREFIX = "\nStack trace:\n"; + private static final String CONTEXT_PREFIX = "\nAdditional context: "; + private static final String CONTEXT_SEPARATOR = ", "; + private static final String CONTEXT_EQUALS = "="; + + public static String randomLogLevel(ThreadLocalRandom random) { + return LOG_LEVELS[random.nextInt(LOG_LEVELS.length)]; + } + + public static String generate2kText(ThreadLocalRandom random, long logTs) { + StringBuilder buf = new StringBuilder(2000); + + // Choose one of several log templates to make it more realistic + String template = LOG_TEMPLATES[random.nextInt(LOG_TEMPLATES.length)]; + + // Format the template with the prepared values + String formattedLog; + if (template.startsWith("INFO: Request")) { + String requestId = "req_" + random.nextLong(1000000); + int duration = random.nextInt(1, 5000); + String userId = "user_" + random.nextInt(10000); + formattedLog = String.format(template, requestId, duration, userId); + } else if (template.startsWith("WARN:")) { + String queryId = "query_" + random.nextInt(50000); + int duration = random.nextInt(1, 5000); + formattedLog = String.format(template, queryId, duration, "SELECT * FROM table_" + random.nextInt(100)); + } else if (template.startsWith("ERROR:")) { + formattedLog = String.format(template, random.nextInt(5), "Connection refused", "db-" + random.nextInt(10)); + } else if (template.startsWith("DEBUG:")) { + int count = random.nextInt(10000); + int percentage = random.nextInt(100); + formattedLog = String.format(template, percentage, count, random.nextInt(1000), random.nextInt(512)); + } else { // Second INFO template + String userId = "user_" + random.nextInt(10000); + formattedLog = String.format( + template, userId, "192.168." + random.nextInt(256) + "." + random.nextInt(256), logTs); + } + + buf.append(formattedLog); + + // Target length between 1600-1900 characters + int targetLength = random.nextInt(1600, 1900); + + // Add stack trace for error logs to reach desired size + if (formattedLog.startsWith("ERROR")) { + buf.append(STACK_TRACE_PREFIX); + + // Add stack frames until we reach target length + int frameIndex = 0; + int framesCount = STACK_FRAMES.length; + + while (buf.length() < targetLength && frameIndex < 100) { // Limit iterations + buf.append(STACK_FRAMES[frameIndex % framesCount]).append('\n'); + frameIndex++; + } + } else { + // For non-error logs, pad with additional context information + buf.append(CONTEXT_PREFIX); + + // Add key-value pairs until we reach desired length + int keyIndex = 0; + int keysCount = CONTEXT_KEYS.length; + + while (buf.length() < targetLength && keyIndex < 100) { // Limit iterations + buf.append(CONTEXT_KEYS[keyIndex % keysCount]) + .append(CONTEXT_EQUALS) + .append(random.nextInt(1000)) + .append(CONTEXT_SEPARATOR); + keyIndex++; + } + } + + return buf.toString(); + } +} diff --git a/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java new file mode 100644 index 0000000..46ff75c --- /dev/null +++ b/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java @@ -0,0 +1,152 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime.bench; + +import com.codahale.metrics.Timer; +import io.greptime.common.SPI; +import io.greptime.common.util.ExecutorServiceHelper; +import io.greptime.common.util.MetricsUtil; +import io.greptime.common.util.NamedThreadFactory; +import io.greptime.common.util.SystemPropertyUtil; +import io.greptime.common.util.ThreadPoolUtil; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SPI( + name = "multi_producer_table_data_provider", + priority = 10 /* newer implementation can use higher priority to override the old one */) +public class MultiProducerTableDataProvider extends RandomTableDataProvider { + + private static final Logger LOG = LoggerFactory.getLogger(MultiProducerTableDataProvider.class); + + private final int producerCount; + private final long rowCount; + private final ExecutorService executorService; + private final BlockingQueue buffer = new ArrayBlockingQueue<>(100000); + + { + this.producerCount = SystemPropertyUtil.getInt("multi_producer_table_data_provider.producer_count", 10); + // Total number of rows to generate, configurable via system property + this.rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 10_000_000L); + this.executorService = ThreadPoolUtil.newBuilder() + .poolName("multi-producer-table-data-provider") + .enableMetric(true) + .coreThreads(this.producerCount) + .maximumThreads(this.producerCount) + .keepAliveSeconds(60L) + .workQueue(new ArrayBlockingQueue<>(512)) + .threadFactory(new NamedThreadFactory("multi-producer-table-data-provider")) + .rejectedHandler(new ThreadPoolExecutor.CallerRunsPolicy()) + .build(); + } + + @Override + public void init() { + AtomicLong rowIndex = new AtomicLong(0); + for (int i = 0; i < producerCount; i++) { + this.executorService.execute(() -> { + while (rowIndex.getAndIncrement() < rowCount) { + Object[] row = nextRow(); + try { + buffer.put(row); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + } + + @Override + public Iterator rows() { + + return new Iterator() { + private long index = 0; + + @Override + public boolean hasNext() { + return index < rowCount; + } + + @Override + public Object[] next() { + index++; + Object[] row = buffer.poll(); + if (row == null) { + try { + LOG.info("Waiting for row from buffer"); + row = buffer.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return row; + } + }; + } + + @Override + public void close() throws Exception { + ExecutorServiceHelper.shutdownAndAwaitTermination(this.executorService); + } + + public Object[] nextRow() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + Timer.Context timerContext = + MetricsUtil.timer("multi_producer_table_data_provider.next_row").time(); + long logTs = System.currentTimeMillis(); + String businessName = "business_" + random.nextInt(10); + String appName = "app_" + random.nextInt(100); + String hostName = "host_" + random.nextInt(1000); + String logMessage = LogTextHelper.generate2kText(random, logTs); + String logLevel = LogTextHelper.randomLogLevel(random); + String logName = "log_name_" + random.nextInt(100); + String uri = "http://example.com/path_" + random.nextInt(1000); + String traceId = "trace_" + random.nextLong(1000000); + String spanId = "span_" + random.nextLong(1000000); + String errno = "errno_" + random.nextInt(1000); + String traceFlags = "trace_flags_" + random.nextInt(1000); + String traceState = "trace_state_" + random.nextInt(1000); + String podName = "pod_" + random.nextInt(1000); + timerContext.stop(); + MetricsUtil.histogram("random_table_data_provider.log_message_length").update(logMessage.length()); + + return new Object[] { + logTs, + businessName, + appName, + hostName, + logMessage, + logLevel, + logName, + uri, + traceId, + spanId, + errno, + traceFlags, + traceState, + podName + }; + } +} diff --git a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java index b9fb671..187074b 100644 --- a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java @@ -22,8 +22,8 @@ import io.greptime.common.util.SystemPropertyUtil; import io.greptime.models.DataType; import io.greptime.models.TableSchema; -import io.grpc.netty.shaded.io.netty.util.internal.ThreadLocalRandom; import java.util.Iterator; +import java.util.concurrent.ThreadLocalRandom; @SPI( name = "random_table_data_provider", @@ -51,8 +51,7 @@ public class RandomTableDataProvider implements TableDataProvider { .addField("pod_name", DataType.String) .build(); // Total number of rows to generate, configurable via system property - // Default is 1 billion rows if not specified - rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 1_000_000L); + rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 10_000_000L); } @Override @@ -86,8 +85,8 @@ public Object[] next() { String businessName = "business_" + random.nextInt(10); String appName = "app_" + random.nextInt(100); String hostName = "host_" + random.nextInt(1000); - String logMessage = generate2kText(random, logTs); - String logLevel = LOG_LEVELS[random.nextInt(LOG_LEVELS.length)]; + String logMessage = LogTextHelper.generate2kText(random, logTs); + String logLevel = LogTextHelper.randomLogLevel(random); String logName = "log_name_" + random.nextInt(100); String uri = "http://example.com/path_" + random.nextInt(1000); String traceId = "trace_" + random.nextLong(1000000); @@ -122,103 +121,4 @@ public Object[] next() { @Override public void close() throws Exception {} - - private static final String[] LOG_LEVELS = {"INFO", "WARN", "ERROR", "DEBUG"}; - - // Log templates as static constants to avoid recreating them for each call - private static final String[] LOG_TEMPLATES = { - "INFO: Request processed successfully. RequestID: %s, Duration: %dms, UserID: %s", - "WARN: Slow query detected. QueryID: %s, Duration: %dms, SQL: %s", - "ERROR: Failed to connect to database. Attempt: %d, Error: %s, Host: %s", - "DEBUG: Cache hit ratio: %d%%, Keys: %d, Misses: %d, Memory usage: %dMB", - "INFO: User authentication successful. UserID: %s, IP: %s, LoginTime: %s" - }; - - // Stack trace frames as static constants - private static final String[] STACK_FRAMES = { - "at io.greptime.service.DatabaseConnector.connect(DatabaseConnector.java:142)", - "at io.greptime.service.QueryExecutor.execute(QueryExecutor.java:85)", - "at io.greptime.api.RequestHandler.processQuery(RequestHandler.java:213)", - "at io.greptime.server.GrpcService.handleRequest(GrpcService.java:178)", - "at io.greptime.server.HttpEndpoint.doPost(HttpEndpoint.java:95)", - "at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)", - "at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)" - }; - - // Context keys as static constants - private static final String[] CONTEXT_KEYS = { - "client_version", "server_version", "cluster_id", "node_id", "region", "datacenter" - }; - - // Pre-computed stack trace and context strings to avoid concatenation in hot path - private static final String STACK_TRACE_PREFIX = "\nStack trace:\n"; - private static final String CONTEXT_PREFIX = "\nAdditional context: "; - private static final String CONTEXT_SEPARATOR = ", "; - private static final String CONTEXT_EQUALS = "="; - - static String generate2kText(ThreadLocalRandom random, long logTs) { - StringBuilder buf = new StringBuilder(2000); - - // Choose one of several log templates to make it more realistic - String template = LOG_TEMPLATES[random.nextInt(LOG_TEMPLATES.length)]; - - // Format the template with the prepared values - String formattedLog; - if (template.startsWith("INFO: Request")) { - String requestId = "req_" + random.nextLong(1000000); - int duration = random.nextInt(1, 5000); - String userId = "user_" + random.nextInt(10000); - formattedLog = String.format(template, requestId, duration, userId); - } else if (template.startsWith("WARN:")) { - String queryId = "query_" + random.nextInt(50000); - int duration = random.nextInt(1, 5000); - formattedLog = String.format(template, queryId, duration, "SELECT * FROM table_" + random.nextInt(100)); - } else if (template.startsWith("ERROR:")) { - formattedLog = String.format(template, random.nextInt(5), "Connection refused", "db-" + random.nextInt(10)); - } else if (template.startsWith("DEBUG:")) { - int count = random.nextInt(10000); - int percentage = random.nextInt(100); - formattedLog = String.format(template, percentage, count, random.nextInt(1000), random.nextInt(512)); - } else { // Second INFO template - String userId = "user_" + random.nextInt(10000); - formattedLog = String.format( - template, userId, "192.168." + random.nextInt(256) + "." + random.nextInt(256), logTs); - } - - buf.append(formattedLog); - - // Target length between 1600-1900 characters - int targetLength = random.nextInt(1600, 1900); - - // Add stack trace for error logs to reach desired size - if (formattedLog.startsWith("ERROR")) { - buf.append(STACK_TRACE_PREFIX); - - // Add stack frames until we reach target length - int frameIndex = 0; - int framesCount = STACK_FRAMES.length; - - while (buf.length() < targetLength && frameIndex < 100) { // Limit iterations - buf.append(STACK_FRAMES[frameIndex % framesCount]).append('\n'); - frameIndex++; - } - } else { - // For non-error logs, pad with additional context information - buf.append(CONTEXT_PREFIX); - - // Add key-value pairs until we reach desired length - int keyIndex = 0; - int keysCount = CONTEXT_KEYS.length; - - while (buf.length() < targetLength && keyIndex < 100) { // Limit iterations - buf.append(CONTEXT_KEYS[keyIndex % keysCount]) - .append(CONTEXT_EQUALS) - .append(random.nextInt(1000)) - .append(CONTEXT_SEPARATOR); - keyIndex++; - } - } - - return buf.toString(); - } } diff --git a/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java index b1b9925..a5e3ada 100644 --- a/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java @@ -47,8 +47,8 @@ public class StreamingWriteBenchmark { public static void main(String[] args) throws Exception { String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001"); String dbName = SystemPropertyUtil.get("db_name", "public"); - boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false); - int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024); + boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true); + int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024); int maxPointsPerSecond = SystemPropertyUtil.getInt("max_points_per_second", Integer.MAX_VALUE); LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint); LOG.info("Using zstd compression: {}", zstdCompression); @@ -56,16 +56,17 @@ public static void main(String[] args) throws Exception { LOG.info("Max points per second: {}", maxPointsPerSecond); GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName); - TableDataProvider tableDataProvider = - ServiceLoader.load(TableDataProvider.class).first(); - tableDataProvider.init(); - TableSchema tableSchema = tableDataProvider.tableSchema(); Compression compression = zstdCompression ? Compression.Zstd : Compression.None; Context ctx = Context.newDefault().withCompression(compression); StreamWriter writer = greptimeDB.streamWriter(maxPointsPerSecond, ctx); + TableDataProvider tableDataProvider = + ServiceLoader.load(TableDataProvider.class).first(); + LOG.info("Table data provider: {}", tableDataProvider.getClass().getName()); + tableDataProvider.init(); + TableSchema tableSchema = tableDataProvider.tableSchema(); Iterator rows = tableDataProvider.rows(); LOG.info("Start writing data"); @@ -78,6 +79,7 @@ public static void main(String[] args) throws Exception { } table.addRow(rows.next()); } + LOG.info("Table bytes used: {}", table.bytesUsed()); // Complete the table; adding rows is no longer permitted. table.complete(); // Write the table data to the server @@ -97,5 +99,6 @@ public static void main(String[] args) throws Exception { LOG.info("Completed writing data: {}, time cost: {}s", result, (System.nanoTime() - start) / 1000000000); greptimeDB.shutdownGracefully(); + tableDataProvider.close(); } } diff --git a/ingester-example/src/main/resources/META-INF/services/io.greptime.bench.TableDataProvider b/ingester-example/src/main/resources/META-INF/services/io.greptime.bench.TableDataProvider index c838d18..41d097f 100644 --- a/ingester-example/src/main/resources/META-INF/services/io.greptime.bench.TableDataProvider +++ b/ingester-example/src/main/resources/META-INF/services/io.greptime.bench.TableDataProvider @@ -1 +1,2 @@ io.greptime.bench.RandomTableDataProvider +io.greptime.bench.MultiProducerTableDataProvider diff --git a/ingester-protocol/src/main/java/io/greptime/BulkWrite.java b/ingester-protocol/src/main/java/io/greptime/BulkWrite.java index 203423c..1519691 100644 --- a/ingester-protocol/src/main/java/io/greptime/BulkWrite.java +++ b/ingester-protocol/src/main/java/io/greptime/BulkWrite.java @@ -45,7 +45,7 @@ public interface BulkWrite { * This value should be determined based on the size of each request packet. A higher value means more in-flight requests, * which could potentially saturate network bandwidth or exceed the actual processing capacity of the database. */ - int DEFAULT_MAX_REQUESTS_IN_FLIGHT = 8; + int DEFAULT_MAX_REQUESTS_IN_FLIGHT = 4; static class Config { private long allocatorInitReservation = DEFAULT_ALLOCATOR_INIT_RESERVATION; diff --git a/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java b/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java index d045ed2..bd4992f 100644 --- a/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java @@ -135,8 +135,8 @@ private BulkStreamWriter bulkStreamWriteTo( HeaderCallOption headerOption = new HeaderCallOption(headers); AsyncExecCallOption execOption = new AsyncExecCallOption(this.asyncPool); - BulkWriteService writer = - manager.intoBulkWriteStream(table, arrowSchema, timeoutMsPerMessage, headerOption, execOption); + BulkWriteService writer = manager.intoBulkWriteStream( + table, arrowSchema, timeoutMsPerMessage, maxRequestsInFlight, headerOption, execOption); writer.start(); if (this.opts.isUseZeroCopyWrite()) { writer.tryUseZeroCopyWrite(); diff --git a/ingester-protocol/src/main/java/io/greptime/models/Table.java b/ingester-protocol/src/main/java/io/greptime/models/Table.java index 81101c8..f95b119 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Table.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Table.java @@ -85,6 +85,11 @@ default int pointCount() { return rowCount() * columnCount(); } + /** + * The bytes used by the table. + */ + long bytesUsed(); + /** * Insets one row with all columns. * @@ -245,6 +250,16 @@ public int columnCount() { return this.columnSchemas.size(); } + /** + *

+ * This is an expensive operation, only used for testing + *

+ */ + @Override + public long bytesUsed() { + return this.rows.stream().mapToLong(row -> row.getSerializedSize()).sum(); + } + @Override public Table addRow(Object... values) { Ensures.ensure( @@ -369,6 +384,13 @@ public int columnCount() { return this.root.getSchema().getFields().size(); } + @Override + public long bytesUsed() { + return this.root.getFieldVectors().stream() + .mapToLong(vector -> vector.getBufferSize()) + .sum(); + } + @Override public Table addRow(Object... values) { Ensures.ensure( From f93f4dbfea2f67309e4616540edc6ddf6cc18bbe Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 22 Apr 2025 17:07:53 +0800 Subject: [PATCH 2/5] release: 0.14.0 --- ingester-all/pom.xml | 2 +- ingester-bulk-protocol/pom.xml | 2 +- ingester-common/pom.xml | 2 +- ingester-example/pom.xml | 2 +- ingester-grpc/pom.xml | 2 +- ingester-protocol/pom.xml | 2 +- ingester-protocol/src/test/java/io/greptime/UtilTest.java | 2 +- ingester-rpc/pom.xml | 2 +- pom.xml | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ingester-all/pom.xml b/ingester-all/pom.xml index e647af4..a06d4cc 100644 --- a/ingester-all/pom.xml +++ b/ingester-all/pom.xml @@ -21,7 +21,7 @@ io.greptime greptimedb-ingester - 0.12.2 + 0.14.0 ingester-all diff --git a/ingester-bulk-protocol/pom.xml b/ingester-bulk-protocol/pom.xml index 1b01325..7b13278 100644 --- a/ingester-bulk-protocol/pom.xml +++ b/ingester-bulk-protocol/pom.xml @@ -21,7 +21,7 @@ io.greptime greptimedb-ingester - 0.12.2 + 0.14.0 ingester-bulk-protocol diff --git a/ingester-common/pom.xml b/ingester-common/pom.xml index 0f5d883..098b4e4 100644 --- a/ingester-common/pom.xml +++ b/ingester-common/pom.xml @@ -21,7 +21,7 @@ io.greptime greptimedb-ingester - 0.12.2 + 0.14.0 ingester-common diff --git a/ingester-example/pom.xml b/ingester-example/pom.xml index 5bc9bf6..0a5b135 100644 --- a/ingester-example/pom.xml +++ b/ingester-example/pom.xml @@ -21,7 +21,7 @@ io.greptime greptimedb-ingester - 0.12.2 + 0.14.0 ingester-example diff --git a/ingester-grpc/pom.xml b/ingester-grpc/pom.xml index 4d06df8..29c78e0 100644 --- a/ingester-grpc/pom.xml +++ b/ingester-grpc/pom.xml @@ -21,7 +21,7 @@ io.greptime greptimedb-ingester - 0.12.2 + 0.14.0 ingester-grpc diff --git a/ingester-protocol/pom.xml b/ingester-protocol/pom.xml index 7867ea7..6112f83 100644 --- a/ingester-protocol/pom.xml +++ b/ingester-protocol/pom.xml @@ -21,7 +21,7 @@ io.greptime greptimedb-ingester - 0.12.2 + 0.14.0 ingester-protocol diff --git a/ingester-protocol/src/test/java/io/greptime/UtilTest.java b/ingester-protocol/src/test/java/io/greptime/UtilTest.java index c2d3772..044e23d 100644 --- a/ingester-protocol/src/test/java/io/greptime/UtilTest.java +++ b/ingester-protocol/src/test/java/io/greptime/UtilTest.java @@ -27,6 +27,6 @@ public class UtilTest { @Test public void testClientVersion() { String ver = Util.clientVersion(); - Assert.assertEquals("0.12.2", ver); + Assert.assertEquals("0.14.0", ver); } } diff --git a/ingester-rpc/pom.xml b/ingester-rpc/pom.xml index 4817bf0..88154bc 100644 --- a/ingester-rpc/pom.xml +++ b/ingester-rpc/pom.xml @@ -21,7 +21,7 @@ io.greptime greptimedb-ingester - 0.12.2 + 0.14.0 ingester-rpc diff --git a/pom.xml b/pom.xml index 2510917..1c0c945 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ io.greptime greptimedb-ingester - 0.12.2 + 0.14.0 pom ${project.groupId}:${project.artifactId} From ec5e45350bd32c572bd48a2b3dce93c25d481a9b Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 22 Apr 2025 17:29:01 +0800 Subject: [PATCH 3/5] chore: minor change --- .../java/io/greptime/BulkWriteService.java | 2 +- .../apache/arrow/flight/BulkFlightClient.java | 5 +---- .../io/greptime/bench/BulkWriteBenchmark.java | 2 +- .../bench/MultiProducerTableDataProvider.java | 20 ++++++------------- .../bench/RandomTableDataProvider.java | 6 +++--- .../src/main/java/io/greptime/BulkWrite.java | 2 +- 6 files changed, 13 insertions(+), 24 deletions(-) diff --git a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java index 0aeef20..610b97a 100644 --- a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java +++ b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java @@ -61,7 +61,7 @@ public class BulkWriteService implements AutoCloseable { private final ClientStreamListener listener; private final AsyncPutListener metadataListener; private final long timeoutMs; - + /** * Constructs a new BulkWriteService. * diff --git a/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java b/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java index b297962..ae747db 100644 --- a/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java +++ b/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java @@ -205,10 +205,7 @@ private static class OnStreamReadyHandler implements Runnable { @Override public void run() { - int mayReleasePermits = this.maxRequestsInFlight - this.semaphore.availablePermits(); - if (mayReleasePermits > 0) { - this.semaphore.release(mayReleasePermits); - } + this.semaphore.release(this.maxRequestsInFlight); } /** diff --git a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java index c8e7fe1..16f5911 100644 --- a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java @@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception { .allocatorInitReservation(0) .allocatorMaxAllocation(4 * 1024 * 1024 * 1024L) .timeoutMsPerMessage(60000) - .maxRequestsInFlight(4) + .maxRequestsInFlight(8) .build(); Compression compression = zstdCompression ? Compression.Zstd : Compression.None; Context ctx = Context.newDefault().withCompression(compression); diff --git a/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java index 46ff75c..c7bf6db 100644 --- a/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java @@ -30,16 +30,12 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @SPI( name = "multi_producer_table_data_provider", priority = 10 /* newer implementation can use higher priority to override the old one */) public class MultiProducerTableDataProvider extends RandomTableDataProvider { - private static final Logger LOG = LoggerFactory.getLogger(MultiProducerTableDataProvider.class); - private final int producerCount; private final long rowCount; private final ExecutorService executorService; @@ -91,17 +87,13 @@ public boolean hasNext() { @Override public Object[] next() { - index++; - Object[] row = buffer.poll(); - if (row == null) { - try { - LOG.info("Waiting for row from buffer"); - row = buffer.take(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + try { + Object[] row = buffer.take(); + index++; + return row; + } catch (InterruptedException e) { + throw new RuntimeException(e); } - return row; } }; } diff --git a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java index 187074b..3ee3ea3 100644 --- a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java @@ -36,9 +36,9 @@ public class RandomTableDataProvider implements TableDataProvider { { tableSchema = TableSchema.newBuilder("my_bench_table") .addTimestamp("log_ts", DataType.TimestampMillisecond) - .addTag("business_name", DataType.String) - .addTag("app_name", DataType.String) - .addTag("host_name", DataType.String) + .addField("business_name", DataType.String) + .addField("app_name", DataType.String) + .addField("host_name", DataType.String) .addField("log_message", DataType.String) // 2K .addField("log_level", DataType.String) .addField("log_name", DataType.String) diff --git a/ingester-protocol/src/main/java/io/greptime/BulkWrite.java b/ingester-protocol/src/main/java/io/greptime/BulkWrite.java index 1519691..203423c 100644 --- a/ingester-protocol/src/main/java/io/greptime/BulkWrite.java +++ b/ingester-protocol/src/main/java/io/greptime/BulkWrite.java @@ -45,7 +45,7 @@ public interface BulkWrite { * This value should be determined based on the size of each request packet. A higher value means more in-flight requests, * which could potentially saturate network bandwidth or exceed the actual processing capacity of the database. */ - int DEFAULT_MAX_REQUESTS_IN_FLIGHT = 4; + int DEFAULT_MAX_REQUESTS_IN_FLIGHT = 8; static class Config { private long allocatorInitReservation = DEFAULT_ALLOCATOR_INIT_RESERVATION; From 88d2e0a383df08458321125740c808eb10932984 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 22 Apr 2025 19:10:50 +0800 Subject: [PATCH 4/5] chore: improve prepare data perf --- .../io/greptime/bench/BulkWriteBenchmark.java | 2 +- .../bench/MultiProducerTableDataProvider.java | 2 +- .../bench/RandomTableDataProvider.java | 2 +- .../java/io/greptime/BulkStreamWriter.java | 4 +- .../java/io/greptime/BulkWriteClient.java | 15 +- .../java/io/greptime/models/ArrowHelper.java | 280 ++++++++++++++++++ .../main/java/io/greptime/models/Table.java | 91 ++++-- .../main/resources/client_version.properties | 2 +- 8 files changed, 372 insertions(+), 26 deletions(-) diff --git a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java index 16f5911..b4c8f65 100644 --- a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java @@ -75,7 +75,7 @@ public static void main(String[] args) throws Exception { long start = System.nanoTime(); for (; ; ) { - Table.TableBufferRoot table = writer.tableBufferRoot(); + Table.TableBufferRoot table = writer.tableBufferRoot(1024); for (int i = 0; i < batchSize; i++) { if (!rows.hasNext()) { break; diff --git a/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java index c7bf6db..c1ccf9d 100644 --- a/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/MultiProducerTableDataProvider.java @@ -33,7 +33,7 @@ @SPI( name = "multi_producer_table_data_provider", - priority = 10 /* newer implementation can use higher priority to override the old one */) + priority = 1 /* newer implementation can use higher priority to override the old one */) public class MultiProducerTableDataProvider extends RandomTableDataProvider { private final int producerCount; diff --git a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java index 3ee3ea3..a55c2c9 100644 --- a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java @@ -27,7 +27,7 @@ @SPI( name = "random_table_data_provider", - priority = 1 /* newer implementation can use higher priority to override the old one */) + priority = 10 /* newer implementation can use higher priority to override the old one */) public class RandomTableDataProvider implements TableDataProvider { private final TableSchema tableSchema; diff --git a/ingester-protocol/src/main/java/io/greptime/BulkStreamWriter.java b/ingester-protocol/src/main/java/io/greptime/BulkStreamWriter.java index 20bbae5..6a1ffdf 100644 --- a/ingester-protocol/src/main/java/io/greptime/BulkStreamWriter.java +++ b/ingester-protocol/src/main/java/io/greptime/BulkStreamWriter.java @@ -66,11 +66,13 @@ public interface BulkStreamWriter extends AutoCloseable { * The `TableBufferRoot` provides direct access to the underlying memory * where table data is stored for efficient bulk operations. * + * @param columnBufferSize the buffer size for each column + * * @see Table.TableBufferRoot * * @return a table buffer root */ - Table.TableBufferRoot tableBufferRoot(); + Table.TableBufferRoot tableBufferRoot(int columnBufferSize); /** * Writes currenttable data to the stream. diff --git a/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java b/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java index bd4992f..a9496d7 100644 --- a/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java @@ -39,6 +39,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.arrow.flight.FlightCallHeaders; import org.apache.arrow.flight.HeaderCallOption; import org.apache.arrow.vector.types.pojo.Schema; @@ -185,6 +186,7 @@ static class DefaultBulkStreamWriter implements BulkStreamWriter { private final BulkWriteLimiter pipelineWriteLimiter; private final BulkWriteService writer; private final TableSchema tableSchema; + private final AtomicReference current = new AtomicReference<>(); public DefaultBulkStreamWriter(BulkWriteService writer, TableSchema tableSchema, int maxRequestsInFlight) { this.writer = writer; @@ -193,12 +195,21 @@ public DefaultBulkStreamWriter(BulkWriteService writer, TableSchema tableSchema, } @Override - public Table.TableBufferRoot tableBufferRoot() { - return Table.tableBufferRoot(this.tableSchema, this.writer.getRoot()); + public Table.TableBufferRoot tableBufferRoot(int columnBufferSize) { + Table.TableBufferRoot table = + Table.tableBufferRoot(this.tableSchema, this.writer.getRoot(), columnBufferSize); + this.current.set(table); + return table; } @Override public CompletableFuture writeNext() throws Exception { + Table.TableBufferRoot table = this.current.getAndSet(null); + if (table != null) { + // make sure the table is completed + table.complete(); + } + // Check if the stream is ready if (!isStreamReady()) { LOG.debug( diff --git a/ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java b/ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java index beb37b3..87ce8d9 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java @@ -22,6 +22,7 @@ import io.greptime.v1.Common; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; @@ -114,6 +115,285 @@ public static Schema createSchema(TableSchema tableSchema) { return new Schema(fields); } + public static void addValues( + FieldVector vector, + int startRowIndex, + Common.ColumnDataType dataType, + Common.ColumnDataTypeExtension dataTypeExtension, + Iterator values) { + switch (dataType) { + case INT8: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((TinyIntVector) vector).setSafe(startRowIndex++, (int) value); + } + } + break; + case INT16: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((SmallIntVector) vector).setSafe(startRowIndex++, (int) value); + } + } + break; + case INT32: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((IntVector) vector).setSafe(startRowIndex++, (int) value); + } + } + break; + case INT64: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((BigIntVector) vector).setSafe(startRowIndex++, (long) value); + } + } + break; + case UINT8: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((UInt1Vector) vector).setSafe(startRowIndex++, (int) value); + } + } + break; + case UINT16: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((UInt2Vector) vector).setSafe(startRowIndex++, (int) value); + } + } + break; + case UINT32: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((UInt4Vector) vector).setSafe(startRowIndex++, ((Long) value).intValue()); + } + } + break; + case UINT64: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((UInt8Vector) vector).setSafe(startRowIndex++, (long) value); + } + } + break; + case FLOAT32: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((Float4Vector) vector).setSafe(startRowIndex++, (float) value); + } + } + break; + case FLOAT64: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((Float8Vector) vector).setSafe(startRowIndex++, (double) value); + } + } + break; + case BOOLEAN: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((BitVector) vector).setSafe(startRowIndex++, (boolean) value ? 1 : 0); + } + } + break; + case BINARY: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((VarBinaryVector) vector).setSafe(startRowIndex++, (byte[]) value); + } + } + break; + case STRING: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((VarCharVector) vector) + .setSafe(startRowIndex++, ((String) value).getBytes(StandardCharsets.UTF_8)); + } + } + break; + case DATE: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + ((DateDayVector) vector).setSafe(startRowIndex++, ValueUtil.getDateValue(value)); + } + } + break; + case TIMESTAMP_SECOND: { + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + TimeStampSecHolder holder = new TimeStampSecHolder(); + holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.SECONDS); + ((TimeStampSecVector) vector).setSafe(startRowIndex++, holder); + } + } + break; + } + case TIMESTAMP_MILLISECOND: { + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + TimeStampMilliHolder holder = new TimeStampMilliHolder(); + holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.MILLISECONDS); + ((TimeStampMilliVector) vector).setSafe(startRowIndex++, holder); + } + } + break; + } + case TIMESTAMP_MICROSECOND: { + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + TimeStampMicroHolder holder = new TimeStampMicroHolder(); + holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.MICROSECONDS); + ((TimeStampMicroVector) vector).setSafe(startRowIndex++, holder); + } + } + break; + } + case TIMESTAMP_NANOSECOND: { + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + TimeStampNanoHolder holder = new TimeStampNanoHolder(); + holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.NANOSECONDS); + ((TimeStampNanoVector) vector).setSafe(startRowIndex++, holder); + } + } + break; + } + case TIME_SECOND: { + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + TimeSecHolder holder = new TimeSecHolder(); + holder.value = (int) ValueUtil.getLongValue(value); + ((TimeSecVector) vector).setSafe(startRowIndex++, holder); + } + } + break; + } + case TIME_MILLISECOND: { + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + TimeMilliHolder holder = new TimeMilliHolder(); + holder.value = (int) ValueUtil.getLongValue(value); + ((TimeMilliVector) vector).setSafe(startRowIndex++, holder); + } + } + break; + } + case TIME_MICROSECOND: { + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + TimeMicroHolder holder = new TimeMicroHolder(); + holder.value = ValueUtil.getLongValue(value); + ((TimeMicroVector) vector).setSafe(startRowIndex++, holder); + } + } + break; + } + case TIME_NANOSECOND: { + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + TimeNanoHolder holder = new TimeNanoHolder(); + holder.value = ValueUtil.getLongValue(value); + ((TimeNanoVector) vector).setSafe(startRowIndex++, holder); + } + } + break; + } + case DECIMAL128: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + byte[] bytes = ValueUtil.getDecimal128BigEndianBytes(dataTypeExtension, value); + ((DecimalVector) vector).setBigEndianSafe(startRowIndex++, bytes); + } + } + break; + case JSON: + while (values.hasNext()) { + Object value = values.next(); + if (value == null) { + vector.setNull(startRowIndex++); + } else { + byte[] jsonBytes = ValueUtil.getJsonString(value).getBytes(StandardCharsets.UTF_8); + ((VarCharVector) vector).setSafe(startRowIndex++, jsonBytes); + } + } + break; + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } + public static void addValue( FieldVector vector, int rowIndex, diff --git a/ingester-protocol/src/main/java/io/greptime/models/Table.java b/ingester-protocol/src/main/java/io/greptime/models/Table.java index f95b119..aaf9f7f 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Table.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Table.java @@ -22,7 +22,9 @@ import io.greptime.v1.Database; import io.greptime.v1.RowData; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -103,6 +105,12 @@ default int pointCount() { * Completes the table data construction and prevents further row additions. * After calling this method, the table will be immutable and ready to be written * to the database. + * + *

+ * This method must be called after all rows have been added to ensure any buffered + * rows are properly flushed to the underlying storage. Failure to call this method + * may result in data loss, particularly for implementations that use internal buffering. + *

*/ Table complete(); @@ -159,10 +167,11 @@ static Table from(TableSchema tableSchema) { * * @param tableSchema the table schema * @param root the vector schema root + * @param columnBufferSize the buffer size for each column * @return a table buffer root */ - static TableBufferRoot tableBufferRoot(TableSchema tableSchema, VectorSchemaRoot root) { - return new BulkTableBuilder(tableSchema, root).build(); + static TableBufferRoot tableBufferRoot(TableSchema tableSchema, VectorSchemaRoot root, int columnBufferSize) { + return new BulkTableBuilder(tableSchema, root, columnBufferSize).build(); } class Builder { @@ -324,10 +333,12 @@ public boolean isCompleted() { class BulkTableBuilder { private final TableSchema tableSchema; private final VectorSchemaRoot root; + private final int columnBufferSize; - public BulkTableBuilder(TableSchema tableSchema, VectorSchemaRoot root) { + public BulkTableBuilder(TableSchema tableSchema, VectorSchemaRoot root, int columnBufferSize) { this.tableSchema = tableSchema; this.root = root; + this.columnBufferSize = columnBufferSize; } public BulkTable build() { @@ -344,29 +355,37 @@ public BulkTable build() { Ensures.ensure( columnCount == dataTypeExtensions.size(), "Column data types size not equal to data type extensions size"); + Ensures.ensure( + columnCount == this.root.getSchema().getFields().size(), + "Column count not equal to root schema fields size"); - return new BulkTable(tableName, dataTypes, dataTypeExtensions, this.root); + return new BulkTable(tableName, dataTypes, dataTypeExtensions, this.root, this.columnBufferSize); } } class BulkTable implements TableBufferRoot { - private volatile boolean completed = false; + private volatile AtomicBoolean completed = new AtomicBoolean(false); private final String tableName; private final List dataTypes; private final List dataTypeExtensions; private final VectorSchemaRoot root; + private final int columnBufferSize; + private final List buffer; public BulkTable( String tableName, List dataTypes, List dataTypeExtensions, - VectorSchemaRoot root) { + VectorSchemaRoot root, + int columnBufferSize) { this.tableName = tableName; this.dataTypes = dataTypes; this.dataTypeExtensions = dataTypeExtensions; this.root = root; + this.columnBufferSize = columnBufferSize; + this.buffer = new ArrayList<>(columnBufferSize); } @Override @@ -381,7 +400,7 @@ public int rowCount() { @Override public int columnCount() { - return this.root.getSchema().getFields().size(); + return this.dataTypes.size(); } @Override @@ -394,21 +413,18 @@ public long bytesUsed() { @Override public Table addRow(Object... values) { Ensures.ensure( - !this.completed, + !this.completed.get(), "Table data construction has been completed. Cannot add more rows. Please create a new table instance."); checkNumValues(values.length); - int rowCount = this.root.getRowCount(); - for (int i = 0; i < values.length; i++) { - FieldVector vector = this.root.getVector(i); - if (vector.getValueCapacity() < rowCount + 1) { - vector.reAlloc(); - } - ArrowHelper.addValue( - vector, rowCount, this.dataTypes.get(i), this.dataTypeExtensions.get(i), values[i]); + this.buffer.add(values); + if (this.buffer.size() < this.columnBufferSize) { + return this; } - this.root.setRowCount(rowCount + 1); + + addRowInner(); + return this; } @@ -419,13 +435,50 @@ public Table subRange(int fromIndex, int toIndex) { @Override public Table complete() { - this.completed = true; + if (this.completed.compareAndSet(false, true)) { + if (!this.buffer.isEmpty()) { + addRowInner(); + } + } return this; } @Override public boolean isCompleted() { - return this.completed; + return this.completed.get(); + } + + private void addRowInner() { + int rowCount = this.root.getRowCount(); + int rowCountToAdd = this.buffer.size(); + int columnCount = columnCount(); + for (int i = 0; i < columnCount; i++) { + FieldVector vector = this.root.getVector(i); + while (vector.getValueCapacity() < rowCount + rowCountToAdd) { + vector.reAlloc(); + } + final int colIndex = i; + ArrowHelper.addValues( + vector, + rowCount, + this.dataTypes.get(colIndex), + this.dataTypeExtensions.get(colIndex), + new Iterator() { + private int index = 0; + + @Override + public boolean hasNext() { + return index < rowCountToAdd; + } + + @Override + public Object next() { + return buffer.get(index++)[colIndex]; + } + }); + } + this.buffer.clear(); + this.root.setRowCount(rowCount + rowCountToAdd); } } } diff --git a/ingester-protocol/src/main/resources/client_version.properties b/ingester-protocol/src/main/resources/client_version.properties index 312c655..00fca95 100644 --- a/ingester-protocol/src/main/resources/client_version.properties +++ b/ingester-protocol/src/main/resources/client_version.properties @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -client.version=0.12.2 +client.version=0.14.0 From 8dcf6d30b499ec07349aff7b4ad9e4bb459611af Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 22 Apr 2025 23:02:53 +0800 Subject: [PATCH 5/5] fix: json interal type --- .../io/greptime/BulkWriteApiQuickStart.java | 18 ++- .../bench/RandomTableDataProvider.java | 16 ++- .../java/io/greptime/models/ArrowHelper.java | 121 +----------------- .../java/io/greptime/models/RowHelper.java | 2 +- .../main/java/io/greptime/models/Table.java | 7 +- .../java/io/greptime/models/TableSchema.java | 17 ++- .../java/io/greptime/models/ValueUtil.java | 19 ++- .../io/greptime/models/ArrowHelperTest.java | 2 +- 8 files changed, 67 insertions(+), 135 deletions(-) diff --git a/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java b/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java index 8957c8d..4e0961e 100644 --- a/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java @@ -82,19 +82,25 @@ public static void main(String[] args) throws Exception { Config cfg = Config.newBuilder() .allocatorInitReservation(0) - .allocatorMaxAllocation(1024 * 1024 * 1024) - .timeoutMsPerMessage(10000) + .allocatorMaxAllocation(1024 * 1024 * 1024L) + .timeoutMsPerMessage(30000) .maxRequestsInFlight(8) .build(); Context ctx = Context.newDefault().withCompression(Compression.None); + // Bulk write api cannot auto create table + Table toCreate = Table.from(schema); + toCreate.addRow(generateOneRow(100000)); + toCreate.complete(); + greptimeDB.write(toCreate).get(); + try (BulkStreamWriter bulkStreamWriter = greptimeDB.bulkStreamWriter(schema, cfg, ctx)) { - // Write 100 times, each time write 100000 rows + // Write 100 times, each time write 10000 rows for (int i = 0; i < 100; i++) { long start = System.currentTimeMillis(); - Table.TableBufferRoot table = bulkStreamWriter.tableBufferRoot(); - for (int j = 0; j < 100000; j++) { + Table.TableBufferRoot table = bulkStreamWriter.tableBufferRoot(1024); + for (int j = 0; j < 10000; j++) { // with 100000 cardinality Object[] row = generateOneRow(100000); table.addRow(row); @@ -150,7 +156,7 @@ private static Object[] generateOneRow(int cardinality) { System.currentTimeMillis(), // ts random.nextInt(127), // field_int8 random.nextInt(32767), // field_int16 - random.nextInt(), // field_int32 + null, // field_int32 random.nextLong(), // field_int64 random.nextInt(255), // field_uint8 random.nextInt(65535), // field_uint16 diff --git a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java index a55c2c9..2602883 100644 --- a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java @@ -92,8 +92,20 @@ public Object[] next() { String traceId = "trace_" + random.nextLong(1000000); String spanId = "span_" + random.nextLong(1000000); String errno = "errno_" + random.nextInt(1000); - String traceFlags = "trace_flags_" + random.nextInt(1000); - String traceState = "trace_state_" + random.nextInt(1000); + String traceFlags; + int flags = random.nextInt(1000); + if (flags % 2 == 0) { + traceFlags = "trace_flags_" + flags; + } else { + traceFlags = null; + } + int state = random.nextInt(1000); + String traceState; + if (state % 3 == 0) { + traceState = "trace_state_" + state; + } else { + traceState = null; + } String podName = "pod_" + random.nextInt(1000); timerContext.stop(); MetricsUtil.histogram("random_table_data_provider.log_message_length") diff --git a/ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java b/ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java index 87ce8d9..cbbef3f 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/ArrowHelper.java @@ -188,7 +188,7 @@ public static void addValues( if (value == null) { vector.setNull(startRowIndex++); } else { - ((UInt4Vector) vector).setSafe(startRowIndex++, ((Long) value).intValue()); + ((UInt4Vector) vector).setSafe(startRowIndex++, ValueUtil.getIntValue(value)); } } break; @@ -385,7 +385,7 @@ public static void addValues( vector.setNull(startRowIndex++); } else { byte[] jsonBytes = ValueUtil.getJsonString(value).getBytes(StandardCharsets.UTF_8); - ((VarCharVector) vector).setSafe(startRowIndex++, jsonBytes); + ((VarBinaryVector) vector).setSafe(startRowIndex++, jsonBytes); } } break; @@ -394,121 +394,6 @@ public static void addValues( } } - public static void addValue( - FieldVector vector, - int rowIndex, - Common.ColumnDataType dataType, - Common.ColumnDataTypeExtension dataTypeExtension, - Object value) { - if (value == null) { - vector.setNull(rowIndex); - return; - } - - switch (dataType) { - case INT8: - ((TinyIntVector) vector).setSafe(rowIndex, (int) value); - break; - case INT16: - ((SmallIntVector) vector).setSafe(rowIndex, (int) value); - break; - case INT32: - ((IntVector) vector).setSafe(rowIndex, (int) value); - break; - case INT64: - ((BigIntVector) vector).setSafe(rowIndex, (long) value); - break; - case UINT8: - ((UInt1Vector) vector).setSafe(rowIndex, (int) value); - break; - case UINT16: - ((UInt2Vector) vector).setSafe(rowIndex, (int) value); - break; - case UINT32: - ((UInt4Vector) vector).setSafe(rowIndex, ((Long) value).intValue()); - break; - case UINT64: - ((UInt8Vector) vector).setSafe(rowIndex, (long) value); - break; - case FLOAT32: - ((Float4Vector) vector).setSafe(rowIndex, (float) value); - break; - case FLOAT64: - ((Float8Vector) vector).setSafe(rowIndex, (double) value); - break; - case BOOLEAN: - ((BitVector) vector).setSafe(rowIndex, (boolean) value ? 1 : 0); - break; - case BINARY: - ((VarBinaryVector) vector).setSafe(rowIndex, (byte[]) value); - break; - case STRING: - ((VarCharVector) vector).setSafe(rowIndex, ((String) value).getBytes(StandardCharsets.UTF_8)); - break; - case DATE: - ((DateDayVector) vector).setSafe(rowIndex, ValueUtil.getDateValue(value)); - break; - case TIMESTAMP_SECOND: { - TimeStampSecHolder holder = new TimeStampSecHolder(); - holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.SECONDS); - ((TimeStampSecVector) vector).setSafe(rowIndex, holder); - break; - } - case TIMESTAMP_MILLISECOND: { - TimeStampMilliHolder holder = new TimeStampMilliHolder(); - holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.MILLISECONDS); - ((TimeStampMilliVector) vector).setSafe(rowIndex, holder); - break; - } - case TIMESTAMP_MICROSECOND: { - TimeStampMicroHolder holder = new TimeStampMicroHolder(); - holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.MICROSECONDS); - ((TimeStampMicroVector) vector).setSafe(rowIndex, holder); - break; - } - case TIMESTAMP_NANOSECOND: { - TimeStampNanoHolder holder = new TimeStampNanoHolder(); - holder.value = ValueUtil.getTimestamp(value, java.util.concurrent.TimeUnit.NANOSECONDS); - ((TimeStampNanoVector) vector).setSafe(rowIndex, holder); - break; - } - case TIME_SECOND: { - TimeSecHolder holder = new TimeSecHolder(); - holder.value = (int) ValueUtil.getLongValue(value); - ((TimeSecVector) vector).setSafe(rowIndex, holder); - break; - } - case TIME_MILLISECOND: { - TimeMilliHolder holder = new TimeMilliHolder(); - holder.value = (int) ValueUtil.getLongValue(value); - ((TimeMilliVector) vector).setSafe(rowIndex, holder); - break; - } - case TIME_MICROSECOND: { - TimeMicroHolder holder = new TimeMicroHolder(); - holder.value = ValueUtil.getLongValue(value); - ((TimeMicroVector) vector).setSafe(rowIndex, holder); - break; - } - case TIME_NANOSECOND: { - TimeNanoHolder holder = new TimeNanoHolder(); - holder.value = ValueUtil.getLongValue(value); - ((TimeNanoVector) vector).setSafe(rowIndex, holder); - break; - } - case DECIMAL128: - byte[] bytes = ValueUtil.getDecimal128BigEndianBytes(dataTypeExtension, value); - ((DecimalVector) vector).setBigEndianSafe(rowIndex, bytes); - break; - case JSON: - byte[] jsonBytes = ValueUtil.getJsonString(value).getBytes(StandardCharsets.UTF_8); - ((VarCharVector) vector).setSafe(rowIndex, jsonBytes); - break; - default: - throw new IllegalArgumentException("Unsupported data type: " + dataType); - } - } - static ArrowType convertToArrowType( Common.ColumnDataType dataType, Common.ColumnDataTypeExtension dataTypeExtension) { switch (dataType) { @@ -565,7 +450,7 @@ static ArrowType convertToArrowType( Ensures.ensureNonNull(decimalTypeExtension, "decimalTypeExtension is null"); return new ArrowType.Decimal(decimalTypeExtension.getPrecision(), decimalTypeExtension.getScale(), 128); case JSON: - return new ArrowType.Utf8(); + return new ArrowType.Binary(); default: throw new IllegalArgumentException("Unsupported data type: " + dataType); } diff --git a/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java b/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java index cc65288..b3442b1 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java +++ b/ingester-protocol/src/main/java/io/greptime/models/RowHelper.java @@ -57,7 +57,7 @@ public static void addValue( valueBuilder.setU16Value((int) value); break; case UINT32: - valueBuilder.setU32Value((int) value); + valueBuilder.setU32Value(ValueUtil.getIntValue(value)); break; case UINT64: valueBuilder.setU64Value(ValueUtil.getLongValue(value)); diff --git a/ingester-protocol/src/main/java/io/greptime/models/Table.java b/ingester-protocol/src/main/java/io/greptime/models/Table.java index aaf9f7f..0d0b6d8 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Table.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Table.java @@ -218,8 +218,11 @@ private static Table buildTable( RowData.ColumnSchema.Builder builder = RowData.ColumnSchema.newBuilder(); builder.setColumnName(columnNames.get(i)) .setSemanticType(semanticTypes.get(i)) - .setDatatype(dataTypes.get(i)) - .setDatatypeExtension(dataTypeExtensions.get(i)); + .setDatatype(dataTypes.get(i)); + Common.ColumnDataTypeExtension ext = dataTypeExtensions.get(i); + if (ext != null) { + builder.setDatatypeExtension(ext); + } table.columnSchemas.add(builder.build()); } return table; diff --git a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java index e83402e..af43301 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java +++ b/ingester-protocol/src/main/java/io/greptime/models/TableSchema.java @@ -189,14 +189,23 @@ public Builder addColumn( this.columnNames.add(name); this.semanticTypes.add(semanticType.toProtoValue()); this.dataTypes.add(dataType.toProtoValue()); - if (decimalTypeExtension == null) { - this.dataTypeExtensions.add(Common.ColumnDataTypeExtension.getDefaultInstance()); - } else { - Ensures.ensure(dataType == DataType.Decimal128, "Only decimal type can have decimal type extension"); + + if (dataType == DataType.Json) { + Common.ColumnDataTypeExtension ext = Common.ColumnDataTypeExtension.newBuilder() + .setJsonType(Common.JsonTypeExtension.JSON_BINARY) + .build(); + this.dataTypeExtensions.add(ext); + } else if (dataType == DataType.Decimal128) { + if (decimalTypeExtension == null) { + decimalTypeExtension = DataType.DecimalTypeExtension.DEFAULT; + } Common.ColumnDataTypeExtension ext = Common.ColumnDataTypeExtension.newBuilder() .setDecimalType(decimalTypeExtension.into()) .build(); this.dataTypeExtensions.add(ext); + } else { + Ensures.ensure(decimalTypeExtension == null, "Only decimal type can have decimal type extension"); + this.dataTypeExtensions.add(null); } return this; } diff --git a/ingester-protocol/src/main/java/io/greptime/models/ValueUtil.java b/ingester-protocol/src/main/java/io/greptime/models/ValueUtil.java index 1d8b221..2973997 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/ValueUtil.java +++ b/ingester-protocol/src/main/java/io/greptime/models/ValueUtil.java @@ -34,6 +34,23 @@ public class ValueUtil { static int ONE_DAY_IN_SECONDS = 86400; + static int getIntValue(Object value) { + if (value instanceof Integer) { + return (int) value; + } + + if (value instanceof Long) { + return ((Long) value).intValue(); + } + + if (value instanceof Number) { + return ((Number) value).intValue(); + } + + // Not null + throw new IllegalArgumentException("Cannot convert value of type " + value.getClass() + " to int"); + } + static long getLongValue(Object value) { if (value instanceof Integer) { return (int) value; @@ -48,7 +65,7 @@ static long getLongValue(Object value) { } // Not null - throw new IllegalArgumentException("Unsupported value type: " + value.getClass()); + throw new IllegalArgumentException("Cannot convert value of type " + value.getClass() + " to long"); } static int getDateValue(Object value) { diff --git a/ingester-protocol/src/test/java/io/greptime/models/ArrowHelperTest.java b/ingester-protocol/src/test/java/io/greptime/models/ArrowHelperTest.java index e7ab895..eaae8bc 100644 --- a/ingester-protocol/src/test/java/io/greptime/models/ArrowHelperTest.java +++ b/ingester-protocol/src/test/java/io/greptime/models/ArrowHelperTest.java @@ -147,7 +147,7 @@ public void testCreateSchema() { new ArrowType.Decimal(38, 18, 128).getTypeID(), schema.getFields().get(21).getType().getTypeID()); Assert.assertEquals( - new ArrowType.Utf8().getTypeID(), + new ArrowType.Binary().getTypeID(), schema.getFields().get(22).getType().getTypeID()); } }