Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_STALENODE_INTERVAL_DEFAULT =
"5m";

public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL =
"ozone.scm.pending.container.roll.interval";
public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT =
"5m";

public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT =
"ozone.scm.heartbeat.rpc-timeout";
public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT =
Expand Down
11 changes: 11 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,17 @@
balances the amount of metadata.
</description>
</property>
<property>
<name>ozone.scm.pending.container.roll.interval</name>
<value>5m</value>
<tag>OZONE, SCM, PERFORMANCE, MANAGEMENT</tag>
<description>
The interval at which the two-window tumbling bucket for pending
container allocations rolls over per DataNode. Pending containers
that have not been confirmed within two intervals are automatically
aged out. Default is 5 minutes.
</description>
</property>
<property>
<name>ozone.scm.container.lock.stripes</name>
<value>512</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,17 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner)
private ContainerInfo allocateContainer(final Pipeline pipeline,
final String owner)
throws IOException {
if (!pipelineManager.hasEnoughSpace(pipeline)) {
LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline);
return null;
}

final long uniqueId = sequenceIdGen.getNextId(CONTAINER_ID);
Preconditions.checkState(uniqueId > 0,
"Cannot allocate container, negative container id" +
" generated. %s.", uniqueId);
final ContainerID containerID = ContainerID.valueOf(uniqueId);

if (!pipelineManager.checkSpaceAndRecordAllocation(pipeline, containerID)) {
LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline);
return null;
}

final ContainerInfoProto.Builder containerInfoBuilder = ContainerInfoProto
.newBuilder()
.setState(LifeCycleState.OPEN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
if (!alreadyInDn) {
// This is a new Container not in the nodeManager -> dn map yet
getNodeManager().addContainer(datanodeDetails, cid);
// Remove from pending tracker when container is added to DN
// This container was just confirmed for the first time on this DN
getNodeManager().removePendingAllocationForDatanode(datanodeDetails.getID(), cid);
}
if (container == null || ContainerReportValidator
.validate(container, datanodeDetails, replica)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ protected void processICR(IncrementalContainerReportFromDatanode report,
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging);
getNodeManager().removePendingAllocationForDatanode(dd.getID(), id);
}
success = true;
} catch (ContainerNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,25 @@ default int getAllNodeCount() {
*/
void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID);

/**
* Atomically checks if the datanode has space for a new container and records the allocation
* if space is available. This prevents race conditions where multiple threads check space
* concurrently and over-allocate.
*
* @param datanodeID the ID of the DataNode receiving the allocation
* @param containerID the container being allocated
* @return true if space was available and allocation was recorded, false otherwise
*/
boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID);

/**
* Removes a pending container allocation from a datanode.
*
* @param datanodeID the ID of the DataNode
* @param containerID the container to remove from pending
*/
void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID);

/**
* Return the node stat of the specified datanode.
* @param datanodeDetails DatanodeDetails.
Expand Down Expand Up @@ -435,4 +454,9 @@ default void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExce
}

int openContainerLimit(List<DatanodeDetails> datanodes);

/**
* SCM-side tracker for container allocations not yet reported by datanodes.
*/
PendingContainerTracker getPendingContainerTracker();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
Expand Down Expand Up @@ -124,7 +125,7 @@ public class NodeStateManager implements Runnable, Closeable {
*/
private final long deadNodeIntervalMs;

private final long containerRollIntervalMs = 5 * 60 * 1000; //TODO
private final long containerRollIntervalMs;

/**
* The future is used to pause/unpause the scheduled checks.
Expand Down Expand Up @@ -214,6 +215,11 @@ public NodeStateManager(ConfigurationSource conf,
scmContext.getFinalizationCheckpoint()) &&
!layoutMatchCondition.test(layout);

containerRollIntervalMs = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL,
ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);

scheduleNextHealthCheck();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdds.scm.node;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -189,6 +190,47 @@ public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanode
return false;
}

/**
* Atomically checks if the datanode has space for a new container and records the allocation
* if space is available. This prevents race conditions where multiple threads check space
* concurrently and over-allocate.
*
* @param datanodeInfo storage reports for the datanode
* @param containerID The container being allocated
* @return true if space was available and allocation was recorded, false otherwise
*/
public boolean checkSpaceAndRecordAllocation(DatanodeInfo datanodeInfo, ContainerID containerID) {
Objects.requireNonNull(datanodeInfo, "datanodeInfo == null");
Objects.requireNonNull(containerID, "containerID == null");

TwoWindowBucket bucket = datanodeInfo.getPendingContainerAllocations();
synchronized (bucket) {
long pendingAllocationSize = bucket.getCount() * maxContainerSize;
List<StorageReportProto> storageReports = datanodeInfo.getStorageReports();
Objects.requireNonNull(storageReports, "storageReports == null");
if (storageReports.isEmpty()) {
return false;
}
long effectiveAllocatableSpace = 0L;
for (StorageReportProto report : storageReports) {
long usableSpace = VolumeUsage.getUsableSpace(report);
long containersOnThisDisk = usableSpace / maxContainerSize;
effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
if (effectiveAllocatableSpace - pendingAllocationSize >= maxContainerSize) {
boolean added = bucket.add(containerID);
if (added && metrics != null) {
metrics.incNumPendingContainersAdded();
}
return true;
}
}
if (metrics != null) {
metrics.incNumSkippedFullNodeContainerAllocation();
}
return false;
}
}

/**
* Record a pending container allocation for a single DataNode.
* Container is added to the current window.
Expand Down Expand Up @@ -222,4 +264,9 @@ public void removePendingAllocation(TwoWindowBucket bucket, ContainerID containe
metrics.incNumPendingContainersRemoved();
}
}

@VisibleForTesting
public SCMNodeMetrics getMetrics() {
return metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ public SCMNodeManager(
this.pendingContainerTracker = new PendingContainerTracker(
(long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES),
5 * 60 * 1000, // TODO
conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL,
ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS),
this.metrics);
this.clusterMap = networkTopology;
this.nodeResolver = nodeResolver;
Expand Down Expand Up @@ -231,6 +234,11 @@ private void unregisterMXBean() {
}
}

@Override
public PendingContainerTracker getPendingContainerTracker() {
return pendingContainerTracker;
}

protected NodeStateManager getNodeStateManager() {
return nodeStateManager;
}
Expand Down Expand Up @@ -1095,6 +1103,27 @@ public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerI
pendingContainerTracker.recordPendingAllocationForDatanode(datanodeInfo, containerID);
}

@Override
public boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID) {
DatanodeInfo datanodeInfo = getNode(datanodeID);
if (datanodeInfo == null) {
LOG.warn("DatanodeInfo not found for node {}", datanodeID);
return false;
}
return pendingContainerTracker.checkSpaceAndRecordAllocation(datanodeInfo, containerID);
}

@Override
public void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) {
DatanodeInfo datanodeInfo = getNode(datanodeID);
Copy link
Copy Markdown
Contributor

@szetszwo szetszwo May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashishkumar50 , we should get datanodeInfo outside the loop instead looking the same datanodeInfo up for each container.

if (datanodeInfo == null) {
LOG.warn("DatanodeInfo not found for node {}", datanodeID);
return;
}
pendingContainerTracker.removePendingAllocation(
datanodeInfo.getPendingContainerAllocations(), containerID);
}

/**
* Return the node stat of the specified datanode.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ void incNumPendingContainersRemoved() {
numPendingContainersRemoved.incr();
}

public long getNumPendingContainersAdded() {
return numPendingContainersAdded.value();
}

public long getNumPendingContainersRemoved() {
return numPendingContainersRemoved.value();
}

void incNumSkippedFullNodeContainerAllocation() {
numSkippedFullNodeContainerAllocation.incr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
*/
boolean hasEnoughSpace(Pipeline pipeline);

/**
* Atomically checks if all datanodes in the pipeline have space for a new container
* and records the allocation if space is available. This prevents race conditions
* where multiple threads check space concurrently and over-allocate.
*
* @param pipeline the pipeline whose nodes will be checked and recorded
* @param containerID the container being allocated
* @return true if all nodes had space and allocation was recorded, false otherwise
*/
boolean checkSpaceAndRecordAllocation(Pipeline pipeline, ContainerID containerID);

int openContainerLimit(List<DatanodeDetails> datanodes);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -650,6 +651,21 @@ public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID)
}
}

@Override
public boolean checkSpaceAndRecordAllocation(Pipeline pipeline, ContainerID containerID) {
List<DatanodeDetails> successfulNodes = new ArrayList<>();
for (DatanodeDetails dn : pipeline.getNodes()) {
if (!nodeManager.checkSpaceAndRecordAllocation(dn.getID(), containerID)) {
for (DatanodeDetails rollbackNode : successfulNodes) {
nodeManager.removePendingAllocationForDatanode(rollbackNode.getID(), containerID);
}
return false;
}
successfulNodes.add(dn);
}
return true;
}

/**
* Schedules a fixed interval job to create pipelines.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ public ContainerWithPipeline allocateContainer(ReplicationConfig replicationConf
getScm().checkAdminAccess(getRemoteUser(), false);
final ContainerInfo container = scm.getContainerManager()
.allocateContainer(replicationConfig, owner);
if (container == null) {
throw new SCMException(
"Could not allocate container for replication " + replicationConfig
+ ", owner=" + owner
+ ": no suitable open pipeline with enough space",
ResultCodes.FAILED_TO_ALLOCATE_CONTAINER);
}
final Pipeline pipeline = scm.getPipelineManager()
.getPipeline(container.getPipelineID());
ContainerWithPipeline cp = new ContainerWithPipeline(container, pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,30 @@ public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerI
pendingContainerTracker.recordPendingAllocationForDatanode(info, containerID);
}

@Override
public boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID containerID) {
DatanodeDetails dd = nodeMetricMap.keySet().stream()
.filter(d -> d.getID().equals(datanodeID))
.findFirst().orElse(null);
DatanodeInfo info = getDatanodeInfo(dd);
if (info == null) {
return false;
}
return pendingContainerTracker.checkSpaceAndRecordAllocation(info, containerID);
}

@Override
public void removePendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) {
DatanodeDetails dd = nodeMetricMap.keySet().stream()
.filter(d -> d.getID().equals(datanodeID))
.findFirst().orElse(null);
DatanodeInfo info = getDatanodeInfo(dd);
if (info != null) {
pendingContainerTracker.removePendingAllocation(
info.getPendingContainerAllocations(), containerID);
}
}

/**
* Return the node stat of the specified datanode.
* @param datanodeDetails - datanode details.
Expand Down Expand Up @@ -943,6 +967,11 @@ public int openContainerLimit(List<DatanodeDetails> datanodes) {
return 9;
}

@Override
public PendingContainerTracker getPendingContainerTracker() {
return pendingContainerTracker;
}

@Override
public long getLastHeartbeat(DatanodeDetails datanodeDetails) {
return -1;
Expand Down
Loading