diff --git a/ingester-all/pom.xml b/ingester-all/pom.xml
index 8545d2e..7fb314b 100644
--- a/ingester-all/pom.xml
+++ b/ingester-all/pom.xml
@@ -47,5 +47,9 @@
${project.groupId}
ingester-bulk-protocol
+
+ ${project.groupId}
+ ingester-prometheus-metrics
+
diff --git a/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java b/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java
index 4e0961e..114282a 100644
--- a/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java
+++ b/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java
@@ -118,6 +118,8 @@ public static void main(String[] args) throws Exception {
bulkStreamWriter.completed();
}
+
+ greptimeDB.shutdownGracefully();
}
private static Object[] generateOneRow(int cardinality) {
diff --git a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java
index b4c8f65..cb6facc 100644
--- a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java
+++ b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java
@@ -19,8 +19,10 @@
import io.greptime.BulkStreamWriter;
import io.greptime.BulkWrite;
import io.greptime.GreptimeDB;
+import io.greptime.common.util.MetricsUtil;
import io.greptime.common.util.ServiceLoader;
import io.greptime.common.util.SystemPropertyUtil;
+import io.greptime.metrics.MetricsExporter;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.rpc.Compression;
@@ -52,6 +54,10 @@ public static void main(String[] args) throws Exception {
LOG.info("Using zstd compression: {}", zstdCompression);
LOG.info("Batch size: {}", batchSize);
+ // Start a metrics exporter
+ MetricsExporter metricsExporter = new MetricsExporter(8080, MetricsUtil.metricRegistry());
+ metricsExporter.init(null);
+
GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName);
BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
@@ -111,5 +117,6 @@ public static void main(String[] args) throws Exception {
}
greptimeDB.shutdownGracefully();
+ metricsExporter.shutdownGracefully();
}
}
diff --git a/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java
index a5e3ada..7d99220 100644
--- a/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java
+++ b/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java
@@ -18,8 +18,10 @@
import io.greptime.GreptimeDB;
import io.greptime.StreamWriter;
+import io.greptime.common.util.MetricsUtil;
import io.greptime.common.util.ServiceLoader;
import io.greptime.common.util.SystemPropertyUtil;
+import io.greptime.metrics.MetricsExporter;
import io.greptime.models.Table;
import io.greptime.models.TableSchema;
import io.greptime.models.WriteOk;
@@ -55,6 +57,10 @@ public static void main(String[] args) throws Exception {
LOG.info("Batch size: {}", batchSize);
LOG.info("Max points per second: {}", maxPointsPerSecond);
+ // Start a metrics exporter
+ MetricsExporter metricsExporter = new MetricsExporter(8080, MetricsUtil.metricRegistry());
+ metricsExporter.init(null);
+
GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName);
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
@@ -100,5 +106,6 @@ public static void main(String[] args) throws Exception {
greptimeDB.shutdownGracefully();
tableDataProvider.close();
+ metricsExporter.shutdownGracefully();
}
}
diff --git a/ingester-prometheus-metrics/pom.xml b/ingester-prometheus-metrics/pom.xml
new file mode 100644
index 0000000..aef29c9
--- /dev/null
+++ b/ingester-prometheus-metrics/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+ 4.0.0
+
+ io.greptime
+ greptimedb-ingester
+ 0.14.2
+
+ ingester-prometheus-metrics
+
+
+ 0.16.0
+
+
+
+
+ io.greptime
+ ingester-common
+
+
+
+ io.prometheus
+ simpleclient_dropwizard
+ ${prometheus.version}
+
+
+ io.prometheus
+ simpleclient_httpserver
+ ${prometheus.version}
+
+
+
diff --git a/ingester-prometheus-metrics/src/main/java/io/greptime/metrics/MetricsExporter.java b/ingester-prometheus-metrics/src/main/java/io/greptime/metrics/MetricsExporter.java
new file mode 100644
index 0000000..0c161de
--- /dev/null
+++ b/ingester-prometheus-metrics/src/main/java/io/greptime/metrics/MetricsExporter.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2023 Greptime Team
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.greptime.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import io.greptime.common.Lifecycle;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.dropwizard.DropwizardExports;
+import io.prometheus.client.exporter.HTTPServer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricsExporter implements Lifecycle {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetricsExporter.class);
+
+ private final CollectorRegistry prometheusMetricRegistry;
+
+ private final int port;
+ private HTTPServer server;
+
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ public MetricsExporter(int port, MetricRegistry dropwizardMetricRegistry) {
+ this.port = port;
+ this.prometheusMetricRegistry = new CollectorRegistry();
+ this.prometheusMetricRegistry.register(new DropwizardExports(dropwizardMetricRegistry));
+ }
+
+ @Override
+ public boolean init(Void opts) {
+ if (this.started.compareAndSet(false, true)) {
+ try {
+ this.server = new HTTPServer(new InetSocketAddress(this.port), this.prometheusMetricRegistry);
+ LOG.info("Metrics exporter started at `http://localhost:{}/metrics`", this.port);
+ return true;
+ } catch (IOException e) {
+ this.started.set(false);
+ LOG.error("Failed to start metrics exporter", e);
+ throw new RuntimeException("Failed to start metrics exporter", e);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void shutdownGracefully() {
+ if (this.started.compareAndSet(true, false)) {
+ if (this.server != null) {
+ this.server.close();
+ LOG.info("Metrics exporter stopped");
+ }
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index dd7240b..462a136 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
ingester-example
ingester-all
ingester-bulk-protocol
+ ingester-prometheus-metrics
@@ -122,6 +123,11 @@
ingester-grpc
${project.version}
+
+ ${project.groupId}
+ ingester-prometheus-metrics
+ ${project.version}
+
${project.groupId}
ingester-protocol