diff --git a/README.md b/README.md index d9371de..25654ae 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,11 @@ ![License](https://img.shields.io/badge/license-Apache--2.0-green.svg) [![Maven Central](https://img.shields.io/maven-central/v/io.greptime/greptimedb-ingester.svg?label=maven%20central)](https://central.sonatype.com/search?q=io.greptime&name=ingester-all) -A Java ingester for GreptimeDB, which is compatible with GreptimeDB protocol and lightweight. +A lightweight Java ingester for GreptimeDB designed for high-throughput data writing. It utilizes the gRPC protocol to provide a non-blocking, purely asynchronous API that is easy to use and integrate into your applications. -## Features +## Documentation +- [API Reference](https://javadoc.io/doc/io.greptime/ingester-protocol/latest/index.html) +- [Examples](https://github.com/GreptimeTeam/greptimedb-ingester-java/tree/main/ingester-example) -- SPI-based extensible network transport layer; provides the default implementation by using the - gRPC framework -- Non-blocking, purely asynchronous API, easy to use -- Automatically collects various performance metrics by default. Users can then configure them and - write to local files -- Users can take in-memory snapshots of critical objects, configure them, and write to local files. - This is helpful when troubleshooting complex issues - -## Javadoc -- [ingester-protocol](https://javadoc.io/doc/io.greptime/ingester-protocol/latest/index.html) +# Features +TODO diff --git a/ingester-common/src/main/java/io/greptime/common/util/ExecutorServiceHelper.java b/ingester-common/src/main/java/io/greptime/common/util/ExecutorServiceHelper.java index a59ce3d..44bc312 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/ExecutorServiceHelper.java +++ b/ingester-common/src/main/java/io/greptime/common/util/ExecutorServiceHelper.java @@ -30,6 +30,9 @@ public final class ExecutorServiceHelper { /** * @see #shutdownAndAwaitTermination(ExecutorService, long) + * + * @param pool the executor service to shutdown + * @return true if the executor service is shutdown successfully, false otherwise */ public static boolean shutdownAndAwaitTermination(ExecutorService pool) { return shutdownAndAwaitTermination(pool, 1000); @@ -40,6 +43,10 @@ public static boolean shutdownAndAwaitTermination(ExecutorService pool) { * phases, first by calling {@code shutdown} to reject incoming tasks, * and then calling {@code shutdownNow}, if necessary, to cancel any * lingering tasks. + * + * @param pool the executor service to shutdown + * @param timeoutMillis the timeout in milliseconds + * @return true if the executor service is shutdown successfully, false otherwise */ public static boolean shutdownAndAwaitTermination(ExecutorService pool, long timeoutMillis) { if (pool == null) { diff --git a/ingester-common/src/main/java/io/greptime/common/util/Files.java b/ingester-common/src/main/java/io/greptime/common/util/Files.java index 666b651..c937fe7 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/Files.java +++ b/ingester-common/src/main/java/io/greptime/common/util/Files.java @@ -50,6 +50,7 @@ public static void fsync(File file) throws IOException { * Creates the directory named by this pathname if not exists. * * @param path pathname + * @throws IOException if an I/O error occurs */ public static void mkdirIfNotExists(String path) throws IOException { File dir = Paths.get(path).toFile().getAbsoluteFile(); diff --git a/ingester-common/src/main/java/io/greptime/common/util/MetricsUtil.java b/ingester-common/src/main/java/io/greptime/common/util/MetricsUtil.java index 2463a91..cd8beaa 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/MetricsUtil.java +++ b/ingester-common/src/main/java/io/greptime/common/util/MetricsUtil.java @@ -82,71 +82,97 @@ public static void reportImmediately() { } /** - * Return the global registry of metric instances. + * Gets the global registry of metric instances. + * + * @return the global registry of metric instances */ public static MetricRegistry metricRegistry() { return METRIC_REGISTRY; } /** - * Return the {@link Meter} registered under this name; or create + * Gets the {@link Meter} registered under this name; or create * and register a new {@link Meter} if none is registered. + * + * @param name the name of the metric + * @return the {@link Meter} registered under this name */ public static Meter meter(Object name) { return METRIC_REGISTRY.meter(named(name)); } /** - * Return the {@link Meter} registered under this name; or create + * Gets the {@link Meter} registered under this name; or create * and register a new {@link Meter} if none is registered. + * + * @param names the names of the metric + * @return the {@link Meter} registered under this name */ public static Meter meter(Object... names) { return METRIC_REGISTRY.meter(named(names)); } /** - * Return the {@link Timer} registered under this name; or create + * Gets the {@link Timer} registered under this name; or create * and register a new {@link Timer} if none is registered. + * + * @param name the name of the metric + * @return the {@link Timer} registered under this name */ public static Timer timer(Object name) { return METRIC_REGISTRY.timer(named(name)); } /** - * Return the {@link Timer} registered under this name; or create + * Gets the {@link Timer} registered under this name; or create * and register a new {@link Timer} if none is registered. + * + * @param names the names of the metric + * @return the {@link Timer} registered under this name */ public static Timer timer(Object... names) { return METRIC_REGISTRY.timer(named(names)); } /** - * Return the {@link Counter} registered under this name; or create + * Gets the {@link Counter} registered under this name; or create * and register a new {@link Counter} if none is registered. + * + * @param name the name of the metric + * @return the {@link Counter} registered under this name */ public static Counter counter(Object name) { return METRIC_REGISTRY.counter(named(name)); } /** - * Return the {@link Counter} registered under this name; or create + * Gets the {@link Counter} registered under this name; or create * and register a new {@link Counter} if none is registered. + * + * @param names the names of the metric + * @return the {@link Counter} registered under this name */ public static Counter counter(Object... names) { return METRIC_REGISTRY.counter(named(names)); } /** - * Return the {@link Histogram} registered under this name; or create + * Gets the {@link Histogram} registered under this name; or create * and register a new {@link Histogram} if none is registered. + * + * @param name the name of the metric + * @return the {@link Histogram} registered under this name */ public static Histogram histogram(Object name) { return METRIC_REGISTRY.histogram(named(name)); } /** - * Return the {@link Histogram} registered under this name; or create + * Gets the {@link Histogram} registered under this name; or create * and register a new {@link Histogram} if none is registered. + * + * @param names the names of the metric + * @return the {@link Histogram} registered under this name */ public static Histogram histogram(Object... names) { return METRIC_REGISTRY.histogram(named(names)); diff --git a/ingester-common/src/main/java/io/greptime/common/util/ObjectPool.java b/ingester-common/src/main/java/io/greptime/common/util/ObjectPool.java index cc9e1b4..1db4acf 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/ObjectPool.java +++ b/ingester-common/src/main/java/io/greptime/common/util/ObjectPool.java @@ -22,27 +22,37 @@ public interface ObjectPool { /** - * Get an object from the pool. + * Gets an object from the pool. + * + * @return an object from the pool */ T getObject(); /** - * Return the object to the pool. The caller should not use the object beyond this point. + * Returns the object to the pool. The caller should not use the object beyond this point. + * + * @param returned the object to return to the pool */ void returnObject(T returned); /** * Defines a resource, and the way to create and destroy instances of it. + * + * @param the type of the resource */ interface Resource { /** - * Create a new instance of the resource. + * Creates a new instance of the resource. + * + * @return a new instance of the resource */ T create(); /** - * Destroy the given instance. + * Destroys the given instance. + * + * @param instance the instance to destroy */ void close(T instance); } diff --git a/ingester-common/src/main/java/io/greptime/common/util/Platform.java b/ingester-common/src/main/java/io/greptime/common/util/Platform.java index d194657..2739254 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/Platform.java +++ b/ingester-common/src/main/java/io/greptime/common/util/Platform.java @@ -35,14 +35,18 @@ public class Platform { private static final boolean IS_MAC = isMac0(); /** - * Return {@code true} if the JVM is running on Windows + * Checks if the JVM is running on Windows. + * + * @return {@code true} if the JVM is running on Windows */ public static boolean isWindows() { return IS_WINDOWS; } /** - * Return {@code true} if the JVM is running on Mac OSX + * Checks if the JVM is running on Mac OSX. + * + * @return {@code true} if the JVM is running on Mac OSX */ public static boolean isMac() { return IS_MAC; diff --git a/ingester-common/src/main/java/io/greptime/common/util/SerializingExecutor.java b/ingester-common/src/main/java/io/greptime/common/util/SerializingExecutor.java index 624e27d..0fbc754 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/SerializingExecutor.java +++ b/ingester-common/src/main/java/io/greptime/common/util/SerializingExecutor.java @@ -66,6 +66,8 @@ public SerializingExecutor(String name, Thread.UncaughtExceptionHandler uncaught * under a lock of your own, but don't want the tasks to be run under * your lock (for fear of deadlock). You can call this method in the * lock, and call {@link #drain} outside the lock. + * + * @param task the task to add */ public final void executeLater(Runnable task) { this.queue.add(Ensures.ensureNonNull(task, "null `task`")); diff --git a/ingester-common/src/main/java/io/greptime/common/util/SharedScheduledPool.java b/ingester-common/src/main/java/io/greptime/common/util/SharedScheduledPool.java index 106c09f..3584108 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/SharedScheduledPool.java +++ b/ingester-common/src/main/java/io/greptime/common/util/SharedScheduledPool.java @@ -19,7 +19,7 @@ import java.util.concurrent.ScheduledExecutorService; /** - * Like rust: pub type SharedScheduledPool = RcObjectPool + * Like rust: {@code pub type SharedScheduledPool = RcObjectPool} */ public class SharedScheduledPool extends RcObjectPool { diff --git a/ingester-common/src/main/java/io/greptime/common/util/SharedThreadPool.java b/ingester-common/src/main/java/io/greptime/common/util/SharedThreadPool.java index edcb46b..35fe3c9 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/SharedThreadPool.java +++ b/ingester-common/src/main/java/io/greptime/common/util/SharedThreadPool.java @@ -19,7 +19,7 @@ import java.util.concurrent.ExecutorService; /** - * Like rust: pub type SharedThreadPool = RcObjectPool + * Like rust: {@code pub type SharedThreadPool = RcObjectPool} */ public class SharedThreadPool extends RcObjectPool { diff --git a/ingester-common/src/main/java/io/greptime/common/util/Strings.java b/ingester-common/src/main/java/io/greptime/common/util/Strings.java index 776ba68..7fcc2aa 100644 --- a/ingester-common/src/main/java/io/greptime/common/util/Strings.java +++ b/ingester-common/src/main/java/io/greptime/common/util/Strings.java @@ -33,21 +33,30 @@ public static String toString(Object obj) { } /** - * Returns the given string if it is non-null; the empty string otherwise. + * Converts a null string to an empty string. + * + * @param string the string to check + * @return the given string if it is non-null; the empty string otherwise */ public static String nullToEmpty(String string) { return (string == null) ? "" : string; } /** - * Returns the given string if it is nonempty; {@code null} otherwise. + * Converts an empty string to a null string. + * + * @param string the string to check + * @return the given string if it is nonempty; {@code null} otherwise */ public static String emptyToNull(String string) { return isNullOrEmpty(string) ? null : string; } /** - * Returns {@code true} if the given string is null or is the empty string. + * Checks if the given string is null or is the empty string. + * + * @param str the string to check + * @return {@code true} if the given string is null or is the empty string */ public static boolean isNullOrEmpty(String str) { return str == null || str.isEmpty(); @@ -63,6 +72,9 @@ public static boolean isNullOrEmpty(String str) { * Strings.isBlank("bob") = false * Strings.isBlank(" bob ") = false * ``` + * + * @param str the string to check + * @return {@code true} if the given string is null, empty or whitespace only */ public static boolean isBlank(String str) { int strLen; @@ -86,6 +98,9 @@ public static boolean isBlank(String str) { * Strings.isNotBlank("bob") = true * Strings.isNotBlank(" bob ") = true * ``` + * + * @param str the string to check + * @return {@code true} if the given string is not null, empty or whitespace only */ public static boolean isNotBlank(String str) { return !isBlank(str); @@ -104,6 +119,10 @@ public static boolean isNotBlank(String str) { * Strings.split("a:b:c", '.') = ["a:b:c"] * Strings.split("a b c", ' ') = ["a", "b", "c"] * ``` + * + * @param str the string to split + * @param separator the separator + * @return an array of strings */ public static String[] split(String str, char separator) { return split(str, separator, false); @@ -129,6 +148,11 @@ public static String[] split(String str, char separator) { * Strings.split(" a b c", ' ', true) = ["", "", a", "b", "c"] * Strings.split(" a b c ", ' ', true) = ["", a", "b", "c", ""] * ``` + * + * @param str the string to split + * @param separator the separator + * @param preserveAllTokens {@code true} if all tokens should be preserved, including empty tokens created by adjacent separators + * @return an array of strings */ public static String[] split(String str, char separator, boolean preserveAllTokens) { if (str == null) { diff --git a/ingester-protocol/src/main/java/io/greptime/BulkStreamWriter.java b/ingester-protocol/src/main/java/io/greptime/BulkStreamWriter.java index 6a1ffdf..c0fd07e 100644 --- a/ingester-protocol/src/main/java/io/greptime/BulkStreamWriter.java +++ b/ingester-protocol/src/main/java/io/greptime/BulkStreamWriter.java @@ -20,21 +20,21 @@ import java.util.concurrent.CompletableFuture; /** - * `BulkStreamWriter` is a specialized interface for efficiently writing data to the server in bulk operations. + * {@code BulkStreamWriter} is a specialized interface for efficiently writing data to the server in bulk operations. * - * Each `BulkStreamWriter` is associated with a `TableBufferRoot` instance that manages off-heap memory. - * The workflow involves first obtaining the `TableBufferRoot` instance and populating it with data. - * This data resides in off-heap memory and is only transmitted to the server when `writeNext()` - * is called. Upon calling `writeNext()`, all data in the `TableBufferRoot` is sent as a batch, - * and the `TableBufferRoot` is automatically cleared for the next set of data. + *

Each {@code BulkStreamWriter} is associated with a {@code TableBufferRoot} instance that manages off-heap memory. + * The workflow involves first obtaining the {@code TableBufferRoot} instance and populating it with data. + * This data resides in off-heap memory and is only transmitted to the server when {@code writeNext()} + * is called. Upon calling {@code writeNext()}, all data in the {@code TableBufferRoot} is sent as a batch, + * and the {@code TableBufferRoot} is automatically cleared for the next set of data. * - * As a streaming interface, `BulkStreamWriter` allows you to repeat this cycle (populate data, - * call `writeNext()`) multiple times for efficient batch processing. When all data has been - * transmitted, you must call `completed()` to properly close the stream and ensure any server-side - * errors are properly reported. Additionally, you should call the `close()` method to ensure all + *

As a streaming interface, {@code BulkStreamWriter} allows you to repeat this cycle (populate data, + * call {@code writeNext()}) multiple times for efficient batch processing. When all data has been + * transmitted, you must call {@code completed()} to properly close the stream and ensure any server-side + * errors are properly reported. Additionally, you should call the {@code close()} method to ensure all * related resources are properly released, though this happens automatically when using try-with-resources. * - * Example usage: + *

Example usage: *

{@code
  * try (BulkStreamWriter bulkStreamWriter = greptimeDB.bulkStreamWriter(schema)) { // auto close in try-with-resources
  *     // Write 1000 times, each time write 100000 rows
@@ -58,26 +58,26 @@
  *     }
  *
  *     bulkStreamWriter.completed();
+ * }
  * }
*/ public interface BulkStreamWriter extends AutoCloseable { /** - * Returns the `TableBufferRoot` instance associated with this writer. - * The `TableBufferRoot` provides direct access to the underlying memory + * Returns the {@code TableBufferRoot} instance associated with this writer. + * The {@code TableBufferRoot} provides direct access to the underlying memory * where table data is stored for efficient bulk operations. * * @param columnBufferSize the buffer size for each column * - * @see Table.TableBufferRoot - * * @return a table buffer root */ Table.TableBufferRoot tableBufferRoot(int columnBufferSize); /** - * Writes currenttable data to the stream. + * Writes current table data to the stream. * - * @return a future that completes with the number of rows affected. + * @return a future that completes with the number of rows affected + * @throws Exception if an error occurs */ CompletableFuture writeNext() throws Exception; @@ -86,11 +86,13 @@ public interface BulkStreamWriter extends AutoCloseable { * and waits for the server to finish processing the data. This method * must be called to ensure all data is properly written and to receive * any errors that may have occurred during the operation. + * + * @throws Exception if an error occurs */ void completed() throws Exception; /** - * If the stream is not ready, calling `writeNext()` will block until the stream + * If the stream is not ready, calling {@code writeNext()} will block until the stream * becomes ready for writing, potentially using a busy-wait mechanism. * * @return true if the stream is ready to write data, false otherwise diff --git a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java index 093afc5..e1422af 100644 --- a/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java +++ b/ingester-protocol/src/main/java/io/greptime/GreptimeDB.java @@ -75,12 +75,21 @@ public class GreptimeDB implements Write, WriteObject, BulkWrite, Health, Lifecy private BulkWriteClient bulkWriteClient; /** - * Returns all instances of {@link GreptimeDB}. + * Gets all instances of {@link GreptimeDB}. + * + * @return all instances of {@link GreptimeDB} */ public static List instances() { return new ArrayList<>(INSTANCES.values()); } + /** + * Creates a new {@link GreptimeDB} instance. + * + * @param opts the options for the GreptimeDB client + * @return a new {@link GreptimeDB} instance + * @throws RuntimeException if the GreptimeDB client fails to start + */ public static GreptimeDB create(GreptimeOptions opts) { GreptimeDB greptimeDB = new GreptimeDB(); if (!greptimeDB.init(opts)) { diff --git a/ingester-protocol/src/main/java/io/greptime/RouterClient.java b/ingester-protocol/src/main/java/io/greptime/RouterClient.java index 0c39384..cc9ab5b 100644 --- a/ingester-protocol/src/main/java/io/greptime/RouterClient.java +++ b/ingester-protocol/src/main/java/io/greptime/RouterClient.java @@ -126,7 +126,9 @@ public void shutdownGracefully() { } /** - * Get the current routing table. + * Gets the current routing table. + * + * @return the current routing table */ public CompletableFuture route() { return this.router.routeFor(null); @@ -134,6 +136,13 @@ public CompletableFuture route() { /** * @see #invoke(Endpoint, Object, Context, long) + * + * @param endpoint the endpoint to invoke + * @param request the request to send + * @param ctx the context + * @return the response future + * @param request type + * @param response type */ public CompletableFuture invoke(Endpoint endpoint, Req request, Context ctx) { return invoke(endpoint, request, ctx, -1 /* use default rpc timeout */); diff --git a/ingester-protocol/src/main/java/io/greptime/Status.java b/ingester-protocol/src/main/java/io/greptime/Status.java index 946acf3..c0289ee 100644 --- a/ingester-protocol/src/main/java/io/greptime/Status.java +++ b/ingester-protocol/src/main/java/io/greptime/Status.java @@ -138,28 +138,35 @@ public enum Status { } /** - * Returns the status code. + * Gets the status code. + * + * @return the status code */ public int getStatusCode() { return statusCode; } /** - * Returns {@code true} if the status code is {@link #Success}. + * Checks if the status code is {@link #Success}. + * + * @param statusCode the status code + * @return {@code true} if the status code is {@link #Success} */ public static boolean isSuccess(int statusCode) { return statusCode == Success.getStatusCode(); } /** - * Returns {@code true} if the status code represents a retry-needed error. + * Checks if the status code represents a retry-needed error. + * + * @return {@code true} if the status code represents a retry-needed error */ public boolean isShouldRetry() { return shouldRetry; } /** - * Returns the {@link Status} for the specified status code. + * Gets the {@link Status} for the specified status code. * * @param statusCode the status code * @return the {@link Status} diff --git a/ingester-protocol/src/main/java/io/greptime/StreamWriter.java b/ingester-protocol/src/main/java/io/greptime/StreamWriter.java index d8bc988..d0909ae 100644 --- a/ingester-protocol/src/main/java/io/greptime/StreamWriter.java +++ b/ingester-protocol/src/main/java/io/greptime/StreamWriter.java @@ -28,6 +28,9 @@ public interface StreamWriter { /** * @see #write(Object, WriteOp) + * + * @param val data value + * @return this */ default StreamWriter write(V val) { return write(val, WriteOp.Insert); diff --git a/ingester-protocol/src/main/java/io/greptime/Write.java b/ingester-protocol/src/main/java/io/greptime/Write.java index 2ba794e..1a0738f 100644 --- a/ingester-protocol/src/main/java/io/greptime/Write.java +++ b/ingester-protocol/src/main/java/io/greptime/Write.java @@ -31,6 +31,9 @@ public interface Write { /** * @see #write(Collection, WriteOp, Context) + * + * @param tables the tables to write + * @return a future that completes with the write result */ default CompletableFuture> write(Table... tables) { return write(Arrays.asList(tables)); @@ -38,6 +41,9 @@ default CompletableFuture> write(Table... tables) { /** * @see #write(Collection, WriteOp, Context) + * + * @param tables the tables to write + * @return a future that completes with the write result */ default CompletableFuture> write(Collection tables) { return write(tables, WriteOp.Insert, Context.newDefault()); @@ -45,6 +51,10 @@ default CompletableFuture> write(Collection
tables) /** * @see #write(Collection, WriteOp, Context) + * + * @param tables the tables to write + * @param writeOp the write operation + * @return a future that completes with the write result */ default CompletableFuture> write(Collection
tables, WriteOp writeOp) { return write(tables, writeOp, Context.newDefault()); @@ -62,6 +72,8 @@ default CompletableFuture> write(Collection
tables, /** * @see #streamWriter(int, Context) + * + * @return a stream writer instance */ default StreamWriter streamWriter() { return streamWriter(-1); @@ -69,6 +81,9 @@ default StreamWriter streamWriter() { /** * @see #streamWriter(int, Context) + * + * @param maxPointsPerSecond the max number of points that can be written per second, exceeding which may cause blockage + * @return a stream writer instance */ default StreamWriter streamWriter(int maxPointsPerSecond) { return streamWriter(maxPointsPerSecond, Context.newDefault()); diff --git a/ingester-protocol/src/main/java/io/greptime/WriteObject.java b/ingester-protocol/src/main/java/io/greptime/WriteObject.java index 70d6c84..8a1640f 100644 --- a/ingester-protocol/src/main/java/io/greptime/WriteObject.java +++ b/ingester-protocol/src/main/java/io/greptime/WriteObject.java @@ -31,12 +31,18 @@ public interface WriteObject { /** * @see #writeObjects(Collection, WriteOp, Context) + * + * @param objects the objects to write + * @return a future that completes with the write result */ default CompletableFuture> writeObjects(List... objects) { return writeObjects(Arrays.asList(objects)); } /** * @see #writeObjects(Collection, WriteOp, Context) + * + * @param objects the objects to write + * @return a future that completes with the write result */ default CompletableFuture> writeObjects(Collection> objects) { return writeObjects(objects, WriteOp.Insert, Context.newDefault()); @@ -44,6 +50,10 @@ default CompletableFuture> writeObjects(Collection> /** * @see #writeObjects(Collection, WriteOp, Context) + * + * @param objects the objects to write + * @param writeOp the write operation + * @return a future that completes with the write result */ default CompletableFuture> writeObjects(Collection> objects, WriteOp writeOp) { return writeObjects(objects, writeOp, Context.newDefault()); @@ -61,6 +71,8 @@ default CompletableFuture> writeObjects(Collection> /** * @see #objectsStreamWriter(int, Context) + * + * @return a stream writer instance */ default StreamWriter, WriteOk> objectsStreamWriter() { return objectsStreamWriter(-1); @@ -68,6 +80,9 @@ default StreamWriter, WriteOk> objectsStreamWriter() { /** * @see #objectsStreamWriter(int, Context) + * + * @param maxPointsPerSecond the max number of points that can be written per second, exceeding which may cause blockage + * @return a stream writer instance */ default StreamWriter, WriteOk> objectsStreamWriter(int maxPointsPerSecond) { return objectsStreamWriter(maxPointsPerSecond, Context.newDefault()); diff --git a/ingester-protocol/src/main/java/io/greptime/limit/Limiter.java b/ingester-protocol/src/main/java/io/greptime/limit/Limiter.java index 931360c..fec9dbc 100644 --- a/ingester-protocol/src/main/java/io/greptime/limit/Limiter.java +++ b/ingester-protocol/src/main/java/io/greptime/limit/Limiter.java @@ -46,6 +46,9 @@ public interface Limiter { /** * @see #tryAcquire(int, long, TimeUnit) + * + * @param permits the number of permits to acquire + * @return {@code true} if the permits were acquired, {@code false} otherwise. */ default boolean tryAcquire(int permits) { return tryAcquire(permits, 0, TimeUnit.NANOSECONDS); diff --git a/ingester-protocol/src/main/java/io/greptime/limit/WriteLimiter.java b/ingester-protocol/src/main/java/io/greptime/limit/WriteLimiter.java index 3ec9024..c1e4919 100644 --- a/ingester-protocol/src/main/java/io/greptime/limit/WriteLimiter.java +++ b/ingester-protocol/src/main/java/io/greptime/limit/WriteLimiter.java @@ -23,7 +23,7 @@ import java.util.Collection; /** - * Like rust: pub type WriteLimiter = AbstractLimiter> + * Like rust: {@code pub type WriteLimiter = AbstractLimiter>} */ public abstract class WriteLimiter extends AbstractLimiter, Result> { diff --git a/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java b/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java index f4f5f8d..f8ed5b4 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java +++ b/ingester-protocol/src/main/java/io/greptime/models/AuthInfo.java @@ -31,16 +31,29 @@ public class AuthInfo implements Into { /** * Create AuthInfo from username/password. + * + * @param username the username + * @param password the password */ public AuthInfo(String username, String password) { this.username = username; this.password = password; } + /** + * Creates an AuthInfo with no authorization. + * + * @return an AuthInfo with no authorization + */ public static AuthInfo noAuthorization() { return null; } + /** + * Converts the AuthInfo to a base64 encoded string. + * + * @return the base64 encoded string + */ public String toBase64() { String authInfoStr = String.format("%s:%s", this.username, this.password); return Base64.getEncoder().encodeToString(authInfoStr.getBytes(StandardCharsets.UTF_8)); diff --git a/ingester-protocol/src/main/java/io/greptime/models/Column.java b/ingester-protocol/src/main/java/io/greptime/models/Column.java index b15bdd7..01dcf2e 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Column.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Column.java @@ -34,6 +34,8 @@ * It is strongly recommended to use snake case naming convention and avoid * using camel case. This is because GreptimeDB treats column names as * case-insensitive, which can cause confusion when querying with camel case. + * + * @return the name of the column in the table */ String name(); diff --git a/ingester-protocol/src/main/java/io/greptime/models/Err.java b/ingester-protocol/src/main/java/io/greptime/models/Err.java index fd318d5..2cf03aa 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Err.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Err.java @@ -30,28 +30,37 @@ public class Err { private Endpoint errTo; /** - * Returns the error code. + * Gets the error code. + * + * @return the error code */ public int getCode() { return code; } /** - * Returns the error. + * Gets the error. + * + * @return the error */ public Throwable getError() { return error; } /** - * Returns the server address where the error occurred. + * Gets the server address where the error occurred. + * + * @return the server address where the error occurred */ public Endpoint getErrTo() { return errTo; } /** - * Returns a {@link Result} containing this error. + * Maps this error to a {@link Result}. + * + * @param the type of the result + * @return a {@link Result} containing this error */ public Result mapToResult() { return Result.err(this); diff --git a/ingester-protocol/src/main/java/io/greptime/models/Result.java b/ingester-protocol/src/main/java/io/greptime/models/Result.java index b714bee..901e885 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Result.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Result.java @@ -34,7 +34,7 @@ public final class Result { * Creates a new `Result` from the given value. * * @param ok the value - * @return `Result` + * @return a new `Result` from the given value * @param the value type * @param the error type */ @@ -47,7 +47,7 @@ public static Result ok(Ok ok) { * Creates a new `Result` from the given error. * * @param err the error - * @return `Result` + * @return a new `Result` from the given error * @param the value type * @param the error type */ @@ -62,35 +62,42 @@ private Result(Ok ok, Err err) { } /** - * Returns `true` if the result is [`Ok`]. + * Checks if the result is [`Ok`]. + * + * @return {@code true} if the result is [`Ok`] */ public boolean isOk() { return this.ok != null && this.err == null; } /** - * Returns the [`Ok`] value. + * Gets the [`Ok`] value. + * + * @return the [`Ok`] value */ public Ok getOk() { return Ensures.ensureNonNull(this.ok, "null `ok`"); } /** - * Returns the [`Err`]. + * Gets the [`Err`]. + * + * @return the [`Err`] */ public Err getErr() { return Ensures.ensureNonNull(this.err, "null `err`"); } /** - * Maps a `Result` to `Result` by applying a function to + * Maps a {@code Result} to {@code Result} by applying a function to * a contained [`Ok`] value, leaving an [`Err`] value untouched. *

* This function can be used to compose the results of two functions. * * @param mapper a function to a contained [`Ok`] value * @param the [`Ok`] value type to map to - * @return `Result` + * @return a new `Result` by applying the mapper function to the [`Ok`] value, + * or a `Result` containing the original error if this result is an [`Err`] */ public Result map(Function mapper) { return isOk() ? Result.ok(mapper.apply(getOk())) : Result.err(getErr()); @@ -115,7 +122,7 @@ public U mapOr(U defaultVal, Function mapper) { } /** - * Maps a `Result` to `U` by applying a fallback function to a + * Maps a {@code Result} to {@code U} by applying a fallback function to a * contained [`Err`] value, or a default function to a contained [`Ok`] * value. *

@@ -125,7 +132,7 @@ public U mapOr(U defaultVal, Function mapper) { * @param fallbackMapper a fallback function to a contained [`Err`] value * @param mapper a function to a contained [`Ok`] value * @param the value type to map to - * @return `U` by applying a fallback function to a contained [`Err`] value, + * @return {@code U} by applying a fallback function to a contained [`Err`] value, * or a default function to a contained [`Ok`] value. */ public U mapOrElse(Function fallbackMapper, Function mapper) { @@ -133,12 +140,13 @@ public U mapOrElse(Function fallbackMapper, Function mapper) } /** - * Maps a `Result` to `Result` by applying a function to a + * Maps a {@code Result} to {@code Result} by applying a function to a * contained [`Err`] value, leaving an [`Ok`] value untouched. * * @param mapper a function to a contained [`Err`] value * @param the error type to map to - * @return `Result` + * @return a new `Result` by applying the mapper function to the [`Err`] value, + * or a `Result` containing the original [`Ok`] value if this result is an [`Ok`] */ public Result mapErr(Function mapper) { return isOk() ? Result.ok(getOk()) : Result.err(mapper.apply(getErr())); @@ -149,7 +157,8 @@ public Result mapErr(Function mapper) { * * @param mapper a function to a contained [`Ok`] value * @param the value type witch mapped to - * @return `Result` + * @return a new `Result` by applying the mapper function to the [`Ok`] value, + * or a `Result` containing the original [`Err`] value if this result is an [`Err`] */ public Result andThen(Function> mapper) { return isOk() ? mapper.apply(getOk()) : Result.err(getErr()); @@ -160,7 +169,8 @@ public Result andThen(Function> mapper) { * * @param mapper a function to a contained [`Err`] value * @param the error type to map to - * @return `Result` + * @return a new `Result` by applying the mapper function to the [`Err`] value, + * or a `Result` containing the original [`Ok`] value if this result is an [`Ok`] */ public Result orElse(Function> mapper) { return isOk() ? Result.ok(getOk()) : mapper.apply(getErr()); diff --git a/ingester-protocol/src/main/java/io/greptime/models/SemanticType.java b/ingester-protocol/src/main/java/io/greptime/models/SemanticType.java index 9f4c97d..64f4eb7 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/SemanticType.java +++ b/ingester-protocol/src/main/java/io/greptime/models/SemanticType.java @@ -28,6 +28,8 @@ public enum SemanticType { /** * Converts to the corresponding proto value. + * + * @return the corresponding proto value */ public Common.SemanticType toProtoValue() { switch (this) { diff --git a/ingester-protocol/src/main/java/io/greptime/models/Table.java b/ingester-protocol/src/main/java/io/greptime/models/Table.java index 0d0b6d8..efcac71 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/Table.java +++ b/ingester-protocol/src/main/java/io/greptime/models/Table.java @@ -66,39 +66,61 @@ public interface Table { /** - * The table name to write. + * Gets the table name to write. + * + * @return the table name */ String tableName(); - /**ยทยท - * The rows count to write. + /** + * Gets the rows count to write. + * + * @return the rows count */ int rowCount(); /** - * The columns count to write. + * Gets the columns count to write. + * + * @return the columns count */ int columnCount(); /** - * The points count to write. + * Gets the points count to write. + * + * @return the points count */ default int pointCount() { return rowCount() * columnCount(); } /** - * The bytes used by the table. + * Gets the bytes used by the table. + * + * @return the bytes used by the table */ long bytesUsed(); /** - * Insets one row with all columns. + * Inserts one row with all columns. * + *

* The order of the values must be the same as the order of the schema. + *

+ * + * @param values the values to insert + * @return this */ Table addRow(Object... values); + /** + * Gets a sub-range of the table. + * + * @param fromIndex the start index + * @param toIndex the end index + * @return a sub-range of the table + */ Table subRange(int fromIndex, int toIndex); /** @@ -111,16 +133,20 @@ default int pointCount() { * rows are properly flushed to the underlying storage. Failure to call this method * may result in data loss, particularly for implementations that use internal buffering. *

+ * + * @return this */ Table complete(); /** - * Returns true if the table has been completed. + * Checks if the table has been completed. + * + * @return true if the table has been completed */ boolean isCompleted(); /** - * Convert to {@link Database.RowInsertRequest}. + * Converts to {@link Database.RowInsertRequest}. * * @return {@link Database.RowInsertRequest} */ @@ -129,7 +155,7 @@ default Database.RowInsertRequest intoRowInsertRequest() { } /** - * Convert to {@link Database.RowDeleteRequest}. + * Converts to {@link Database.RowDeleteRequest}. * * @return {@link Database.RowDeleteRequest} */ diff --git a/ingester-protocol/src/main/java/io/greptime/models/WriteOk.java b/ingester-protocol/src/main/java/io/greptime/models/WriteOk.java index 0969eec..39c0598 100644 --- a/ingester-protocol/src/main/java/io/greptime/models/WriteOk.java +++ b/ingester-protocol/src/main/java/io/greptime/models/WriteOk.java @@ -25,14 +25,18 @@ public class WriteOk { private int failure; /** - * Returns the number of successful writes. + * Gets the number of successful writes. + * + * @return the number of successful writes */ public int getSuccess() { return success; } /** - * Returns the number of failed writes. + * Gets the number of failed writes. + * + * @return the number of failed writes */ public int getFailure() { return failure; @@ -51,14 +55,20 @@ public String toString() { } /** - * Returns an empty {@link WriteOk}. + * Creates an empty {@link WriteOk}. + * + * @return an empty {@link WriteOk} */ public static WriteOk emptyOk() { return ok(0, 0); } /** - * Creates a new {@link WriteOk} from the given value. + * Creates a new {@link WriteOk} from the given values. + * + * @param success the number of successful writes + * @param failure the number of failed writes + * @return a new {@link WriteOk} */ public static WriteOk ok(int success, int failure) { WriteOk ok = new WriteOk();