Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public final class OMSnapshotDirectoryMetrics extends OMPeriodicMetrics implemen
private @Metric MutableGaugeLong dbSnapshotsDirSize;
private @Metric MutableGaugeLong totalSstFilesCount;
private @Metric MutableGaugeLong numSnapshots;
private @Metric MutableGaugeLong sstBackupDirSize;
private @Metric MutableGaugeLong sstBackupSstFilesCount;

private final OMMetadataManager metadataManager;
private final MetricsRegistry registry = new MetricsRegistry(SOURCE_NAME);
Expand All @@ -79,6 +81,21 @@ public final class OMSnapshotDirectoryMetrics extends OMPeriodicMetrics implemen
OZONE_OM_SNAPSHOT_DIRECTORY_METRICS_UPDATE_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS));
this.metadataManager = metadataManager;
this.dbSnapshotsDirSize = registry.newGauge(
SnapshotMetricsInfo.DbSnapshotsDirSize.name(),
SnapshotMetricsInfo.DbSnapshotsDirSize.description(), 0L);
this.totalSstFilesCount = registry.newGauge(
SnapshotMetricsInfo.TotalSstFilesCount.name(),
SnapshotMetricsInfo.TotalSstFilesCount.description(), 0L);
this.numSnapshots = registry.newGauge(
SnapshotMetricsInfo.NumSnapshots.name(),
SnapshotMetricsInfo.NumSnapshots.description(), 0L);
this.sstBackupDirSize = registry.newGauge(
SnapshotMetricsInfo.SstBackupDirSize.name(),
SnapshotMetricsInfo.SstBackupDirSize.description(), 0L);
this.sstBackupSstFilesCount = registry.newGauge(
SnapshotMetricsInfo.SstBackupSstFilesCount.name(),
SnapshotMetricsInfo.SstBackupSstFilesCount.description(), 0L);
}

public static OMSnapshotDirectoryMetrics create(ConfigurationSource conf,
Expand Down Expand Up @@ -116,7 +133,10 @@ protected boolean updateMetrics() {

try {
// Calculate aggregate metrics
calculateAndUpdateMetrics(snapshotsDir);
String sstBackupDir = store.getRocksDBCheckpointDiffer() != null
? store.getRocksDBCheckpointDiffer().getSSTBackupDir() : null;
calculateAndUpdateMetrics(snapshotsDir,
sstBackupDir != null ? new File(sstBackupDir) : null);
} catch (Exception e) {
LOG.warn("Error calculating snapshot directory metrics", e);
resetMetrics();
Expand All @@ -126,60 +146,100 @@ protected boolean updateMetrics() {
}

/**
* Calculates & updates directory size metrics accounting for hardlinks.
* (only counts each inode once).
* Calculates & updates directory size metrics accounting for snapshot and
* backup SST Directory. (only counts each inode once).
* Uses Files.getAttribute to get the inode number and tracks visited inodes.
*
* @param directory the directory containing all checkpointDirs.
* @param snapshotsDir the directory containing all checkpointDirs.
* @param sstBackupDir the backup SST directory
*/
private void calculateAndUpdateMetrics(File directory) throws IOException {
Set<Object> visitedInodes = new HashSet<>();
long totalSize = 0;
long sstFileCount = 0;
private void calculateAndUpdateMetrics(File snapshotsDir,
File sstBackupDir) throws IOException {
Set<Object> visitedSnapshotsInodes = new HashSet<>();
long snapshotsTotalSize = 0;
long snapshotsSstFileCount = 0;
int snapshotCount = 0;
try (Stream<Path> checkpointDirs = Files.list(directory.toPath())) {
for (Path checkpointDir : checkpointDirs.collect(Collectors.toList())) {
if (Files.isDirectory(checkpointDir)) {
snapshotCount++;
try (Stream<Path> files = Files.list(checkpointDir)) {
for (Path path : files.collect(Collectors.toList())) {
if (Files.isRegularFile(path)) {
try {
// Get inode number
Object fileKey = IOUtils.getINode(path);
if (fileKey == null) {
// Fallback: use file path + size as unique identifier
fileKey = path.toAbsolutePath() + ":" + Files.size(path);
}
// Only count this file if we haven't seen this inode before
if (visitedInodes.add(fileKey)) {
if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
sstFileCount++;
}
totalSize += Files.size(path);
}
} catch (UnsupportedOperationException | IOException e) {
// Fallback: if we can't get inode, just count the file size.
LOG.warn("Could not get inode for {}, using file size directly: {}",
path, e.getMessage());
totalSize += Files.size(path);
if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
sstFileCount++;
}
}
}
}

if (snapshotsDir != null && snapshotsDir.exists() && snapshotsDir.isDirectory()) {
try (Stream<Path> checkpointDirs = Files.list(snapshotsDir.toPath())) {
for (Path checkpointDir : checkpointDirs.collect(Collectors.toList())) {
if (Files.isDirectory(checkpointDir)) {
snapshotCount++;
SizeAndCount sizeAndCount =
calculateDirSize(checkpointDir, visitedSnapshotsInodes);
snapshotsTotalSize += sizeAndCount.size;
snapshotsSstFileCount += sizeAndCount.count;
}
}
}
}

long backupDirSize = 0;
long backupSstFileCount = 0;
if (sstBackupDir != null && sstBackupDir.exists() &&
sstBackupDir.isDirectory()) {
SizeAndCount sizeAndCount =
calculateDirSize(sstBackupDir.toPath(), new HashSet<>());
backupDirSize = sizeAndCount.size;
backupSstFileCount = sizeAndCount.count;
}

numSnapshots.set(snapshotCount);
totalSstFilesCount.set(sstFileCount);
dbSnapshotsDirSize.set(totalSize);
totalSstFilesCount.set(snapshotsSstFileCount);
dbSnapshotsDirSize.set(snapshotsTotalSize);
sstBackupDirSize.set(backupDirSize);
sstBackupSstFilesCount.set(backupSstFileCount);

if (LOG.isDebugEnabled()) {
LOG.debug("Updated snapshot directory metrics: size={}, sstFiles={}, snapshots={}",
totalSize, sstFileCount, snapshotCount);
LOG.debug("Updated snapshot directory metrics: size={}, sstFiles={}, " +
"snapshots={}, backupSize={}, backupSstFiles={}",
snapshotsTotalSize, snapshotsSstFileCount, snapshotCount,
backupDirSize, backupSstFileCount);
}
}

private SizeAndCount calculateDirSize(Path directory, Set<Object> visitedInodes) throws IOException {
long size = 0;
long count = 0;
try (Stream<Path> files = Files.list(directory)) {
for (Path path : files.collect(Collectors.toList())) {
if (Files.isRegularFile(path)) {
try {
// Get inode number
Object fileKey = IOUtils.getINode(path);
if (fileKey == null) {
// Fallback: use file path + size as unique identifier
fileKey = path.toAbsolutePath() + ":" + Files.size(path);
}
// Only count this file if we haven't seen this inode before
if (visitedInodes.add(fileKey)) {
if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
count++;
}
size += Files.size(path);
}
} catch (UnsupportedOperationException | IOException e) {
// Fallback: if we can't get inode, just count the file size.
LOG.warn("Could not get inode for {}, using file size directly: {}",
path, e.getMessage());
size += Files.size(path);
if (path.toFile().getName().endsWith(ROCKSDB_SST_SUFFIX)) {
count++;
}
}
}
}
}
return new SizeAndCount(size, count);
}

private static final class SizeAndCount {
private final long size;
private final long count;

private SizeAndCount(long size, long count) {
this.size = size;
this.count = count;
}
}

Expand All @@ -190,6 +250,8 @@ private void resetMetrics() {
dbSnapshotsDirSize.set(0);
totalSstFilesCount.set(0);
numSnapshots.set(0);
sstBackupDirSize.set(0);
sstBackupSstFilesCount.set(0);
}

/**
Expand All @@ -204,6 +266,8 @@ public void getMetrics(MetricsCollector collector, boolean all) {
.addGauge(SnapshotMetricsInfo.DbSnapshotsDirSize, dbSnapshotsDirSize.value())
.addGauge(SnapshotMetricsInfo.TotalSstFilesCount, totalSstFilesCount.value())
.addGauge(SnapshotMetricsInfo.NumSnapshots, numSnapshots.value())
.addGauge(SnapshotMetricsInfo.SstBackupDirSize, sstBackupDirSize.value())
.addGauge(SnapshotMetricsInfo.SstBackupSstFilesCount, sstBackupSstFilesCount.value())
.addGauge(SnapshotMetricsInfo.LastUpdateTime, getLastUpdateTime());
}

Expand All @@ -222,6 +286,16 @@ public long getNumSnapshots() {
return numSnapshots.value();
}

@VisibleForTesting
public long getSstBackupDirSize() {
return sstBackupDirSize.value();
}

@VisibleForTesting
public long getSstBackupSstFilesCount() {
return sstBackupSstFilesCount.value();
}

public void unRegister() {
stop();
MetricsSystem ms = DefaultMetricsSystem.instance();
Expand All @@ -236,6 +310,8 @@ enum SnapshotMetricsInfo implements MetricsInfo {
DbSnapshotsDirSize("Total size of db.snapshots directory in bytes"),
TotalSstFilesCount("Total number of SST files across all snapshots"),
NumSnapshots("Total number of snapshot checkpoint directories"),
SstBackupDirSize("Total size of backup SST directory in bytes"),
SstBackupSstFilesCount("Total number of SST files in backup SST directory"),
LastUpdateTime("Time stamp when the snapshot directory metrics were last updated");

private final String desc;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.om.snapshot;

import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;

/**
* Tests for OMSnapshotDirectoryMetrics.
*/
public class TestOMSnapshotDirectoryMetrics {

@TempDir
private Path tempDir;

private OMMetadataManager metadataManager;
private OMSnapshotDirectoryMetrics metrics;

@BeforeEach
public void setup() throws IOException {
metadataManager = Mockito.mock(OMMetadataManager.class);
RDBStore store = Mockito.mock(RDBStore.class);
RocksDBCheckpointDiffer differ = Mockito.mock(RocksDBCheckpointDiffer.class);

when(metadataManager.getStore()).thenReturn(store);
when(store.getRocksDBCheckpointDiffer()).thenReturn(differ);

Path snapshotsDir = tempDir.resolve("snapshots");
Files.createDirectories(snapshotsDir);
when(store.getSnapshotsParentDir()).thenReturn(snapshotsDir.toString());

Path backupDir = tempDir.resolve("backup");
Files.createDirectories(backupDir);
when(differ.getSSTBackupDir()).thenReturn(backupDir.toString());

OzoneConfiguration conf = new OzoneConfiguration();
metrics = new OMSnapshotDirectoryMetrics(conf, metadataManager);
}

@Test
public void testMetrics() throws IOException {
Path snapshotsDir = Paths.get(metadataManager.getStore().getSnapshotsParentDir());
Path backupDir = Paths.get(metadataManager.getStore().getRocksDBCheckpointDiffer().getSSTBackupDir());

// Create a snapshot directory with some files
Path snap1 = snapshotsDir.resolve(UUID.randomUUID().toString());
Files.createDirectories(snap1);
Path file1 = snap1.resolve("file1" + ROCKSDB_SST_SUFFIX);
Files.write(file1, "data1".getBytes(StandardCharsets.UTF_8));
long size1 = Files.size(file1);

// Create backup directory with some files
Path backupFile1 = backupDir.resolve("backup1" + ROCKSDB_SST_SUFFIX);
Files.write(backupFile1, "backupData1".getBytes(StandardCharsets.UTF_8));
long backupSize1 = Files.size(backupFile1);

Path backupFile2 = backupDir.resolve("backup2" + ROCKSDB_SST_SUFFIX);
Files.write(backupFile2, "backupData22".getBytes(StandardCharsets.UTF_8));
long backupSize2 = Files.size(backupFile2);

metrics.updateMetrics();

assertEquals(1, metrics.getNumSnapshots());
assertEquals(1, metrics.getTotalSstFilesCount());
assertEquals(size1, metrics.getDbSnapshotsDirSize());
assertEquals(2, metrics.getSstBackupSstFilesCount());
assertEquals(backupSize1 + backupSize2, metrics.getSstBackupDirSize());

// Add another snapshot with a hardlink
Path snap2 = snapshotsDir.resolve(UUID.randomUUID().toString());
Files.createDirectories(snap2);
Path file2 = snap2.resolve("file1" + ROCKSDB_SST_SUFFIX);
try {
Files.createLink(file2, file1);
} catch (UnsupportedOperationException e) {
// Fallback for systems that don't support hardlinks in temp dir
Files.write(file2, "data1".getBytes(StandardCharsets.UTF_8));
}

metrics.updateMetrics();

assertEquals(2, metrics.getNumSnapshots());
assertEquals(1, metrics.getTotalSstFilesCount());
assertEquals(size1, metrics.getDbSnapshotsDirSize());

// Test backup dir update
Path backupFile3 = backupDir.resolve("notSst.txt");
Files.write(backupFile3, "notSstData".getBytes(StandardCharsets.UTF_8));

metrics.updateMetrics();
assertEquals(2, metrics.getSstBackupSstFilesCount());
assertEquals(backupSize1 + backupSize2 + Files.size(backupFile3), metrics.getSstBackupDirSize());
}
}