diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 5d36eb3b8f29..5eb3c2601de3 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -2055,6 +2055,15 @@ file. Unit could be defined with postfix (ns,ms,s,m,h,d) + + ozone.om.snapshot.directory.metrics.update.interval + 5m + OZONE, OM + Time interval used to update the space consumption stats of the + Ozone Manager snapshot directories. Background thread periodically calculates + and updates these stats. Unit could be defined with postfix (ns,ms,s,m,h,d) + + ozone.security.enabled false diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index ce00ec86b929..69e62d427857 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -167,6 +167,10 @@ public final class OMConfigKeys { public static final boolean OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED_DEFAULT = false; + public static final String OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL = + "ozone.om.snapshot.directory.metrics.update.interval"; + public static final String OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT = "5m"; + /** * OM Ratis related configurations. */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index 1fb49e5e7fd2..6ec97869a95c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.DBCheckpointMetrics; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; @@ -26,6 +27,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.ozone.om.snapshot.OMSnapshotDirectoryMetrics; /** * This class is for maintaining Ozone Manager statistics. @@ -245,6 +247,7 @@ public class OMMetrics implements OmMetadataReaderMetrics { private @Metric MutableCounterLong ecBucketCreateTotal; private @Metric MutableCounterLong ecBucketCreateFailsTotal; private final DBCheckpointMetrics dbCheckpointMetrics; + private OMSnapshotDirectoryMetrics snapshotDirectoryMetrics; public OMMetrics() { dbCheckpointMetrics = DBCheckpointMetrics.create("OM Metrics"); @@ -261,6 +264,28 @@ public DBCheckpointMetrics getDBCheckpointMetrics() { return dbCheckpointMetrics; } + /** + * Starts periodic updates for snapshot directory metrics. + * Creates the metrics instance if it doesn't exist. + * + * @param configuration OzoneConfiguration for reading update interval + * @param metadataManager OMMetadataManager for accessing snapshot directories + */ + public void startSnapshotDirectoryMetrics(OzoneConfiguration configuration, + OMMetadataManager metadataManager) { + if (snapshotDirectoryMetrics == null) { + snapshotDirectoryMetrics = OMSnapshotDirectoryMetrics.create(configuration, + "OM Metrics", metadataManager); + } + snapshotDirectoryMetrics.start(); + } + + public void stopSnapshotDirectoryMetrics() { + if (snapshotDirectoryMetrics != null) { + snapshotDirectoryMetrics.stop(); + } + } + public void incNumS3BucketCreates() { numBucketOps.incr(); numBucketS3Creates.incr(); @@ -1502,6 +1527,9 @@ public void unRegister() { if (dbCheckpointMetrics != null) { dbCheckpointMetrics.unRegister(); } + if (snapshotDirectoryMetrics != null) { + snapshotDirectoryMetrics.unRegister(); + } MetricsSystem ms = DefaultMetricsSystem.instance(); ms.unregisterSource(SOURCE_NAME); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 3f7f979e9bf2..82cdd1d16597 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -1868,6 +1868,10 @@ public void start() throws IOException { metricsTimer = new Timer(); metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period); + LOG.info("Starting OmSnapshotDirectoryMetrics"); + + metrics.startSnapshotDirectoryMetrics(configuration, getMetadataManager()); + try { scmTopologyClient.start(configuration); } catch (IOException ex) { @@ -1950,6 +1954,10 @@ public void restart() throws IOException { metricsTimer = new Timer(); metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period); + // Restart snapshot directory metrics updates + metrics.stopSnapshotDirectoryMetrics(); + metrics.startSnapshotDirectoryMetrics(configuration, getMetadataManager()); + initializeRatisServer(false); if (omRatisServer != null) { omRatisServer.start(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMPeriodicMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMPeriodicMetrics.java new file mode 100644 index 000000000000..2e4b63c4fbc9 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMPeriodicMetrics.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ha; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generic framework for metrics that need to get updated on a specified interval. + * A single threaded scheduled thread pool executor is created. + * The implementing class should only define the logic in updateMetrics() + */ +public abstract class OMPeriodicMetrics { + private static final Logger LOG = + LoggerFactory.getLogger(OMPeriodicMetrics.class); + private final AtomicLong lastUpdateTime = new AtomicLong(0); + private ScheduledExecutorService updateExecutor; + private ScheduledFuture updateTask; + private final String metricsTaskName; + private final long updateInterval; + private volatile boolean started = false; + + protected OMPeriodicMetrics(String metricsTaskName, long updateInterval) { + if (metricsTaskName == null || metricsTaskName.isEmpty()) { + throw new IllegalArgumentException("metricsTaskName cannot be null or empty"); + } + if (updateInterval <= 0) { + throw new IllegalArgumentException("updateInterval must be positive"); + } + this.metricsTaskName = metricsTaskName; + this.updateInterval = updateInterval; + } + + public void start() { + if (started) { + LOG.warn("Periodic metrics '{}' already started, ignoring duplicate start()", + metricsTaskName); + return; + } + updateExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, metricsTaskName); + t.setDaemon(true); + return t; + }); + // Schedule periodic updates + updateTask = updateExecutor.scheduleWithFixedDelay(() -> { + try { + boolean success = updateMetrics(); + if (success) { + lastUpdateTime.set(System.currentTimeMillis()); + } + } catch (Exception e) { + LOG.error("Failed to update metrics for periodic metrics", e); + } + }, 0, updateInterval, TimeUnit.MILLISECONDS); + started = true; + } + + /** + * Updates the metrics periodically. This method is called by the framework + * at the configured interval after {@link #start()} is called. + *

+ * Implementations should perform the actual metrics calculation and update + * logic here. The method should be thread-safe as it may be called from + * the scheduled executor thread. + * + * @return {@code true} if the metrics update was successful, + * {@code false} if the update should be considered unsuccessful + * (e.g., due to missing prerequisites or non-fatal errors). + * When {@code false} is returned, {@link #getLastUpdateTime()} + * will not be updated. + */ + protected abstract boolean updateMetrics(); + + /** + * Stops the periodic metrics update task. + */ + public void stop() { + if (!started) { + return; // Already stopped or never started + } + if (updateTask != null) { + updateTask.cancel(false); // Don't interrupt if running + updateTask = null; + } + + if (updateExecutor != null) { + updateExecutor.shutdown(); + try { + // Wait for any running updateMetrics() to complete (with timeout) + if (!updateExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.warn("Metrics update executor did not terminate in time, forcing shutdown"); + updateExecutor.shutdownNow(); + // Wait a bit more for cancellation to take effect + if (!updateExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.error("Metrics update executor did not terminate after force shutdown"); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + updateExecutor.shutdownNow(); + } + updateExecutor = null; + } + started = false; // Reset + } + + public long getLastUpdateTime() { + return lastUpdateTime.get(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java new file mode 100644 index 000000000000..9e485b769eb5 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OMSnapshotDirectoryMetrics.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.snapshot; + +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.hdds.annotation.InterfaceAudience; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.ha.OMPeriodicMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics for tracking db.snapshots directory space usage and SST file counts. + * Provides aggregate metrics. + * Metrics are updated asynchronously to avoid blocking operations. + */ +@InterfaceAudience.Private +@Metrics(about = "OM Snapshot Directory Metrics", context = OzoneConsts.OZONE) +public final class OMSnapshotDirectoryMetrics extends OMPeriodicMetrics implements MetricsSource { + private static final Logger LOG = + LoggerFactory.getLogger(OMSnapshotDirectoryMetrics.class); + private static final String SOURCE_NAME = + OMSnapshotDirectoryMetrics.class.getSimpleName(); + + // Aggregate metrics + private @Metric MutableGaugeLong dbSnapshotsDirSize; + private @Metric MutableGaugeLong totalSstFilesCount; + private @Metric MutableGaugeLong numSnapshots; + + private final OMMetadataManager metadataManager; + private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME); + + OMSnapshotDirectoryMetrics(ConfigurationSource conf, + OMMetadataManager metadataManager) { + super("OMSnapshotDirectoryMetrics", + conf.getTimeDuration(OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL, + OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS)); + this.metadataManager = metadataManager; + } + + public static OMSnapshotDirectoryMetrics create(ConfigurationSource conf, + String parent, OMMetadataManager metadataManager) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(SOURCE_NAME, parent, + new OMSnapshotDirectoryMetrics(conf, metadataManager)); + } + + /** + * @return if the update was successful. + * Updates aggregate metrics synchronously. + */ + @Override + protected boolean updateMetrics() { + DBStore store = metadataManager.getStore(); + if (!(store instanceof RDBStore)) { + LOG.debug("Store is not RDBStore, skipping snapshot directory metrics update"); + resetMetrics(); + return false; + } + + String snapshotsParentDir = store.getSnapshotsParentDir(); + + if (snapshotsParentDir == null) { + resetMetrics(); + return false; + } + + File snapshotsDir = new File(snapshotsParentDir); + if (!snapshotsDir.exists() || !snapshotsDir.isDirectory()) { + resetMetrics(); + return false; + } + + try { + // Calculate aggregate metrics + calculateAndUpdateMetrics(snapshotsDir); + } catch (Exception e) { + LOG.warn("Error calculating snapshot directory metrics", e); + resetMetrics(); + return false; + } + return true; + } + + /** + * Calculates & updates directory size metrics accounting for hardlinks. + * (only counts each inode once). + * Uses Files.getAttribute to get the inode number and tracks visited inodes. + * + * @param directory the directory containing all checkpointDirs. + */ + private void calculateAndUpdateMetrics(File directory) throws IOException { + Set visitedInodes = new HashSet<>(); + long totalSize = 0; + long sstFileCount = 0; + int snapshotCount = 0; + try (Stream checkpointDirs = Files.list(directory.toPath())) { + for (Path checkpointDir : checkpointDirs.collect(Collectors.toList())) { + if (Files.isDirectory(checkpointDir)) { + snapshotCount++; + try (Stream files = Files.list(checkpointDir)) { + for (Path path : files.collect(Collectors.toList())) { + if (Files.isRegularFile(path)) { + try { + // Get inode number + Object fileKey = IOUtils.getINode(path); + if (fileKey == null) { + // Fallback: use file path + size as unique identifier + fileKey = path.toAbsolutePath() + ":" + Files.size(path); + } + // Only count this file if we haven't seen this inode before + if (visitedInodes.add(fileKey)) { + if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) { + sstFileCount++; + } + totalSize += Files.size(path); + } + } catch (UnsupportedOperationException | IOException e) { + // Fallback: if we can't get inode, just count the file size. + LOG.warn("Could not get inode for {}, using file size directly: {}", + path, e.getMessage()); + totalSize += Files.size(path); + if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) { + sstFileCount++; + } + } + } + } + } + } + } + } + numSnapshots.set(snapshotCount); + totalSstFilesCount.set(sstFileCount); + dbSnapshotsDirSize.set(totalSize); + + if (LOG.isDebugEnabled()) { + LOG.debug("Updated snapshot directory metrics: size={}, sstFiles={}, snapshots={}", + totalSize, sstFileCount, snapshotCount); + } + } + + /** + * Resets all metrics to zero. + */ + private void resetMetrics() { + dbSnapshotsDirSize.set(0); + totalSstFilesCount.set(0); + numSnapshots.set(0); + } + + /** + * Implements MetricsSource to provide metrics. + * Reads from cached values updated by updateMetrics(). + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + // Add aggregate metrics + collector.addRecord(SOURCE_NAME) + .setContext("Snapshot Directory Metrics") + .addGauge(SnapshotMetricsInfo.DbSnapshotsDirSize, dbSnapshotsDirSize.value()) + .addGauge(SnapshotMetricsInfo.TotalSstFilesCount, totalSstFilesCount.value()) + .addGauge(SnapshotMetricsInfo.NumSnapshots, numSnapshots.value()) + .addGauge(SnapshotMetricsInfo.LastUpdateTime, getLastUpdateTime()); + } + + @VisibleForTesting + public long getDbSnapshotsDirSize() { + return dbSnapshotsDirSize.value(); + } + + @VisibleForTesting + public long getTotalSstFilesCount() { + return totalSstFilesCount.value(); + } + + @VisibleForTesting + public long getNumSnapshots() { + return numSnapshots.value(); + } + + public void unRegister() { + stop(); + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(SOURCE_NAME); + } + + /** + * Metrics info enum for snapshot directory metrics. + */ + enum SnapshotMetricsInfo implements MetricsInfo { + // Aggregate metrics + DbSnapshotsDirSize("Total size of db.snapshots directory in bytes"), + TotalSstFilesCount("Total number of SST files across all snapshots"), + NumSnapshots("Total number of snapshot checkpoint directories"), + LastUpdateTime("Time stamp when the snapshot directory metrics were last updated"); + + private final String desc; + + SnapshotMetricsInfo(String desc) { + this.desc = desc; + } + + @Override + public String description() { + return desc; + } + } +}