From 86ea7b3bcbf3a00b23b69f90716379ea65292e98 Mon Sep 17 00:00:00 2001 From: arafat Date: Thu, 30 Apr 2026 14:45:27 +0530 Subject: [PATCH] HDDS-14913. Implement Scalable CSV Export for Unhealthy Containers in Recon UI. --- .../recon/TestReconContainerEndpoint.java | 2 +- .../ozone/recon/ReconControllerModule.java | 2 + .../ozone/recon/ReconServerConfigKeys.java | 34 ++ .../ozone/recon/api/ContainerEndpoint.java | 141 ++++++- .../ozone/recon/api/ExportJobManager.java | 360 ++++++++++++++++++ .../ozone/recon/api/types/ExportJob.java | 186 +++++++++ .../ContainerHealthSchemaManager.java | 77 ++++ .../src/v2/pages/containers/containers.tsx | 157 +++++++- 8 files changed, 944 insertions(+), 15 deletions(-) create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java index a8863046f6ee..17b4b7946a2d 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java @@ -222,7 +222,7 @@ private Response getContainerEndpointResponse(long containerId) { null, // ContainerHealthSchemaManager - not needed for this test recon.getReconServer().getReconNamespaceSummaryManager(), recon.getReconServer().getReconContainerMetadataManager(), - omMetadataManagerInstance); + omMetadataManagerInstance, null); return containerEndpoint.getKeysForContainer(containerId, 10, ""); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index 9a9dfb48e74b..2827ff6cc86e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -41,6 +41,7 @@ import org.apache.hadoop.ozone.om.protocolPB.OmTransport; import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.recon.api.ExportJobManager; import org.apache.hadoop.ozone.recon.heatmap.HeatMapServiceImpl; import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration; @@ -110,6 +111,7 @@ protected void configure() { bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class); bind(ContainerHealthSchemaManager.class).in(Singleton.class); + bind(ExportJobManager.class).in(Singleton.class); bind(ReconContainerMetadataManager.class) .to(ReconContainerMetadataManagerImpl.class).in(Singleton.class); bind(ReconFileMetadataManager.class) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index b4da42d8f03a..38b382c0c8de 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -253,6 +253,40 @@ public final class ReconServerConfigKeys { "ozone.recon.scm.container.id.batch.size"; public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000; + /** + * JDBC fetch size for CSV exports. + * Default: 10,000 rows per fetch + */ + public static final String OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE = + "ozone.recon.unhealthy.container.fetch.size"; + public static final int OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE_DEFAULT = 10_000; + + /** + * Worker thread pool size for async CSV exports. + * Single-threaded to avoid concurrent database access. + * Default: 1 + */ + public static final String OZONE_RECON_EXPORT_WORKER_THREADS = + "ozone.recon.export.worker.threads"; + public static final int OZONE_RECON_EXPORT_WORKER_THREADS_DEFAULT = 1; + + /** + * Max export jobs in queue (global limit). + * Jobs beyond this limit will be rejected. + * Default: 10 + */ + public static final String OZONE_RECON_EXPORT_MAX_JOBS_TOTAL = + "ozone.recon.export.max.jobs.total"; + public static final int OZONE_RECON_EXPORT_MAX_JOBS_TOTAL_DEFAULT = 10; + + /** + * Directory to store export CSV files. + * Default: /tmp/recon/exports + */ + public static final String OZONE_RECON_EXPORT_DIRECTORY = + "ozone.recon.export.directory"; + public static final String OZONE_RECON_EXPORT_DIRECTORY_DEFAULT = "/tmp/recon/exports"; + /** * Private constructor for utility class. */ diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java index 4cf6ca85f6f7..6ec5ab4315e3 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java @@ -26,6 +26,9 @@ import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_MIN_CONTAINER_ID; import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.time.Instant; @@ -39,8 +42,10 @@ import java.util.UUID; import java.util.stream.Collectors; import javax.inject.Inject; +import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -48,6 +53,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -67,6 +73,7 @@ import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata; import org.apache.hadoop.ozone.recon.api.types.ContainersResponse; import org.apache.hadoop.ozone.recon.api.types.DeletedContainerInfo; +import org.apache.hadoop.ozone.recon.api.types.ExportJob; import org.apache.hadoop.ozone.recon.api.types.KeyMetadata; import org.apache.hadoop.ozone.recon.api.types.KeyMetadata.ContainerBlockMetadata; import org.apache.hadoop.ozone.recon.api.types.KeysResponse; @@ -75,6 +82,7 @@ import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainerMetadata; import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersResponse; import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary; +import org.apache.hadoop.ozone.recon.api.ExportJobManager; import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; import org.apache.hadoop.ozone.recon.persistence.ContainerHistory; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; @@ -104,6 +112,7 @@ public class ContainerEndpoint { private final ContainerHealthSchemaManager containerHealthSchemaManager; private final ReconNamespaceSummaryManager reconNamespaceSummaryManager; private final OzoneStorageContainerManager reconSCM; + private final ExportJobManager exportJobManager; private static final Logger LOG = LoggerFactory.getLogger(ContainerEndpoint.class); private BucketLayout layout = BucketLayout.DEFAULT; @@ -145,7 +154,8 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM, ContainerHealthSchemaManager containerHealthSchemaManager, ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconContainerMetadataManager reconContainerMetadataManager, - ReconOMMetadataManager omMetadataManager) { + ReconOMMetadataManager omMetadataManager, + ExportJobManager exportJobManager) { this.containerManager = (ReconContainerManager) reconSCM.getContainerManager(); this.pipelineManager = reconSCM.getPipelineManager(); @@ -154,6 +164,7 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM, this.reconSCM = reconSCM; this.reconContainerMetadataManager = reconContainerMetadataManager; this.omMetadataManager = omMetadataManager; + this.exportJobManager = exportJobManager; } /** @@ -502,6 +513,134 @@ public Response getUnhealthyContainers( minContainerId); } + /** + * Start an async CSV export job for unhealthy containers. + * Returns immediately with a job ID that the client can poll. + * + * @param state The container state (required: MISSING, UNDER_REPLICATED, etc.) + * @param userId User ID for rate limiting (defaults to "anonymous") + * @return Response containing ExportJob with jobId + */ + @POST + @Path("/unhealthy/export") + @Produces(MediaType.APPLICATION_JSON) + public Response startExport( + @QueryParam("state") String state, + @DefaultValue("anonymous") @QueryParam("userId") String userId) { + + if (StringUtils.isEmpty(state)) { + throw new WebApplicationException("state query parameter is required", + Response.Status.BAD_REQUEST); + } + + // Validate state parameter + try { + ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state); + } catch (IllegalArgumentException e) { + throw new WebApplicationException("Invalid state: " + state, Response.Status.BAD_REQUEST); + } + + try { + String jobId = exportJobManager.submitJob(userId, state, -1, 0); + ExportJob job = exportJobManager.getJob(jobId); + return Response.ok(job).build(); + } catch (IllegalStateException e) { + // Return JSON error response instead of HTML + Map errorResponse = new HashMap<>(); + errorResponse.put("error", "Too Many Requests"); + errorResponse.put("message", e.getMessage()); + return Response.status(Response.Status.TOO_MANY_REQUESTS) + .entity(errorResponse) + .type(MediaType.APPLICATION_JSON) + .build(); + } + } + + /** + * Get the status of an export job. + * + * @param jobId The job ID returned by startExport + * @return Response containing the ExportJob with current status/progress + */ + @GET + @Path("/unhealthy/export/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + public Response getExportStatus(@PathParam("jobId") String jobId) { + ExportJob job = exportJobManager.getJob(jobId); + if (job == null) { + throw new WebApplicationException("Job not found", Response.Status.NOT_FOUND); + } + + // Calculate and set queue position if QUEUED + if (job.getStatus() == ExportJob.JobStatus.QUEUED) { + int position = exportJobManager.getQueuePosition(jobId); + job.setQueuePosition(position); + } + + return Response.ok(job).build(); + } + + /** + * Download a completed export TAR file. + * + * @param jobId The job ID + * @return Response with TAR file stream + */ + @GET + @Path("/unhealthy/export/{jobId}/download") + @Produces("application/x-tar") + public Response downloadExport(@PathParam("jobId") String jobId) { + ExportJob job = exportJobManager.getJob(jobId); + if (job == null) { + throw new WebApplicationException("Job not found", Response.Status.NOT_FOUND); + } + if (job.getStatus() != ExportJob.JobStatus.COMPLETED) { + throw new WebApplicationException("Job not completed yet", Response.Status.CONFLICT); + } + + File file = new File(job.getFilePath()); + if (!file.exists()) { + throw new WebApplicationException("Export file not found", Response.Status.NOT_FOUND); + } + + StreamingOutput stream = outputStream -> { + try (FileInputStream fis = new FileInputStream(file); + BufferedOutputStream bos = new BufferedOutputStream(outputStream, 256 * 1024)) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = fis.read(buffer)) != -1) { + bos.write(buffer, 0, bytesRead); + } + bos.flush(); + } + }; + + // Extract filename from full path (e.g., /tmp/recon/exports/export_missing_webui_d5854cc2.tar) + String filename = job.getFilePath().substring(job.getFilePath().lastIndexOf('/') + 1); + + return Response.ok(stream) + .header("Content-Disposition", "attachment; filename=\"" + filename + "\"") + .header("Content-Type", "application/x-tar") + .build(); + } + + /** + * Cancel a running export job. + * + * @param jobId The job ID + * @return Response with 200 if successful + */ + @DELETE + @Path("/unhealthy/export/{jobId}") + public Response cancelExport(@PathParam("jobId") String jobId) { + try { + exportJobManager.cancelJob(jobId); + return Response.ok().build(); + } catch (IllegalStateException e) { + throw new WebApplicationException(e.getMessage(), Response.Status.NOT_FOUND); + } + } + /** * This API will return all DELETED containers in SCM in below JSON format. * { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java new file mode 100644 index 000000000000..0649b46a52ca --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.Archiver; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; +import org.apache.hadoop.ozone.recon.api.types.ExportJob; +import org.apache.hadoop.ozone.recon.api.types.ExportJob.JobStatus; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition; +import org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord; +import org.jooq.Cursor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages asynchronous CSV export jobs. + */ +@Singleton +public class ExportJobManager { + private static final Logger LOG = LoggerFactory.getLogger(ExportJobManager.class); + private static final int MAX_QUEUE_SIZE = 10; + + private final Map jobTracker = new ConcurrentHashMap<>(); + private final LinkedHashMap jobQueue = new LinkedHashMap<>(); + private final Map> runningTasks = new ConcurrentHashMap<>(); + private final ExecutorService workerPool; + private final ContainerHealthSchemaManager containerHealthSchemaManager; + private final String exportDirectory; + + @Inject + public ExportJobManager(ContainerHealthSchemaManager containerHealthSchemaManager, + OzoneConfiguration conf) { + this.containerHealthSchemaManager = containerHealthSchemaManager; + + // Use single thread executor for sequential processing (no concurrent DB access) + this.workerPool = Executors.newSingleThreadExecutor(); + + this.exportDirectory = conf.get( + ReconServerConfigKeys.OZONE_RECON_EXPORT_DIRECTORY, + ReconServerConfigKeys.OZONE_RECON_EXPORT_DIRECTORY_DEFAULT); + + // Create export directory if it doesn't exist + try { + Files.createDirectories(Paths.get(exportDirectory)); + } catch (IOException e) { + LOG.error("Failed to create export directory: {}", exportDirectory, e); + } + + LOG.info("ExportJobManager initialized with single-threaded queue (max {} jobs)", MAX_QUEUE_SIZE); + } + + public synchronized String submitJob(String userId, String state, int limit, long prevKey) { + // Check queue size limit + synchronized (jobQueue) { + if (jobQueue.size() >= MAX_QUEUE_SIZE) { + throw new IllegalStateException( + "Export queue is full (max " + MAX_QUEUE_SIZE + " jobs). Please try again later."); + } + } + + String jobId = UUID.randomUUID().toString(); + ExportJob job = new ExportJob(jobId, userId, state, limit, prevKey); + // Filename format: export_{state}_{userId}_{shortJobId}.tar + String shortJobId = jobId.substring(0, 8); + String filePath = exportDirectory + "/export_" + state.toLowerCase() + "_" + userId + "_" + shortJobId + ".tar"; + job.setFilePath(filePath); + + jobTracker.put(jobId, job); + + // Add to queue (LinkedHashMap maintains insertion order) + synchronized (jobQueue) { + jobQueue.put(jobId, job); + } + + // Submit to single-threaded worker pool + Future future = workerPool.submit(() -> executeExport(job)); + runningTasks.put(jobId, future); + + int queuePosition = getQueuePosition(jobId); + LOG.info("Submitted export job {} for user {} (state={}, limit={}, queue position={})", + jobId, userId, state, limit, queuePosition); + + return jobId; + } + + public ExportJob getJob(String jobId) { + return jobTracker.get(jobId); + } + + /** + * Get the queue position for a job (1-indexed). + * Returns 0 if job is not in queue (running, completed, or not found). + */ + public int getQueuePosition(String jobId) { + synchronized (jobQueue) { + if (!jobQueue.containsKey(jobId)) { + return 0; + } + + int position = 1; + for (String id : jobQueue.keySet()) { + if (id.equals(jobId)) { + return position; + } + position++; + } + return 0; + } + } + + public void cancelJob(String jobId) { + ExportJob job = jobTracker.get(jobId); + if (job == null) { + throw new IllegalStateException("Job not found: " + jobId); + } + + if (job.getStatus() == JobStatus.COMPLETED || job.getStatus() == JobStatus.FAILED) { + throw new IllegalStateException("Job already completed or failed"); + } + + // Remove from queue if still queued + synchronized (jobQueue) { + jobQueue.remove(jobId); + } + + Future future = runningTasks.get(jobId); + if (future != null) { + future.cancel(true); + runningTasks.remove(jobId); + } + + job.setStatus(JobStatus.FAILED); + job.setErrorMessage("Cancelled by user"); + + // Delete partial files/directory + deleteDirectory(Paths.get(exportDirectory + "/" + jobId)); + deleteFileQuietly(job.getFilePath()); + + LOG.info("Cancelled export job {}", jobId); + } + + private void executeExport(ExportJob job) { + String jobDirectory = exportDirectory + "/" + job.getJobId(); + Path jobDir = Paths.get(jobDirectory); + String tarFilePath = job.getFilePath(); // Use the filename set in submitJob + + try { + // Create job-specific directory for CSV files + Files.createDirectories(jobDir); + + // Remove from queue and mark as running + synchronized (jobQueue) { + jobQueue.remove(job.getJobId()); + } + job.setStatus(JobStatus.RUNNING); + LOG.info("Starting export job {}", job.getJobId()); + + ContainerSchemaDefinition.UnHealthyContainerStates internalState = + ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(job.getState()); + + // Get total count first for progress tracking + long estimatedTotal = containerHealthSchemaManager.getUnhealthyContainersCount( + internalState, job.getLimit(), job.getPrevKey()); + job.setEstimatedTotal(estimatedTotal); + LOG.info("Export job {} will process approximately {} records", job.getJobId(), estimatedTotal); + + // Open database cursor + try (Cursor cursor = + containerHealthSchemaManager.getUnhealthyContainersCursor( + internalState, job.getLimit(), job.getPrevKey())) { + + int fileIndex = 1; + long totalRecords = 0; + long recordsInCurrentFile = 0; + final int CHUNK_SIZE = 500_000; + + BufferedWriter writer = null; + FileOutputStream fos = null; + + try { + while (cursor.hasNext()) { + // Check for cancellation + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Job cancelled"); + } + + // Start new CSV file if needed + if (recordsInCurrentFile == 0) { + // Close previous file if exists + if (writer != null) { + writer.flush(); + writer.close(); + } + + String csvFileName = String.format("%s/unhealthy_containers_%s_part%03d.csv", + jobDirectory, job.getState().toLowerCase(), fileIndex); + fos = new FileOutputStream(csvFileName); + writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8)); + + // Write CSV header + writer.write("container_id,container_state,in_state_since," + + "expected_replica_count,actual_replica_count,replica_delta\n"); + + LOG.info("Created CSV file: part{}", fileIndex); + } + + // Fetch and write record + UnhealthyContainersRecord rec = cursor.fetchNext(); + StringBuilder sb = new StringBuilder(128); + sb.append(rec.getContainerId()).append(',') + .append(rec.getContainerState()).append(',') + .append(rec.getInStateSince()).append(',') + .append(rec.getExpectedReplicaCount()).append(',') + .append(rec.getActualReplicaCount()).append(',') + .append(rec.getReplicaDelta()).append('\n'); + writer.write(sb.toString()); + + totalRecords++; + recordsInCurrentFile++; + job.setTotalRecords(totalRecords); + + // Move to next file if chunk limit reached + if (recordsInCurrentFile >= CHUNK_SIZE) { + writer.flush(); + writer.close(); + writer = null; + recordsInCurrentFile = 0; + fileIndex++; + } + + // Flush every 10K rows + if (recordsInCurrentFile > 0 && recordsInCurrentFile % 10000 == 0) { + writer.flush(); + } + } + + // Close last file + if (writer != null) { + writer.flush(); + writer.close(); + } + + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.warn("Error closing writer", e); + } + } + } + + LOG.info("Export job {} wrote {} records across {} files", + job.getJobId(), totalRecords, fileIndex); + + // Create TAR archive + File tarFile = new File(tarFilePath); + Archiver.create(tarFile, jobDir); + LOG.info("Created TAR archive: {}", tarFilePath); + + // Delete CSV files and job directory + deleteDirectory(jobDir); + LOG.info("Deleted temporary CSV files for job {}", job.getJobId()); + + // Update job with TAR file path + job.setFilePath(tarFilePath); + job.setStatus(JobStatus.COMPLETED); + LOG.info("Completed export job {} ({} records)", job.getJobId(), totalRecords); + + } catch (InterruptedException e) { + job.setStatus(JobStatus.FAILED); + job.setErrorMessage("Job was cancelled"); + deleteDirectory(jobDir); + deleteFileQuietly(tarFilePath); + LOG.info("Export job {} was cancelled", job.getJobId()); + Thread.currentThread().interrupt(); + } + + } catch (Exception e) { + job.setStatus(JobStatus.FAILED); + job.setErrorMessage(e.getMessage()); + deleteDirectory(jobDir); + deleteFileQuietly(tarFilePath); + LOG.error("Export job {} failed", job.getJobId(), e); + } finally { + runningTasks.remove(job.getJobId()); + } + } + + private void deleteDirectory(Path directory) { + try { + if (Files.exists(directory)) { + Files.walk(directory) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + } catch (IOException e) { + LOG.warn("Failed to delete directory: {}", directory, e); + } + } + + private void deleteFileQuietly(String filePath) { + try { + Files.deleteIfExists(Paths.get(filePath)); + } catch (IOException e) { + LOG.warn("Failed to delete file: {}", filePath, e); + } + } + + @PreDestroy + public void shutdown() { + LOG.info("Shutting down ExportJobManager"); + workerPool.shutdownNow(); + try { + workerPool.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Timeout waiting for executor shutdown", e); + Thread.currentThread().interrupt(); + } + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java new file mode 100644 index 000000000000..f1bba5ce8fe4 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents an asynchronous CSV export job. + */ +public class ExportJob { + + public enum JobStatus { + QUEUED, // Waiting for worker thread + RUNNING, // Actively exporting + COMPLETED, // File ready for download + FAILED // Error occurred + } + + @JsonProperty("jobId") + private String jobId; + + @JsonProperty("userId") + private String userId; + + @JsonProperty("state") + private String state; + + @JsonProperty("limit") + private int limit; + + @JsonProperty("prevKey") + private long prevKey; + + @JsonProperty("status") + private JobStatus status; + + @JsonProperty("submittedAt") + private long submittedAt; + + @JsonProperty("startedAt") + private long startedAt; + + @JsonProperty("completedAt") + private long completedAt; + + @JsonProperty("totalRecords") + private long totalRecords; + + @JsonProperty("estimatedTotal") + private long estimatedTotal; + + @JsonProperty("filePath") + private String filePath; + + @JsonProperty("errorMessage") + private String errorMessage; + + @JsonProperty("progressPercent") + private int progressPercent; + + @JsonProperty("queuePosition") + private int queuePosition; + + public ExportJob(String jobId, String userId, String state, int limit, long prevKey) { + this.jobId = jobId; + this.userId = userId; + this.state = state; + this.limit = limit; + this.prevKey = prevKey; + this.status = JobStatus.QUEUED; + this.submittedAt = System.currentTimeMillis(); + this.totalRecords = 0; + this.estimatedTotal = -1; + } + + public String getJobId() { + return jobId; + } + + public String getUserId() { + return userId; + } + + public String getState() { + return state; + } + + public int getLimit() { + return limit; + } + + public long getPrevKey() { + return prevKey; + } + + public JobStatus getStatus() { + return status; + } + + public void setStatus(JobStatus status) { + this.status = status; + if (status == JobStatus.RUNNING && startedAt == 0) { + startedAt = System.currentTimeMillis(); + } else if ((status == JobStatus.COMPLETED || status == JobStatus.FAILED) && completedAt == 0) { + completedAt = System.currentTimeMillis(); + } + } + + public long getSubmittedAt() { + return submittedAt; + } + + public long getStartedAt() { + return startedAt; + } + + public long getCompletedAt() { + return completedAt; + } + + public long getTotalRecords() { + return totalRecords; + } + + public void setTotalRecords(long totalRecords) { + this.totalRecords = totalRecords; + } + + public void incrementTotalRecords() { + this.totalRecords++; + } + + public long getEstimatedTotal() { + return estimatedTotal; + } + + public void setEstimatedTotal(long estimatedTotal) { + this.estimatedTotal = estimatedTotal; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + public int getProgressPercent() { + if (estimatedTotal > 0 && totalRecords > 0) { + return (int) ((totalRecords * 100) / estimatedTotal); + } + return 0; + } + + public int getQueuePosition() { + return queuePosition; + } + + public void setQueuePosition(int queuePosition) { + this.queuePosition = queuePosition; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java index ac1e91350cc6..48f5734fbc32 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java @@ -36,6 +36,7 @@ import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; import org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord; import org.jooq.Condition; +import org.jooq.Cursor; import org.jooq.DSLContext; import org.jooq.OrderField; import org.jooq.Record; @@ -395,6 +396,82 @@ public void clearAllUnhealthyContainerRecords() { } } + /** + * Returns a streaming cursor over unhealthy container records for a given state. + * Caller MUST close the cursor. + * + * Generated SQL example (50,000 MISSING containers, starting after container ID 12345): + * + * SELECT * FROM unhealthy_containers + * WHERE container_state = 'MISSING' + * AND container_id > 12345 + * ORDER BY container_id ASC + * LIMIT 50000 + * + * @param state filter by state (required) + * @param limit max records to return, -1 = unlimited + * @param prevKey previous container ID to skip, for cursor-based pagination + * @return Cursor returning UnhealthyContainersRecord + */ + /** + * Get the total count of unhealthy containers for a given state. + * + * @param state The container health state to filter by + * @param limit Maximum number of records to count (-1 for unlimited) + * @param prevKey Container ID offset for cursor-based pagination + * @return Total count of matching containers + */ + public long getUnhealthyContainersCount( + UnHealthyContainerStates state, int limit, long prevKey) { + DSLContext dslContext = containerSchemaDefinition.getDSLContext(); + + Condition whereCondition = UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()); + + if (prevKey > 0) { + whereCondition = whereCondition.and(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey)); + } + + long totalCount = dslContext.selectCount() + .from(UNHEALTHY_CONTAINERS) + .where(whereCondition) + .fetchOne(0, long.class); + + // If limit is set and less than total, return the limit as estimated total + if (limit > 0 && limit < totalCount) { + return limit; + } + + return totalCount; + } + + public Cursor getUnhealthyContainersCursor( + UnHealthyContainerStates state, int limit, long prevKey) { + DSLContext dslContext = containerSchemaDefinition.getDSLContext(); + SelectQuery query = dslContext.selectFrom(UNHEALTHY_CONTAINERS).getQuery(); + + // WHERE container_state = ? + query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString())); + + if (prevKey > 0) { + // AND container_id > ? (cursor-based pagination) + query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey)); + } + + // ORDER BY container_id ASC — matches composite index (state, container_id), + // so Derby walks it in order with no sort step. + query.addOrderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc()); + + if (limit > 0) { + query.addLimit(limit); + } + + // Controls how many rows Derby returns per JDBC round-trip. + // Default is 10,000 rows. + query.fetchSize(10000); + + return query.fetchLazy(); + } + /** * POJO representing a record in UNHEALTHY_CONTAINERS table. */ diff --git a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx index 2b1ca3d24994..e1ffb44ef3de 100644 --- a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx +++ b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx @@ -18,7 +18,8 @@ import React, { useState, useEffect } from "react"; import moment from "moment"; -import { Card, Row, Tabs } from "antd"; +import { Button, Card, message, Modal, Row, Tabs, Tooltip } from "antd"; +import { DownloadOutlined } from "@ant-design/icons"; import { ValueType } from "react-select/src/types"; import Search from "@/v2/components/search/search"; @@ -99,6 +100,7 @@ const Containers: React.FC<{}> = () => { const [searchTerm, setSearchTerm] = useState(''); const [selectedTab, setSelectedTab] = useState('1'); const [searchColumn, setSearchColumn] = useState<'containerID' | 'pipelineID'>('containerID'); + const [exportInProgress, setExportInProgress] = useState(false); const debouncedSearch = useDebounce(searchTerm, 300); @@ -265,6 +267,123 @@ const Containers: React.FC<{}> = () => { clusterState.refetch(); }; + // Helper function to trigger browser download + const downloadFile = (jobId: string, filename: string) => { + const downloadUrl = `/api/v1/containers/unhealthy/export/${jobId}/download`; + const link = document.createElement('a'); + link.href = downloadUrl; + link.download = filename; + document.body.appendChild(link); + link.click(); + document.body.removeChild(link); + }; + + // Export handler with async polling + const handleExportCsv = async () => { + const state = TAB_STATE_MAP[selectedTab]; + const label = ['Missing','Under-Replicated','Over-Replicated','Mis-Replicated','Mismatched Replicas'][Number(selectedTab)-1]; + + setExportInProgress(true); + let hideLoadingMsg: (() => void) | null = message.loading(`Starting export of ${label} containers...`, 0); + + try { + // 1. Submit export job + const startResponse = await fetch(`/api/v1/containers/unhealthy/export?state=${state}&userId=webui`, { + method: 'POST' + }); + if (!startResponse.ok) { + if (hideLoadingMsg) hideLoadingMsg(); + let errorMsg = `Failed to start export: ${startResponse.status}`; + try { + const errorJson = await startResponse.json(); + errorMsg = errorJson.message || errorJson.error || errorMsg; + } catch { + // If not JSON, try text + const errorText = await startResponse.text(); + if (errorText && !errorText.includes('')) { + errorMsg = errorText; + } + } + throw new Error(errorMsg); + } + const job = await startResponse.json(); + const jobId = job.jobId; + + // 2. Poll for completion + let attempts = 0; + const maxAttempts = 120; // 10 minutes with 5s intervals + while (attempts < maxAttempts) { + await new Promise(resolve => setTimeout(resolve, 5000)); // 5s poll interval + + const statusResponse = await fetch(`/api/v1/containers/unhealthy/export/${jobId}`); + if (!statusResponse.ok) { + throw new Error(`Failed to check job status: ${statusResponse.status}`); + } + const statusJob = await statusResponse.json(); + + // Handle different statuses + if (statusJob.status === 'QUEUED') { + const position = statusJob.queuePosition || 0; + if (hideLoadingMsg) { + hideLoadingMsg(); + hideLoadingMsg = null; + } + message.info(`Export queued. Position: #${position} in queue`, 2); + } else if (statusJob.status === 'RUNNING') { + if (hideLoadingMsg) { + hideLoadingMsg(); + hideLoadingMsg = null; + } + const percent = statusJob.progressPercent || 0; + const records = statusJob.totalRecords.toLocaleString(); + const total = statusJob.estimatedTotal > 0 ? statusJob.estimatedTotal.toLocaleString() : '?'; + message.loading(`Processing export... ${percent}% (${records} / ${total} records)`, 2); + } else if (statusJob.status === 'COMPLETED') { + if (hideLoadingMsg) hideLoadingMsg(); + + // Extract filename from full path + const filename = statusJob.filePath ? statusJob.filePath.split('/').pop() : 'export file'; + + Modal.success({ + title: 'Export Completed!', + content: ( +
+

{statusJob.totalRecords.toLocaleString()} records exported successfully.

+ +
+ ), + okText: 'Close', + width: 500 + }); + + setExportInProgress(false); + return; + } else if (statusJob.status === 'FAILED') { + if (hideLoadingMsg) hideLoadingMsg(); + throw new Error(statusJob.errorMessage || 'Export failed'); + } + + attempts++; + } + + // Timeout + if (hideLoadingMsg) hideLoadingMsg(); + throw new Error('Export timed out after 10 minutes'); + + } catch (error: any) { + if (hideLoadingMsg) hideLoadingMsg(); + console.error("Export failed:", error); + message.error(`Export failed: ${error.message || error}`); + setExportInProgress(false); + } + }; + const autoReload = useAutoReload(loadContainersData); const { @@ -348,18 +467,30 @@ const Containers: React.FC<{}> = () => { onTagClose={() => { }} columnLength={columnOptions.length} /> - ) => setSearchTerm(e.target.value) - } - onChange={(value) => { - setSearchTerm(''); - setSearchColumn(value as 'containerID' | 'pipelineID'); - }} /> +
+ ) => setSearchTerm(e.target.value) + } + onChange={(value) => { + setSearchTerm(''); + setSearchColumn(value as 'containerID' | 'pipelineID'); + }} /> + + + +
handleTabChange(activeKey)}>