diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 150159eb84ae..b0200f73fb25 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -77,6 +77,7 @@ import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CommandStatus; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; @@ -854,6 +855,14 @@ public void addCmdStatus(SCMCommand cmd) { .setStatus(Status.PENDING) .setType(cmd.getType()) .build()); + } else if (cmd.getType() == SCMCommandProto.Type.replicateContainerCommand + || cmd.getType() == SCMCommandProto.Type.reconstructECContainersCommand) { + addCmdStatus(cmd.getId(), + CommandStatusBuilder.newBuilder() + .setCmdId(cmd.getId()) + .setStatus(Status.PENDING) + .setType(cmd.getType()) + .build()); } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java index 7f86fe8894a4..a64da9bd1b3c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCommandInfo.java @@ -42,9 +42,11 @@ public class ECReconstructionCommandInfo { private final ByteString missingContainerIndexes; private final long deadlineMsSinceEpoch; private final long term; + private final long commandId; public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) { this.containerID = cmd.getContainerID(); + this.commandId = cmd.getId(); this.ecReplicationConfig = cmd.getEcReplicationConfig(); this.missingContainerIndexes = cmd.getMissingContainerIndexes(); this.deadlineMsSinceEpoch = cmd.getDeadline(); @@ -97,4 +99,8 @@ public long getTerm() { return term; } + public long getId() { + return commandId; + } + } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java index e9535c6afe80..8030e4d5632a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java @@ -47,6 +47,11 @@ public ECReconstructionCoordinatorTask( debugString = reconstructionCommandInfo.toString(); } + @Override + public long getCommandId() { + return reconstructionCommandInfo.getId(); + } + @Override public String getMetricName() { return METRIC_NAME; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java index 05932e6edf79..276b8c1ed801 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java @@ -66,6 +66,15 @@ public long getContainerId() { return containerId; } + /** + * Returns the id of the SCM command that scheduled this task, or 0 if this + * task is not associated with a tracked command. Subclasses backed by an + * {@link org.apache.hadoop.ozone.protocol.commands.SCMCommand} override this. + */ + public long getCommandId() { + return 0; + } + public Status getStatus() { return status; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 3184fb2ed2e0..bd426ec0b472 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.IntConsumer; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -53,6 +54,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,6 +235,9 @@ public void addTask(AbstractReplicationTask task) { if (queueHasRoomFor(task)) { initCounters(task); addToQueue(task); + } else { + // Queue is full: drain the PENDING status entry so SCM can reschedule promptly. + updateCommandStatus(task, CommandStatus::markAsFailed); } } @@ -276,7 +281,21 @@ private void addToQueue(AbstractReplicationTask task) { } queuedCounter.get(task.getMetricName()).incrementAndGet(); executor.execute(new TaskRunner(task)); + } else { + // Duplicate: an identical task is already in-flight; the in-flight copy will report the real + // outcome, so drain this command's PENDING entry now to avoid a status-map leak. + updateCommandStatus(task, CommandStatus::markAsExecuted); + } + } + + private void updateCommandStatus(AbstractReplicationTask task, + Consumer updater) { + long cmdId = task.getCommandId(); + if (context == null || cmdId == 0) { + // No SCM context (test) or no tracked command (e.g. reconcile task). + return; } + context.updateCommandStatus(cmdId, updater); } private void decrementTaskCounter(AbstractReplicationTask task) { @@ -395,6 +414,8 @@ public void run() { LOG.info("Ignoring {} since the deadline has passed ({} < {})", this, Instant.ofEpochMilli(deadline), Instant.ofEpochMilli(now)); timeoutCounter.get(task.getMetricName()).incrementAndGet(); + // FAILED drains the PENDING status entry so SCM can clear and reschedule promptly. + updateCommandStatus(task, CommandStatus::markAsFailed); return; } @@ -405,6 +426,7 @@ public void run() { && task.shouldOnlyRunOnInServiceDatanodes()) { LOG.info("Ignoring {} since datanode is not in service ({})", this, dn.getPersistedOpState()); + updateCommandStatus(task, CommandStatus::markAsFailed); return; } @@ -413,6 +435,7 @@ public void run() { if (currentTerm.isPresent() && taskTerm < currentTerm.getAsLong()) { LOG.info("Ignoring {} since SCM leader has new term ({} < {})", this, taskTerm, currentTerm.getAsLong()); + updateCommandStatus(task, CommandStatus::markAsFailed); return; } } @@ -422,17 +445,23 @@ public void run() { if (task.getStatus() == Status.FAILED) { LOG.warn("Failed {}", this); failureCounter.get(task.getMetricName()).incrementAndGet(); + updateCommandStatus(task, CommandStatus::markAsFailed); } else if (task.getStatus() == Status.DONE) { LOG.info("Successful {}", this); successCounter.get(task.getMetricName()).incrementAndGet(); + // Mark EXECUTED (non-PENDING) so CommandStatusReportPublisher drains this entry from the status map. + updateCommandStatus(task, CommandStatus::markAsExecuted); } else if (task.getStatus() == Status.SKIPPED) { LOG.info("Skipped {}", this); skippedCounter.get(task.getMetricName()).incrementAndGet(); + // SKIPPED means the replica already exists; EXECUTED drains the entry. + updateCommandStatus(task, CommandStatus::markAsExecuted); } } catch (Exception e) { task.setStatus(Status.FAILED); LOG.warn("Failed {}", this, e); failureCounter.get(task.getMetricName()).incrementAndGet(); + updateCommandStatus(task, CommandStatus::markAsFailed); } finally { queuedCounter.get(task.getMetricName()).decrementAndGet(); opsLatencyMs.get(task.getMetricName()).add(Time.monotonicNow() - startTime); @@ -441,6 +470,11 @@ public void run() { } } + private void updateCommandStatus(AbstractReplicationTask t, + Consumer updater) { + ReplicationSupervisor.this.updateCommandStatus(t, updater); + } + @Override public String toString() { return task.toString(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index a32e9b41ab1b..0afc8fe41e88 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -66,6 +66,11 @@ protected ReplicationTask( replicator); } + @Override + public long getCommandId() { + return cmd.getId(); + } + @Override public String getMetricName() { return METRIC_NAME; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java index 8d79335591b9..9311027b9c76 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java @@ -19,6 +19,7 @@ import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE; import static org.apache.ozone.test.GenericTestUtils.waitFor; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -66,6 +67,7 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -755,6 +757,18 @@ void keepsExistingTermForCommandWithOlderTerm() throws IOException { assertNull(subject.getNextCommand()); } + @Test + public void addCmdStatusRegistersPendingForReplication() throws IOException { + StateContext ctx = createSubject(); + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(1L, Collections.emptyList()); + ctx.addCmdStatus(cmd); + CommandStatus status = ctx.getCommandStatusMap().get(cmd.getId()); + assertNotNull(status); + assertEquals(SCMCommandProto.Type.replicateContainerCommand, status.getType()); + assertEquals(PENDING, status.getStatus()); + } + private static StateContext createSubject() throws IOException { OzoneConfiguration conf = new OzoneConfiguration(); DatanodeStateMachine datanodeStateMachineMock = diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index a8b590e671e8..48a45e361fc7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -24,6 +24,8 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL; import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; @@ -117,6 +119,7 @@ import org.apache.ozone.test.TestClock; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; /** @@ -844,6 +847,62 @@ public void testReconcileContainerCommandDeduplication() throws Exception { } } + @Test + public void reportsExecutedStatusOnSuccess() { + ReplicationSupervisor supervisor = supervisorWithReplicator( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.DONE)); + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(1L, Collections.emptyList()); + cmd.setTerm(CURRENT_TERM); + context.addCmdStatus(cmd); + supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); + assertEquals(EXECUTED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); + } + + @Test + public void reportsFailedStatusOnFailure() { + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(2L, Collections.emptyList()); + cmd.setTerm(CURRENT_TERM); + context.addCmdStatus(cmd); + ReplicationSupervisor supervisor = supervisorWithReplicator( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.FAILED)); + supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); + assertEquals(FAILED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); + } + + @Test + public void reportsExecutedStatusOnSkip() { + // A SKIPPED task (replica already exists) should drain its PENDING status entry to EXECUTED + // so CommandStatusReportPublisher removes it instead of re-sending it forever. + ReplicationSupervisor supervisor = supervisorWithReplicator( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.SKIPPED)); + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(3L, Collections.emptyList()); + cmd.setTerm(CURRENT_TERM); + context.addCmdStatus(cmd); + supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); + assertEquals(EXECUTED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); + } + + @Test + public void reportsFailedStatusWhenDeadlinePassed() { + // A task dropped because its deadline has passed should mark the status FAILED + // so the PENDING entry drains and SCM can reschedule promptly. + ReplicationSupervisor supervisor = supervisorWith( + __ -> task -> task.setStatus(AbstractReplicationTask.Status.DONE), + newDirectExecutorService()); + ReplicateContainerCommand cmd = + ReplicateContainerCommand.fromSources(4L, Collections.emptyList()); + cmd.setTerm(CURRENT_TERM); + cmd.setDeadline(clock.millis() + 5000); + context.addCmdStatus(cmd); + // Advance clock past the deadline + clock.fastForward(10000); + supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get())); + assertEquals(FAILED, context.getCommandStatusMap().get(cmd.getId()).getStatus()); + } + private static class BlockingTask extends AbstractReplicationTask { private final CountDownLatch runningLatch; @@ -1171,6 +1230,61 @@ public void poolSizeCanBeUpdatedByReplicationStreamsLimitReconfiguration() { assertEquals(3, threadPoolSize.get()); } + @Test + public void reportsFailedStatusWhenQueueFull() { + // A task dropped because the queue is full should drain its PENDING entry to FAILED + // so SCM can free the inflight quota and reschedule promptly. + final int maxQueueSize = 1; + DatanodeConfiguration datanodeConfig = new DatanodeConfiguration(); + datanodeConfig.setCommandQueueLimit(maxQueueSize); + ReplicationServer.ReplicationConfig repConf = new ReplicationServer.ReplicationConfig(); + + ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context) + .executor(new DiscardingExecutorService()) + .datanodeConfig(datanodeConfig) + .replicationConfig(repConf) + .clock(clock) + .build(); + + // Fill the queue to capacity with one task so the next addTask is dropped. + ReplicateContainerCommand fillCmd = createCommand(100L); + supervisor.addTask(new ReplicationTask(fillCmd, noopReplicator)); + assertEquals(maxQueueSize, supervisor.getTotalInFlightReplications()); + + // Now register a PENDING status for a second command and try to add it. + ReplicateContainerCommand droppedCmd = createCommand(200L); + context.addCmdStatus(droppedCmd); + supervisor.addTask(new ReplicationTask(droppedCmd, noopReplicator)); + + // The dropped task's status should be FAILED so the PENDING entry is drained. + assertEquals(FAILED, context.getCommandStatusMap().get(droppedCmd.getId()).getStatus()); + } + + @Test + public void reportsExecutedStatusWhenDuplicate() { + // A duplicate task (same container already in-flight) should drain its own PENDING + // status entry to EXECUTED, because the already-queued task will report the real outcome. + ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context) + .executor(new DiscardingExecutorService()) + .clock(clock) + .build(); + + // Add the first task - accepted into the in-flight set. + ReplicateContainerCommand firstCmd = createCommand(300L); + supervisor.addTask(new ReplicationTask(firstCmd, noopReplicator)); + assertEquals(1, supervisor.getTotalInFlightReplications()); + + // Register a PENDING status for a second command with the same container id. + ReplicateContainerCommand dupCmd = createCommand(300L); + context.addCmdStatus(dupCmd); + supervisor.addTask(new ReplicationTask(dupCmd, noopReplicator)); + + // The duplicate command's status should be EXECUTED (entry drained). + assertEquals(EXECUTED, context.getCommandStatusMap().get(dupCmd.getId()).getStatus()); + } + @ContainerLayoutTestInfo.ContainerTest public void testMaxQueueSize() { List datanodes = new ArrayList<>(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index 2d794ac83a8c..6b8405cd6051 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -54,16 +54,22 @@ public void onMessage(CommandStatusReportFromDatanode report, // Route command status to its watchers. List deleteBlocksCommandStatus = new ArrayList<>(); + List failedReplicationStatus = new ArrayList<>(); cmdStatusList.forEach(cmdStatus -> { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus .getCmdId(), cmdStatus.getType()); } - if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) { + SCMCommandProto.Type type = cmdStatus.getType(); + if (type == SCMCommandProto.Type.deleteBlocksCommand) { deleteBlocksCommandStatus.add(cmdStatus); + } else if ((type == SCMCommandProto.Type.replicateContainerCommand + || type == SCMCommandProto.Type.reconstructECContainersCommand) + && cmdStatus.getStatus() == CommandStatus.Status.FAILED) { + failedReplicationStatus.add(cmdStatus); } else { LOGGER.debug("CommandStatus of type:{} not handled in " + - "CommandStatusReportHandler.", cmdStatus.getType()); + "CommandStatusReportHandler.", type); } }); @@ -77,6 +83,10 @@ public void onMessage(CommandStatusReportFromDatanode report, publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new DeleteBlockStatus( deleteBlocksCommandStatus, report.getDatanodeDetails())); } + if (!failedReplicationStatus.isEmpty()) { + publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new ReplicationStatus( + failedReplicationStatus, report.getDatanodeDetails())); + } } /** @@ -121,4 +131,21 @@ public DatanodeDetails getDatanodeDetails() { } } + /** + * Wrapper event for failed replication / reconstruction command statuses. + */ + public static class ReplicationStatus extends CommandStatusEvent { + private final DatanodeDetails datanodeDetails; + + public ReplicationStatus(List cmdStatus, + DatanodeDetails datanodeDetails) { + super(cmdStatus); + this.datanodeDetails = datanodeDetails; + } + + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; + } + } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/ReplicationStatusHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/ReplicationStatusHandler.java new file mode 100644 index 000000000000..3f3c3256ecb4 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/ReplicationStatusHandler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.command; + +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.ReplicationStatus; +import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles REPLICATION_STATUS events by clearing failed pending ops. + * Only the leader SCM processes these events; followers skip them. + */ +public class ReplicationStatusHandler implements EventHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationStatusHandler.class); + + private final ContainerReplicaPendingOps containerReplicaPendingOps; + private final SCMContext scmContext; + + public ReplicationStatusHandler(ContainerReplicaPendingOps containerReplicaPendingOps, + SCMContext scmContext) { + this.containerReplicaPendingOps = containerReplicaPendingOps; + this.scmContext = scmContext; + } + + @Override + public void onMessage(ReplicationStatus status, EventPublisher publisher) { + if (!scmContext.isLeader()) { + LOG.info("Skip processing replication status since current SCM is not leader."); + return; + } + status.getCmdStatus().forEach(cmdStatus -> + containerReplicaPendingOps.onReplicationCommandFailed(cmdStatus.getCmdId())); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java index 2905ae4d4a36..535f2d83ebfd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java @@ -52,6 +52,11 @@ public class ContainerReplicaPendingOps { private final Clock clock; private final ConcurrentHashMap> pendingOps = new ConcurrentHashMap<>(); + // Maps an in-flight command id back to the container it belongs to, so a + // failed-command report from a datanode can locate and clear the pending op + // without scanning every container. + private final ConcurrentHashMap commandIdToContainer = + new ConcurrentHashMap<>(); private final Striped stripedLock = Striped.readWriteLock(64); private final ReentrantReadWriteLock globalLock = new ReentrantReadWriteLock(); @@ -125,6 +130,7 @@ public void clear() { globalLock.writeLock().lock(); try { pendingOps.clear(); + commandIdToContainer.clear(); resetCounters(); containerSizeScheduled.clear(); } finally { @@ -274,6 +280,9 @@ public void removeExpiredEntries() { releaseScheduledContainerSize(op); } decrementCounter(op.getOpType(), op.getReplicaIndex()); + if (op.getCommand() != null) { + commandIdToContainer.remove(op.getCommand().getId()); + } } expiredOps.add(op); updateTimeoutMetrics(op); @@ -293,6 +302,62 @@ public void removeExpiredEntries() { } } + /** + * Handle a failure report for a failed replication or reconstruction (ADD) + * command from a datanode. This is called by + * {@code CommandStatusReportHandler} only for FAILED + * {@code replicateContainerCommand} and {@code reconstructECContainersCommand} + * statuses; DELETE command IDs are never routed here by the current + * command-status tracking path. + * + *

The matched op is removed from the pending list and its counter + * decremented so the inflight quota is freed immediately instead of waiting + * for the event timeout. For ADD ops the scheduled container size is also + * released. Subscribers are notified with timedOut=true so the + * ReplicationManager re-evaluates the container as it would for an expired op. + * + * @param cmdId the id of the failed command, as reported by the datanode + */ + public void onReplicationCommandFailed(long cmdId) { + ContainerID containerID = commandIdToContainer.get(cmdId); + if (containerID == null) { + // Already completed, already expired, or not a tracked replication op. + return; + } + List failedOps = new ArrayList<>(); + Lock lock = writeLock(containerID); + lock(lock); + try { + List ops = pendingOps.get(containerID); + if (ops == null) { + return; + } + Iterator iterator = ops.listIterator(); + while (iterator.hasNext()) { + ContainerReplicaOp op = iterator.next(); + if (op.getCommand() != null && op.getCommand().getId() == cmdId) { + iterator.remove(); + if (op.getOpType() == ADD) { + releaseScheduledContainerSize(op); + } + decrementCounter(op.getOpType(), op.getReplicaIndex()); + commandIdToContainer.remove(cmdId); + failedOps.add(op); + } + } + if (ops.isEmpty()) { + pendingOps.remove(containerID); + } + } finally { + unlock(lock); + } + if (!failedOps.isEmpty()) { + // Failures reuse timedOut=true so ReplicationManager re-evaluates like an expired op. + // Timeout metrics are not updated here. + notifySubscribers(failedOps, containerID, true); + } + } + private void releaseScheduledContainerSize(ContainerReplicaOp op) { containerSizeScheduled.computeIfPresent(op.getTarget().getID(), (k, v) -> { long newSize = v.getSize() - op.getContainerSize(); @@ -339,6 +404,9 @@ private void addReplica(ContainerReplicaOp.PendingOpType opType, containerID, s -> new ArrayList<>()); ops.add(new ContainerReplicaOp(opType, target, replicaIndex, command, deadlineEpochMillis, containerSize)); + if (command != null) { + commandIdToContainer.put(command.getId(), containerID); + } DatanodeID id = target.getID(); if (opType == ADD) { containerSizeScheduled.compute(id, (k, v) -> { @@ -374,6 +442,9 @@ private boolean completeOp(ContainerReplicaOp.PendingOpType opType, found = true; completedOps.add(op); iterator.remove(); + if (op.getCommand() != null) { + commandIdToContainer.remove(op.getCommand().getId()); + } if (opType == ADD) { containerSizeScheduled.computeIfPresent(target.getID(), (k, v) -> { long newSize = v.getSize() - op.getContainerSize(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 2255fb392593..b5ff46678a35 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -212,6 +212,15 @@ public final class SCMEvents { new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class, "Delete_Block_Status"); + /** + * This event will be triggered by CommandStatusReportHandler whenever a + * datanode reports a failed replication or reconstruction command. + */ + public static final TypedEvent + REPLICATION_STATUS = + new TypedEvent<>(CommandStatusReportHandler.ReplicationStatus.class, + "Replication_Status"); + public static final TypedEvent REPLICATION_MANAGER_NOTIFY = new TypedEvent<>(DatanodeDetails.class, "Replication_Manager_Notify"); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 69f7973ff1bd..ae3afe855e2b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; +import org.apache.hadoop.hdds.scm.command.ReplicationStatusHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -603,6 +604,8 @@ private void initializeEventHandlers() { eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); + eventQueue.addHandler(SCMEvents.REPLICATION_STATUS, + new ReplicationStatusHandler(containerReplicaPendingOps, scmContext)); eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); eventQueue.addHandler(SCMEvents.RECONCILE_CONTAINER, reconcileContainerEventHandler); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java index 5a540e71c843..1d4aaf088630 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.command; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.util.ArrayList; import java.util.Collections; @@ -29,6 +31,8 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.ReplicationStatus; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode; import org.apache.hadoop.hdds.server.events.Event; @@ -47,10 +51,16 @@ public class TestCommandStatusReportHandler implements EventPublisher { private static final Logger LOG = LoggerFactory .getLogger(TestCommandStatusReportHandler.class); private CommandStatusReportHandler cmdStatusReportHandler; + private ReplicationStatus replicationStatus; @BeforeEach public void setup() { cmdStatusReportHandler = new CommandStatusReportHandler(); + replicationStatus = null; + } + + private ReplicationStatus getReplicationStatus() { + return replicationStatus; } @Test @@ -81,11 +91,25 @@ private CommandStatusReportFromDatanode getStatusReport( dn, report); } + @Test + public void replicationFailureFiresReplicationStatusEvent() { + // getCommandStatusList() already includes a FAILED replicateContainerCommand + cmdStatusReportHandler.onMessage(getStatusReport(getCommandStatusList()), this); + assertNotNull(getReplicationStatus()); + assertEquals(1, getReplicationStatus().getCmdStatus().size()); + assertEquals(CommandStatus.Status.FAILED, + getReplicationStatus().getCmdStatus().get(0).getStatus()); + } + @Override + @SuppressWarnings("unchecked") public > void fireEvent(EVENT_TYPE event, PAYLOAD payload) { LOG.info("firing event of type {}, payload {}", event.getName(), payload .toString()); + if (event == SCMEvents.REPLICATION_STATUS) { + replicationStatus = (ReplicationStatus) payload; + } } private List getCommandStatusList() { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestReplicationStatusHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestReplicationStatusHandler.java new file mode 100644 index 000000000000..5c8b0ea8b83c --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestReplicationStatusHandler.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.command; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import org.apache.hadoop.hdds.HddsIdFactory; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.ReplicationStatus; +import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for ReplicationStatusHandler. + */ +public class TestReplicationStatusHandler { + + @Test + public void leaderClearsReplicaPendingOp() { + SCMContext scmContext = mock(SCMContext.class); + when(scmContext.isLeader()).thenReturn(true); + ContainerReplicaPendingOps pendingOps = mock(ContainerReplicaPendingOps.class); + EventPublisher publisher = mock(EventPublisher.class); + + long cmdId = HddsIdFactory.getLongId(); + CommandStatus cmdStatus = CommandStatus.newBuilder() + .setCmdId(cmdId) + .setStatus(CommandStatus.Status.FAILED) + .setType(Type.replicateContainerCommand) + .build(); + ReplicationStatus status = new ReplicationStatus( + Collections.singletonList(cmdStatus), + MockDatanodeDetails.randomDatanodeDetails()); + + ReplicationStatusHandler handler = new ReplicationStatusHandler(pendingOps, scmContext); + handler.onMessage(status, publisher); + + verify(pendingOps).onReplicationCommandFailed(cmdId); + } + + @Test + public void followerSkipsClearingReplicaPendingOp() { + SCMContext scmContext = mock(SCMContext.class); + when(scmContext.isLeader()).thenReturn(false); + ContainerReplicaPendingOps pendingOps = mock(ContainerReplicaPendingOps.class); + EventPublisher publisher = mock(EventPublisher.class); + + long cmdId = HddsIdFactory.getLongId(); + CommandStatus cmdStatus = CommandStatus.newBuilder() + .setCmdId(cmdId) + .setStatus(CommandStatus.Status.FAILED) + .setType(Type.replicateContainerCommand) + .build(); + ReplicationStatus status = new ReplicationStatus( + Collections.singletonList(cmdStatus), + MockDatanodeDetails.randomDatanodeDetails()); + + ReplicationStatusHandler handler = new ReplicationStatusHandler(pendingOps, scmContext); + handler.onMessage(status, publisher); + + verifyNoInteractions(pendingOps); + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java index ee813f0942cc..f944838b3a06 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java @@ -533,6 +533,26 @@ public void testScheduledSizeIsCorrectlyTrackedAndExpired() { assertNull(scheduled.get(dn2.getID())); } + @Test + public void failedAddCommandRemovesPendingOpAndDecrementsCount() { + ContainerID containerID = ContainerID.valueOf(1); + SCMCommand cmd = ReplicateContainerCommand.toTarget(1L, dn1); + pendingOps.scheduleAddReplica(containerID, dn1, 0, cmd, + clock.millis() + 60000, 1000L, clock.millis()); + assertEquals(1, pendingOps.getPendingOpCount(ADD)); + + pendingOps.onReplicationCommandFailed(cmd.getId()); + + assertEquals(0, pendingOps.getPendingOpCount(ADD)); + assertTrue(pendingOps.getPendingOps(containerID).isEmpty()); + } + + @Test + public void failedCommandWithUnknownIdIsNoOp() { + pendingOps.onReplicationCommandFailed(999999L); + assertEquals(0, pendingOps.getPendingOpCount(ADD)); + } + /** * Tests that only the size of containers with expired ops is reduced from the map tracking size of pending ops. * For example, if target Datanode DN1 has two pending ADD ops 10GB + 15GB, and the first op expires, then only