Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,11 @@ public void updateContainerState(final ContainerID cid,
lock.lock();
try {
if (containerExist(cid)) {
containerStateManager.updateContainerState(protoId, event);
final ContainerInfo info = containerStateManager.getContainer(cid);
if (info != null) {
// Delegate to @Replicate method with current sequenceId
containerStateManager.updateContainerStateWithSequenceId(protoId, event, info.getSequenceId());
}
} else {
throw new ContainerNotFoundException(cid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,14 @@ void addContainer(ContainerInfoProto containerInfo)
throws IOException;

/**
*
* Updates container state with sequenceId synchronization for HA consistency.
* This method ensures that all SCM nodes have the same sequenceId when
* state transitions occur.
*/
@Replicate
void updateContainerState(HddsProtos.ContainerID id,
HddsProtos.LifeCycleEvent event)
void updateContainerStateWithSequenceId(HddsProtos.ContainerID id,
HddsProtos.LifeCycleEvent event,
Long sequenceId)
throws IOException, InvalidStateTransitionException;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,29 +353,41 @@ public boolean contains(ContainerID id) {
}

@Override
public void updateContainerState(final HddsProtos.ContainerID containerID,
final LifeCycleEvent event)
public void updateContainerStateWithSequenceId(final HddsProtos.ContainerID containerID,
final LifeCycleEvent event,
final Long sequenceId)
throws IOException, InvalidStateTransitionException {
// TODO: Remove the protobuf conversion after fixing ContainerStateMap.
final ContainerID id = ContainerID.getFromProtobuf(containerID);

try (AutoCloseableLock ignored = writeLock(id)) {
Copy link
Copy Markdown
Contributor

@siddhantsangwan siddhantsangwan Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A read lock is acquired in the previous method. Can you check if there is possibility of a deadlock or some other problem with acquiring the write lock here while the read lock is held? AFAIK java doesn't allow a normal reentrant read write lock to be upgraded from read to write. Not sure if the striped lock being used here has some handling for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used this method anymore now.

if (containers.contains(id)) {
final ContainerInfo oldInfo = containers.getContainerInfo(id);
final LifeCycleState oldState = oldInfo.getState();
final ContainerInfo containerInfo = containers.getContainerInfo(id);

// Synchronize sequenceId first
if (containerInfo.getSequenceId() < sequenceId) {
containerInfo.updateSequenceId(sequenceId);
} else if (containerInfo.getSequenceId() > sequenceId) {
LOG.warn("Container sequenceId is {} greater than the leader container sequenceId {}",
containerInfo.getSequenceId(), sequenceId);
}
Comment thread
ashishkumar50 marked this conversation as resolved.

final LifeCycleState oldState = containerInfo.getState();
final LifeCycleState newState = stateMachine.getNextState(
oldInfo.getState(), event);
oldState, event);
if (newState.getNumber() > oldState.getNumber()) {
ExecutionUtil.create(() -> {
containers.updateState(id, oldState, newState);
transactionBuffer.addToBuffer(containerStore, id,
containers.getContainerInfo(id));
}).onException(() -> {
transactionBuffer.addToBuffer(containerStore, id, oldInfo);
containers.updateState(id, newState, oldState);
ContainerInfo currentInfo = containers.getContainerInfo(id);
transactionBuffer.addToBuffer(containerStore, id, currentInfo);

}).execute();
containerStateChangeActions.getOrDefault(event, info -> { })
.accept(oldInfo);
.accept(containerInfo);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ void setup() throws IOException, InvalidStateTransitionException {

doAnswer(invocation -> {
containerStateManager
.updateContainerState(((ContainerID)invocation
.updateContainerStateWithSequenceId(((ContainerID)invocation
.getArguments()[0]).getProtobuf(),
(HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
(HddsProtos.LifeCycleEvent)invocation.getArguments()[1], 0L);
return null;
}).when(containerManager).updateContainerState(
any(ContainerID.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -196,6 +197,46 @@ public void testTransitionContainerToClosedStateAllowOnlyDeletingOrDeletedContai
}
}

@Test
public void testSequenceIdOnStateUpdate() throws Exception {
ContainerID containerID = ContainerID.valueOf(3L);
long sequenceId = 100L;

ContainerInfo containerInfo = new ContainerInfo.Builder()
.setContainerID(containerID.getId())
.setState(HddsProtos.LifeCycleState.OPEN)
.setSequenceId(sequenceId)
.setOwner("scm")
.setPipelineID(PipelineID.randomId())
.setReplicationConfig(
RatisReplicationConfig
.getInstance(ReplicationFactor.THREE))
.build();

containerStateManager.addContainer(containerInfo.getProtobuf());

// Try to update with a higher sequenceId
containerStateManager.updateContainerStateWithSequenceId(
containerID.getProtobuf(),
HddsProtos.LifeCycleEvent.FINALIZE,
sequenceId + 1);

ContainerInfo afterFirst = containerStateManager.getContainer(containerID);
long currentSequenceId = afterFirst.getSequenceId();
// Sequence id should be updated with latest sequence id
assertEquals(sequenceId + 1, currentSequenceId);

// Try updating with older sequenceId
containerStateManager.updateContainerStateWithSequenceId(
containerID.getProtobuf(),
HddsProtos.LifeCycleEvent.CLOSE,
sequenceId - 10); // Older sequenceId

// Assert - SequenceId should not change
ContainerInfo finalInfo = containerStateManager.getContainer(containerID);
assertEquals(finalInfo.getSequenceId(), currentSequenceId);
}

private void addReplica(ContainerInfo cont, DatanodeDetails node) {
ContainerReplica replica = ContainerReplica.newBuilder()
.setContainerID(cont.containerID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ public void setup() throws IOException, InvalidStateTransitionException,

doAnswer(invocation -> {
containerStateManager
.updateContainerState(((ContainerID)invocation
.updateContainerStateWithSequenceId(((ContainerID)invocation
.getArguments()[0]).getProtobuf(),
(HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
(HddsProtos.LifeCycleEvent)invocation.getArguments()[1], 0L);
return null;
}).when(containerManager).updateContainerState(
any(ContainerID.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,11 @@ public void testCreateRecoveryContainer() throws Exception {
// To create the actual situation, container would have been in closed
// state at SCM.
scm.getContainerManager().getContainerStateManager()
.updateContainerState(container.containerID().getProtobuf(),
HddsProtos.LifeCycleEvent.FINALIZE);
.updateContainerStateWithSequenceId(container.containerID().getProtobuf(),
HddsProtos.LifeCycleEvent.FINALIZE, 0L);
scm.getContainerManager().getContainerStateManager()
.updateContainerState(container.containerID().getProtobuf(),
HddsProtos.LifeCycleEvent.CLOSE);
.updateContainerStateWithSequenceId(container.containerID().getProtobuf(),
HddsProtos.LifeCycleEvent.CLOSE, 0L);

//Create the recovering container in DN.
String encodedToken = cToken.encodeToUrlString();
Expand Down Expand Up @@ -555,11 +555,11 @@ public void testCreateRecoveryContainerAfterDNRestart() throws Exception {
// To create the actual situation, container would have been in closed
// state at SCM.
scm.getContainerManager().getContainerStateManager()
.updateContainerState(container.containerID().getProtobuf(),
HddsProtos.LifeCycleEvent.FINALIZE);
.updateContainerStateWithSequenceId(container.containerID().getProtobuf(),
HddsProtos.LifeCycleEvent.FINALIZE, 0L);
scm.getContainerManager().getContainerStateManager()
.updateContainerState(container.containerID().getProtobuf(),
HddsProtos.LifeCycleEvent.CLOSE);
.updateContainerStateWithSequenceId(container.containerID().getProtobuf(),
HddsProtos.LifeCycleEvent.CLOSE, 0L);

//Create the recovering container in target DN.
String encodedToken = cToken.encodeToUrlString();
Expand Down Expand Up @@ -936,12 +936,12 @@ public void testECReconstructionCoordinatorShouldCleanupContainersOnFailure()
private void closeContainer(long conID)
throws IOException, InvalidStateTransitionException {
//Close the container first.
scm.getContainerManager().getContainerStateManager().updateContainerState(
scm.getContainerManager().getContainerStateManager().updateContainerStateWithSequenceId(
HddsProtos.ContainerID.newBuilder().setId(conID).build(),
HddsProtos.LifeCycleEvent.FINALIZE);
scm.getContainerManager().getContainerStateManager().updateContainerState(
HddsProtos.LifeCycleEvent.FINALIZE, 0L);
scm.getContainerManager().getContainerStateManager().updateContainerStateWithSequenceId(
HddsProtos.ContainerID.newBuilder().setId(conID).build(),
HddsProtos.LifeCycleEvent.CLOSE);
HddsProtos.LifeCycleEvent.CLOSE, 0L);
}

private void checkBlockDataWithRetry(
Expand Down