Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private Response getContainerEndpointResponse(long containerId) {
null, // ContainerHealthSchemaManager - not needed for this test
recon.getReconServer().getReconNamespaceSummaryManager(),
recon.getReconServer().getReconContainerMetadataManager(),
omMetadataManagerInstance);
omMetadataManagerInstance, null);
return containerEndpoint.getKeysForContainer(containerId, 10, "");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.recon.api.ExportJobManager;
import org.apache.hadoop.ozone.recon.heatmap.HeatMapServiceImpl;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration;
Expand Down Expand Up @@ -110,6 +111,7 @@ protected void configure() {
bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class);

bind(ContainerHealthSchemaManager.class).in(Singleton.class);
bind(ExportJobManager.class).in(Singleton.class);
bind(ReconContainerMetadataManager.class)
.to(ReconContainerMetadataManagerImpl.class).in(Singleton.class);
bind(ReconFileMetadataManager.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,40 @@ public final class ReconServerConfigKeys {
"ozone.recon.scm.container.id.batch.size";
public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000;

/**
* JDBC fetch size for CSV exports.
* Default: 10,000 rows per fetch
*/
public static final String OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE =
"ozone.recon.unhealthy.container.fetch.size";
public static final int OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE_DEFAULT = 10_000;

/**
* Worker thread pool size for async CSV exports.
* Single-threaded to avoid concurrent database access.
* Default: 1
*/
public static final String OZONE_RECON_EXPORT_WORKER_THREADS =
"ozone.recon.export.worker.threads";
public static final int OZONE_RECON_EXPORT_WORKER_THREADS_DEFAULT = 1;

/**
* Max export jobs in queue (global limit).
* Jobs beyond this limit will be rejected.
* Default: 10
*/
public static final String OZONE_RECON_EXPORT_MAX_JOBS_TOTAL =
"ozone.recon.export.max.jobs.total";
public static final int OZONE_RECON_EXPORT_MAX_JOBS_TOTAL_DEFAULT = 10;

/**
* Directory to store export CSV files.
* Default: /tmp/recon/exports
*/
public static final String OZONE_RECON_EXPORT_DIRECTORY =
"ozone.recon.export.directory";
public static final String OZONE_RECON_EXPORT_DIRECTORY_DEFAULT = "/tmp/recon/exports";

/**
* Private constructor for utility class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_MIN_CONTAINER_ID;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
Expand All @@ -39,15 +42,18 @@
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand All @@ -67,6 +73,7 @@
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.ContainersResponse;
import org.apache.hadoop.ozone.recon.api.types.DeletedContainerInfo;
import org.apache.hadoop.ozone.recon.api.types.ExportJob;
import org.apache.hadoop.ozone.recon.api.types.KeyMetadata;
import org.apache.hadoop.ozone.recon.api.types.KeyMetadata.ContainerBlockMetadata;
import org.apache.hadoop.ozone.recon.api.types.KeysResponse;
Expand All @@ -75,6 +82,7 @@
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersResponse;
import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary;
import org.apache.hadoop.ozone.recon.api.ExportJobManager;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.persistence.ContainerHistory;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
Expand Down Expand Up @@ -104,6 +112,7 @@ public class ContainerEndpoint {
private final ContainerHealthSchemaManager containerHealthSchemaManager;
private final ReconNamespaceSummaryManager reconNamespaceSummaryManager;
private final OzoneStorageContainerManager reconSCM;
private final ExportJobManager exportJobManager;
private static final Logger LOG =
LoggerFactory.getLogger(ContainerEndpoint.class);
private BucketLayout layout = BucketLayout.DEFAULT;
Expand Down Expand Up @@ -145,7 +154,8 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM,
ContainerHealthSchemaManager containerHealthSchemaManager,
ReconNamespaceSummaryManager reconNamespaceSummaryManager,
ReconContainerMetadataManager reconContainerMetadataManager,
ReconOMMetadataManager omMetadataManager) {
ReconOMMetadataManager omMetadataManager,
ExportJobManager exportJobManager) {
this.containerManager =
(ReconContainerManager) reconSCM.getContainerManager();
this.pipelineManager = reconSCM.getPipelineManager();
Expand All @@ -154,6 +164,7 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM,
this.reconSCM = reconSCM;
this.reconContainerMetadataManager = reconContainerMetadataManager;
this.omMetadataManager = omMetadataManager;
this.exportJobManager = exportJobManager;
}

/**
Expand Down Expand Up @@ -502,6 +513,134 @@ public Response getUnhealthyContainers(
minContainerId);
}

/**
* Start an async CSV export job for unhealthy containers.
* Returns immediately with a job ID that the client can poll.
*
* @param state The container state (required: MISSING, UNDER_REPLICATED, etc.)
* @param userId User ID for rate limiting (defaults to "anonymous")
* @return Response containing ExportJob with jobId
*/
@POST
@Path("/unhealthy/export")
@Produces(MediaType.APPLICATION_JSON)
public Response startExport(
@QueryParam("state") String state,
@DefaultValue("anonymous") @QueryParam("userId") String userId) {

if (StringUtils.isEmpty(state)) {
throw new WebApplicationException("state query parameter is required",
Response.Status.BAD_REQUEST);
}

// Validate state parameter
try {
ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state);
} catch (IllegalArgumentException e) {
throw new WebApplicationException("Invalid state: " + state, Response.Status.BAD_REQUEST);
}

try {
String jobId = exportJobManager.submitJob(userId, state, -1, 0);
ExportJob job = exportJobManager.getJob(jobId);
return Response.ok(job).build();
} catch (IllegalStateException e) {
// Return JSON error response instead of HTML
Map<String, String> errorResponse = new HashMap<>();
errorResponse.put("error", "Too Many Requests");
errorResponse.put("message", e.getMessage());
return Response.status(Response.Status.TOO_MANY_REQUESTS)
.entity(errorResponse)
.type(MediaType.APPLICATION_JSON)
.build();
}
}

/**
* Get the status of an export job.
*
* @param jobId The job ID returned by startExport
* @return Response containing the ExportJob with current status/progress
*/
@GET
@Path("/unhealthy/export/{jobId}")
@Produces(MediaType.APPLICATION_JSON)
public Response getExportStatus(@PathParam("jobId") String jobId) {
ExportJob job = exportJobManager.getJob(jobId);
if (job == null) {
throw new WebApplicationException("Job not found", Response.Status.NOT_FOUND);
}

// Calculate and set queue position if QUEUED
if (job.getStatus() == ExportJob.JobStatus.QUEUED) {
int position = exportJobManager.getQueuePosition(jobId);
job.setQueuePosition(position);
}

return Response.ok(job).build();
}

/**
* Download a completed export TAR file.
*
* @param jobId The job ID
* @return Response with TAR file stream
*/
@GET
@Path("/unhealthy/export/{jobId}/download")
@Produces("application/x-tar")
public Response downloadExport(@PathParam("jobId") String jobId) {
ExportJob job = exportJobManager.getJob(jobId);
if (job == null) {
throw new WebApplicationException("Job not found", Response.Status.NOT_FOUND);
}
if (job.getStatus() != ExportJob.JobStatus.COMPLETED) {
throw new WebApplicationException("Job not completed yet", Response.Status.CONFLICT);
}

File file = new File(job.getFilePath());
if (!file.exists()) {
throw new WebApplicationException("Export file not found", Response.Status.NOT_FOUND);
}

StreamingOutput stream = outputStream -> {
try (FileInputStream fis = new FileInputStream(file);
BufferedOutputStream bos = new BufferedOutputStream(outputStream, 256 * 1024)) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
bos.write(buffer, 0, bytesRead);
}
bos.flush();
}
};

// Extract filename from full path (e.g., /tmp/recon/exports/export_missing_webui_d5854cc2.tar)
String filename = job.getFilePath().substring(job.getFilePath().lastIndexOf('/') + 1);

return Response.ok(stream)
.header("Content-Disposition", "attachment; filename=\"" + filename + "\"")
.header("Content-Type", "application/x-tar")
.build();
}

/**
* Cancel a running export job.
*
* @param jobId The job ID
* @return Response with 200 if successful
*/
@DELETE
@Path("/unhealthy/export/{jobId}")
public Response cancelExport(@PathParam("jobId") String jobId) {
try {
exportJobManager.cancelJob(jobId);
return Response.ok().build();
} catch (IllegalStateException e) {
throw new WebApplicationException(e.getMessage(), Response.Status.NOT_FOUND);
}
}

/**
* This API will return all DELETED containers in SCM in below JSON format.
* {
Expand Down
Loading