From 3ccaff975db8ab7609e6c9c90af1d4ecc2a6aaaf Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 13 May 2025 16:25:35 +0800 Subject: [PATCH] feat: prometheus metrics exporter --- ingester-all/pom.xml | 4 ++ .../io/greptime/BulkWriteApiQuickStart.java | 2 + .../io/greptime/bench/BulkWriteBenchmark.java | 7 ++ .../bench/StreamingWriteBenchmark.java | 7 ++ ingester-prometheus-metrics/pom.xml | 49 +++++++++++++ .../io/greptime/metrics/MetricsExporter.java | 72 +++++++++++++++++++ pom.xml | 6 ++ 7 files changed, 147 insertions(+) create mode 100644 ingester-prometheus-metrics/pom.xml create mode 100644 ingester-prometheus-metrics/src/main/java/io/greptime/metrics/MetricsExporter.java diff --git a/ingester-all/pom.xml b/ingester-all/pom.xml index 8545d2e..7fb314b 100644 --- a/ingester-all/pom.xml +++ b/ingester-all/pom.xml @@ -47,5 +47,9 @@ ${project.groupId} ingester-bulk-protocol + + ${project.groupId} + ingester-prometheus-metrics + diff --git a/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java b/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java index 4e0961e..114282a 100644 --- a/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java +++ b/ingester-example/src/main/java/io/greptime/BulkWriteApiQuickStart.java @@ -118,6 +118,8 @@ public static void main(String[] args) throws Exception { bulkStreamWriter.completed(); } + + greptimeDB.shutdownGracefully(); } private static Object[] generateOneRow(int cardinality) { diff --git a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java index b4c8f65..cb6facc 100644 --- a/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java @@ -19,8 +19,10 @@ import io.greptime.BulkStreamWriter; import io.greptime.BulkWrite; import io.greptime.GreptimeDB; +import io.greptime.common.util.MetricsUtil; import io.greptime.common.util.ServiceLoader; import io.greptime.common.util.SystemPropertyUtil; +import io.greptime.metrics.MetricsExporter; import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.rpc.Compression; @@ -52,6 +54,10 @@ public static void main(String[] args) throws Exception { LOG.info("Using zstd compression: {}", zstdCompression); LOG.info("Batch size: {}", batchSize); + // Start a metrics exporter + MetricsExporter metricsExporter = new MetricsExporter(8080, MetricsUtil.metricRegistry()); + metricsExporter.init(null); + GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName); BulkWrite.Config cfg = BulkWrite.Config.newBuilder() @@ -111,5 +117,6 @@ public static void main(String[] args) throws Exception { } greptimeDB.shutdownGracefully(); + metricsExporter.shutdownGracefully(); } } diff --git a/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java b/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java index a5e3ada..7d99220 100644 --- a/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java +++ b/ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java @@ -18,8 +18,10 @@ import io.greptime.GreptimeDB; import io.greptime.StreamWriter; +import io.greptime.common.util.MetricsUtil; import io.greptime.common.util.ServiceLoader; import io.greptime.common.util.SystemPropertyUtil; +import io.greptime.metrics.MetricsExporter; import io.greptime.models.Table; import io.greptime.models.TableSchema; import io.greptime.models.WriteOk; @@ -55,6 +57,10 @@ public static void main(String[] args) throws Exception { LOG.info("Batch size: {}", batchSize); LOG.info("Max points per second: {}", maxPointsPerSecond); + // Start a metrics exporter + MetricsExporter metricsExporter = new MetricsExporter(8080, MetricsUtil.metricRegistry()); + metricsExporter.init(null); + GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName); Compression compression = zstdCompression ? Compression.Zstd : Compression.None; @@ -100,5 +106,6 @@ public static void main(String[] args) throws Exception { greptimeDB.shutdownGracefully(); tableDataProvider.close(); + metricsExporter.shutdownGracefully(); } } diff --git a/ingester-prometheus-metrics/pom.xml b/ingester-prometheus-metrics/pom.xml new file mode 100644 index 0000000..aef29c9 --- /dev/null +++ b/ingester-prometheus-metrics/pom.xml @@ -0,0 +1,49 @@ + + + + 4.0.0 + + io.greptime + greptimedb-ingester + 0.14.2 + + ingester-prometheus-metrics + + + 0.16.0 + + + + + io.greptime + ingester-common + + + + io.prometheus + simpleclient_dropwizard + ${prometheus.version} + + + io.prometheus + simpleclient_httpserver + ${prometheus.version} + + + diff --git a/ingester-prometheus-metrics/src/main/java/io/greptime/metrics/MetricsExporter.java b/ingester-prometheus-metrics/src/main/java/io/greptime/metrics/MetricsExporter.java new file mode 100644 index 0000000..0c161de --- /dev/null +++ b/ingester-prometheus-metrics/src/main/java/io/greptime/metrics/MetricsExporter.java @@ -0,0 +1,72 @@ +/* + * Copyright 2023 Greptime Team + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.greptime.metrics; + +import com.codahale.metrics.MetricRegistry; +import io.greptime.common.Lifecycle; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.dropwizard.DropwizardExports; +import io.prometheus.client.exporter.HTTPServer; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsExporter implements Lifecycle { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsExporter.class); + + private final CollectorRegistry prometheusMetricRegistry; + + private final int port; + private HTTPServer server; + + private final AtomicBoolean started = new AtomicBoolean(false); + + public MetricsExporter(int port, MetricRegistry dropwizardMetricRegistry) { + this.port = port; + this.prometheusMetricRegistry = new CollectorRegistry(); + this.prometheusMetricRegistry.register(new DropwizardExports(dropwizardMetricRegistry)); + } + + @Override + public boolean init(Void opts) { + if (this.started.compareAndSet(false, true)) { + try { + this.server = new HTTPServer(new InetSocketAddress(this.port), this.prometheusMetricRegistry); + LOG.info("Metrics exporter started at `http://localhost:{}/metrics`", this.port); + return true; + } catch (IOException e) { + this.started.set(false); + LOG.error("Failed to start metrics exporter", e); + throw new RuntimeException("Failed to start metrics exporter", e); + } + } + return false; + } + + @Override + public void shutdownGracefully() { + if (this.started.compareAndSet(true, false)) { + if (this.server != null) { + this.server.close(); + LOG.info("Metrics exporter stopped"); + } + } + } +} diff --git a/pom.xml b/pom.xml index dd7240b..462a136 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ ingester-example ingester-all ingester-bulk-protocol + ingester-prometheus-metrics @@ -122,6 +123,11 @@ ingester-grpc ${project.version} + + ${project.groupId} + ingester-prometheus-metrics + ${project.version} + ${project.groupId} ingester-protocol