Skip to content

Commit 14c7cae

Browse files
authored
fix: bench write minor fix (#70)
* fix: minor fix * fix: buf realloc * chore: add FutureDeadlineExceededException * feat: Skip ID 0 as it's reserved for special cases * chore: minor change
1 parent 08afb12 commit 14c7cae

11 files changed

Lines changed: 79 additions & 59 deletions

File tree

ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ private static class RootAllocatorHolder {
5959

6060
private static BufferAllocator createRootAllocator() {
6161
// max allocation size in bytes
62-
long allocationLimit = SystemPropertyUtil.getLong(Keys.FLIGHT_ALLOCATION_LIMIT, 1024 * 1024 * 1024);
62+
long allocationLimit = SystemPropertyUtil.getLong(Keys.FLIGHT_ALLOCATION_LIMIT, 4 * 1024 * 1024 * 1024L);
6363
BufferAllocator rootAllocator = new RootAllocator(new FlightAllocationListener(), allocationLimit);
6464

6565
// Add a shutdown hook to close the root allocator when the JVM exits
6666
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
6767
try {
68+
LOG.info("Closing root allocator: {}", rootAllocator);
6869
AutoCloseables.close(rootAllocator);
69-
} catch (Exception e) {
70-
LOG.error("Failed to close root allocator", e);
70+
} catch (Exception ignored) {
7171
}
7272
}));
7373

ingester-bulk-protocol/src/main/java/io/greptime/BulkWriteService.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public boolean isStreamReady() {
143143
* @return A PutStage object containing the future and the number of in-flight requests
144144
*/
145145
public PutStage putNext() {
146-
long id = this.idGenerator.incrementAndGet();
146+
long id = nextId();
147147
long totalRowCount = this.root.getRowCount();
148148

149149
LOG.debug("Starting putNext operation [id={}], total row count: {}", id, totalRowCount);
@@ -159,7 +159,7 @@ public PutStage putNext() {
159159
// The buffer will be closed in the putNext method, but if an error occurs during execution,
160160
// we need to close it ourselves in the catch block to prevent memory leaks.
161161
metadataBuf = this.allocator.buffer(metadata.length);
162-
metadataBuf.setBytes(0, metadata);
162+
metadataBuf.writeBytes(metadata);
163163

164164
// Send data to the server
165165
LOG.debug("Sending data to server [id={}]", id);
@@ -207,6 +207,14 @@ public void close() throws Exception {
207207
AutoCloseables.close(this.root, this.manager);
208208
}
209209

210+
private long nextId() {
211+
long id;
212+
do {
213+
id = this.idGenerator.incrementAndGet();
214+
} while (id == 0); // Skip ID 0 as it's reserved for special cases
215+
return id;
216+
}
217+
210218
/**
211219
* Represents the state of a put operation, containing both the future for tracking
212220
* completion and the current count of in-flight requests.
@@ -277,7 +285,7 @@ public long getId() {
277285
* Listener for handling asynchronous responses from the server during bulk write operations.
278286
* Manages the lifecycle of in-flight requests and their associated futures.
279287
*/
280-
class AsyncPutListener implements PutListener {
288+
static class AsyncPutListener implements PutListener {
281289
private final ConcurrentMap<Long, IdentifiableCompletableFuture> futuresInFlight;
282290
private final CompletableFuture<Void> completed;
283291

@@ -313,9 +321,11 @@ public void attach(long id, IdentifiableCompletableFuture future) {
313321

314322
if (t != null) {
315323
LOG.error("Put operation failed [id={}]: {}", id, t.getMessage(), t);
316-
// If a put next operation fails, we complete the future with the exception
317-
// and the stream will be terminated immediately to prevent further operations
318-
onError(t);
324+
if (!(t instanceof TimeoutCompletableFuture.FutureDeadlineExceededException)) {
325+
// If a put next operation fails, we complete the future with the exception
326+
// and the stream will be terminated immediately to prevent further operations
327+
onError(t);
328+
}
319329
} else {
320330
LOG.debug("Put operation succeeded [id={}], affected rows: {}", id, r);
321331
}
@@ -338,26 +348,24 @@ public int numInFlight() {
338348

339349
@Override
340350
public void onNext(PutResult val) {
341-
try (ArrowBuf metadata = val.getApplicationMetadata()) {
342-
if (metadata == null) {
343-
LOG.warn("Received PutResult with null metadata");
344-
return;
345-
}
346-
String metadataString =
347-
ByteString.copyFrom(metadata.nioBuffer()).toStringUtf8();
348-
Metadata.ResponseMetadata responseMetadata = Metadata.ResponseMetadata.fromJson(metadataString);
351+
ArrowBuf metadata = val.getApplicationMetadata();
352+
if (metadata == null) {
353+
LOG.warn("Received PutResult with null metadata");
354+
return;
355+
}
356+
String metadataString = ByteString.copyFrom(metadata.nioBuffer()).toStringUtf8();
357+
Metadata.ResponseMetadata responseMetadata = Metadata.ResponseMetadata.fromJson(metadataString);
349358

350-
long requestId = responseMetadata.getRequestId();
351-
int affectedRows = responseMetadata.getAffectedRows();
359+
long requestId = responseMetadata.getRequestId();
360+
int affectedRows = responseMetadata.getAffectedRows();
352361

353-
LOG.debug("Received response [id={}], affected rows: {}", requestId, affectedRows);
362+
LOG.debug("Received response [id={}], affected rows: {}", requestId, affectedRows);
354363

355-
IdentifiableCompletableFuture future = this.futuresInFlight.get(requestId);
356-
if (future != null) {
357-
future.complete(affectedRows);
358-
} else {
359-
LOG.warn("A timeout response [id={}] finally received", requestId);
360-
}
364+
IdentifiableCompletableFuture future = this.futuresInFlight.get(requestId);
365+
if (future != null) {
366+
future.complete(affectedRows);
367+
} else if (requestId != 0) { // 0 is reserved for special cases
368+
LOG.warn("A timeout response [id={}] finally received", requestId);
361369
}
362370
}
363371

ingester-bulk-protocol/src/main/java/org/apache/arrow/flight/BulkFlightClient.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package org.apache.arrow.flight;
1818

19+
import com.codahale.metrics.Timer;
1920
import io.greptime.ArrowCompressionType;
21+
import io.greptime.common.util.MetricsUtil;
2022
import io.greptime.rpc.TlsOptions;
2123
import io.grpc.Channel;
2224
import io.grpc.ClientCall;
@@ -54,8 +56,6 @@
5456
import org.apache.arrow.vector.dictionary.DictionaryProvider;
5557
import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
5658
import org.apache.arrow.vector.ipc.message.IpcOption;
57-
import org.slf4j.Logger;
58-
import org.slf4j.LoggerFactory;
5959

6060
/**
6161
* Client for Flight services.
@@ -64,8 +64,6 @@
6464
* with some changes to support bulk write.
6565
*/
6666
public class BulkFlightClient implements AutoCloseable {
67-
private static final Logger LOG = LoggerFactory.getLogger(BulkFlightClient.class);
68-
6967
/** The maximum number of trace events to keep on the gRPC Channel. This value disables channel tracing. */
7068
private static final int MAX_CHANNEL_TRACE_EVENTS = 0;
7169

@@ -86,7 +84,7 @@ public class BulkFlightClient implements AutoCloseable {
8684
ManagedChannel channel,
8785
List<FlightClientMiddleware.Factory> middleware,
8886
ArrowCompressionType compressionType) {
89-
this.allocator = incomingAllocator.newChildAllocator("flight-client", 0, Long.MAX_VALUE);
87+
this.allocator = incomingAllocator.newChildAllocator("bulk-flight-client", 0, Long.MAX_VALUE);
9088
this.channel = channel;
9189
this.middleware = middleware;
9290
this.compressionType = compressionType;
@@ -326,22 +324,28 @@ public void start(VectorSchemaRoot root, DictionaryProvider dictionaries, IpcOpt
326324

327325
@Override
328326
protected void waitUntilStreamReady() {
329-
// Check isCancelled as well to avoid inadvertently blocking forever
330-
// (so long as PutListener properly implements it)
331-
while (!super.responseObserver.isReady() && !this.isCancelled.getAsBoolean()) {
332-
if (this.isCompletedExceptionally.getAsBoolean()) {
333-
// Will throw the error immediately
334-
getResult();
335-
}
327+
Timer.Context timerCtx = MetricsUtil.timer("bulk_flight_client.wait_until_stream_ready")
328+
.time();
329+
try {
330+
// Check isCancelled as well to avoid inadvertently blocking forever
331+
// (so long as PutListener properly implements it)
332+
while (!super.responseObserver.isReady() && !this.isCancelled.getAsBoolean()) {
333+
if (this.isCompletedExceptionally.getAsBoolean()) {
334+
// Will throw the error immediately
335+
getResult();
336+
}
336337

337-
// If the stream is not ready, wait for a short time to avoid busy waiting
338-
// This helps reduce CPU usage while still being responsive
339-
try {
340-
this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS);
341-
} catch (InterruptedException e) {
342-
Thread.currentThread().interrupt();
343-
throw new RuntimeException("Interrupted while waiting for stream to be ready", e);
338+
// If the stream is not ready, wait for a short time to avoid busy waiting
339+
// This helps reduce CPU usage while still being responsive
340+
try {
341+
this.onStreamReadyHandler.await(10, TimeUnit.MILLISECONDS);
342+
} catch (InterruptedException e) {
343+
Thread.currentThread().interrupt();
344+
throw new RuntimeException("Interrupted while waiting for stream to be ready", e);
345+
}
344346
}
347+
} finally {
348+
timerCtx.stop();
345349
}
346350
}
347351

ingester-common/src/main/java/io/greptime/common/TimeoutCompletableFuture.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,20 @@ public TimeoutCompletableFuture<T> scheduleTimeout() {
5454
if (isCancelled() || isDone()) {
5555
return;
5656
}
57-
completeExceptionally(
58-
new TimeoutException("Operation timed out after " + this.timeout + " " + this.unit));
57+
completeExceptionally(new FutureDeadlineExceededException(
58+
"Future deadline exceeded, timeout: " + this.timeout + " " + this.unit));
5959
},
6060
this.timeout,
6161
this.unit);
6262

6363
return this;
6464
}
65+
66+
public static class FutureDeadlineExceededException extends TimeoutException {
67+
private static final long serialVersionUID = 1L;
68+
69+
public FutureDeadlineExceededException(String message) {
70+
super(message);
71+
}
72+
}
6573
}

ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public static void main(String[] args) throws Exception {
4747
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
4848
String dbName = SystemPropertyUtil.get("db_name", "public");
4949
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false);
50-
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 100 * 1024);
50+
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024);
5151
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
5252
LOG.info("Using zstd compression: {}", zstdCompression);
5353
LOG.info("Batch size: {}", batchSize);
@@ -60,9 +60,9 @@ public static void main(String[] args) throws Exception {
6060

6161
BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
6262
.allocatorInitReservation(0)
63-
.allocatorMaxAllocation(4 * 1024 * 1024 * 1024)
64-
.timeoutMsPerMessage(10000)
65-
.maxRequestsInFlight(8)
63+
.allocatorMaxAllocation(4 * 1024 * 1024 * 1024L)
64+
.timeoutMsPerMessage(60000)
65+
.maxRequestsInFlight(32)
6666
.build();
6767
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
6868
Context ctx = Context.newDefault().withCompression(compression);

ingester-example/src/main/java/io/greptime/bench/RandomTableDataProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class RandomTableDataProvider implements TableDataProvider {
5252
.build();
5353
// Total number of rows to generate, configurable via system property
5454
// Default is 1 billion rows if not specified
55-
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 1_000_000_000L);
55+
rowCount = SystemPropertyUtil.getLong("table_data_provider.row_count", 1_000_000L);
5656
}
5757

5858
@Override
@@ -97,8 +97,8 @@ public Object[] next() {
9797
String traceState = "trace_state_" + random.nextInt(1000);
9898
String podName = "pod_" + random.nextInt(1000);
9999
timerContext.stop();
100-
MetricsUtil.counter("random_table_data_provider.log_message_length")
101-
.inc(logMessage.length());
100+
MetricsUtil.histogram("random_table_data_provider.log_message_length")
101+
.update(logMessage.length());
102102

103103
return new Object[] {
104104
logTs,

ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static void main(String[] args) throws Exception {
4848
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
4949
String dbName = SystemPropertyUtil.get("db_name", "public");
5050
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", false);
51-
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 100 * 1024);
51+
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 5 * 1024);
5252
int maxPointsPerSecond = SystemPropertyUtil.getInt("max_points_per_second", Integer.MAX_VALUE);
5353
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
5454
LOG.info("Using zstd compression: {}", zstdCompression);

ingester-example/src/main/resources/log4j2.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
</Appenders>
2727
<Loggers>
28-
<Root level="debug">
28+
<Root level="info">
2929
<AppenderRef ref="Console"/>
3030
</Root>
3131
</Loggers>

ingester-protocol/src/main/java/io/greptime/BulkWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public interface BulkWrite {
2727
/**
2828
* The default timeout in milliseconds for each message.
2929
*/
30-
long DEFAULT_TIMEOUT_MS_PER_MESSAGE = 10000;
30+
long DEFAULT_TIMEOUT_MS_PER_MESSAGE = 60000;
3131

3232
/**
3333
* The default allocator init reservation bytes.

ingester-protocol/src/main/java/io/greptime/BulkWriteClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ public CompletableFuture<Integer> writeNext() throws Exception {
218218
InnerMetricHelper.putTime().update(clock.duration(startCall), TimeUnit.MILLISECONDS);
219219
});
220220

221-
LOG.debug("Write request sent successfully, in-flight requests: {}", stage.numInFlight());
221+
LOG.info("Write request sent successfully, in-flight requests: {}", stage.numInFlight());
222222

223223
return future;
224224
});

0 commit comments

Comments
 (0)