diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index a90bacc9fee2..a2fcc555f18a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -269,6 +269,11 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_STALENODE_INTERVAL_DEFAULT = "5m"; + public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL = + "ozone.scm.pending.container.roll.interval"; + public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT = + "5m"; + public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT = "ozone.scm.heartbeat.rpc-timeout"; public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT = diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 2cb1f52ede15..e8581661c5c0 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1037,6 +1037,17 @@ balances the amount of metadata. + + ozone.scm.pending.container.roll.interval + 5m + OZONE, SCM, PERFORMANCE, MANAGEMENT + + The interval at which the two-window tumbling bucket for pending + container allocations rolls over per DataNode. Pending containers + that have not been confirmed within two intervals are automatically + aged out. Default is 5 minutes. + + ozone.scm.container.lock.stripes 512 diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 15a566f6421b..3234222f86c7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -238,16 +238,17 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner) private ContainerInfo allocateContainer(final Pipeline pipeline, final String owner) throws IOException { - if (!pipelineManager.hasEnoughSpace(pipeline)) { - LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline); - return null; - } - final long uniqueId = sequenceIdGen.getNextId(CONTAINER_ID); Preconditions.checkState(uniqueId > 0, "Cannot allocate container, negative container id" + " generated. %s.", uniqueId); final ContainerID containerID = ContainerID.valueOf(uniqueId); + + if (!pipelineManager.checkSpaceAndRecordAllocation(pipeline, containerID)) { + LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline); + return null; + } + final ContainerInfoProto.Builder containerInfoBuilder = ContainerInfoProto .newBuilder() .setState(LifeCycleState.OPEN) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 0cebcb10ef2c..4053abed19b1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -175,6 +175,9 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, if (!alreadyInDn) { // This is a new Container not in the nodeManager -> dn map yet getNodeManager().addContainer(datanodeDetails, cid); + // Remove from pending tracker when container is added to DN + // This container was just confirmed for the first time on this DN + getNodeManager().removePendingAllocationForDatanode(datanodeDetails.getID(), cid); } if (container == null || ContainerReportValidator .validate(container, datanodeDetails, replica)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index 247e3667d9ef..7beb7469c4e4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -103,6 +103,7 @@ protected void processICR(IncrementalContainerReportFromDatanode report, } if (ContainerReportValidator.validate(container, dd, replicaProto)) { processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging); + getNodeManager().removePendingAllocationForDatanode(dd.getID(), id); } success = true; } catch (ContainerNotFoundException e) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 4fb7f84394f3..153ac1bb36ea 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -197,6 +197,25 @@ default int getAllNodeCount() { */ void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID); + /** + * Atomically checks if the datanode has space for a new container and records the allocation + * if space is available. This prevents race conditions where multiple threads check space + * concurrently and over-allocate. + * + * @param datanodeID the ID of the DataNode receiving the allocation + * @param containerID the container being allocated + * @return true if space was available and allocation was recorded, false otherwise + */ + boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID); + + /** + * Removes a pending container allocation from a datanode. + * + * @param datanodeID the ID of the DataNode + * @param containerID the container to remove from pending + */ + void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID); + /** * Return the node stat of the specified datanode. * @param datanodeDetails DatanodeDetails. @@ -435,4 +454,9 @@ default void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExce } int openContainerLimit(List datanodes); + + /** + * SCM-side tracker for container allocations not yet reported by datanodes. + */ + PendingContainerTracker getPendingContainerTracker(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 9e4b96999df0..57ee8ef9adb6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; @@ -124,7 +125,7 @@ public class NodeStateManager implements Runnable, Closeable { */ private final long deadNodeIntervalMs; - private final long containerRollIntervalMs = 5 * 60 * 1000; //TODO + private final long containerRollIntervalMs; /** * The future is used to pause/unpause the scheduled checks. @@ -214,6 +215,11 @@ public NodeStateManager(ConfigurationSource conf, scmContext.getFinalizationCheckpoint()) && !layoutMatchCondition.test(layout); + containerRollIntervalMs = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL, + ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + scheduleNextHealthCheck(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java index fc7bbc238192..11d908129480 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.node; +import com.google.common.annotations.VisibleForTesting; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -189,6 +190,47 @@ public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanode return false; } + /** + * Atomically checks if the datanode has space for a new container and records the allocation + * if space is available. This prevents race conditions where multiple threads check space + * concurrently and over-allocate. + * + * @param datanodeInfo storage reports for the datanode + * @param containerID The container being allocated + * @return true if space was available and allocation was recorded, false otherwise + */ + public boolean checkSpaceAndRecordAllocation(DatanodeInfo datanodeInfo, ContainerID containerID) { + Objects.requireNonNull(datanodeInfo, "datanodeInfo == null"); + Objects.requireNonNull(containerID, "containerID == null"); + + TwoWindowBucket bucket = datanodeInfo.getPendingContainerAllocations(); + synchronized (bucket) { + long pendingAllocationSize = bucket.getCount() * maxContainerSize; + List storageReports = datanodeInfo.getStorageReports(); + Objects.requireNonNull(storageReports, "storageReports == null"); + if (storageReports.isEmpty()) { + return false; + } + long effectiveAllocatableSpace = 0L; + for (StorageReportProto report : storageReports) { + long usableSpace = VolumeUsage.getUsableSpace(report); + long containersOnThisDisk = usableSpace / maxContainerSize; + effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize; + if (effectiveAllocatableSpace - pendingAllocationSize >= maxContainerSize) { + boolean added = bucket.add(containerID); + if (added && metrics != null) { + metrics.incNumPendingContainersAdded(); + } + return true; + } + } + if (metrics != null) { + metrics.incNumSkippedFullNodeContainerAllocation(); + } + return false; + } + } + /** * Record a pending container allocation for a single DataNode. * Container is added to the current window. @@ -222,4 +264,9 @@ public void removePendingAllocation(TwoWindowBucket bucket, ContainerID containe metrics.incNumPendingContainersRemoved(); } } + + @VisibleForTesting + public SCMNodeMetrics getMetrics() { + return metrics; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index ad392a247d53..386ff0e04d50 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -192,7 +192,10 @@ public SCMNodeManager( this.pendingContainerTracker = new PendingContainerTracker( (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES), - 5 * 60 * 1000, // TODO + conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL, + ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS), this.metrics); this.clusterMap = networkTopology; this.nodeResolver = nodeResolver; @@ -231,6 +234,11 @@ private void unregisterMXBean() { } } + @Override + public PendingContainerTracker getPendingContainerTracker() { + return pendingContainerTracker; + } + protected NodeStateManager getNodeStateManager() { return nodeStateManager; } @@ -1095,6 +1103,27 @@ public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerI pendingContainerTracker.recordPendingAllocationForDatanode(datanodeInfo, containerID); } + @Override + public boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID) { + DatanodeInfo datanodeInfo = getNode(datanodeID); + if (datanodeInfo == null) { + LOG.warn("DatanodeInfo not found for node {}", datanodeID); + return false; + } + return pendingContainerTracker.checkSpaceAndRecordAllocation(datanodeInfo, containerID); + } + + @Override + public void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { + DatanodeInfo datanodeInfo = getNode(datanodeID); + if (datanodeInfo == null) { + LOG.warn("DatanodeInfo not found for node {}", datanodeID); + return; + } + pendingContainerTracker.removePendingAllocation( + datanodeInfo.getPendingContainerAllocations(), containerID); + } + /** * Return the node stat of the specified datanode. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java index 0014936a80db..0b90247ae8ab 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java @@ -136,6 +136,14 @@ void incNumPendingContainersRemoved() { numPendingContainersRemoved.incr(); } + public long getNumPendingContainersAdded() { + return numPendingContainersAdded.value(); + } + + public long getNumPendingContainersRemoved() { + return numPendingContainersRemoved.value(); + } + void incNumSkippedFullNodeContainerAllocation() { numSkippedFullNodeContainerAllocation.incr(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 739b0c058ec8..326bb6b882ed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -232,6 +232,17 @@ void reinitialize(Table pipelineStore) */ boolean hasEnoughSpace(Pipeline pipeline); + /** + * Atomically checks if all datanodes in the pipeline have space for a new container + * and records the allocation if space is available. This prevents race conditions + * where multiple threads check space concurrently and over-allocate. + * + * @param pipeline the pipeline whose nodes will be checked and recorded + * @param containerID the container being allocated + * @return true if all nodes had space and allocation was recorded, false otherwise + */ + boolean checkSpaceAndRecordAllocation(Pipeline pipeline, ContainerID containerID); + int openContainerLimit(List datanodes); /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index c4375e3f20ea..150e18ba97e7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -23,6 +23,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -650,6 +651,21 @@ public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) } } + @Override + public boolean checkSpaceAndRecordAllocation(Pipeline pipeline, ContainerID containerID) { + List successfulNodes = new ArrayList<>(); + for (DatanodeDetails dn : pipeline.getNodes()) { + if (!nodeManager.checkSpaceAndRecordAllocation(dn.getID(), containerID)) { + for (DatanodeDetails rollbackNode : successfulNodes) { + nodeManager.removePendingAllocationForDatanode(rollbackNode.getID(), containerID); + } + return false; + } + successfulNodes.add(dn); + } + return true; + } + /** * Schedules a fixed interval job to create pipelines. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 156eed688d81..38cacb1c6086 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -255,6 +255,13 @@ public ContainerWithPipeline allocateContainer(ReplicationConfig replicationConf getScm().checkAdminAccess(getRemoteUser(), false); final ContainerInfo container = scm.getContainerManager() .allocateContainer(replicationConfig, owner); + if (container == null) { + throw new SCMException( + "Could not allocate container for replication " + replicationConfig + + ", owner=" + owner + + ": no suitable open pipeline with enough space", + ResultCodes.FAILED_TO_ALLOCATE_CONTAINER); + } final Pipeline pipeline = scm.getPipelineManager() .getPipeline(container.getPipelineID()); ContainerWithPipeline cp = new ContainerWithPipeline(container, pipeline); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 57d38ece3dd6..65ebcc7b74bd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -462,6 +462,30 @@ public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerI pendingContainerTracker.recordPendingAllocationForDatanode(info, containerID); } + @Override + public boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID) { + DatanodeDetails dd = nodeMetricMap.keySet().stream() + .filter(d -> d.getID().equals(datanodeID)) + .findFirst().orElse(null); + DatanodeInfo info = getDatanodeInfo(dd); + if (info == null) { + return false; + } + return pendingContainerTracker.checkSpaceAndRecordAllocation(info, containerID); + } + + @Override + public void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { + DatanodeDetails dd = nodeMetricMap.keySet().stream() + .filter(d -> d.getID().equals(datanodeID)) + .findFirst().orElse(null); + DatanodeInfo info = getDatanodeInfo(dd); + if (info != null) { + pendingContainerTracker.removePendingAllocation( + info.getPendingContainerAllocations(), containerID); + } + } + /** * Return the node stat of the specified datanode. * @param datanodeDetails - datanode details. @@ -943,6 +967,11 @@ public int openContainerLimit(List datanodes) { return 9; } + @Override + public PendingContainerTracker getPendingContainerTracker() { + return pendingContainerTracker; + } + @Override public long getLastHeartbeat(DatanodeDetails datanodeDetails) { return -1; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java index f2da8fd2878b..9ba73327847b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; @@ -63,6 +64,7 @@ public class SimpleMockNodeManager implements NodeManager { private Map nodeMap = new ConcurrentHashMap<>(); private Map> pipelineMap = new ConcurrentHashMap<>(); private Map> containerMap = new ConcurrentHashMap<>(); + private PendingContainerTracker pendingContainerTracker; public void register(DatanodeDetails dd, NodeStatus status) { dd.setPersistedOpState(status.getOperationalState()); @@ -253,6 +255,15 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) { public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { } + @Override + public boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID) { + return true; + } + + @Override + public void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) { + } + @Override public boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID) { return true; @@ -445,4 +456,12 @@ public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) { return false; } + @Override + public PendingContainerTracker getPendingContainerTracker() { + int rollIntervalMs = 5 * 60 * 1000; + if (pendingContainerTracker == null) { + pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, rollIntervalMs, null); + } + return pendingContainerTracker; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index 68dcc634a5e3..c64db89290aa 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -102,7 +102,9 @@ void setUp() throws Exception { pipelineManager = spy(base); // Default: allow allocation in tests unless a test overrides it. - doReturn(true).when(pipelineManager).hasEnoughSpace(any(Pipeline.class)); + // Allocation uses checkSpaceAndRecordAllocation + doReturn(true).when(pipelineManager) + .checkSpaceAndRecordAllocation(any(Pipeline.class), any(ContainerID.class)); pipelineManager.createPipeline(RatisReplicationConfig.getInstance( ReplicationFactor.THREE)); @@ -140,11 +142,12 @@ void testAllocateContainer() throws Exception { */ @Test public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() throws IOException { - doReturn(false).when(pipelineManager).hasEnoughSpace(any()); + doReturn(false).when(pipelineManager) + .checkSpaceAndRecordAllocation(any(Pipeline.class), any(ContainerID.class)); long sizeRequired = 256 * 1024 * 1024; // 256 MB Pipeline pipeline = pipelineManager.getPipelines().iterator().next(); - // MockPipelineManager#hasEnoughSpace always returns false + // MockPipelineManager#checkSpaceAndRecordAllocation always returns false // the pipeline has no existing containers, so a new container gets allocated in getMatchingContainer ContainerInfo container = containerManager .getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet()); @@ -162,10 +165,10 @@ public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() t public void testGetMatchingContainerReturnsContainerWhenEnoughSpaceInDatanodes() throws IOException { long sizeRequired = 256 * 1024 * 1024; // 256 MB - // create a spy to mock hasEnoughSpace to always return true + // create a spy to mock checkSpaceAndRecordAllocation to always return true PipelineManager spyPipelineManager = spy(pipelineManager); doReturn(true).when(spyPipelineManager) - .hasEnoughSpace(any(Pipeline.class)); + .checkSpaceAndRecordAllocation(any(Pipeline.class), any(ContainerID.class)); // create a new ContainerManager using the spy File tempDir = new File(testDir, "tempDir"); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 4dbe79fc1351..681344531fa0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl; @@ -69,6 +70,7 @@ import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -159,7 +161,8 @@ SCMNodeManager createNodeManager(OzoneConfiguration config) { ContainerManager createContainerManager() throws IOException { pipelineManager = spy(pipelineManager); - doReturn(true).when(pipelineManager).hasEnoughSpace(any()); + doReturn(true).when(pipelineManager) + .checkSpaceAndRecordAllocation(any(Pipeline.class), any(ContainerID.class)); return new ContainerManagerImpl(conf, scmhaManager, sequenceIdGen, pipelineManager, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index 69b1e24dfe3b..db594e560c57 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -335,7 +335,12 @@ public boolean isPipelineCreationFrozen() { @Override public boolean hasEnoughSpace(Pipeline pipeline) { - return false; + for (DatanodeDetails node : pipeline.getNodes()) { + if (!nodeManager.hasSpaceForNewContainerAllocation(node.getID())) { + return false; + } + } + return true; } @Override @@ -345,6 +350,16 @@ public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) } } + @Override + public boolean checkSpaceAndRecordAllocation(Pipeline pipeline, ContainerID containerID) { + for (DatanodeDetails dn : pipeline.getNodes()) { + if (!nodeManager.checkSpaceAndRecordAllocation(dn.getID(), containerID)) { + return false; + } + } + return true; + } + @Override public int openContainerLimit(List datanodes) { // For tests that do not care about this limit, return a large value. diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java new file mode 100644 index 000000000000..2b84ce901c08 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.function.BooleanSupplier; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; +import org.apache.hadoop.hdds.scm.node.PendingContainerTracker; +import org.apache.hadoop.hdds.scm.node.SCMNodeManager; +import org.apache.hadoop.hdds.scm.node.SCMNodeMetrics; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for PendingContainerTracker. + */ +@Timeout(300) +public class TestPendingContainerTrackerIntegration { + + private static final Logger LOG = + LoggerFactory.getLogger(TestPendingContainerTrackerIntegration.class); + private MiniOzoneCluster cluster; + private StorageContainerManager scm; + private OzoneClient client; + private ContainerManager containerManager; + private PendingContainerTracker pendingTracker; + private SCMNodeMetrics metrics; + private OzoneBucket bucket; + private SCMNodeManager nodeManager; + + @BeforeEach + public void setup() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + + conf.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "60s"); + + // Reduce heartbeat interval for faster container reports + conf.set(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, "10s"); + + conf.set("ozone.scm.container.size", "100MB"); + conf.set("ozone.scm.pipeline.owner.container.count", "1"); + conf.set("ozone.scm.pipeline.per.metadata.disk", "1"); + conf.set("ozone.scm.datanode.pipeline.limit", "1"); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + cluster.waitTobeOutOfSafeMode(); + + scm = cluster.getStorageContainerManager(); + containerManager = scm.getContainerManager(); + client = cluster.newClient(); + + // Create bucket for testing + bucket = TestDataUtil.createVolumeAndBucket(client); + + nodeManager = (SCMNodeManager) scm.getScmNodeManager(); + assertNotNull(nodeManager); + pendingTracker = nodeManager.getPendingContainerTracker(); + assertNotNull(pendingTracker, "PendingContainerTracker should be initialized"); + metrics = pendingTracker.getMetrics(); + + LOG.info("Test setup complete - ICR interval: 5s, Heartbeat interval: 1s"); + } + + @AfterEach + public void cleanup() throws Exception { + if (client != null) { + client.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test: Write key → Container allocation → Pending tracked → ICR → Pending removed. + */ + @Test + public void testKeyWriteRecordsPendingAndICRRemovesIt() throws Exception { + long initialAdded = metrics.getNumPendingContainersAdded(); + long initialRemoved = metrics.getNumPendingContainersRemoved(); + + // Allocate a container directly + containerManager.allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + "omServiceIdDefault"); + + // Verify the added metric increased, meaning pending was recorded + GenericTestUtils.waitFor( + (BooleanSupplier) () -> metrics.getNumPendingContainersAdded() > initialAdded, + 100, 5000); + + long afterAdded = metrics.getNumPendingContainersAdded(); + assertThat(afterAdded).isGreaterThan(initialAdded); + + LOG.info("Pending tracked successfully. Waiting for ICR to remove pending..."); + + // Write a key so datanodes send ICRs + String keyName = "testKey1"; + byte[] data = "Testing Pending Container Tracker".getBytes(UTF_8); + + LOG.info("Writing key: {}", keyName); + try (OzoneOutputStream out = bucket.createKey(keyName, data.length, + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new java.util.HashMap<>())) { + out.write(data); + } + LOG.info("Key written successfully"); + + // Wait for ICRs to be processed and removed metric to increase + GenericTestUtils.waitFor( + (BooleanSupplier) () -> metrics.getNumPendingContainersRemoved() > initialRemoved, + 100, 5000); + + long afterRemoved = metrics.getNumPendingContainersRemoved(); + assertThat(afterRemoved).isGreaterThan(initialRemoved); + + LOG.info("After added={}, removed={}", afterAdded, afterRemoved); + } + + /** + * Test: Verify idempotency - adding the same container twice does not double-count in metrics. + */ + @Test + public void testIdempotentPendingTracking() throws Exception { + // Allocate a container directly + ContainerInfo container = containerManager.allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + "omServiceIdDefault"); + + Pipeline pipeline = scm.getPipelineManager().getPipeline(container.getPipelineID()); + DatanodeDetails firstNode = pipeline.getFirstNode(); + DatanodeInfo firstNodeInfo = nodeManager.getDatanodeInfo(firstNode); + + // Capture the added metric after initial allocation + long addedAfterAllocation = metrics.getNumPendingContainersAdded(); + + LOG.info("Pending added metric after allocation: {}", addedAfterAllocation); + + // Try adding the same container again (simulates retry or duplicate allocation) + pendingTracker.recordPendingAllocationForDatanode(firstNodeInfo, container.containerID()); + + // The metric should not have increased since it was a duplicate + assertEquals(addedAfterAllocation, metrics.getNumPendingContainersAdded(), + "Pending added metric should not change when adding duplicate container"); + } + + /** + * Test: Verify metrics are updated correctly. + */ + @Test + public void testMetricsUpdateThroughLifecycle() throws Exception { + long initialAdded = metrics.getNumPendingContainersAdded(); + long initialRemoved = metrics.getNumPendingContainersRemoved(); + + LOG.info("Initial metrics: added={}, removed={}", initialAdded, initialRemoved); + + // Write multiple keys + for (int i = 0; i < 3; i++) { + String keyName = "metricsTestKey" + i; + byte[] data = ("Metrics test " + i).getBytes(UTF_8); + + try (OzoneOutputStream out = bucket.createKey(keyName, data.length, + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new java.util.HashMap<>())) { + out.write(data); + } + } + + // addedMetrics should increase as containers are allocated + GenericTestUtils.waitFor( + (BooleanSupplier) () -> metrics.getNumPendingContainersAdded() > initialAdded, + 100, 5000); + + // Removed metric should increase after ICR processing + GenericTestUtils.waitFor( + (BooleanSupplier) () -> metrics.getNumPendingContainersRemoved() > initialRemoved, + 100, 5000); + } +}