Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2055,6 +2055,15 @@
file. Unit could be defined with postfix (ns,ms,s,m,h,d)
</description>
</property>
<property>
<name>ozone.om.snapshot.directory.metrics.update.interval</name>
<value>5m</value>
Comment thread
sadanand48 marked this conversation as resolved.
<tag>OZONE, OM</tag>
<description>Time interval used to update the space consumption stats of the
Ozone Manager snapshot directories. Background thread periodically calculates
and updates these stats. Unit could be defined with postfix (ns,ms,s,m,h,d)
</description>
</property>
<property>
<name>ozone.security.enabled</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ public final class OMConfigKeys {
public static final boolean
OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED_DEFAULT = false;

public static final String OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL =
"ozone.om.snapshot.directory.metrics.update.interval";
public static final String OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT = "5m";

/**
* OM Ratis related configurations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.DBCheckpointMetrics;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.ozone.om.snapshot.OMSnapshotDirectoryMetrics;

/**
* This class is for maintaining Ozone Manager statistics.
Expand Down Expand Up @@ -245,6 +247,7 @@ public class OMMetrics implements OmMetadataReaderMetrics {
private @Metric MutableCounterLong ecBucketCreateTotal;
private @Metric MutableCounterLong ecBucketCreateFailsTotal;
private final DBCheckpointMetrics dbCheckpointMetrics;
private OMSnapshotDirectoryMetrics snapshotDirectoryMetrics;

public OMMetrics() {
dbCheckpointMetrics = DBCheckpointMetrics.create("OM Metrics");
Expand All @@ -261,6 +264,28 @@ public DBCheckpointMetrics getDBCheckpointMetrics() {
return dbCheckpointMetrics;
}

/**
* Starts periodic updates for snapshot directory metrics.
* Creates the metrics instance if it doesn't exist.
*
* @param configuration OzoneConfiguration for reading update interval
* @param metadataManager OMMetadataManager for accessing snapshot directories
*/
public void startSnapshotDirectoryMetrics(OzoneConfiguration configuration,
OMMetadataManager metadataManager) {
if (snapshotDirectoryMetrics == null) {
snapshotDirectoryMetrics = OMSnapshotDirectoryMetrics.create(configuration,
"OM Metrics", metadataManager);
}
snapshotDirectoryMetrics.start();
}

public void stopSnapshotDirectoryMetrics() {
if (snapshotDirectoryMetrics != null) {
snapshotDirectoryMetrics.stop();
}
}

public void incNumS3BucketCreates() {
numBucketOps.incr();
numBucketS3Creates.incr();
Expand Down Expand Up @@ -1502,6 +1527,9 @@ public void unRegister() {
if (dbCheckpointMetrics != null) {
dbCheckpointMetrics.unRegister();
}
if (snapshotDirectoryMetrics != null) {
snapshotDirectoryMetrics.unRegister();
}
MetricsSystem ms = DefaultMetricsSystem.instance();
ms.unregisterSource(SOURCE_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,10 @@ public void start() throws IOException {
metricsTimer = new Timer();
metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);

LOG.info("Starting OmSnapshotDirectoryMetrics");

metrics.startSnapshotDirectoryMetrics(configuration, getMetadataManager());

try {
scmTopologyClient.start(configuration);
} catch (IOException ex) {
Expand Down Expand Up @@ -1950,6 +1954,10 @@ public void restart() throws IOException {
metricsTimer = new Timer();
metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);

// Restart snapshot directory metrics updates
metrics.stopSnapshotDirectoryMetrics();
metrics.startSnapshotDirectoryMetrics(configuration, getMetadataManager());

initializeRatisServer(false);
if (omRatisServer != null) {
omRatisServer.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.ha;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Generic framework for metrics that need to get updated on a specified interval.
* A single threaded scheduled thread pool executor is created.
* The implementing class should only define the logic in updateMetrics()
*/
public abstract class OMPeriodicMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(OMPeriodicMetrics.class);
private final AtomicLong lastUpdateTime = new AtomicLong(0);
private ScheduledExecutorService updateExecutor;
private ScheduledFuture<?> updateTask;
private final String metricsTaskName;
private final long updateInterval;
private volatile boolean started = false;

protected OMPeriodicMetrics(String metricsTaskName, long updateInterval) {
if (metricsTaskName == null || metricsTaskName.isEmpty()) {
throw new IllegalArgumentException("metricsTaskName cannot be null or empty");
}
if (updateInterval <= 0) {
throw new IllegalArgumentException("updateInterval must be positive");
}
this.metricsTaskName = metricsTaskName;
this.updateInterval = updateInterval;
}

public void start() {
if (started) {
LOG.warn("Periodic metrics '{}' already started, ignoring duplicate start()",
metricsTaskName);
return;
}
updateExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, metricsTaskName);
t.setDaemon(true);
return t;
});
// Schedule periodic updates
updateTask = updateExecutor.scheduleWithFixedDelay(() -> {
try {
boolean success = updateMetrics();
if (success) {
lastUpdateTime.set(System.currentTimeMillis());
}
} catch (Exception e) {
LOG.error("Failed to update metrics for periodic metrics", e);
}
}, 0, updateInterval, TimeUnit.MILLISECONDS);
started = true;
}

/**
* Updates the metrics periodically. This method is called by the framework
* at the configured interval after {@link #start()} is called.
* <p>
* Implementations should perform the actual metrics calculation and update
* logic here. The method should be thread-safe as it may be called from
* the scheduled executor thread.
*
* @return {@code true} if the metrics update was successful,
* {@code false} if the update should be considered unsuccessful
* (e.g., due to missing prerequisites or non-fatal errors).
* When {@code false} is returned, {@link #getLastUpdateTime()}
* will not be updated.
*/
protected abstract boolean updateMetrics();

/**
* Stops the periodic metrics update task.
*/
public void stop() {
if (!started) {
return; // Already stopped or never started
}
if (updateTask != null) {
updateTask.cancel(false); // Don't interrupt if running
updateTask = null;
}

if (updateExecutor != null) {
updateExecutor.shutdown();
try {
// Wait for any running updateMetrics() to complete (with timeout)
if (!updateExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.warn("Metrics update executor did not terminate in time, forcing shutdown");
updateExecutor.shutdownNow();
// Wait a bit more for cancellation to take effect
if (!updateExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.error("Metrics update executor did not terminate after force shutdown");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
updateExecutor.shutdownNow();
}
updateExecutor = null;
}
started = false; // Reset
}

public long getLastUpdateTime() {
return lastUpdateTime.get();
}
}
Comment thread
sadanand48 marked this conversation as resolved.
Loading