Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,17 @@ List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(String address,
List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(
boolean mostUsed, int count) throws IOException;

@Deprecated
StatusAndMessages finalizeScmUpgrade(String upgradeClientID)
throws IOException;

@Deprecated
StatusAndMessages queryUpgradeFinalizationProgress(
String upgradeClientID, boolean force, boolean readonly)
throws IOException;

void finalizeUpgrade() throws IOException;
Comment thread
errose28 marked this conversation as resolved.

HddsProtos.UpgradeStatus queryUpgradeStatus() throws IOException;

DecommissionScmResponseProto decommissionScm(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,13 @@ List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(
List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(
boolean mostUsed, int count, int clientVersion) throws IOException;

@Deprecated
StatusAndMessages finalizeScmUpgrade(String upgradeClientID)
throws IOException;

void finalizeUpgrade() throws IOException;

@Deprecated
StatusAndMessages queryUpgradeFinalizationProgress(
String upgradeClientID, boolean force, boolean readonly)
throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.FinalizeScmUpgradeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.FinalizeScmUpgradeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.FinalizeUpgradeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerCountRequestProto;
Expand Down Expand Up @@ -1110,6 +1111,7 @@ public List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(
}

@Override
@Deprecated
public StatusAndMessages finalizeScmUpgrade(String upgradeClientID)
throws IOException {
FinalizeScmUpgradeRequestProto req = FinalizeScmUpgradeRequestProto.
Expand All @@ -1129,6 +1131,14 @@ public StatusAndMessages finalizeScmUpgrade(String upgradeClientID)
}

@Override
public void finalizeUpgrade() throws IOException {
FinalizeUpgradeRequestProto req = FinalizeUpgradeRequestProto.newBuilder()
.build();
submitRequest(Type.FinalizeUpgrade, builder -> builder.setFinalizeUpgradeRequest(req));
}

@Override
@Deprecated
public StatusAndMessages queryUpgradeFinalizationProgress(
String upgradeClientID, boolean force, boolean readonly)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ public StatusAndMessages finalize(String upgradeClientID, T service)
}
}

@Override
public void finalize(T service) throws IOException {
UpgradeFinalization.Status status = versionManager.getUpgradeState();
if (isFinalized(status)) {
return;
}
if (status == FINALIZATION_REQUIRED) {
finalizationExecutor.execute(service, this);
}
}

@Override
public synchronized StatusAndMessages reportStatus(
String upgradeClientID, boolean takeover) throws UpgradeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ public interface UpgradeFinalizer<T> {
StatusAndMessages finalize(String upgradeClientID, T service)
throws IOException;

/**
* Finalize the metadata upgrade. If finalization is not needed or is already underway, this call is a noop.
* @param service the service on which we run finalization.
* @throws IOException if the finalization fails at any stage.
*/
void finalize(T service) throws IOException;

/**
* Finalize the component if needed, and wait until completion.
* @param upgradeClientID the initiating client's identifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message ScmContainerLocationRequest {
optional ReconcileContainerRequestProto reconcileContainerRequest = 49;
optional GetDeletedBlocksTxnSummaryRequestProto getDeletedBlocksTxnSummaryRequest = 50;
optional QueryUpgradeStatusRequestProto queryUpgradeStatusRequest = 51;
optional FinalizeUpgradeRequestProto finalizeUpgradeRequest = 52;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -147,6 +148,7 @@ message ScmContainerLocationResponse {
optional ReconcileContainerResponseProto reconcileContainerResponse = 49;
optional GetDeletedBlocksTxnSummaryResponseProto getDeletedBlocksTxnSummaryResponse = 50;
optional QueryUpgradeStatusResponseProto queryUpgradeStatusResponse = 51;
optional FinalizeUpgradeResponseProto finalizeUpgradeResponse = 52;

enum Status {
OK = 1;
Expand Down Expand Up @@ -205,6 +207,7 @@ enum Type {
ReconcileContainer = 45;
GetDeletedBlocksTransactionSummary = 46;
QueryUpgradeStatus = 47;
FinalizeUpgrade = 48;
}

/**
Expand Down Expand Up @@ -580,6 +583,12 @@ message QueryUpgradeStatusResponseProto {
required hadoop.hdds.UpgradeStatus status = 1;
}

message FinalizeUpgradeRequestProto {
}

message FinalizeUpgradeResponseProto {
}

message ContainerTokenSecretProto {
required string ownerId = 1;
required ContainerID containerId = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,12 @@ public ScmContainerLocationResponse processRequest(
.setStatus(Status.OK)
.setQueryUpgradeStatusResponse(getQueryUpgradeStatus(request.getQueryUpgradeStatusRequest()))
.build();
case FinalizeUpgrade:
impl.finalizeUpgrade();
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.build();
default:
throw new IllegalArgumentException(
"Unknown command type: " + request.getCmdType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
import static org.apache.hadoop.hdds.utils.HddsServerUtil.getRemoteUser;
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.ALREADY_FINALIZED;
import static org.apache.hadoop.ozone.upgrade.UpgradeFinalization.Status.STARTING_FINALIZATION;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -1102,31 +1104,38 @@ public ReplicationManagerReport getReplicationManagerReport() {
}

@Override
@Deprecated
public StatusAndMessages finalizeScmUpgrade(String upgradeClientID) throws
IOException {
final Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("upgradeClientID", upgradeClientID);
if (scm.getLayoutVersionManager().getUpgradeState() == ALREADY_FINALIZED) {
return new StatusAndMessages(ALREADY_FINALIZED, Collections.emptyList());
}
finalizeUpgrade();
return new StatusAndMessages(STARTING_FINALIZATION, Collections.emptyList());
}

@Override
public void finalizeUpgrade() throws IOException {
final Map<String, String> auditMap = Collections.emptyMap();
try {
// check admin authorization
getScm().checkAdminAccess(getRemoteUser(), false);
// TODO HDDS-6762: Return to the client once the FINALIZATION_STARTED
// checkpoint has been crossed and continue finalizing asynchronously.
StatusAndMessages result = scm.getFinalizationManager().finalizeUpgrade(upgradeClientID);
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.FINALIZE_SCM_UPGRADE, auditMap));
return result;
scm.getFinalizationManager().finalizeUpgrade();
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(SCMAction.FINALIZE_SCM_UPGRADE, auditMap));
} catch (Exception ex) {
AUDIT.logWriteFailure(buildAuditMessageForFailure(
SCMAction.FINALIZE_SCM_UPGRADE, auditMap, ex));
AUDIT.logWriteFailure(buildAuditMessageForFailure(SCMAction.FINALIZE_SCM_UPGRADE, auditMap, ex));
throw ex;
}

}

@Override
@Deprecated
public StatusAndMessages queryUpgradeFinalizationProgress(
String upgradeClientID, boolean force, boolean readonly)
throws IOException {

// This method, we change to call the queryUpgradeStatus and create a StatusAndMessages object that reflects
// the state.

Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("upgradeClientID", upgradeClientID);
auditMap.put("force", String.valueOf(force));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public interface FinalizationManager {
UpgradeFinalization.StatusAndMessages finalizeUpgrade(String upgradeClientID)
throws IOException;

void finalizeUpgrade() throws IOException;

UpgradeFinalization.StatusAndMessages queryUpgradeFinalizationProgress(
String upgradeClientID, boolean takeover, boolean readonly
) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public UpgradeFinalization.StatusAndMessages finalizeUpgrade(
return upgradeFinalizer.finalize(upgradeClientID, context);
}

@Override
public void finalizeUpgrade() throws IOException {
Objects.requireNonNull(context, "Cannot finalize upgrade without first building the upgrade context.");
upgradeFinalizer.finalize(context);
}

@Override
public UpgradeFinalization.StatusAndMessages queryUpgradeFinalizationProgress(
String upgradeClientID, boolean takeover, boolean readonly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
package org.apache.hadoop.hdds.scm.server.upgrade;

import java.io.IOException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
import org.apache.hadoop.ozone.upgrade.BasicUpgradeFinalizer;
import org.apache.hadoop.ozone.upgrade.LayoutFeature;
import org.apache.hadoop.ozone.upgrade.UpgradeException;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizationExecutor;
import org.apache.ratis.protocol.exceptions.NotLeaderException;

/**
* UpgradeFinalizer for the Storage Container Manager service.
Expand Down Expand Up @@ -85,59 +82,4 @@ void replicatedFinalizationSteps(HDDSLayoutFeature lf,
lf.scmAction(),
context.getStorage());
}

@Override
public void postFinalizeUpgrade(SCMUpgradeFinalizationContext context) throws IOException {
waitForDatanodesToFinalize(context);
super.postFinalizeUpgrade(context);
}

/**
* Wait for all HEALTHY datanodes to complete finalization before finishing
* SCM finalization. This ensures that when the client receives a
* FINALIZATION_DONE status, all healthy datanodes have also finalized.
*
* A datanode is considered finalized when its metadata layout version (MLV)
* equals its software layout version (SLV), indicating it has completed
* processing all layout features.
*
* @param context The finalization context containing node manager reference
* @throws SCMException if waiting is interrupted or SCM loses leadership
* @throws NotLeaderException if SCM is no longer the leader
*/
private void waitForDatanodesToFinalize(SCMUpgradeFinalizationContext context)
throws SCMException, NotLeaderException {
NodeManager nodeManager = context.getNodeManager();

LOG.info("Waiting for all HEALTHY datanodes to complete finalization before finishing SCM finalization.");

boolean allDatanodesFinalized = false;
while (!allDatanodesFinalized) {
// Break out of the wait and step down from driving finalization if this
// SCM is no longer the leader by throwing NotLeaderException.
context.getSCMContext().getTermOfLeader();

NodeManager.DatanodeFinalizationCounts datanodeFinalizationCounts = nodeManager.getDatanodeFinalizationCounts();
int finalizedNodes = datanodeFinalizationCounts.getNumFinalizedDatanodes();
int totalHealthyNodes = datanodeFinalizationCounts.getTotalHealthyDatanodes();
allDatanodesFinalized = datanodeFinalizationCounts.allNodesFinalized();

if (!allDatanodesFinalized) {
int unfinalizedNodes = totalHealthyNodes - finalizedNodes;
LOG.info("Waiting for datanodes to finalize. Status: {}/{} healthy " +
"datanodes have finalized ({} remaining).",
finalizedNodes, totalHealthyNodes, unfinalizedNodes);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SCMException("Interrupted while waiting for datanodes to " +
"finalize.", SCMException.ResultCodes.INTERNAL_ERROR);
}
} else {
LOG.info("All {} HEALTHY datanodes have completed finalization.",
totalHealthyNodes);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,11 @@ public StatusAndMessages queryUpgradeFinalizationProgress(
upgradeClientID, force, readonly);
}

@Override
public void finalizeUpgrade() throws IOException {
storageContainerLocationClient.finalizeUpgrade();
}

@Override
public HddsProtos.UpgradeStatus queryUpgradeStatus() throws IOException {
return storageContainerLocationClient.queryUpgradeStatus();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.admin.upgrade;

import java.io.IOException;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import picocli.CommandLine;

/**
* Sub command to finalize a cluster upgrade.
*/
@CommandLine.Command(
name = "finalize",
description = "Finalize a cluster upgrade",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class)
public class FinalizeSubCommand extends ScmSubcommand {

@Override
public void execute(ScmClient client) throws IOException {
client.finalizeUpgrade();

out().println("Cluster finalization has been started. Monitor progress with `ozone admin upgrade status`");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class,
subcommands = {
FinalizeSubCommand.class,
StatusSubCommand.class
})
@MetaInfServices(AdminSubcommand.class)
Expand Down
Loading
Loading