diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index d255bc9a672d..c93383e40db6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -281,7 +281,11 @@ public void updateContainerState(final ContainerID cid, lock.lock(); try { if (containerExist(cid)) { - containerStateManager.updateContainerState(protoId, event); + final ContainerInfo info = containerStateManager.getContainer(cid); + if (info != null) { + // Delegate to @Replicate method with current sequenceId + containerStateManager.updateContainerStateWithSequenceId(protoId, event, info.getSequenceId()); + } } else { throw new ContainerNotFoundException(cid); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 0194e65fe00a..f5a2334b7cd2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -160,11 +160,14 @@ void addContainer(ContainerInfoProto containerInfo) throws IOException; /** - * + * Updates container state with sequenceId synchronization for HA consistency. + * This method ensures that all SCM nodes have the same sequenceId when + * state transitions occur. */ @Replicate - void updateContainerState(HddsProtos.ContainerID id, - HddsProtos.LifeCycleEvent event) + void updateContainerStateWithSequenceId(HddsProtos.ContainerID id, + HddsProtos.LifeCycleEvent event, + Long sequenceId) throws IOException, InvalidStateTransitionException; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java index 4b4578894a61..d971b19c406c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java @@ -353,29 +353,41 @@ public boolean contains(ContainerID id) { } @Override - public void updateContainerState(final HddsProtos.ContainerID containerID, - final LifeCycleEvent event) + public void updateContainerStateWithSequenceId(final HddsProtos.ContainerID containerID, + final LifeCycleEvent event, + final Long sequenceId) throws IOException, InvalidStateTransitionException { // TODO: Remove the protobuf conversion after fixing ContainerStateMap. final ContainerID id = ContainerID.getFromProtobuf(containerID); try (AutoCloseableLock ignored = writeLock(id)) { if (containers.contains(id)) { - final ContainerInfo oldInfo = containers.getContainerInfo(id); - final LifeCycleState oldState = oldInfo.getState(); + final ContainerInfo containerInfo = containers.getContainerInfo(id); + + // Synchronize sequenceId first + if (containerInfo.getSequenceId() < sequenceId) { + containerInfo.updateSequenceId(sequenceId); + } else if (containerInfo.getSequenceId() > sequenceId) { + LOG.warn("Container sequenceId is {} greater than the leader container sequenceId {}", + containerInfo.getSequenceId(), sequenceId); + } + + final LifeCycleState oldState = containerInfo.getState(); final LifeCycleState newState = stateMachine.getNextState( - oldInfo.getState(), event); + oldState, event); if (newState.getNumber() > oldState.getNumber()) { ExecutionUtil.create(() -> { containers.updateState(id, oldState, newState); transactionBuffer.addToBuffer(containerStore, id, containers.getContainerInfo(id)); }).onException(() -> { - transactionBuffer.addToBuffer(containerStore, id, oldInfo); containers.updateState(id, newState, oldState); + ContainerInfo currentInfo = containers.getContainerInfo(id); + transactionBuffer.addToBuffer(containerStore, id, currentInfo); + }).execute(); containerStateChangeActions.getOrDefault(event, info -> { }) - .accept(oldInfo); + .accept(containerInfo); } } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 61001b9d38fc..8e5d9b161a4d 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -132,9 +132,9 @@ void setup() throws IOException, InvalidStateTransitionException { doAnswer(invocation -> { containerStateManager - .updateContainerState(((ContainerID)invocation + .updateContainerStateWithSequenceId(((ContainerID)invocation .getArguments()[0]).getProtobuf(), - (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]); + (HddsProtos.LifeCycleEvent)invocation.getArguments()[1], 0L); return null; }).when(containerManager).updateContainerState( any(ContainerID.class), diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java index 0f5b3b6adcd5..51c459f5575b 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -196,6 +197,46 @@ public void testTransitionContainerToClosedStateAllowOnlyDeletingOrDeletedContai } } + @Test + public void testSequenceIdOnStateUpdate() throws Exception { + ContainerID containerID = ContainerID.valueOf(3L); + long sequenceId = 100L; + + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setContainerID(containerID.getId()) + .setState(HddsProtos.LifeCycleState.OPEN) + .setSequenceId(sequenceId) + .setOwner("scm") + .setPipelineID(PipelineID.randomId()) + .setReplicationConfig( + RatisReplicationConfig + .getInstance(ReplicationFactor.THREE)) + .build(); + + containerStateManager.addContainer(containerInfo.getProtobuf()); + + // Try to update with a higher sequenceId + containerStateManager.updateContainerStateWithSequenceId( + containerID.getProtobuf(), + HddsProtos.LifeCycleEvent.FINALIZE, + sequenceId + 1); + + ContainerInfo afterFirst = containerStateManager.getContainer(containerID); + long currentSequenceId = afterFirst.getSequenceId(); + // Sequence id should be updated with latest sequence id + assertEquals(sequenceId + 1, currentSequenceId); + + // Try updating with older sequenceId + containerStateManager.updateContainerStateWithSequenceId( + containerID.getProtobuf(), + HddsProtos.LifeCycleEvent.CLOSE, + sequenceId - 10); // Older sequenceId + + // Assert - SequenceId should not change + ContainerInfo finalInfo = containerStateManager.getContainer(containerID); + assertEquals(finalInfo.getSequenceId(), currentSequenceId); + } + private void addReplica(ContainerInfo cont, DatanodeDetails node) { ContainerReplica replica = ContainerReplica.newBuilder() .setContainerID(cont.containerID()) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java index d9ecaba4935e..188783b95903 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java @@ -165,9 +165,9 @@ public void setup() throws IOException, InvalidStateTransitionException, doAnswer(invocation -> { containerStateManager - .updateContainerState(((ContainerID)invocation + .updateContainerStateWithSequenceId(((ContainerID)invocation .getArguments()[0]).getProtobuf(), - (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]); + (HddsProtos.LifeCycleEvent)invocation.getArguments()[1], 0L); return null; }).when(containerManager).updateContainerState( any(ContainerID.class), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index 7e6047744707..7ce4f9319db2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -465,11 +465,11 @@ public void testCreateRecoveryContainer() throws Exception { // To create the actual situation, container would have been in closed // state at SCM. scm.getContainerManager().getContainerStateManager() - .updateContainerState(container.containerID().getProtobuf(), - HddsProtos.LifeCycleEvent.FINALIZE); + .updateContainerStateWithSequenceId(container.containerID().getProtobuf(), + HddsProtos.LifeCycleEvent.FINALIZE, 0L); scm.getContainerManager().getContainerStateManager() - .updateContainerState(container.containerID().getProtobuf(), - HddsProtos.LifeCycleEvent.CLOSE); + .updateContainerStateWithSequenceId(container.containerID().getProtobuf(), + HddsProtos.LifeCycleEvent.CLOSE, 0L); //Create the recovering container in DN. String encodedToken = cToken.encodeToUrlString(); @@ -555,11 +555,11 @@ public void testCreateRecoveryContainerAfterDNRestart() throws Exception { // To create the actual situation, container would have been in closed // state at SCM. scm.getContainerManager().getContainerStateManager() - .updateContainerState(container.containerID().getProtobuf(), - HddsProtos.LifeCycleEvent.FINALIZE); + .updateContainerStateWithSequenceId(container.containerID().getProtobuf(), + HddsProtos.LifeCycleEvent.FINALIZE, 0L); scm.getContainerManager().getContainerStateManager() - .updateContainerState(container.containerID().getProtobuf(), - HddsProtos.LifeCycleEvent.CLOSE); + .updateContainerStateWithSequenceId(container.containerID().getProtobuf(), + HddsProtos.LifeCycleEvent.CLOSE, 0L); //Create the recovering container in target DN. String encodedToken = cToken.encodeToUrlString(); @@ -936,12 +936,12 @@ public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure() private void closeContainer(long conID) throws IOException, InvalidStateTransitionException { //Close the container first. - scm.getContainerManager().getContainerStateManager().updateContainerState( + scm.getContainerManager().getContainerStateManager().updateContainerStateWithSequenceId( HddsProtos.ContainerID.newBuilder().setId(conID).build(), - HddsProtos.LifeCycleEvent.FINALIZE); - scm.getContainerManager().getContainerStateManager().updateContainerState( + HddsProtos.LifeCycleEvent.FINALIZE, 0L); + scm.getContainerManager().getContainerStateManager().updateContainerStateWithSequenceId( HddsProtos.ContainerID.newBuilder().setId(conID).build(), - HddsProtos.LifeCycleEvent.CLOSE); + HddsProtos.LifeCycleEvent.CLOSE, 0L); } private void checkBlockDataWithRetry(