From a900a8267ba4c5d5d3cebdb80a87ed2cb0791ee8 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Fri, 18 Apr 2025 19:32:07 +0800 Subject: [PATCH 1/5] fix: minor fix --- .../java/io/greptime/BulkWriteService.java | 32 +++++++++---------- .../io/greptime/bench/BulkWriteBenchmark.java | 2 +- .../src/main/resources/log4j2.xml | 2 +- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java index a96586a..3d5e3a3 100644 --- a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java +++ b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java @@ -338,26 +338,24 @@ public int numInFlight() { @Override public void onNext(PutResult val) { - try (ArrowBuf metadata = val.getApplicationMetadata()) { - if (metadata == null) { - LOG.warn("Received PutResult with null metadata"); - return; - } - String metadataString = - ByteString.copyFrom(metadata.nioBuffer()).toStringUtf8(); - Metadata.ResponseMetadata responseMetadata = Metadata.ResponseMetadata.fromJson(metadataString); + ArrowBuf metadata = val.getApplicationMetadata(); + if (metadata == null) { + LOG.warn("Received PutResult with null metadata"); + return; + } + String metadataString = ByteString.copyFrom(metadata.nioBuffer()).toStringUtf8(); + Metadata.ResponseMetadata responseMetadata = Metadata.ResponseMetadata.fromJson(metadataString); - long requestId = responseMetadata.getRequestId(); - int affectedRows = responseMetadata.getAffectedRows(); + long requestId = responseMetadata.getRequestId(); + int affectedRows = responseMetadata.getAffectedRows(); - LOG.debug("Received response [id={}], affected rows: {}", requestId, affectedRows); + LOG.debug("Received response [id={}], affected rows: {}", requestId, affectedRows); - IdentifiableCompletableFuture future = this.futuresInFlight.get(requestId); - if (future != null) { - future.complete(affectedRows); - } else { - LOG.warn("A timeout response [id={}] finally received", requestId); - } + IdentifiableCompletableFuture future = this.futuresInFlight.get(requestId); + if (future != null) { + future.complete(affectedRows); + } else { + LOG.warn("A timeout response [id={}] finally received", requestId); } } diff --git a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java index eb25387..c94b529 100644 --- a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java @@ -60,7 +60,7 @@ public static void main(String[] args) throws Exception { BulkWrite.Config cfg = BulkWrite.Config.newBuilder() .allocatorInitReservation(0) - .allocatorMaxAllocation(4 * 1024 * 1024 * 1024) + .allocatorMaxAllocation(4 * 1024 * 1024 * 1024L) .timeoutMsPerMessage(10000) .maxRequestsInFlight(8) .build(); diff --git a/ingester-example/src/main/resources/log4j2.xml b/ingester-example/src/main/resources/log4j2.xml index d99eb5b..e159d2b 100644 --- a/ingester-example/src/main/resources/log4j2.xml +++ b/ingester-example/src/main/resources/log4j2.xml @@ -25,7 +25,7 @@ - + From b908c83f0c0ce82bb628fb9bdb0b4210f88d0069 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Fri, 18 Apr 2025 21:01:22 +0800 Subject: [PATCH 2/5] fix: buf realloc --- .../java/io/greptime/BulkWriteService.java | 4 +- .../apache/arrow/flight/BulkFlightClient.java | 38 +++++++++++-------- .../io/greptime/bench/BulkWriteBenchmark.java | 2 +- .../bench/RandomTableDataProvider.java | 4 +- .../src/main/java/io/greptime/BulkWrite.java | 2 +- .../main/java/io/greptime/models/Table.java | 2 +- 6 files changed, 30 insertions(+), 22 deletions(-) diff --git a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java index 3d5e3a3..7ff08c3 100644 --- a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java +++ b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java @@ -159,7 +159,7 @@ public PutStage putNext() { // The buffer will be closed in the putNext method, but if an error occurs during execution, // we need to close it ourselves in the catch block to prevent memory leaks. metadataBuf = this.allocator.buffer(metadata.length); - metadataBuf.setBytes(0, metadata); + metadataBuf.writeBytes(metadata); // Send data to the server LOG.debug("Sending data to server [id={}]", id); @@ -277,7 +277,7 @@ public long getId() { * Listener for handling asynchronous responses from the server during bulk write operations. * Manages the lifecycle of in-flight requests and their associated futures. */ - class AsyncPutListener implements PutListener { + static class AsyncPutListener implements PutListener { private final ConcurrentMap futuresInFlight; private final CompletableFuture completed; diff --git a/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java b/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java index 47cb63c..0ef625c 100644 --- a/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java +++ b/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java @@ -16,7 +16,9 @@ package org.apache.arrow.flight; +import com.codahale.metrics.Timer; import io.greptime.ArrowCompressionType; +import io.greptime.common.util.MetricsUtil; import io.greptime.rpc.TlsOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -86,7 +88,7 @@ public class BulkFlightClient implements AutoCloseable { ManagedChannel channel, List middleware, ArrowCompressionType compressionType) { - this.allocator = incomingAllocator.newChildAllocator("flight-client", 0, Long.MAX_VALUE); + this.allocator = incomingAllocator.newChildAllocator("bulk-flight-client", 0, Long.MAX_VALUE); this.channel = channel; this.middleware = middleware; this.compressionType = compressionType; @@ -326,22 +328,28 @@ public void start(VectorSchemaRoot root, DictionaryProvider dictionaries, IpcOpt @Override protected void waitUntilStreamReady() { - // Check isCancelled as well to avoid inadvertently blocking forever - // (so long as PutListener properly implements it) - while (!super.responseObserver.isReady() && !this.isCancelled.getAsBoolean()) { - if (this.isCompletedExceptionally.getAsBoolean()) { - // Will throw the error immediately - getResult(); - } + Timer.Context timerCtx = MetricsUtil.timer("bulk_flight_client.wait_until_stream_ready") + .time(); + try { + // Check isCancelled as well to avoid inadvertently blocking forever + // (so long as PutListener properly implements it) + while (!super.responseObserver.isReady() && !this.isCancelled.getAsBoolean()) { + if (this.isCompletedExceptionally.getAsBoolean()) { + // Will throw the error immediately + getResult(); + } - // If the stream is not ready, wait for a short time to avoid busy waiting - // This helps reduce CPU usage while still being responsive - try { - this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for stream to be ready", e); + // If the stream is not ready, wait for a short time to avoid busy waiting + // This helps reduce CPU usage while still being responsive + try { + this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for stream to be ready", e); + } } + } finally { + timerCtx.stop(); } } diff --git a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java index c94b529..bd8c5a7 100644 --- a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java @@ -61,7 +61,7 @@ public static void main(String[] args) throws Exception { BulkWrite.Config cfg = BulkWrite.Config.newBuilder() .allocatorInitReservation(0) .allocatorMaxAllocation(4 * 1024 * 1024 * 1024L) - .timeoutMsPerMessage(10000) + .timeoutMsPerMessage(60000) .maxRequestsInFlight(8) .build(); Compression compression = zstdCompression ? Compression.Zstd : Compression.None; diff --git a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java index 7cbd10c..609130f 100644 --- a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java @@ -97,8 +97,8 @@ public Object[] next() { String traceState = "trace_state_" + random.nextInt(1000); String podName = "pod_" + random.nextInt(1000); timerContext.stop(); - MetricsUtil.counter("random_table_data_provider.log_message_length") - .inc(logMessage.length()); + MetricsUtil.histogram("random_table_data_provider.log_message_length") + .update(logMessage.length()); return new Object[] { logTs, diff --git a/ingester-protocol/src/main/java/io/greptime/BulkWrite.java b/ingester-protocol/src/main/java/io/greptime/BulkWrite.java index 1e3a4bd..203423c 100644 --- a/ingester-protocol/src/main/java/io/greptime/BulkWrite.java +++ b/ingester-protocol/src/main/java/io/greptime/BulkWrite.java @@ -27,7 +27,7 @@ public interface BulkWrite { /** * The default timeout in milliseconds for each message. */ - long DEFAULT_TIMEOUT_MS_PER_MESSAGE = 10000; + long DEFAULT_TIMEOUT_MS_PER_MESSAGE = 60000; /** * The default allocator init reservation bytes. diff --git a/ingester-protocol/src/main/java/io/greptime/models/Table.java b/ingester-protocol/src/main/java/io/greptime/models/Table.java index cd414ab..81101c8 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Table.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Table.java @@ -381,7 +381,7 @@ public Table addRow(Object... values) { for (int i = 0; i < values.length; i++) { FieldVector vector = this.root.getVector(i); if (vector.getValueCapacity() < rowCount + 1) { - vector.allocateNew(); + vector.reAlloc(); } ArrowHelper.addValue( vector, rowCount, this.dataTypes.get(i), this.dataTypeExtensions.get(i), values[i]); From 364365d3c4822ba46e0edd93d24d2810c842ca8f Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sat, 19 Apr 2025 01:13:01 +0800 Subject: [PATCH 3/5] chore: add FutureDeadlineExceededException --- .../src/main/java/io/greptime/BulkWriteManager.java | 6 +++--- .../src/main/java/io/greptime/BulkWriteService.java | 8 +++++--- .../org/apache/arrow/flight/BulkFlightClient.java | 4 ---- .../io/greptime/common/TimeoutCompletableFuture.java | 12 ++++++++++-- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java index d308a6e..1d83dfc 100644 --- a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java +++ b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java @@ -59,15 +59,15 @@ private static class RootAllocatorHolder { private static BufferAllocator createRootAllocator() { // max allocation size in bytes - long allocationLimit = SystemPropertyUtil.getLong(Keys.FLIGHT_ALLOCATION_LIMIT, 1024 * 1024 * 1024); + long allocationLimit = SystemPropertyUtil.getLong(Keys.FLIGHT_ALLOCATION_LIMIT, 4 * 1024 * 1024 * 1024L); BufferAllocator rootAllocator = new RootAllocator(new FlightAllocationListener(), allocationLimit); // Add a shutdown hook to close the root allocator when the JVM exits Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { + LOG.info("Closing root allocator: {}", rootAllocator); AutoCloseables.close(rootAllocator); - } catch (Exception e) { - LOG.error("Failed to close root allocator", e); + } catch (Exception ignored) { } })); diff --git a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java index 7ff08c3..8a53aab 100644 --- a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java +++ b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java @@ -313,9 +313,11 @@ public void attach(long id, IdentifiableCompletableFuture future) { if (t != null) { LOG.error("Put operation failed [id={}]: {}", id, t.getMessage(), t); - // If a put next operation fails, we complete the future with the exception - // and the stream will be terminated immediately to prevent further operations - onError(t); + if (!(t instanceof TimeoutCompletableFuture.FutureDeadlineExceededException)) { + // If a put next operation fails, we complete the future with the exception + // and the stream will be terminated immediately to prevent further operations + onError(t); + } } else { LOG.debug("Put operation succeeded [id={}], affected rows: {}", id, r); } diff --git a/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java b/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java index 0ef625c..123668f 100644 --- a/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java +++ b/ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java @@ -56,8 +56,6 @@ import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider; import org.apache.arrow.vector.ipc.message.IpcOption; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Client for Flight services. @@ -66,8 +64,6 @@ * with some changes to support bulk write. */ public class BulkFlightClient implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(BulkFlightClient.class); - /** The maximum number of trace events to keep on the gRPC Channel. This value disables channel tracing. */ private static final int MAX_CHANNEL_TRACE_EVENTS = 0; diff --git a/ingester-common/src/main/java/io/greptime/common/TimeoutCompletableFuture.java b/ingester-common/src/main/java/io/greptime/common/TimeoutCompletableFuture.java index 29801e7..0d8e90a 100644 --- a/ingester-common/src/main/java/io/greptime/common/TimeoutCompletableFuture.java +++ b/ingester-common/src/main/java/io/greptime/common/TimeoutCompletableFuture.java @@ -54,12 +54,20 @@ public TimeoutCompletableFuture scheduleTimeout() { if (isCancelled() || isDone()) { return; } - completeExceptionally( - new TimeoutException("Operation timed out after " + this.timeout + " " + this.unit)); + completeExceptionally(new FutureDeadlineExceededException( + "Future deadline exceeded, timeout: " + this.timeout + " " + this.unit)); }, this.timeout, this.unit); return this; } + + public static class FutureDeadlineExceededException extends TimeoutException { + private static final long serialVersionUID = 1L; + + public FutureDeadlineExceededException(String message) { + super(message); + } + } } From 6eb9848065468c8c42d2f284e9b25bd7d110ad86 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sat, 19 Apr 2025 02:56:10 +0800 Subject: [PATCH 4/5] feat: Skip ID 0 as it's reserved for special cases --- .../src/main/java/io/greptime/BulkWriteService.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java index 8a53aab..79adc59 100644 --- a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java +++ b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java @@ -143,7 +143,7 @@ public boolean isStreamReady() { * @return A PutStage object containing the future and the number of in-flight requests */ public PutStage putNext() { - long id = this.idGenerator.incrementAndGet(); + long id = nextId(); long totalRowCount = this.root.getRowCount(); LOG.debug("Starting putNext operation [id={}], total row count: {}", id, totalRowCount); @@ -207,6 +207,14 @@ public void close() throws Exception { AutoCloseables.close(this.root, this.manager); } + private long nextId() { + long id = this.idGenerator.incrementAndGet(); + if (id == 0) { // Skip ID 0 as it's reserved for special cases + id = this.idGenerator.incrementAndGet(); + } + return id; + } + /** * Represents the state of a put operation, containing both the future for tracking * completion and the current count of in-flight requests. @@ -356,7 +364,7 @@ public void onNext(PutResult val) { IdentifiableCompletableFuture future = this.futuresInFlight.get(requestId); if (future != null) { future.complete(affectedRows); - } else { + } else if (requestId != 0) { // 0 is reserved for special cases LOG.warn("A timeout response [id={}] finally received", requestId); } } From 6f2665f9f0c959ee21f9d7946cca20a01d8608bf Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sat, 19 Apr 2025 03:32:17 +0800 Subject: [PATCH 5/5] chore: minor change --- .../src/main/java/io/greptime/BulkWriteService.java | 6 +++--- .../src/main/java/io/greptime/bench/BulkWriteBenchmark.java | 4 ++-- .../java/io/greptime/bench/RandomTableDataProvider.java | 2 +- .../java/io/greptime/bench/StreamingWriteBenchmark.java | 2 +- .../src/main/java/io/greptime/BulkWriteClient.java | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java index 79adc59..0ec2c41 100644 --- a/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java +++ b/ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java @@ -208,10 +208,10 @@ public void close() throws Exception { } private long nextId() { - long id = this.idGenerator.incrementAndGet(); - if (id == 0) { // Skip ID 0 as it's reserved for special cases + long id; + do { id = this.idGenerator.incrementAndGet(); - } + } while (id == 0); // Skip ID 0 as it's reserved for special cases return id; } diff --git a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java index bd8c5a7..61840ce 100644 --- a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java @@ -47,7 +47,7 @@ public static void main(String[] args) throws Exception { String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001"); String dbName = SystemPropertyUtil.get("db_name", "public"); boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false); - int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 100 * 1024); + int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024); LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint); LOG.info("Using zstd compression: {}", zstdCompression); LOG.info("Batch size: {}", batchSize); @@ -62,7 +62,7 @@ public static void main(String[] args) throws Exception { .allocatorInitReservation(0) .allocatorMaxAllocation(4 * 1024 * 1024 * 1024L) .timeoutMsPerMessage(60000) - .maxRequestsInFlight(8) + .maxRequestsInFlight(32) .build(); Compression compression = zstdCompression ? Compression.Zstd : Compression.None; Context ctx = Context.newDefault().withCompression(compression); diff --git a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java index 609130f..b9fb671 100644 --- a/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java +++ b/ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java @@ -52,7 +52,7 @@ public class RandomTableDataProvider implements TableDataProvider { .build(); // Total number of rows to generate, configurable via system property // Default is 1 billion rows if not specified - rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 1_000_000_000L); + rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 1_000_000L); } @Override diff --git a/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java index c5b9c6a..b1b9925 100644 --- a/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java @@ -48,7 +48,7 @@ public static void main(String[] args) throws Exception { String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001"); String dbName = SystemPropertyUtil.get("db_name", "public"); boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false); - int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 100 * 1024); + int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024); int maxPointsPerSecond = SystemPropertyUtil.getInt("max_points_per_second", Integer.MAX_VALUE); LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint); LOG.info("Using zstd compression: {}", zstdCompression); diff --git a/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java b/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java index 833a17e..d045ed2 100644 --- a/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java +++ b/ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java @@ -218,7 +218,7 @@ public CompletableFuture writeNext() throws Exception { InnerMetricHelper.putTime().update(clock.duration(startCall), TimeUnit.MILLISECONDS); }); - LOG.debug("Write request sent successfully, in-flight requests: {}", stage.numInFlight()); + LOG.info("Write request sent successfully, in-flight requests: {}", stage.numInFlight()); return future; });