diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index a90bacc9fee2..a2fcc555f18a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -269,6 +269,11 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_STALENODE_INTERVAL_DEFAULT =
"5m";
+ public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL =
+ "ozone.scm.pending.container.roll.interval";
+ public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT =
+ "5m";
+
public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT =
"ozone.scm.heartbeat.rpc-timeout";
public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2cb1f52ede15..e8581661c5c0 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1037,6 +1037,17 @@
balances the amount of metadata.
+
+ ozone.scm.pending.container.roll.interval
+ 5m
+ OZONE, SCM, PERFORMANCE, MANAGEMENT
+
+ The interval at which the two-window tumbling bucket for pending
+ container allocations rolls over per DataNode. Pending containers
+ that have not been confirmed within two intervals are automatically
+ aged out. Default is 5 minutes.
+
+
ozone.scm.container.lock.stripes
512
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index 15a566f6421b..3234222f86c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -238,16 +238,17 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner)
private ContainerInfo allocateContainer(final Pipeline pipeline,
final String owner)
throws IOException {
- if (!pipelineManager.hasEnoughSpace(pipeline)) {
- LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline);
- return null;
- }
-
final long uniqueId = sequenceIdGen.getNextId(CONTAINER_ID);
Preconditions.checkState(uniqueId > 0,
"Cannot allocate container, negative container id" +
" generated. %s.", uniqueId);
final ContainerID containerID = ContainerID.valueOf(uniqueId);
+
+ if (!pipelineManager.checkSpaceAndRecordAllocation(pipeline, containerID)) {
+ LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline);
+ return null;
+ }
+
final ContainerInfoProto.Builder containerInfoBuilder = ContainerInfoProto
.newBuilder()
.setState(LifeCycleState.OPEN)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
index 0cebcb10ef2c..4053abed19b1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java
@@ -175,6 +175,9 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
if (!alreadyInDn) {
// This is a new Container not in the nodeManager -> dn map yet
getNodeManager().addContainer(datanodeDetails, cid);
+ // Remove from pending tracker when container is added to DN
+ // This container was just confirmed for the first time on this DN
+ getNodeManager().removePendingAllocationForDatanode(datanodeDetails.getID(), cid);
}
if (container == null || ContainerReportValidator
.validate(container, datanodeDetails, replica)) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
index 247e3667d9ef..7beb7469c4e4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java
@@ -103,6 +103,7 @@ protected void processICR(IncrementalContainerReportFromDatanode report,
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging);
+ getNodeManager().removePendingAllocationForDatanode(dd.getID(), id);
}
success = true;
} catch (ContainerNotFoundException e) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 4fb7f84394f3..153ac1bb36ea 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -197,6 +197,25 @@ default int getAllNodeCount() {
*/
void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID);
+ /**
+ * Atomically checks if the datanode has space for a new container and records the allocation
+ * if space is available. This prevents race conditions where multiple threads check space
+ * concurrently and over-allocate.
+ *
+ * @param datanodeID the ID of the DataNode receiving the allocation
+ * @param containerID the container being allocated
+ * @return true if space was available and allocation was recorded, false otherwise
+ */
+ boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID);
+
+ /**
+ * Removes a pending container allocation from a datanode.
+ *
+ * @param datanodeID the ID of the DataNode
+ * @param containerID the container to remove from pending
+ */
+ void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID);
+
/**
* Return the node stat of the specified datanode.
* @param datanodeDetails DatanodeDetails.
@@ -435,4 +454,9 @@ default void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExce
}
int openContainerLimit(List datanodes);
+
+ /**
+ * SCM-side tracker for container allocations not yet reported by datanodes.
+ */
+ PendingContainerTracker getPendingContainerTracker();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 9e4b96999df0..57ee8ef9adb6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -124,7 +125,7 @@ public class NodeStateManager implements Runnable, Closeable {
*/
private final long deadNodeIntervalMs;
- private final long containerRollIntervalMs = 5 * 60 * 1000; //TODO
+ private final long containerRollIntervalMs;
/**
* The future is used to pause/unpause the scheduled checks.
@@ -214,6 +215,11 @@ public NodeStateManager(ConfigurationSource conf,
scmContext.getFinalizationCheckpoint()) &&
!layoutMatchCondition.test(layout);
+ containerRollIntervalMs = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL,
+ ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
scheduleNextHealthCheck();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java
index fc7bbc238192..11d908129480 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.node;
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@@ -189,6 +190,47 @@ public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanode
return false;
}
+ /**
+ * Atomically checks if the datanode has space for a new container and records the allocation
+ * if space is available. This prevents race conditions where multiple threads check space
+ * concurrently and over-allocate.
+ *
+ * @param datanodeInfo storage reports for the datanode
+ * @param containerID The container being allocated
+ * @return true if space was available and allocation was recorded, false otherwise
+ */
+ public boolean checkSpaceAndRecordAllocation(DatanodeInfo datanodeInfo, ContainerID containerID) {
+ Objects.requireNonNull(datanodeInfo, "datanodeInfo == null");
+ Objects.requireNonNull(containerID, "containerID == null");
+
+ TwoWindowBucket bucket = datanodeInfo.getPendingContainerAllocations();
+ synchronized (bucket) {
+ long pendingAllocationSize = bucket.getCount() * maxContainerSize;
+ List storageReports = datanodeInfo.getStorageReports();
+ Objects.requireNonNull(storageReports, "storageReports == null");
+ if (storageReports.isEmpty()) {
+ return false;
+ }
+ long effectiveAllocatableSpace = 0L;
+ for (StorageReportProto report : storageReports) {
+ long usableSpace = VolumeUsage.getUsableSpace(report);
+ long containersOnThisDisk = usableSpace / maxContainerSize;
+ effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
+ if (effectiveAllocatableSpace - pendingAllocationSize >= maxContainerSize) {
+ boolean added = bucket.add(containerID);
+ if (added && metrics != null) {
+ metrics.incNumPendingContainersAdded();
+ }
+ return true;
+ }
+ }
+ if (metrics != null) {
+ metrics.incNumSkippedFullNodeContainerAllocation();
+ }
+ return false;
+ }
+ }
+
/**
* Record a pending container allocation for a single DataNode.
* Container is added to the current window.
@@ -222,4 +264,9 @@ public void removePendingAllocation(TwoWindowBucket bucket, ContainerID containe
metrics.incNumPendingContainersRemoved();
}
}
+
+ @VisibleForTesting
+ public SCMNodeMetrics getMetrics() {
+ return metrics;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index ad392a247d53..386ff0e04d50 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -192,7 +192,10 @@ public SCMNodeManager(
this.pendingContainerTracker = new PendingContainerTracker(
(long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES),
- 5 * 60 * 1000, // TODO
+ conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL,
+ ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS),
this.metrics);
this.clusterMap = networkTopology;
this.nodeResolver = nodeResolver;
@@ -231,6 +234,11 @@ private void unregisterMXBean() {
}
}
+ @Override
+ public PendingContainerTracker getPendingContainerTracker() {
+ return pendingContainerTracker;
+ }
+
protected NodeStateManager getNodeStateManager() {
return nodeStateManager;
}
@@ -1095,6 +1103,27 @@ public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerI
pendingContainerTracker.recordPendingAllocationForDatanode(datanodeInfo, containerID);
}
+ @Override
+ public boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID) {
+ DatanodeInfo datanodeInfo = getNode(datanodeID);
+ if (datanodeInfo == null) {
+ LOG.warn("DatanodeInfo not found for node {}", datanodeID);
+ return false;
+ }
+ return pendingContainerTracker.checkSpaceAndRecordAllocation(datanodeInfo, containerID);
+ }
+
+ @Override
+ public void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) {
+ DatanodeInfo datanodeInfo = getNode(datanodeID);
+ if (datanodeInfo == null) {
+ LOG.warn("DatanodeInfo not found for node {}", datanodeID);
+ return;
+ }
+ pendingContainerTracker.removePendingAllocation(
+ datanodeInfo.getPendingContainerAllocations(), containerID);
+ }
+
/**
* Return the node stat of the specified datanode.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
index 0014936a80db..0b90247ae8ab 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
@@ -136,6 +136,14 @@ void incNumPendingContainersRemoved() {
numPendingContainersRemoved.incr();
}
+ public long getNumPendingContainersAdded() {
+ return numPendingContainersAdded.value();
+ }
+
+ public long getNumPendingContainersRemoved() {
+ return numPendingContainersRemoved.value();
+ }
+
void incNumSkippedFullNodeContainerAllocation() {
numSkippedFullNodeContainerAllocation.incr();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 739b0c058ec8..326bb6b882ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -232,6 +232,17 @@ void reinitialize(Table pipelineStore)
*/
boolean hasEnoughSpace(Pipeline pipeline);
+ /**
+ * Atomically checks if all datanodes in the pipeline have space for a new container
+ * and records the allocation if space is available. This prevents race conditions
+ * where multiple threads check space concurrently and over-allocate.
+ *
+ * @param pipeline the pipeline whose nodes will be checked and recorded
+ * @param containerID the container being allocated
+ * @return true if all nodes had space and allocation was recorded, false otherwise
+ */
+ boolean checkSpaceAndRecordAllocation(Pipeline pipeline, ContainerID containerID);
+
int openContainerLimit(List datanodes);
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index c4375e3f20ea..150e18ba97e7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -23,6 +23,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -650,6 +651,21 @@ public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID)
}
}
+ @Override
+ public boolean checkSpaceAndRecordAllocation(Pipeline pipeline, ContainerID containerID) {
+ List successfulNodes = new ArrayList<>();
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ if (!nodeManager.checkSpaceAndRecordAllocation(dn.getID(), containerID)) {
+ for (DatanodeDetails rollbackNode : successfulNodes) {
+ nodeManager.removePendingAllocationForDatanode(rollbackNode.getID(), containerID);
+ }
+ return false;
+ }
+ successfulNodes.add(dn);
+ }
+ return true;
+ }
+
/**
* Schedules a fixed interval job to create pipelines.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 156eed688d81..38cacb1c6086 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -255,6 +255,13 @@ public ContainerWithPipeline allocateContainer(ReplicationConfig replicationConf
getScm().checkAdminAccess(getRemoteUser(), false);
final ContainerInfo container = scm.getContainerManager()
.allocateContainer(replicationConfig, owner);
+ if (container == null) {
+ throw new SCMException(
+ "Could not allocate container for replication " + replicationConfig
+ + ", owner=" + owner
+ + ": no suitable open pipeline with enough space",
+ ResultCodes.FAILED_TO_ALLOCATE_CONTAINER);
+ }
final Pipeline pipeline = scm.getPipelineManager()
.getPipeline(container.getPipelineID());
ContainerWithPipeline cp = new ContainerWithPipeline(container, pipeline);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 57d38ece3dd6..65ebcc7b74bd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -462,6 +462,30 @@ public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerI
pendingContainerTracker.recordPendingAllocationForDatanode(info, containerID);
}
+ @Override
+ public boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID) {
+ DatanodeDetails dd = nodeMetricMap.keySet().stream()
+ .filter(d -> d.getID().equals(datanodeID))
+ .findFirst().orElse(null);
+ DatanodeInfo info = getDatanodeInfo(dd);
+ if (info == null) {
+ return false;
+ }
+ return pendingContainerTracker.checkSpaceAndRecordAllocation(info, containerID);
+ }
+
+ @Override
+ public void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) {
+ DatanodeDetails dd = nodeMetricMap.keySet().stream()
+ .filter(d -> d.getID().equals(datanodeID))
+ .findFirst().orElse(null);
+ DatanodeInfo info = getDatanodeInfo(dd);
+ if (info != null) {
+ pendingContainerTracker.removePendingAllocation(
+ info.getPendingContainerAllocations(), containerID);
+ }
+ }
+
/**
* Return the node stat of the specified datanode.
* @param datanodeDetails - datanode details.
@@ -943,6 +967,11 @@ public int openContainerLimit(List datanodes) {
return 9;
}
+ @Override
+ public PendingContainerTracker getPendingContainerTracker() {
+ return pendingContainerTracker;
+ }
+
@Override
public long getLastHeartbeat(DatanodeDetails datanodeDetails) {
return -1;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index f2da8fd2878b..9ba73327847b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.PendingContainerTracker;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -63,6 +64,7 @@ public class SimpleMockNodeManager implements NodeManager {
private Map nodeMap = new ConcurrentHashMap<>();
private Map> pipelineMap = new ConcurrentHashMap<>();
private Map> containerMap = new ConcurrentHashMap<>();
+ private PendingContainerTracker pendingContainerTracker;
public void register(DatanodeDetails dd, NodeStatus status) {
dd.setPersistedOpState(status.getOperationalState());
@@ -253,6 +255,15 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) {
}
+ @Override
+ public boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID) {
+ return true;
+ }
+
+ @Override
+ public void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) {
+ }
+
@Override
public boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID) {
return true;
@@ -445,4 +456,12 @@ public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
return false;
}
+ @Override
+ public PendingContainerTracker getPendingContainerTracker() {
+ int rollIntervalMs = 5 * 60 * 1000;
+ if (pendingContainerTracker == null) {
+ pendingContainerTracker = new PendingContainerTracker(5L * 1024 * 1024 * 1024, rollIntervalMs, null);
+ }
+ return pendingContainerTracker;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index 68dcc634a5e3..c64db89290aa 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@ -102,7 +102,9 @@ void setUp() throws Exception {
pipelineManager = spy(base);
// Default: allow allocation in tests unless a test overrides it.
- doReturn(true).when(pipelineManager).hasEnoughSpace(any(Pipeline.class));
+ // Allocation uses checkSpaceAndRecordAllocation
+ doReturn(true).when(pipelineManager)
+ .checkSpaceAndRecordAllocation(any(Pipeline.class), any(ContainerID.class));
pipelineManager.createPipeline(RatisReplicationConfig.getInstance(
ReplicationFactor.THREE));
@@ -140,11 +142,12 @@ void testAllocateContainer() throws Exception {
*/
@Test
public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() throws IOException {
- doReturn(false).when(pipelineManager).hasEnoughSpace(any());
+ doReturn(false).when(pipelineManager)
+ .checkSpaceAndRecordAllocation(any(Pipeline.class), any(ContainerID.class));
long sizeRequired = 256 * 1024 * 1024; // 256 MB
Pipeline pipeline = pipelineManager.getPipelines().iterator().next();
- // MockPipelineManager#hasEnoughSpace always returns false
+ // MockPipelineManager#checkSpaceAndRecordAllocation always returns false
// the pipeline has no existing containers, so a new container gets allocated in getMatchingContainer
ContainerInfo container = containerManager
.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet());
@@ -162,10 +165,10 @@ public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() t
public void testGetMatchingContainerReturnsContainerWhenEnoughSpaceInDatanodes() throws IOException {
long sizeRequired = 256 * 1024 * 1024; // 256 MB
- // create a spy to mock hasEnoughSpace to always return true
+ // create a spy to mock checkSpaceAndRecordAllocation to always return true
PipelineManager spyPipelineManager = spy(pipelineManager);
doReturn(true).when(spyPipelineManager)
- .hasEnoughSpace(any(Pipeline.class));
+ .checkSpaceAndRecordAllocation(any(Pipeline.class), any(ContainerID.class));
// create a new ContainerManager using the spy
File tempDir = new File(testDir, "tempDir");
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 4dbe79fc1351..681344531fa0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -50,6 +50,7 @@
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
@@ -69,6 +70,7 @@
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -159,7 +161,8 @@ SCMNodeManager createNodeManager(OzoneConfiguration config) {
ContainerManager createContainerManager()
throws IOException {
pipelineManager = spy(pipelineManager);
- doReturn(true).when(pipelineManager).hasEnoughSpace(any());
+ doReturn(true).when(pipelineManager)
+ .checkSpaceAndRecordAllocation(any(Pipeline.class), any(ContainerID.class));
return new ContainerManagerImpl(conf,
scmhaManager, sequenceIdGen, pipelineManager,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 69b1e24dfe3b..db594e560c57 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -335,7 +335,12 @@ public boolean isPipelineCreationFrozen() {
@Override
public boolean hasEnoughSpace(Pipeline pipeline) {
- return false;
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ if (!nodeManager.hasSpaceForNewContainerAllocation(node.getID())) {
+ return false;
+ }
+ }
+ return true;
}
@Override
@@ -345,6 +350,16 @@ public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID)
}
}
+ @Override
+ public boolean checkSpaceAndRecordAllocation(Pipeline pipeline, ContainerID containerID) {
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ if (!nodeManager.checkSpaceAndRecordAllocation(dn.getID(), containerID)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
@Override
public int openContainerLimit(List datanodes) {
// For tests that do not care about this limit, return a large value.
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java
new file mode 100644
index 000000000000..2b84ce901c08
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.function.BooleanSupplier;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.PendingContainerTracker;
+import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
+import org.apache.hadoop.hdds.scm.node.SCMNodeMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration tests for PendingContainerTracker.
+ */
+@Timeout(300)
+public class TestPendingContainerTrackerIntegration {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestPendingContainerTrackerIntegration.class);
+ private MiniOzoneCluster cluster;
+ private StorageContainerManager scm;
+ private OzoneClient client;
+ private ContainerManager containerManager;
+ private PendingContainerTracker pendingTracker;
+ private SCMNodeMetrics metrics;
+ private OzoneBucket bucket;
+ private SCMNodeManager nodeManager;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+
+ conf.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "60s");
+
+ // Reduce heartbeat interval for faster container reports
+ conf.set(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, "10s");
+
+ conf.set("ozone.scm.container.size", "100MB");
+ conf.set("ozone.scm.pipeline.owner.container.count", "1");
+ conf.set("ozone.scm.pipeline.per.metadata.disk", "1");
+ conf.set("ozone.scm.datanode.pipeline.limit", "1");
+
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(3)
+ .build();
+ cluster.waitForClusterToBeReady();
+ cluster.waitTobeOutOfSafeMode();
+
+ scm = cluster.getStorageContainerManager();
+ containerManager = scm.getContainerManager();
+ client = cluster.newClient();
+
+ // Create bucket for testing
+ bucket = TestDataUtil.createVolumeAndBucket(client);
+
+ nodeManager = (SCMNodeManager) scm.getScmNodeManager();
+ assertNotNull(nodeManager);
+ pendingTracker = nodeManager.getPendingContainerTracker();
+ assertNotNull(pendingTracker, "PendingContainerTracker should be initialized");
+ metrics = pendingTracker.getMetrics();
+
+ LOG.info("Test setup complete - ICR interval: 5s, Heartbeat interval: 1s");
+ }
+
+ @AfterEach
+ public void cleanup() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test: Write key → Container allocation → Pending tracked → ICR → Pending removed.
+ */
+ @Test
+ public void testKeyWriteRecordsPendingAndICRRemovesIt() throws Exception {
+ long initialAdded = metrics.getNumPendingContainersAdded();
+ long initialRemoved = metrics.getNumPendingContainersRemoved();
+
+ // Allocate a container directly
+ containerManager.allocateContainer(
+ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ "omServiceIdDefault");
+
+ // Verify the added metric increased, meaning pending was recorded
+ GenericTestUtils.waitFor(
+ (BooleanSupplier) () -> metrics.getNumPendingContainersAdded() > initialAdded,
+ 100, 5000);
+
+ long afterAdded = metrics.getNumPendingContainersAdded();
+ assertThat(afterAdded).isGreaterThan(initialAdded);
+
+ LOG.info("Pending tracked successfully. Waiting for ICR to remove pending...");
+
+ // Write a key so datanodes send ICRs
+ String keyName = "testKey1";
+ byte[] data = "Testing Pending Container Tracker".getBytes(UTF_8);
+
+ LOG.info("Writing key: {}", keyName);
+ try (OzoneOutputStream out = bucket.createKey(keyName, data.length,
+ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ new java.util.HashMap<>())) {
+ out.write(data);
+ }
+ LOG.info("Key written successfully");
+
+ // Wait for ICRs to be processed and removed metric to increase
+ GenericTestUtils.waitFor(
+ (BooleanSupplier) () -> metrics.getNumPendingContainersRemoved() > initialRemoved,
+ 100, 5000);
+
+ long afterRemoved = metrics.getNumPendingContainersRemoved();
+ assertThat(afterRemoved).isGreaterThan(initialRemoved);
+
+ LOG.info("After added={}, removed={}", afterAdded, afterRemoved);
+ }
+
+ /**
+ * Test: Verify idempotency - adding the same container twice does not double-count in metrics.
+ */
+ @Test
+ public void testIdempotentPendingTracking() throws Exception {
+ // Allocate a container directly
+ ContainerInfo container = containerManager.allocateContainer(
+ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ "omServiceIdDefault");
+
+ Pipeline pipeline = scm.getPipelineManager().getPipeline(container.getPipelineID());
+ DatanodeDetails firstNode = pipeline.getFirstNode();
+ DatanodeInfo firstNodeInfo = nodeManager.getDatanodeInfo(firstNode);
+
+ // Capture the added metric after initial allocation
+ long addedAfterAllocation = metrics.getNumPendingContainersAdded();
+
+ LOG.info("Pending added metric after allocation: {}", addedAfterAllocation);
+
+ // Try adding the same container again (simulates retry or duplicate allocation)
+ pendingTracker.recordPendingAllocationForDatanode(firstNodeInfo, container.containerID());
+
+ // The metric should not have increased since it was a duplicate
+ assertEquals(addedAfterAllocation, metrics.getNumPendingContainersAdded(),
+ "Pending added metric should not change when adding duplicate container");
+ }
+
+ /**
+ * Test: Verify metrics are updated correctly.
+ */
+ @Test
+ public void testMetricsUpdateThroughLifecycle() throws Exception {
+ long initialAdded = metrics.getNumPendingContainersAdded();
+ long initialRemoved = metrics.getNumPendingContainersRemoved();
+
+ LOG.info("Initial metrics: added={}, removed={}", initialAdded, initialRemoved);
+
+ // Write multiple keys
+ for (int i = 0; i < 3; i++) {
+ String keyName = "metricsTestKey" + i;
+ byte[] data = ("Metrics test " + i).getBytes(UTF_8);
+
+ try (OzoneOutputStream out = bucket.createKey(keyName, data.length,
+ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ new java.util.HashMap<>())) {
+ out.write(data);
+ }
+ }
+
+ // addedMetrics should increase as containers are allocated
+ GenericTestUtils.waitFor(
+ (BooleanSupplier) () -> metrics.getNumPendingContainersAdded() > initialAdded,
+ 100, 5000);
+
+ // Removed metric should increase after ICR processing
+ GenericTestUtils.waitFor(
+ (BooleanSupplier) () -> metrics.getNumPendingContainersRemoved() > initialRemoved,
+ 100, 5000);
+ }
+}