From c745ed795906a317353066ec7e90376cc8f12d18 Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Wed, 17 Jun 2026 21:11:17 +0800 Subject: [PATCH 01/10] HDDS-15327. Clear pending replication op when datanode reports command failure --- .../ContainerReplicaPendingOps.java | 66 +++++++++++++++++++ .../TestContainerReplicaPendingOps.java | 20 ++++++ 2 files changed, 86 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index 2905ae4d4a36..9c5fab5be235 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -52,6 +52,11 @@ public class ContainerReplicaPendingOps { private final Clock clock; private final ConcurrentHashMap> pendingOps = new ConcurrentHashMap<>(); + // Maps an in-flight command id back to the container it belongs to, so a + // failed-command report from a datanode can locate and clear the pending op + // without scanning every container. + private final ConcurrentHashMap commandIdToContainer = + new ConcurrentHashMap<>(); private final Striped stripedLock = Striped.readWriteLock(64); private final ReentrantReadWriteLock globalLock = new ReentrantReadWriteLock(); @@ -125,6 +130,7 @@ public void clear() { globalLock.writeLock().lock(); try { pendingOps.clear(); + commandIdToContainer.clear(); resetCounters(); containerSizeScheduled.clear(); } finally { @@ -274,6 +280,9 @@ public void removeExpiredEntries() { releaseScheduledContainerSize(op); } decrementCounter(op.getOpType(), op.getReplicaIndex()); + if (op.getCommand() != null) { + commandIdToContainer.remove(op.getCommand().getId()); + } } expiredOps.add(op); updateTimeoutMetrics(op); @@ -293,6 +302,57 @@ public void removeExpiredEntries() { } } + /** + * Handle a failure report for a replication or reconstruction command from a + * datanode. The matching ADD op is removed and its counter decremented so the + * inflight quota is freed immediately instead of waiting for the event + * timeout. DELETE ops are left in place (so they can be resent), mirroring + * {@link #removeExpiredEntries()}. Subscribers are notified with + * timedOut=true so the ReplicationManager treats a failed command the same as + * an expired one. + * + * @param cmdId the id of the failed command, as reported by the datanode + */ + public void onReplicationCommandFailed(long cmdId) { + ContainerID containerID = commandIdToContainer.get(cmdId); + if (containerID == null) { + // Already completed, already expired, or not a tracked replication op. + return; + } + List failedOps = new ArrayList<>(); + Lock lock = writeLock(containerID); + lock(lock); + try { + List ops = pendingOps.get(containerID); + if (ops == null) { + return; + } + Iterator iterator = ops.listIterator(); + while (iterator.hasNext()) { + ContainerReplicaOp op = iterator.next(); + if (op.getCommand() != null && op.getCommand().getId() == cmdId) { + if (op.getOpType() != DELETE) { + iterator.remove(); + if (op.getOpType() == ADD) { + releaseScheduledContainerSize(op); + } + decrementCounter(op.getOpType(), op.getReplicaIndex()); + commandIdToContainer.remove(cmdId); + } + failedOps.add(op); + } + } + if (ops.isEmpty()) { + pendingOps.remove(containerID); + } + } finally { + unlock(lock); + } + if (!failedOps.isEmpty()) { + notifySubscribers(failedOps, containerID, true); + } + } + private void releaseScheduledContainerSize(ContainerReplicaOp op) { containerSizeScheduled.computeIfPresent(op.getTarget().getID(), (k, v) -> { long newSize = v.getSize() - op.getContainerSize(); @@ -339,6 +399,9 @@ private void addReplica(ContainerReplicaOp.PendingOpType opType, containerID, s -> new ArrayList<>()); ops.add(new ContainerReplicaOp(opType, target, replicaIndex, command, deadlineEpochMillis, containerSize)); + if (command != null) { + commandIdToContainer.put(command.getId(), containerID); + } DatanodeID id = target.getID(); if (opType == ADD) { containerSizeScheduled.compute(id, (k, v) -> { @@ -374,6 +437,9 @@ private boolean completeOp(ContainerReplicaOp.PendingOpType opType, found = true; completedOps.add(op); iterator.remove(); + if (op.getCommand() != null) { + commandIdToContainer.remove(op.getCommand().getId()); + } if (opType == ADD) { containerSizeScheduled.computeIfPresent(target.getID(), (k, v) -> { long newSize = v.getSize() - op.getContainerSize(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java index ee813f0942cc..f944838b3a06 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java @@ -533,6 +533,26 @@ public void testScheduledSizeIsCorrectlyTrackedAndExpired() { assertNull(scheduled.get(dn2.getID())); } + @Test + public void failedAddCommandRemovesPendingOpAndDecrementsCount() { + ContainerID containerID = ContainerID.valueOf(1); + SCMCommand cmd = ReplicateContainerCommand.toTarget(1L, dn1); + pendingOps.scheduleAddReplica(containerID, dn1, 0, cmd, + clock.millis() + 60000, 1000L, clock.millis()); + assertEquals(1, pendingOps.getPendingOpCount(ADD)); + + pendingOps.onReplicationCommandFailed(cmd.getId()); + + assertEquals(0, pendingOps.getPendingOpCount(ADD)); + assertTrue(pendingOps.getPendingOps(containerID).isEmpty()); + } + + @Test + public void failedCommandWithUnknownIdIsNoOp() { + pendingOps.onReplicationCommandFailed(999999L); + assertEquals(0, pendingOps.getPendingOpCount(ADD)); + } + /** * Tests that only the size of containers with expired ops is reduced from the map tracking size of pending ops. * For example, if target Datanode DN1 has two pending ADD ops 10GB + 15GB, and the first op expires, then only From 99df4da9dd13b62b0690e251fc9c806c857a53d0 Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Wed, 17 Jun 2026 21:15:42 +0800 Subject: [PATCH 02/10] HDDS-15327. Drop command index entry for failed delete ops --- .../scm/container/replication/ContainerReplicaPendingOps.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index 9c5fab5be235..8382aa6309db 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -337,8 +337,9 @@ public void onReplicationCommandFailed(long cmdId) { releaseScheduledContainerSize(op); } decrementCounter(op.getOpType(), op.getReplicaIndex()); - commandIdToContainer.remove(cmdId); } + // Always drop the index entry; DELETE ops stay in the list for resend but no longer need tracking. + commandIdToContainer.remove(cmdId); failedOps.add(op); } } From ffe96f799e9a67186a4318de3b2e28ac005d4b82 Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Wed, 17 Jun 2026 21:19:33 +0800 Subject: [PATCH 03/10] HDDS-15327. Route failed replication command status to ContainerReplicaPendingOps --- .../command/CommandStatusReportHandler.java | 31 +++++++++++++++++-- .../hadoop/hdds/scm/events/SCMEvents.java | 9 ++++++ .../scm/server/StorageContainerManager.java | 4 +++ .../TestCommandStatusReportHandler.java | 24 ++++++++++++++ 4 files changed, 66 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index 2d794ac83a8c..6b8405cd6051 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -54,16 +54,22 @@ public void onMessage(CommandStatusReportFromDatanode report, // Route command status to its watchers. List deleteBlocksCommandStatus = new ArrayList<>(); + List failedReplicationStatus = new ArrayList<>(); cmdStatusList.forEach(cmdStatus -> { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus .getCmdId(), cmdStatus.getType()); } - if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) { + SCMCommandProto.Type type = cmdStatus.getType(); + if (type == SCMCommandProto.Type.deleteBlocksCommand) { deleteBlocksCommandStatus.add(cmdStatus); + } else if ((type == SCMCommandProto.Type.replicateContainerCommand + || type == SCMCommandProto.Type.reconstructECContainersCommand) + && cmdStatus.getStatus() == CommandStatus.Status.FAILED) { + failedReplicationStatus.add(cmdStatus); } else { LOGGER.debug("CommandStatus of type:{} not handled in " + - "CommandStatusReportHandler.", cmdStatus.getType()); + "CommandStatusReportHandler.", type); } }); @@ -77,6 +83,10 @@ public void onMessage(CommandStatusReportFromDatanode report, publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new DeleteBlockStatus( deleteBlocksCommandStatus, report.getDatanodeDetails())); } + if (!failedReplicationStatus.isEmpty()) { + publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new ReplicationStatus( + failedReplicationStatus, report.getDatanodeDetails())); + } } /** @@ -121,4 +131,21 @@ public DatanodeDetails getDatanodeDetails() { } } + /** + * Wrapper event for failed replication / reconstruction command statuses. + */ + public static class ReplicationStatus extends CommandStatusEvent { + private final DatanodeDetails datanodeDetails; + + public ReplicationStatus(List cmdStatus, + DatanodeDetails datanodeDetails) { + super(cmdStatus); + this.datanodeDetails = datanodeDetails; + } + + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; + } + } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 2255fb392593..b5ff46678a35 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -212,6 +212,15 @@ public final class SCMEvents { new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class, "Delete_Block_Status"); + /** + * This event will be triggered by CommandStatusReportHandler whenever a + * datanode reports a failed replication or reconstruction command. + */ + public static final TypedEvent + REPLICATION_STATUS = + new TypedEvent<>(CommandStatusReportHandler.ReplicationStatus.class, + "Replication_Status"); + public static final TypedEvent REPLICATION_MANAGER_NOTIFY = new TypedEvent<>(DatanodeDetails.class, "Replication_Manager_Notify"); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 69f7973ff1bd..2d2b373f4e45 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -603,6 +603,10 @@ private void initializeEventHandlers() { eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); + eventQueue.addHandler(SCMEvents.REPLICATION_STATUS, + (status, publisher) -> status.getCmdStatus().forEach(cmdStatus -> + containerReplicaPendingOps.onReplicationCommandFailed( + cmdStatus.getCmdId()))); eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); eventQueue.addHandler(SCMEvents.RECONCILE_CONTAINER, reconcileContainerEventHandler); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java index 5a540e71c843..1d4aaf088630 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.command; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.util.ArrayList; import java.util.Collections; @@ -29,6 +31,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.ReplicationStatus; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode; import org.apache.hadoop.hdds.server.events.Event; @@ -47,10 +51,16 @@ public class TestCommandStatusReportHandler implements EventPublisher { private static final Logger LOG = LoggerFactory .getLogger(TestCommandStatusReportHandler.class); private CommandStatusReportHandler cmdStatusReportHandler; + private ReplicationStatus replicationStatus; @BeforeEach public void setup() { cmdStatusReportHandler = new CommandStatusReportHandler(); + replicationStatus = null; + } + + private ReplicationStatus getReplicationStatus() { + return replicationStatus; } @Test @@ -81,11 +91,25 @@ private CommandStatusReportFromDatanode getStatusReport( dn, report); } + @Test + public void replicationFailureFiresReplicationStatusEvent() { + // getCommandStatusList() already includes a FAILED replicateContainerCommand + cmdStatusReportHandler.onMessage(getStatusReport(getCommandStatusList()), this); + assertNotNull(getReplicationStatus()); + assertEquals(1, getReplicationStatus().getCmdStatus().size()); + assertEquals(CommandStatus.Status.FAILED, + getReplicationStatus().getCmdStatus().get(0).getStatus()); + } + @Override + @SuppressWarnings("unchecked") public > void fireEvent(EVENT_TYPE event, PAYLOAD payload) { LOG.info("firing event of type {}, payload {}", event.getName(), payload .toString()); + if (event == SCMEvents.REPLICATION_STATUS) { + replicationStatus = (ReplicationStatus) payload; + } } private List getCommandStatusList() { From 48aeadb86870c201bb640efe61e506213d6913ad Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Wed, 17 Jun 2026 21:27:06 +0800 Subject: [PATCH 04/10] HDDS-15327. Datanode reports replication command success and failure to SCM --- .../common/statemachine/StateContext.java | 9 ++++++ .../ECReconstructionCommandInfo.java | 6 ++++ .../ECReconstructionCoordinatorTask.java | 5 +++ .../replication/AbstractReplicationTask.java | 9 ++++++ .../replication/ReplicationSupervisor.java | 15 +++++++++ .../replication/ReplicationTask.java | 5 +++ .../common/statemachine/TestStateContext.java | 16 ++++++++++ .../TestReplicationSupervisor.java | 31 +++++++++++++++++++ 8 files changed, 96 insertions(+) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 150159eb84ae..b0200f73fb25 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -77,6 +77,7 @@ import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; @@ -854,6 +855,14 @@ public void addCmdStatus(SCMCommand cmd) { .setStatus(Status.PENDING) .setType(cmd.getType()) .build()); + } else if (cmd.getType() == SCMCommandProto.Type.replicateContainerCommand + || cmd.getType() == SCMCommandProto.Type.reconstructECContainersCommand) { + addCmdStatus(cmd.getId(), + CommandStatusBuilder.newBuilder() + .setCmdId(cmd.getId()) + .setStatus(Status.PENDING) + .setType(cmd.getType()) + .build()); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java index 7f86fe8894a4..a64da9bd1b3c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java @@ -42,9 +42,11 @@ public class ECReconstructionCommandInfo { private final ByteString missingContainerIndexes; private final long deadlineMsSinceEpoch; private final long term; + private final long commandId; public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) { this.containerID = cmd.getContainerID(); + this.commandId = cmd.getId(); this.ecReplicationConfig = cmd.getEcReplicationConfig(); this.missingContainerIndexes = cmd.getMissingContainerIndexes(); this.deadlineMsSinceEpoch = cmd.getDeadline(); @@ -97,4 +99,8 @@ public long getTerm() { return term; } + public long getId() { + return commandId; + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java index e9535c6afe80..8030e4d5632a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java @@ -47,6 +47,11 @@ public ECReconstructionCoordinatorTask( debugString = reconstructionCommandInfo.toString(); } + @Override + public long getCommandId() { + return reconstructionCommandInfo.getId(); + } + @Override public String getMetricName() { return METRIC_NAME; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java index 05932e6edf79..276b8c1ed801 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java @@ -66,6 +66,15 @@ public long getContainerId() { return containerId; } + /** + * Returns the id of the SCM command that scheduled this task, or 0 if this + * task is not associated with a tracked command. Subclasses backed by an + * {@link org.apache.hadoop.ozone.protocol.commands.SCMCommand} override this. + */ + public long getCommandId() { + return 0; + } + public Status getStatus() { return status; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 3184fb2ed2e0..9135eb38c09e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.IntConsumer; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -52,6 +53,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -422,9 +424,11 @@ public void run() { if (task.getStatus() == Status.FAILED) { LOG.warn("Failed {}", this); failureCounter.get(task.getMetricName()).incrementAndGet(); + updateCommandStatus(task, CommandStatus::markAsFailed); } else if (task.getStatus() == Status.DONE) { LOG.info("Successful {}", this); successCounter.get(task.getMetricName()).incrementAndGet(); + updateCommandStatus(task, CommandStatus::markAsExecuted); } else if (task.getStatus() == Status.SKIPPED) { LOG.info("Skipped {}", this); skippedCounter.get(task.getMetricName()).incrementAndGet(); @@ -433,6 +437,7 @@ public void run() { task.setStatus(Status.FAILED); LOG.warn("Failed {}", this, e); failureCounter.get(task.getMetricName()).incrementAndGet(); + updateCommandStatus(task, CommandStatus::markAsFailed); } finally { queuedCounter.get(task.getMetricName()).decrementAndGet(); opsLatencyMs.get(task.getMetricName()).add(Time.monotonicNow() - startTime); @@ -441,6 +446,16 @@ public void run() { } } + private void updateCommandStatus(AbstractReplicationTask t, + Consumer updater) { + long cmdId = t.getCommandId(); + if (context == null || cmdId == 0) { + // No SCM context (test) or no tracked command (e.g. reconcile task). + return; + } + context.updateCommandStatus(cmdId, updater); + } + @Override public String toString() { return task.toString(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index a32e9b41ab1b..0afc8fe41e88 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -66,6 +66,11 @@ protected ReplicationTask( replicator); } + @Override + public long getCommandId() { + return cmd.getId(); + } + @Override public String getMetricName() { return METRIC_NAME; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index 8d79335591b9..49e51d5073b0 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -66,6 +66,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -755,6 +756,21 @@ void keepsExistingTermForCommandWithOlderTerm() throws IOException { assertNull(subject.getNextCommand()); } + @Test + public void addCmdStatusRegistersPendingForReplication() throws IOException { + StateContext ctx = createSubject(); + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(1L, Collections.emptyList()); + ctx.addCmdStatus(cmd); + CommandStatus status = ctx.getCommandStatusMap().get(cmd.getId()); + assertNotNull(status); + assertEquals(SCMCommandProto.Type.replicateContainerCommand, status.getType()); + assertEquals( + org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING, + status.getStatus()); + } + private static StateContext createSubject() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); DatanodeStateMachine datanodeStateMachineMock = diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index a8b590e671e8..089f34866c28 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -844,6 +844,37 @@ public void testReconcileContainerCommandDeduplication() throws Exception { } } + @org.junit.jupiter.api.Test + public void reportsExecutedStatusOnSuccess() { + ReplicationSupervisor supervisor = supervisorWithReplicator( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.DONE)); + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(1L, Collections.emptyList()); + cmd.setTerm(CURRENT_TERM); + context.addCmdStatus(cmd); + supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); + assertEquals( + org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED, + context.getCommandStatusMap().get(cmd.getId()).getStatus()); + } + + @org.junit.jupiter.api.Test + public void reportsFailedStatusOnFailure() { + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(2L, Collections.emptyList()); + cmd.setTerm(CURRENT_TERM); + context.addCmdStatus(cmd); + ReplicationSupervisor supervisor = supervisorWith( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.FAILED), + newDirectExecutorService()); + supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); + assertEquals( + org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED, + context.getCommandStatusMap().get(cmd.getId()).getStatus()); + } + private static class BlockingTask extends AbstractReplicationTask { private final CountDownLatch runningLatch; From cd8eaac66a8b0db9cdf5a1e57c4557151e979faf Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Wed, 17 Jun 2026 21:35:24 +0800 Subject: [PATCH 05/10] HDDS-15327. Fix checkstyle and test style for failed replication command handling --- .../replication/ReplicationSupervisor.java | 2 +- .../common/statemachine/TestStateContext.java | 4 ++-- .../replication/TestReplicationSupervisor.java | 13 +++++++------ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 9135eb38c09e..6b0de7e48439 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -53,8 +53,8 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; -import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index 49e51d5073b0..d7d6317bdda4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -19,6 +19,7 @@ import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE; import static org.apache.ozone.test.GenericTestUtils.waitFor; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -766,8 +767,7 @@ public void addCmdStatusRegistersPendingForReplication() throws IOException { assertNotNull(status); assertEquals(SCMCommandProto.Type.replicateContainerCommand, status.getType()); assertEquals( - org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING, + PENDING, status.getStatus()); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 089f34866c28..ee2f39d509b4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -24,6 +24,8 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL; import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; @@ -117,6 +119,7 @@ import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; /** @@ -844,7 +847,7 @@ public void testReconcileContainerCommandDeduplication() throws Exception { } } - @org.junit.jupiter.api.Test + @Test public void reportsExecutedStatusOnSuccess() { ReplicationSupervisor supervisor = supervisorWithReplicator( __ -> task -> task.setStatus(AbstractReplicationTask.Status.DONE)); @@ -854,12 +857,11 @@ public void reportsExecutedStatusOnSuccess() { context.addCmdStatus(cmd); supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); assertEquals( - org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED, + EXECUTED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); } - @org.junit.jupiter.api.Test + @Test public void reportsFailedStatusOnFailure() { ReplicateContainerCommand cmd = ReplicateContainerCommand.fromSources(2L, Collections.emptyList()); @@ -870,8 +872,7 @@ public void reportsFailedStatusOnFailure() { newDirectExecutorService()); supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); assertEquals( - org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED, + FAILED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); } From bb83cf0a908fba162b902afb9f5f4000e8de1e64 Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Wed, 17 Jun 2026 21:43:18 +0800 Subject: [PATCH 06/10] HDDS-15327. Clarify failed-command Javadoc and executed-status reporting comment --- .../replication/ReplicationSupervisor.java | 1 + .../ContainerReplicaPendingOps.java | 19 +++++++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 6b0de7e48439..03d9b76a9ac4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -428,6 +428,7 @@ public void run() { } else if (task.getStatus() == Status.DONE) { LOG.info("Successful {}", this); successCounter.get(task.getMetricName()).incrementAndGet(); + // Mark EXECUTED (non-PENDING) so CommandStatusReportPublisher drains this entry from the status map. updateCommandStatus(task, CommandStatus::markAsExecuted); } else if (task.getStatus() == Status.SKIPPED) { LOG.info("Skipped {}", this); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index 8382aa6309db..833a8cbfeca1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -303,13 +303,20 @@ public void removeExpiredEntries() { } /** - * Handle a failure report for a replication or reconstruction command from a - * datanode. The matching ADD op is removed and its counter decremented so the + * Handle a failure report for a failed replication or reconstruction (ADD) + * command from a datanode. This is called by + * {@code CommandStatusReportHandler} only for FAILED + * {@code replicateContainerCommand} and {@code reconstructECContainersCommand} + * statuses; DELETE command IDs are never routed here by the current + * command-status tracking path. + * + *

The matching ADD op is removed and its counter decremented so the * inflight quota is freed immediately instead of waiting for the event - * timeout. DELETE ops are left in place (so they can be resent), mirroring - * {@link #removeExpiredEntries()}. Subscribers are notified with - * timedOut=true so the ReplicationManager treats a failed command the same as - * an expired one. + * timeout. The DELETE branch below is defensive code that is not reached + * in practice (DELETE ops are left in place to be resent, mirroring + * {@link #removeExpiredEntries()}). Subscribers are notified with + * timedOut=true so the ReplicationManager treats a failed command the same + * as an expired one. * * @param cmdId the id of the failed command, as reported by the datanode */ From 4611edb83989dac2e7dedbd33d578c64baeb77c8 Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Thu, 18 Jun 2026 19:27:57 +0800 Subject: [PATCH 07/10] HDDS-15327. Drain command status on skipped and early-return replication paths --- .../replication/ReplicationSupervisor.java | 6 +++ .../TestReplicationSupervisor.java | 37 +++++++++++++++++++ .../ContainerReplicaPendingOps.java | 25 ++++++------- 3 files changed, 54 insertions(+), 14 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 03d9b76a9ac4..ca91d2d5af46 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -397,6 +397,8 @@ public void run() { LOG.info("Ignoring {} since the deadline has passed ({} < {})", this, Instant.ofEpochMilli(deadline), Instant.ofEpochMilli(now)); timeoutCounter.get(task.getMetricName()).incrementAndGet(); + // FAILED drains the PENDING status entry so SCM can clear and reschedule promptly. + updateCommandStatus(task, CommandStatus::markAsFailed); return; } @@ -407,6 +409,7 @@ public void run() { && task.shouldOnlyRunOnInServiceDatanodes()) { LOG.info("Ignoring {} since datanode is not in service ({})", this, dn.getPersistedOpState()); + updateCommandStatus(task, CommandStatus::markAsFailed); return; } @@ -415,6 +418,7 @@ public void run() { if (currentTerm.isPresent() && taskTerm < currentTerm.getAsLong()) { LOG.info("Ignoring {} since SCM leader has new term ({} < {})", this, taskTerm, currentTerm.getAsLong()); + updateCommandStatus(task, CommandStatus::markAsFailed); return; } } @@ -433,6 +437,8 @@ public void run() { } else if (task.getStatus() == Status.SKIPPED) { LOG.info("Skipped {}", this); skippedCounter.get(task.getMetricName()).incrementAndGet(); + // SKIPPED means the replica already exists; EXECUTED drains the entry. + updateCommandStatus(task, CommandStatus::markAsExecuted); } } catch (Exception e) { task.setStatus(Status.FAILED); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index ee2f39d509b4..ee52fa6f0a1f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -876,6 +876,43 @@ public void reportsFailedStatusOnFailure() { context.getCommandStatusMap().get(cmd.getId()).getStatus()); } + @Test + public void reportsExecutedStatusOnSkip() { + // A SKIPPED task (replica already exists) should drain its PENDING status entry to EXECUTED + // so CommandStatusReportPublisher removes it instead of re-sending it forever. + ReplicationSupervisor supervisor = supervisorWith( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.SKIPPED), + newDirectExecutorService()); + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(3L, Collections.emptyList()); + cmd.setTerm(CURRENT_TERM); + context.addCmdStatus(cmd); + supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); + assertEquals( + EXECUTED, + context.getCommandStatusMap().get(cmd.getId()).getStatus()); + } + + @Test + public void reportsFailedStatusWhenDeadlinePassed() { + // A task dropped because its deadline has passed should mark the status FAILED + // so the PENDING entry drains and SCM can reschedule promptly. + ReplicationSupervisor supervisor = supervisorWith( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.DONE), + newDirectExecutorService()); + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(4L, Collections.emptyList()); + cmd.setTerm(CURRENT_TERM); + cmd.setDeadline(clock.millis() + 5000); + context.addCmdStatus(cmd); + // Advance clock past the deadline + clock.fastForward(10000); + supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); + assertEquals( + FAILED, + context.getCommandStatusMap().get(cmd.getId()).getStatus()); + } + private static class BlockingTask extends AbstractReplicationTask { private final CountDownLatch runningLatch; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index 833a8cbfeca1..535f2d83ebfd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -310,13 +310,11 @@ public void removeExpiredEntries() { * statuses; DELETE command IDs are never routed here by the current * command-status tracking path. * - *

The matching ADD op is removed and its counter decremented so the - * inflight quota is freed immediately instead of waiting for the event - * timeout. The DELETE branch below is defensive code that is not reached - * in practice (DELETE ops are left in place to be resent, mirroring - * {@link #removeExpiredEntries()}). Subscribers are notified with - * timedOut=true so the ReplicationManager treats a failed command the same - * as an expired one. + *

The matched op is removed from the pending list and its counter + * decremented so the inflight quota is freed immediately instead of waiting + * for the event timeout. For ADD ops the scheduled container size is also + * released. Subscribers are notified with timedOut=true so the + * ReplicationManager re-evaluates the container as it would for an expired op. * * @param cmdId the id of the failed command, as reported by the datanode */ @@ -338,14 +336,11 @@ public void onReplicationCommandFailed(long cmdId) { while (iterator.hasNext()) { ContainerReplicaOp op = iterator.next(); if (op.getCommand() != null && op.getCommand().getId() == cmdId) { - if (op.getOpType() != DELETE) { - iterator.remove(); - if (op.getOpType() == ADD) { - releaseScheduledContainerSize(op); - } - decrementCounter(op.getOpType(), op.getReplicaIndex()); + iterator.remove(); + if (op.getOpType() == ADD) { + releaseScheduledContainerSize(op); } - // Always drop the index entry; DELETE ops stay in the list for resend but no longer need tracking. + decrementCounter(op.getOpType(), op.getReplicaIndex()); commandIdToContainer.remove(cmdId); failedOps.add(op); } @@ -357,6 +352,8 @@ public void onReplicationCommandFailed(long cmdId) { unlock(lock); } if (!failedOps.isEmpty()) { + // Failures reuse timedOut=true so ReplicationManager re-evaluates like an expired op. + // Timeout metrics are not updated here. notifySubscribers(failedOps, containerID, true); } } From fa2abce87bd50a9bb97a086fcdc0bad698d58f9f Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Thu, 18 Jun 2026 19:33:56 +0800 Subject: [PATCH 08/10] HDDS-15327. Collapse over-wrapped assertions in replication status tests --- .../common/statemachine/TestStateContext.java | 4 +--- .../replication/TestReplicationSupervisor.java | 16 ++++------------ 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index d7d6317bdda4..9311027b9c76 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -766,9 +766,7 @@ public void addCmdStatusRegistersPendingForReplication() throws IOException { CommandStatus status = ctx.getCommandStatusMap().get(cmd.getId()); assertNotNull(status); assertEquals(SCMCommandProto.Type.replicateContainerCommand, status.getType()); - assertEquals( - PENDING, - status.getStatus()); + assertEquals(PENDING, status.getStatus()); } private static StateContext createSubject() throws IOException { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index ee52fa6f0a1f..0d567b690606 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -856,9 +856,7 @@ public void reportsExecutedStatusOnSuccess() { cmd.setTerm(CURRENT_TERM); context.addCmdStatus(cmd); supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); - assertEquals( - EXECUTED, - context.getCommandStatusMap().get(cmd.getId()).getStatus()); + assertEquals(EXECUTED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); } @Test @@ -871,9 +869,7 @@ public void reportsFailedStatusOnFailure() { __ -> task -> task.setStatus(AbstractReplicationTask.Status.FAILED), newDirectExecutorService()); supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); - assertEquals( - FAILED, - context.getCommandStatusMap().get(cmd.getId()).getStatus()); + assertEquals(FAILED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); } @Test @@ -888,9 +884,7 @@ public void reportsExecutedStatusOnSkip() { cmd.setTerm(CURRENT_TERM); context.addCmdStatus(cmd); supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); - assertEquals( - EXECUTED, - context.getCommandStatusMap().get(cmd.getId()).getStatus()); + assertEquals(EXECUTED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); } @Test @@ -908,9 +902,7 @@ public void reportsFailedStatusWhenDeadlinePassed() { // Advance clock past the deadline clock.fastForward(10000); supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); - assertEquals( - FAILED, - context.getCommandStatusMap().get(cmd.getId()).getStatus()); + assertEquals(FAILED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); } private static class BlockingTask extends AbstractReplicationTask { From 5b87c59011c9a17540ced1173ca8eaf7c3a5b5e9 Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Thu, 18 Jun 2026 19:39:33 +0800 Subject: [PATCH 09/10] HDDS-15327. Extract ReplicationStatusHandler with leader guard Follower SCMs no longer clear pending ops on stray command-status reports, matching the existing leader guard in DeletedBlockLogImpl. --- .../scm/command/ReplicationStatusHandler.java | 56 ++++++++++++ .../scm/server/StorageContainerManager.java | 5 +- .../command/TestReplicationStatusHandler.java | 87 +++++++++++++++++++ 3 files changed, 145 insertions(+), 3 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/ReplicationStatusHandler.java create mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestReplicationStatusHandler.java diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/ReplicationStatusHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/ReplicationStatusHandler.java new file mode 100644 index 000000000000..3f3c3256ecb4 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/ReplicationStatusHandler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.command; + +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.ReplicationStatus; +import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles REPLICATION_STATUS events by clearing failed pending ops. + * Only the leader SCM processes these events; followers skip them. + */ +public class ReplicationStatusHandler implements EventHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationStatusHandler.class); + + private final ContainerReplicaPendingOps containerReplicaPendingOps; + private final SCMContext scmContext; + + public ReplicationStatusHandler(ContainerReplicaPendingOps containerReplicaPendingOps, + SCMContext scmContext) { + this.containerReplicaPendingOps = containerReplicaPendingOps; + this.scmContext = scmContext; + } + + @Override + public void onMessage(ReplicationStatus status, EventPublisher publisher) { + if (!scmContext.isLeader()) { + LOG.info("Skip processing replication status since current SCM is not leader."); + return; + } + status.getCmdStatus().forEach(cmdStatus -> + containerReplicaPendingOps.onReplicationCommandFailed(cmdStatus.getCmdId())); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 2d2b373f4e45..ae3afe855e2b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; +import org.apache.hadoop.hdds.scm.command.ReplicationStatusHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -604,9 +605,7 @@ private void initializeEventHandlers() { eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); eventQueue.addHandler(SCMEvents.REPLICATION_STATUS, - (status, publisher) -> status.getCmdStatus().forEach(cmdStatus -> - containerReplicaPendingOps.onReplicationCommandFailed( - cmdStatus.getCmdId()))); + new ReplicationStatusHandler(containerReplicaPendingOps, scmContext)); eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); eventQueue.addHandler(SCMEvents.RECONCILE_CONTAINER, reconcileContainerEventHandler); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestReplicationStatusHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestReplicationStatusHandler.java new file mode 100644 index 000000000000..5c8b0ea8b83c --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestReplicationStatusHandler.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.command; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import org.apache.hadoop.hdds.HddsIdFactory; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.ReplicationStatus; +import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for ReplicationStatusHandler. + */ +public class TestReplicationStatusHandler { + + @Test + public void leaderClearsReplicaPendingOp() { + SCMContext scmContext = mock(SCMContext.class); + when(scmContext.isLeader()).thenReturn(true); + ContainerReplicaPendingOps pendingOps = mock(ContainerReplicaPendingOps.class); + EventPublisher publisher = mock(EventPublisher.class); + + long cmdId = HddsIdFactory.getLongId(); + CommandStatus cmdStatus = CommandStatus.newBuilder() + .setCmdId(cmdId) + .setStatus(CommandStatus.Status.FAILED) + .setType(Type.replicateContainerCommand) + .build(); + ReplicationStatus status = new ReplicationStatus( + Collections.singletonList(cmdStatus), + MockDatanodeDetails.randomDatanodeDetails()); + + ReplicationStatusHandler handler = new ReplicationStatusHandler(pendingOps, scmContext); + handler.onMessage(status, publisher); + + verify(pendingOps).onReplicationCommandFailed(cmdId); + } + + @Test + public void followerSkipsClearingReplicaPendingOp() { + SCMContext scmContext = mock(SCMContext.class); + when(scmContext.isLeader()).thenReturn(false); + ContainerReplicaPendingOps pendingOps = mock(ContainerReplicaPendingOps.class); + EventPublisher publisher = mock(EventPublisher.class); + + long cmdId = HddsIdFactory.getLongId(); + CommandStatus cmdStatus = CommandStatus.newBuilder() + .setCmdId(cmdId) + .setStatus(CommandStatus.Status.FAILED) + .setType(Type.replicateContainerCommand) + .build(); + ReplicationStatus status = new ReplicationStatus( + Collections.singletonList(cmdStatus), + MockDatanodeDetails.randomDatanodeDetails()); + + ReplicationStatusHandler handler = new ReplicationStatusHandler(pendingOps, scmContext); + handler.onMessage(status, publisher); + + verifyNoInteractions(pendingOps); + } + +} From af5d8a2d1b8eb3e72ed44bada5c2be3b22cf54a8 Mon Sep 17 00:00:00 2001 From: Chi-Hsuan Huang Date: Thu, 18 Jun 2026 20:16:48 +0800 Subject: [PATCH 10/10] HDDS-15327. Drain command status when replication task is dropped before running --- .../replication/ReplicationSupervisor.java | 24 +++++-- .../TestReplicationSupervisor.java | 65 +++++++++++++++++-- 2 files changed, 77 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index ca91d2d5af46..bd426ec0b472 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -235,6 +235,9 @@ public void addTask(AbstractReplicationTask task) { if (queueHasRoomFor(task)) { initCounters(task); addToQueue(task); + } else { + // Queue is full: drain the PENDING status entry so SCM can reschedule promptly. + updateCommandStatus(task, CommandStatus::markAsFailed); } } @@ -278,9 +281,23 @@ private void addToQueue(AbstractReplicationTask task) { } queuedCounter.get(task.getMetricName()).incrementAndGet(); executor.execute(new TaskRunner(task)); + } else { + // Duplicate: an identical task is already in-flight; the in-flight copy will report the real + // outcome, so drain this command's PENDING entry now to avoid a status-map leak. + updateCommandStatus(task, CommandStatus::markAsExecuted); } } + private void updateCommandStatus(AbstractReplicationTask task, + Consumer updater) { + long cmdId = task.getCommandId(); + if (context == null || cmdId == 0) { + // No SCM context (test) or no tracked command (e.g. reconcile task). + return; + } + context.updateCommandStatus(cmdId, updater); + } + private void decrementTaskCounter(AbstractReplicationTask task) { if (task.getPriority() == ReplicationCommandPriority.LOW) { // LOW tasks are not included in the counter, so skip decrementing the @@ -455,12 +472,7 @@ public void run() { private void updateCommandStatus(AbstractReplicationTask t, Consumer updater) { - long cmdId = t.getCommandId(); - if (context == null || cmdId == 0) { - // No SCM context (test) or no tracked command (e.g. reconcile task). - return; - } - context.updateCommandStatus(cmdId, updater); + ReplicationSupervisor.this.updateCommandStatus(t, updater); } @Override diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 0d567b690606..48a45e361fc7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -865,9 +865,8 @@ public void reportsFailedStatusOnFailure() { ReplicateContainerCommand.fromSources(2L, Collections.emptyList()); cmd.setTerm(CURRENT_TERM); context.addCmdStatus(cmd); - ReplicationSupervisor supervisor = supervisorWith( - __ -> task -> task.setStatus(AbstractReplicationTask.Status.FAILED), - newDirectExecutorService()); + ReplicationSupervisor supervisor = supervisorWithReplicator( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.FAILED)); supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); assertEquals(FAILED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); } @@ -876,9 +875,8 @@ public void reportsFailedStatusOnFailure() { public void reportsExecutedStatusOnSkip() { // A SKIPPED task (replica already exists) should drain its PENDING status entry to EXECUTED // so CommandStatusReportPublisher removes it instead of re-sending it forever. - ReplicationSupervisor supervisor = supervisorWith( - __ -> task -> task.setStatus(AbstractReplicationTask.Status.SKIPPED), - newDirectExecutorService()); + ReplicationSupervisor supervisor = supervisorWithReplicator( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.SKIPPED)); ReplicateContainerCommand cmd = ReplicateContainerCommand.fromSources(3L, Collections.emptyList()); cmd.setTerm(CURRENT_TERM); @@ -1232,6 +1230,61 @@ public void poolSizeCanBeUpdatedByReplicationStreamsLimitReconfiguration() { assertEquals(3, threadPoolSize.get()); } + @Test + public void reportsFailedStatusWhenQueueFull() { + // A task dropped because the queue is full should drain its PENDING entry to FAILED + // so SCM can free the inflight quota and reschedule promptly. + final int maxQueueSize = 1; + DatanodeConfiguration datanodeConfig = new DatanodeConfiguration(); + datanodeConfig.setCommandQueueLimit(maxQueueSize); + ReplicationServer.ReplicationConfig repConf = new ReplicationServer.ReplicationConfig(); + + ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context) + .executor(new DiscardingExecutorService()) + .datanodeConfig(datanodeConfig) + .replicationConfig(repConf) + .clock(clock) + .build(); + + // Fill the queue to capacity with one task so the next addTask is dropped. + ReplicateContainerCommand fillCmd = createCommand(100L); + supervisor.addTask(new ReplicationTask(fillCmd, noopReplicator)); + assertEquals(maxQueueSize, supervisor.getTotalInFlightReplications()); + + // Now register a PENDING status for a second command and try to add it. + ReplicateContainerCommand droppedCmd = createCommand(200L); + context.addCmdStatus(droppedCmd); + supervisor.addTask(new ReplicationTask(droppedCmd, noopReplicator)); + + // The dropped task's status should be FAILED so the PENDING entry is drained. + assertEquals(FAILED, context.getCommandStatusMap().get(droppedCmd.getId()).getStatus()); + } + + @Test + public void reportsExecutedStatusWhenDuplicate() { + // A duplicate task (same container already in-flight) should drain its own PENDING + // status entry to EXECUTED, because the already-queued task will report the real outcome. + ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context) + .executor(new DiscardingExecutorService()) + .clock(clock) + .build(); + + // Add the first task - accepted into the in-flight set. + ReplicateContainerCommand firstCmd = createCommand(300L); + supervisor.addTask(new ReplicationTask(firstCmd, noopReplicator)); + assertEquals(1, supervisor.getTotalInFlightReplications()); + + // Register a PENDING status for a second command with the same container id. + ReplicateContainerCommand dupCmd = createCommand(300L); + context.addCmdStatus(dupCmd); + supervisor.addTask(new ReplicationTask(dupCmd, noopReplicator)); + + // The duplicate command's status should be EXECUTED (entry drained). + assertEquals(EXECUTED, context.getCommandStatusMap().get(dupCmd.getId()).getStatus()); + } + @ContainerLayoutTestInfo.ContainerTest public void testMaxQueueSize() { List datanodes = new ArrayList<>();