Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.hadoop.ozone.container.common.states.datanode.RunningDatanodeState;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -854,6 +855,14 @@ public void addCmdStatus(SCMCommand<?> cmd) {
.setStatus(Status.PENDING)
.setType(cmd.getType())
.build());
} else if (cmd.getType() == SCMCommandProto.Type.replicateContainerCommand
|| cmd.getType() == SCMCommandProto.Type.reconstructECContainersCommand) {
addCmdStatus(cmd.getId(),
CommandStatusBuilder.newBuilder()
.setCmdId(cmd.getId())
.setStatus(Status.PENDING)
.setType(cmd.getType())
.build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ public class ECReconstructionCommandInfo {
private final ByteString missingContainerIndexes;
private final long deadlineMsSinceEpoch;
private final long term;
private final long commandId;

public ECReconstructionCommandInfo(ReconstructECContainersCommand cmd) {
this.containerID = cmd.getContainerID();
this.commandId = cmd.getId();
this.ecReplicationConfig = cmd.getEcReplicationConfig();
this.missingContainerIndexes = cmd.getMissingContainerIndexes();
this.deadlineMsSinceEpoch = cmd.getDeadline();
Expand Down Expand Up @@ -97,4 +99,8 @@ public long getTerm() {
return term;
}

public long getId() {
return commandId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public ECReconstructionCoordinatorTask(
debugString = reconstructionCommandInfo.toString();
}

@Override
public long getCommandId() {
return reconstructionCommandInfo.getId();
}

@Override
public String getMetricName() {
return METRIC_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ public long getContainerId() {
return containerId;
}

/**
* Returns the id of the SCM command that scheduled this task, or 0 if this
* task is not associated with a tracked command. Subclasses backed by an
* {@link org.apache.hadoop.ozone.protocol.commands.SCMCommand} override this.
*/
public long getCommandId() {
return 0;
}

public Status getStatus() {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand All @@ -53,6 +54,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -233,6 +235,9 @@ public void addTask(AbstractReplicationTask task) {
if (queueHasRoomFor(task)) {
initCounters(task);
addToQueue(task);
} else {
// Queue is full: drain the PENDING status entry so SCM can reschedule promptly.
updateCommandStatus(task, CommandStatus::markAsFailed);
}
}

Expand Down Expand Up @@ -276,7 +281,21 @@ private void addToQueue(AbstractReplicationTask task) {
}
queuedCounter.get(task.getMetricName()).incrementAndGet();
executor.execute(new TaskRunner(task));
} else {
// Duplicate: an identical task is already in-flight; the in-flight copy will report the real
// outcome, so drain this command's PENDING entry now to avoid a status-map leak.
updateCommandStatus(task, CommandStatus::markAsExecuted);
}
}

private void updateCommandStatus(AbstractReplicationTask task,
Consumer<CommandStatus> updater) {
long cmdId = task.getCommandId();
if (context == null || cmdId == 0) {
// No SCM context (test) or no tracked command (e.g. reconcile task).
return;
}
context.updateCommandStatus(cmdId, updater);
}

private void decrementTaskCounter(AbstractReplicationTask task) {
Expand Down Expand Up @@ -395,6 +414,8 @@ public void run() {
LOG.info("Ignoring {} since the deadline has passed ({} < {})",
this, Instant.ofEpochMilli(deadline), Instant.ofEpochMilli(now));
timeoutCounter.get(task.getMetricName()).incrementAndGet();
// FAILED drains the PENDING status entry so SCM can clear and reschedule promptly.
updateCommandStatus(task, CommandStatus::markAsFailed);
return;
}

Expand All @@ -405,6 +426,7 @@ public void run() {
&& task.shouldOnlyRunOnInServiceDatanodes()) {
LOG.info("Ignoring {} since datanode is not in service ({})",
this, dn.getPersistedOpState());
updateCommandStatus(task, CommandStatus::markAsFailed);
return;
}

Expand All @@ -413,6 +435,7 @@ public void run() {
if (currentTerm.isPresent() && taskTerm < currentTerm.getAsLong()) {
LOG.info("Ignoring {} since SCM leader has new term ({} < {})",
this, taskTerm, currentTerm.getAsLong());
updateCommandStatus(task, CommandStatus::markAsFailed);
return;
}
}
Expand All @@ -422,17 +445,23 @@ public void run() {
if (task.getStatus() == Status.FAILED) {
LOG.warn("Failed {}", this);
failureCounter.get(task.getMetricName()).incrementAndGet();
updateCommandStatus(task, CommandStatus::markAsFailed);
} else if (task.getStatus() == Status.DONE) {
LOG.info("Successful {}", this);
successCounter.get(task.getMetricName()).incrementAndGet();
// Mark EXECUTED (non-PENDING) so CommandStatusReportPublisher drains this entry from the status map.
updateCommandStatus(task, CommandStatus::markAsExecuted);
} else if (task.getStatus() == Status.SKIPPED) {
LOG.info("Skipped {}", this);
skippedCounter.get(task.getMetricName()).incrementAndGet();
// SKIPPED means the replica already exists; EXECUTED drains the entry.
updateCommandStatus(task, CommandStatus::markAsExecuted);
}
} catch (Exception e) {
task.setStatus(Status.FAILED);
LOG.warn("Failed {}", this, e);
failureCounter.get(task.getMetricName()).incrementAndGet();
updateCommandStatus(task, CommandStatus::markAsFailed);
} finally {
queuedCounter.get(task.getMetricName()).decrementAndGet();
opsLatencyMs.get(task.getMetricName()).add(Time.monotonicNow() - startTime);
Expand All @@ -441,6 +470,11 @@ public void run() {
}
}

private void updateCommandStatus(AbstractReplicationTask t,
Consumer<CommandStatus> updater) {
ReplicationSupervisor.this.updateCommandStatus(t, updater);
}

@Override
public String toString() {
return task.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ protected ReplicationTask(
replicator);
}

@Override
public long getCommandId() {
return cmd.getId();
}

@Override
public String getMetricName() {
return METRIC_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status.PENDING;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE;
import static org.apache.ozone.test.GenericTestUtils.waitFor;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
Expand Down Expand Up @@ -755,6 +757,18 @@ void keepsExistingTermForCommandWithOlderTerm() throws IOException {
assertNull(subject.getNextCommand());
}

@Test
public void addCmdStatusRegistersPendingForReplication() throws IOException {
StateContext ctx = createSubject();
ReplicateContainerCommand cmd =
ReplicateContainerCommand.fromSources(1L, Collections.emptyList());
ctx.addCmdStatus(cmd);
CommandStatus status = ctx.getCommandStatusMap().get(cmd.getId());
assertNotNull(status);
assertEquals(SCMCommandProto.Type.replicateContainerCommand, status.getType());
assertEquals(PENDING, status.getStatus());
}

private static StateContext createSubject() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
DatanodeStateMachine datanodeStateMachineMock =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status.EXECUTED;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status.FAILED;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL;
import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
Expand Down Expand Up @@ -117,6 +119,7 @@
import org.apache.ozone.test.TestClock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/**
Expand Down Expand Up @@ -844,6 +847,62 @@ public void testReconcileContainerCommandDeduplication() throws Exception {
}
}

@Test
public void reportsExecutedStatusOnSuccess() {
ReplicationSupervisor supervisor = supervisorWithReplicator(
__ -> task -> task.setStatus(AbstractReplicationTask.Status.DONE));
ReplicateContainerCommand cmd =
ReplicateContainerCommand.fromSources(1L, Collections.emptyList());
cmd.setTerm(CURRENT_TERM);
context.addCmdStatus(cmd);
supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get()));
assertEquals(EXECUTED, context.getCommandStatusMap().get(cmd.getId()).getStatus());
}

@Test
public void reportsFailedStatusOnFailure() {
ReplicateContainerCommand cmd =
ReplicateContainerCommand.fromSources(2L, Collections.emptyList());
cmd.setTerm(CURRENT_TERM);
context.addCmdStatus(cmd);
ReplicationSupervisor supervisor = supervisorWithReplicator(
__ -> task -> task.setStatus(AbstractReplicationTask.Status.FAILED));
supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get()));
assertEquals(FAILED, context.getCommandStatusMap().get(cmd.getId()).getStatus());
}

@Test
public void reportsExecutedStatusOnSkip() {
// A SKIPPED task (replica already exists) should drain its PENDING status entry to EXECUTED
// so CommandStatusReportPublisher removes it instead of re-sending it forever.
ReplicationSupervisor supervisor = supervisorWithReplicator(
__ -> task -> task.setStatus(AbstractReplicationTask.Status.SKIPPED));
ReplicateContainerCommand cmd =
ReplicateContainerCommand.fromSources(3L, Collections.emptyList());
cmd.setTerm(CURRENT_TERM);
context.addCmdStatus(cmd);
supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get()));
assertEquals(EXECUTED, context.getCommandStatusMap().get(cmd.getId()).getStatus());
}

@Test
public void reportsFailedStatusWhenDeadlinePassed() {
// A task dropped because its deadline has passed should mark the status FAILED
// so the PENDING entry drains and SCM can reschedule promptly.
ReplicationSupervisor supervisor = supervisorWith(
__ -> task -> task.setStatus(AbstractReplicationTask.Status.DONE),
newDirectExecutorService());
ReplicateContainerCommand cmd =
ReplicateContainerCommand.fromSources(4L, Collections.emptyList());
cmd.setTerm(CURRENT_TERM);
cmd.setDeadline(clock.millis() + 5000);
context.addCmdStatus(cmd);
// Advance clock past the deadline
clock.fastForward(10000);
supervisor.addTask(new ReplicationTask(cmd, replicatorRef.get()));
assertEquals(FAILED, context.getCommandStatusMap().get(cmd.getId()).getStatus());
}

private static class BlockingTask extends AbstractReplicationTask {

private final CountDownLatch runningLatch;
Expand Down Expand Up @@ -1171,6 +1230,61 @@ public void poolSizeCanBeUpdatedByReplicationStreamsLimitReconfiguration() {
assertEquals(3, threadPoolSize.get());
}

@Test
public void reportsFailedStatusWhenQueueFull() {
// A task dropped because the queue is full should drain its PENDING entry to FAILED
// so SCM can free the inflight quota and reschedule promptly.
final int maxQueueSize = 1;
DatanodeConfiguration datanodeConfig = new DatanodeConfiguration();
datanodeConfig.setCommandQueueLimit(maxQueueSize);
ReplicationServer.ReplicationConfig repConf = new ReplicationServer.ReplicationConfig();

ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.executor(new DiscardingExecutorService())
.datanodeConfig(datanodeConfig)
.replicationConfig(repConf)
.clock(clock)
.build();

// Fill the queue to capacity with one task so the next addTask is dropped.
ReplicateContainerCommand fillCmd = createCommand(100L);
supervisor.addTask(new ReplicationTask(fillCmd, noopReplicator));
assertEquals(maxQueueSize, supervisor.getTotalInFlightReplications());

// Now register a PENDING status for a second command and try to add it.
ReplicateContainerCommand droppedCmd = createCommand(200L);
context.addCmdStatus(droppedCmd);
supervisor.addTask(new ReplicationTask(droppedCmd, noopReplicator));

// The dropped task's status should be FAILED so the PENDING entry is drained.
assertEquals(FAILED, context.getCommandStatusMap().get(droppedCmd.getId()).getStatus());
}

@Test
public void reportsExecutedStatusWhenDuplicate() {
// A duplicate task (same container already in-flight) should drain its own PENDING
// status entry to EXECUTED, because the already-queued task will report the real outcome.
ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.executor(new DiscardingExecutorService())
.clock(clock)
.build();

// Add the first task - accepted into the in-flight set.
ReplicateContainerCommand firstCmd = createCommand(300L);
supervisor.addTask(new ReplicationTask(firstCmd, noopReplicator));
assertEquals(1, supervisor.getTotalInFlightReplications());

// Register a PENDING status for a second command with the same container id.
ReplicateContainerCommand dupCmd = createCommand(300L);
context.addCmdStatus(dupCmd);
supervisor.addTask(new ReplicationTask(dupCmd, noopReplicator));

// The duplicate command's status should be EXECUTED (entry drained).
assertEquals(EXECUTED, context.getCommandStatusMap().get(dupCmd.getId()).getStatus());
}

@ContainerLayoutTestInfo.ContainerTest
public void testMaxQueueSize() {
List<DatanodeDetails> datanodes = new ArrayList<>();
Expand Down
Loading