Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import picocli.CommandLine;

/**
Expand Down Expand Up @@ -84,12 +86,20 @@ public class ContainerToKeyMapping extends AbstractSubcommand implements Callabl
description = "Only display file names without full path")
private boolean onlyFileNames;

@CommandLine.Option(names = {"--in-progress"},
defaultValue = "false",
description = "Includes in-progress open files/keys and multipart uploads")
private boolean inProgress;

private DBStore omDbStore;
private Table<String, OmVolumeArgs> volumeTable;
private Table<String, OmBucketInfo> bucketTable;
private Table<String, OmDirectoryInfo> directoryTable;
private Table<String, OmKeyInfo> fileTable;
private Table<String, OmKeyInfo> keyTable;
private Table<String, OmKeyInfo> openFileTable;
private Table<String, OmKeyInfo> openKeyTable;
private Table<String, OmMultipartKeyInfo> multipartInfoTable;
private DBStore dirTreeDbStore;
private Table<Long, String> dirTreeTable;
// Cache volume IDs to avoid repeated lookups
Expand All @@ -98,7 +108,6 @@ public class ContainerToKeyMapping extends AbstractSubcommand implements Callabl

@Override
public Void call() throws Exception {

String dbPath = parent.getDbPath();
// Parse container IDs
Set<Long> containerIDs = Arrays.stream(containers.split(","))
Expand Down Expand Up @@ -126,6 +135,9 @@ public Void call() throws Exception {
directoryTable = OMDBDefinition.DIRECTORY_TABLE_DEF.getTable(omDbStore, CacheType.NO_CACHE);
fileTable = OMDBDefinition.FILE_TABLE_DEF.getTable(omDbStore, CacheType.NO_CACHE);
keyTable = OMDBDefinition.KEY_TABLE_DEF.getTable(omDbStore, CacheType.NO_CACHE);
openFileTable = OMDBDefinition.OPEN_FILE_TABLE_DEF.getTable(omDbStore, CacheType.NO_CACHE);
openKeyTable = OMDBDefinition.OPEN_KEY_TABLE_DEF.getTable(omDbStore, CacheType.NO_CACHE);
multipartInfoTable = OMDBDefinition.MULTIPART_INFO_TABLE_DEF.getTable(omDbStore, CacheType.NO_CACHE);

retrieve(dbPath, writer, containerIDs);
} catch (Exception e) {
Expand Down Expand Up @@ -181,20 +193,29 @@ private void retrieve(String dbPath, PrintWriter writer, Set<Long> containerIds)

// Map to collect keys per container
Map<Long, List<String>> containerToKeysMap = new HashMap<>();
// Map to collect open keys per container
Map<Long, List<String>> containerToOpenKeysMap = inProgress ? new HashMap<>() : null;
// Track unreferenced keys count per container (FSO only)
Map<Long, Long> unreferencedCountMap = new HashMap<>();
for (Long containerId : containerIds) {
containerToKeysMap.put(containerId, new ArrayList<>());
unreferencedCountMap.put(containerId, 0L);
}

// Process open file and open key tables
if (inProgress) {
for (Long containerId : containerIds) {
containerToOpenKeysMap.put(containerId, new ArrayList<>());
}
processOpenFiles(containerIds, containerToOpenKeysMap);
processOpenKeys(containerIds, containerToOpenKeysMap);
processMultipartUpload(containerIds, containerToOpenKeysMap);
}
// Process FSO keys (fileTable)
processFSOKeys(containerIds, containerToKeysMap, unreferencedCountMap, bucketVolMap);

// Process OBS keys (keyTable)
processOBSKeys(containerIds, containerToKeysMap);

jsonOutput(writer, containerToKeysMap, unreferencedCountMap);
jsonOutput(writer, containerToKeysMap, containerToOpenKeysMap, unreferencedCountMap);
}

private void processFSOKeys(Set<Long> containerIds, Map<Long, List<String>> containerToKeysMap,
Expand Down Expand Up @@ -251,6 +272,69 @@ private void processOBSKeys(Set<Long> containerIds, Map<Long, List<String>> cont
}
}

private void processOpenFiles(Set<Long> containerIds, Map<Long, List<String>> containerToOpenKeysMap) {
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> fileIterator =
openFileTable.iterator()) {
while (fileIterator.hasNext()) {
Table.KeyValue<String, OmKeyInfo> entry = fileIterator.next();
addOpenKeyToContainerMap(entry.getKey(), entry.getValue(), containerIds, containerToOpenKeysMap);
}
} catch (Exception e) {
err().println("Exception occurred reading openFileTable (FSO keys), " + e);
}
}

private void processOpenKeys(Set<Long> containerIds, Map<Long, List<String>> containerToOpenKeysMap) {
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> keyIterator =
openKeyTable.iterator()) {
while (keyIterator.hasNext()) {
Table.KeyValue<String, OmKeyInfo> entry = keyIterator.next();
addOpenKeyToContainerMap(entry.getKey(), entry.getValue(), containerIds, containerToOpenKeysMap);
}
} catch (Exception e) {
err().println("Exception occurred reading openKeyTable (OBS keys), " + e);
}
}

private void addOpenKeyToContainerMap(String dbKey, OmKeyInfo keyInfo, Set<Long> containerIds,
Map<Long, List<String>> containerToOpenKeysMap) {
// Find which containers this key uses
Set<Long> keyContainers = getKeyContainers(keyInfo, containerIds);

if (!keyContainers.isEmpty()) {
for (Long containerId : keyContainers) {
containerToOpenKeysMap.get(containerId).add(dbKey);
}
}
}

private void processMultipartUpload(Set<Long> containerIds, Map<Long, List<String>> containerToOpenKeysMap) {
try (TableIterator<String, ? extends Table.KeyValue<String, OmMultipartKeyInfo>> mpuIterator =
multipartInfoTable.iterator()) {

while (mpuIterator.hasNext()) {
Table.KeyValue<String, OmMultipartKeyInfo> entry = mpuIterator.next();
Comment thread
sarvekshayr marked this conversation as resolved.
String dbKey = entry.getKey();
OmMultipartKeyInfo mpuInfo = entry.getValue();

// Collect all target containers that have parts of this MPU
Set<Long> matchedContainers = new HashSet<>();
for (PartKeyInfo partKeyInfo : mpuInfo.getPartKeyInfoMap()) {
OmKeyInfo partKey = OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
matchedContainers.addAll(getKeyContainers(partKey, containerIds));
}

if (!matchedContainers.isEmpty()) {
for (Long containerId : matchedContainers) {
containerToOpenKeysMap.get(containerId).add(dbKey);
}
}
}
} catch (Exception e) {
err().println("Exception occurred reading multipartInfoTable, " + e);
}
}

private Set<Long> getKeyContainers(OmKeyInfo keyInfo, Set<Long> targetContainerIds) {
Set<Long> keyContainers = new HashSet<>();
keyInfo.getKeyLocationVersions().forEach(
Expand Down Expand Up @@ -317,11 +401,11 @@ private String reconstructFullPath(OmKeyInfo keyInfo, Map<Long, Pair<Long, Strin
// Check dir tree
Pair<Long, String> nameParentPair = getFromDirTree(prvParent);
if (nameParentPair == null) {
// If parent is not found, mark the key as unreferenced and increment its count
// If parent is not found (maybe in deletion process), increment unreferenced count
for (Long containerId : keyContainers) {
unreferencedCountMap.put(containerId, unreferencedCountMap.get(containerId) + 1);
}
return "[unreferenced] " + keyInfo.getKeyName();
return null;
}
sb.insert(0, nameParentPair.getValue() + OM_KEY_PREFIX);
prvParent = nameParentPair.getKey();
Expand Down Expand Up @@ -372,7 +456,7 @@ private void addToDirTree(Long objectId, Long parentId, String name) throws IOEx
}

private void jsonOutput(PrintWriter writer, Map<Long, List<String>> containerToKeysMap,
Map<Long, Long> unreferencedCountMap) {
Map<Long, List<String>> containerToOpenKeysMap, Map<Long, Long> unreferencedCountMap) {
try {
ObjectMapper mapper = new ObjectMapper();
ObjectNode root = mapper.createObjectNode();
Expand All @@ -388,13 +472,30 @@ private void jsonOutput(PrintWriter writer, Map<Long, List<String>> containerToK
}

containerNode.set("keys", keysArray);
containerNode.put("totalKeys", entry.getValue().size()); // includes unreferenced keys

// Add open keys array only if --in-progress flag is set
long totalKeys = entry.getValue().size();
if (containerToOpenKeysMap != null) {
ArrayNode openKeysArray = mapper.createArrayNode();
List<String> openKeys = containerToOpenKeysMap.get(containerId);
if (openKeys != null) {
for (String key : openKeys) {
openKeysArray.add(key);
}
totalKeys += openKeys.size();
}
containerNode.set("openKeys", openKeysArray);
}

// Add unreferenced count if > 0
long unreferencedCount = unreferencedCountMap.get(containerId);
if (unreferencedCount > 0) {
containerNode.put("unreferencedKeys", unreferencedCount);
totalKeys += unreferencedCount;
}

// Total keys = committed keys + open keys + unreferenced keys
containerNode.put("totalKeys", totalKeys);

containersNode.set(containerId.toString(), containerNode);
}
Expand All @@ -407,4 +508,3 @@ private void jsonOutput(PrintWriter writer, Map<Long, List<String>> containerToK
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -72,8 +75,14 @@ public class TestContainerToKeyMapping {
private static final long CONTAINER_ID_1 = 1L;
private static final long CONTAINER_ID_2 = 2L;
private static final long CONTAINER_ID_3 = 3L;
private static final long CONTAINER_ID_4 = 4L;
private static final long UNREFERENCED_FILE_ID = 500L;
private static final long MISSING_DIR_ID = 999L; // Non-existent parent
private static final long OPEN_FILE_ID = 600L;
private static final long OPEN_KEY_ID = 610L;
private static final long MPU_KEY_ID = 700L;
private static final long MPU_PART1_ID = 710L;
private static final long MPU_PART2_ID = 720L;

@BeforeEach
public void setup() throws Exception {
Expand Down Expand Up @@ -137,6 +146,51 @@ public void testContainerToKeyMappingWithOnlyFileNames() {
}

@Test
public void testContainerToKeyMappingWithOpenKeys() {
int exitCode = execute("--containers", CONTAINER_ID_1 + "," + CONTAINER_ID_2, "--in-progress");
assertEquals(0, exitCode);

String output = outWriter.toString();

// Check open FSO file in openKeys
assertThat(output).contains("\"" + CONTAINER_ID_1 + "\"");
assertThat(output).contains("\"openKeys\"");
assertThat(output).contains("/100/200/300/openFile/1");

// Check open OBS key in openKeys
assertThat(output).contains("\"" + CONTAINER_ID_2 + "\"");
assertThat(output).contains("\"openKeys\"");
assertThat(output).contains("/vol1/obs-bucket/openKey");
}

@Test
public void testContainerToKeyMappingWithMPU() {
int exitCode = execute("--containers", String.valueOf(CONTAINER_ID_4), "--in-progress");
assertEquals(0, exitCode);

String output = outWriter.toString();

// Check MPU parts in openKeys
assertThat(output).contains("\"" + CONTAINER_ID_4 + "\"");
assertThat(output).contains("\"openKeys\"");
assertThat(output).contains("/vol1/obs-bucket/mpuKey/test-upload-id");
}

@Test
public void testContainerToKeyMappingWithMPUOnlyFileNames() {
int exitCode = execute("--containers", String.valueOf(CONTAINER_ID_4), "--in-progress", "--onlyFileNames");
assertEquals(0, exitCode);

String output = outWriter.toString();

// Open keys and MPU always show table key (not affected by --onlyFileNames)
assertThat(output).contains("\"" + CONTAINER_ID_4 + "\"");
assertThat(output).contains("\"openKeys\"");
assertThat(output).contains("/vol1/obs-bucket/mpuKey/test-upload-id");
}

@Test

public void testNonExistentContainer() {
long nonExistentContainerId = 999L;

Expand Down Expand Up @@ -221,6 +275,68 @@ private void createTestData() throws Exception {
String unreferencedFileKey = omMetadataManager.getOzonePathKey(
VOLUME_ID, FSO_BUCKET_ID, MISSING_DIR_ID, "unreferencedFile");
omMetadataManager.getFileTable().put(unreferencedFileKey, unreferencedKey);

// Create open FSO file with a block in container 1
OmKeyInfo openFileInfo = createKeyInfo(
"openFile", OPEN_FILE_ID, DIR_ID, CONTAINER_ID_1);
String openFileKey = omMetadataManager.getOpenFileName(
VOLUME_ID, FSO_BUCKET_ID, DIR_ID, "openFile", 1L);
omMetadataManager.getOpenKeyTable(BucketLayout.FILE_SYSTEM_OPTIMIZED).put(openFileKey, openFileInfo);

// Create open OBS key with a block in container 2
OmKeyInfo openKeyInfo = createOBSKeyInfo(
"openKey", OPEN_KEY_ID, CONTAINER_ID_2);
String openKey = omMetadataManager.getOzoneKey(
VOLUME_NAME, OBS_BUCKET_NAME, "openKey");
omMetadataManager.getOpenKeyTable(BucketLayout.OBJECT_STORE).put(openKey, openKeyInfo);

// Create MPU (multipart upload) for OBS bucket with parts in container 5
createMultipartUpload();
}

/**
* Helper method to create a multipart upload with parts.
*/
private void createMultipartUpload() throws Exception {
String mpuKeyName = "mpuKey";
String uploadId = "test-upload-id";

// Create part 1 with a block in container 4
OmKeyInfo part1Info = createOBSKeyInfo(
mpuKeyName + "/" + uploadId + "/part-1", MPU_PART1_ID, CONTAINER_ID_4);
KeyInfo part1Proto = part1Info.getProtobuf(true, 0);
PartKeyInfo partKeyInfo1 = PartKeyInfo.newBuilder()
.setPartName(mpuKeyName + "/" + uploadId + "/part-1")
.setPartNumber(1)
.setPartKeyInfo(part1Proto)
.build();

// Create part 2 with a block in container 4
OmKeyInfo part2Info = createOBSKeyInfo(
mpuKeyName + "/" + uploadId + "/part-2", MPU_PART2_ID, CONTAINER_ID_4);
KeyInfo part2Proto = part2Info.getProtobuf(true, 0);
PartKeyInfo partKeyInfo2 = PartKeyInfo.newBuilder()
.setPartName(mpuKeyName + "/" + uploadId + "/part-2")
.setPartNumber(2)
.setPartKeyInfo(part2Proto)
.build();

// Create OmMultipartKeyInfo - for OBS, parentID should be 0 (not the bucket ID)
OmMultipartKeyInfo mpuInfo = new OmMultipartKeyInfo.Builder()
.setUploadID(uploadId)
.setCreationTime(System.currentTimeMillis())
.setReplicationConfig(StandaloneReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE))
.addPartKeyInfoList(1, partKeyInfo1)
.addPartKeyInfoList(2, partKeyInfo2)
.setObjectID(MPU_KEY_ID)
.setParentID(0) // OBS keys have parentID = 0
.setUpdateID(1)
.build();

// Put into multipartInfoTable
String mpuKey = omMetadataManager.getMultipartKey(
VOLUME_NAME, OBS_BUCKET_NAME, mpuKeyName, uploadId);
omMetadataManager.getMultipartInfoTable().put(mpuKey, mpuInfo);
}

/**
Expand Down Expand Up @@ -281,4 +397,3 @@ private int execute(String... args) {
return cmd.execute(argList.toArray(new String[0]));
}
}