diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java index a8863046f6ee..3ff0a636d814 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java @@ -23,10 +23,13 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import javax.ws.rs.core.Response; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; +import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.BucketArgs; import org.apache.hadoop.ozone.client.ObjectStore; @@ -36,11 +39,15 @@ import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.recon.api.ContainerEndpoint; import org.apache.hadoop.ozone.recon.api.types.KeyMetadata; import org.apache.hadoop.ozone.recon.api.types.KeysResponse; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; +import org.apache.hadoop.ozone.recon.tasks.ContainerKeyMapperHelper; import org.apache.hadoop.ozone.recon.tasks.ReconTaskControllerImpl; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterEach; @@ -60,6 +67,9 @@ public class TestReconContainerEndpoint { @BeforeEach public void init() throws Exception { + // ContainerKeyMapper tasks share static maps/flags across the JVM; reset so a + // prior test method cannot break mapper state for this cluster instance. + ContainerKeyMapperHelper.clearSharedContainerCountMap(); OzoneConfiguration conf = new OzoneConfiguration(); conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT, OMConfigKeys.OZONE_BUCKET_LAYOUT_FILE_SYSTEM_OPTIMIZED); @@ -76,13 +86,9 @@ public void init() throws Exception { } @AfterEach - public void shutdown() throws IOException { - if (client != null) { - client.close(); - } - if (cluster != null) { - cluster.shutdown(); - } + public void shutdown() { + IOUtils.closeQuietly(client, cluster); + ContainerKeyMapperHelper.clearSharedContainerCountMap(); } @Test @@ -117,6 +123,9 @@ public void testContainerEndpointForFSOLayout() throws Exception { CompletableFuture completableFuture = omMetaManagerUtils.waitForEventBufferEmpty(reconTaskController.getEventBuffer()); GenericTestUtils.waitFor(completableFuture::isDone, 100, 30000); + completableFuture.join(); + waitUntilReconIndexesKeysForPaths(volName, bucketName, + nestedDirKey, singleFileKey); //Search for the bucket from the bucket table and verify its FSO OmBucketInfo bucketInfo = cluster.getOzoneManager().getBucketInfo(volName, bucketName); @@ -124,8 +133,7 @@ public void testContainerEndpointForFSOLayout() throws Exception { assertEquals(BucketLayout.FILE_SYSTEM_OPTIMIZED, bucketInfo.getBucketLayout()); - // Assuming a known container ID that these keys have been written into - long testContainerID = 1L; + long testContainerID = getContainerIdForKey(volName, bucketName, nestedDirKey); // Query the ContainerEndpoint for the keys in the specified container Response response = getContainerEndpointResponse(testContainerID); @@ -145,7 +153,7 @@ public void testContainerEndpointForFSOLayout() throws Exception { assertEquals("file1", keyMetadata.getKey()); assertEquals("testvol/fsobucket/dir1/dir2/dir3/file1", keyMetadata.getCompletePath()); - testContainerID = 2L; + testContainerID = getContainerIdForKey(volName, bucketName, singleFileKey); response = getContainerEndpointResponse(testContainerID); data = (KeysResponse) response.getEntity(); keyMetadataList = data.getKeys(); @@ -186,14 +194,17 @@ public void testContainerEndpointForOBSBucket() throws Exception { CompletableFuture completableFuture = omMetaManagerUtils.waitForEventBufferEmpty(reconTaskController.getEventBuffer()); GenericTestUtils.waitFor(completableFuture::isDone, 100, 30000); + completableFuture.join(); + waitUntilReconIndexesKeysForPaths(volumeName, obsBucketName, obsSingleFileKey); // Search for the bucket from the bucket table and verify its OBS OmBucketInfo bucketInfo = cluster.getOzoneManager().getBucketInfo(volumeName, obsBucketName); assertNotNull(bucketInfo); assertEquals(BucketLayout.OBJECT_STORE, bucketInfo.getBucketLayout()); - // Initialize the ContainerEndpoint - long containerId = 1L; + long containerId = getContainerIdForKey(volumeName, obsBucketName, + obsSingleFileKey); + Response response = getContainerEndpointResponse(containerId); assertNotNull(response, "Response should not be null."); @@ -226,6 +237,40 @@ private Response getContainerEndpointResponse(long containerId) { return containerEndpoint.getKeysForContainer(containerId, 10, ""); } + /** + * Wait until Recon's container-key index reflects all written keys (by container id). + * The OM event queue can be empty while a batch is still being processed. + */ + private void waitUntilReconIndexesKeysForPaths(String volumeName, + String bucketName, String... keyPaths) + throws Exception { + Map requiredCountByContainer = new HashMap<>(); + for (String keyPath : keyPaths) { + long containerId = + getContainerIdForKey(volumeName, bucketName, keyPath); + requiredCountByContainer.merge(containerId, 1, Integer::sum); + } + ReconContainerMetadataManager mgr = + recon.getReconServer().getReconContainerMetadataManager(); + TestReconOmMetaManagerUtils.waitUntilReconKeyCounts(mgr, requiredCountByContainer); + } + + private long getContainerIdForKey(String volumeName, String bucketName, + String keyName) throws IOException { + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .build(); + OmKeyLocationInfo location = cluster.getOzoneManager() + .lookupKey(keyArgs) + .getKeyLocationVersions() + .get(0) + .getBlocksLatestVersionOnly() + .get(0); + return location.getContainerID(); + } + private void writeTestData(String volumeName, String bucketName, String keyPath, String data) throws Exception { try (OzoneOutputStream out = client.getObjectStore().getVolume(volumeName) diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconOmMetaManagerUtils.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconOmMetaManagerUtils.java index 4ef84f2e6d9b..8aa32ac40baa 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconOmMetaManagerUtils.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconOmMetaManagerUtils.java @@ -17,7 +17,10 @@ package org.apache.hadoop.ozone.recon; +import java.io.IOException; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager; import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBuffer; import org.apache.ozone.test.GenericTestUtils; @@ -43,4 +46,34 @@ public CompletableFuture waitForEventBufferEmpty(OMUpdateEventBuffer event } }); } + + /** + * Waits until Recon's container-key index reports at least the given number of keys + * per container id. Use after OM sync when the event buffer can be empty while a + * dequeued batch is still being processed. + *

+ * IO failures from {@code mgr} reads (including temporary {@code RocksDatabaseException} + * while Recon applies updates) are treated as "not ready yet"; the wait repeats until the + * timeout if counts never converge. + * + * @param mgr Recon container metadata manager + * @param minimumCountPerContainer map of container ID to minimum inclusive key count + * @throws Exception if the condition is not met within the timeout or on interrupt + */ + public static void waitUntilReconKeyCounts(ReconContainerMetadataManager mgr, + Map minimumCountPerContainer) throws Exception { + GenericTestUtils.waitFor(() -> { + try { + for (Map.Entry e : minimumCountPerContainer.entrySet()) { + if (mgr.getKeyCountForContainer(e.getKey()) < e.getValue()) { + return false; + } + } + return true; + } catch (IOException ex) { + // Retry: concurrent Recon indexing can transiently expose a closed Rocks handle. + return false; + } + }, 1000, 90000); + } }