From ee8aaed12ff5566e0fe045bed6c4db3cd66f9d9b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 15 Oct 2025 00:02:36 +0000 Subject: [PATCH] Add NFS TTL cleanup workflow with dual-cluster support - Add workflow to clean up old directories from NFS storage based on TTL - Support for two clusters: exa-cluster (PVC mount) and cirrascale (direct NFS mount) - Configurable TTL (default: 4 weeks / 28 days) - Daily scheduled execution at midnight UTC - Dry run mode for testing before actual deletion - Comprehensive documentation in README The workflow scans directories and deletes those where all files haven't been accessed within the TTL period. Two launch plans are configured for the different clusters with appropriate NFS mounting and node selection. Co-Authored-By: carlos@exa.ai --- examples/NFS_TTL_CLEANUP_README.md | 174 +++++++++++++ examples/nfs_ttl_cleanup.py | 392 +++++++++++++++++++++++++++++ 2 files changed, 566 insertions(+) create mode 100644 examples/NFS_TTL_CLEANUP_README.md create mode 100644 examples/nfs_ttl_cleanup.py diff --git a/examples/NFS_TTL_CLEANUP_README.md b/examples/NFS_TTL_CLEANUP_README.md new file mode 100644 index 0000000000..e3189e6128 --- /dev/null +++ b/examples/NFS_TTL_CLEANUP_README.md @@ -0,0 +1,174 @@ +# NFS TTL Cleanup Workflow + +This example demonstrates a Flyte workflow that cleans up old directories from NFS storage based on a Time-To-Live (TTL) period. + +## Overview + +The workflow scans directories on NFS mounts and deletes directories where **all files** haven't been accessed within the specified TTL period (default: 4 weeks). This helps manage storage space by removing stale data. + +## Features + +- **Two Cluster Support**: + - `exa-cluster`: Uses NFS PVC (nfs-pvc) + - `cirrascale`: Uses NFS direct mount (172.18.72.200:/export/metaphor) + +- **Scheduled Execution**: Runs daily at midnight UTC + +- **Configurable TTL**: Default is 28 days (4 weeks), but can be customized + +- **Dry Run Mode**: Test the cleanup without actually deleting files + +- **Safe Deletion Logic**: Only deletes directories where ALL files are older than TTL + +## Architecture + +### Tasks + +- `cleanup_nfs_ttl_exa`: Task configured for exa-cluster with PVC mounting +- `cleanup_nfs_ttl_cirrascale`: Task configured for cirrascale with direct NFS mounting + +Each task: +- Mounts the appropriate NFS storage +- Selects the correct cluster via node selector +- Configures resource requirements (4 CPU, 8Gi memory) + +### Workflows + +- `nfs_ttl_cleanup_exa_workflow`: Workflow for exa-cluster +- `nfs_ttl_cleanup_cirrascale_workflow`: Workflow for cirrascale + +### Launch Plans + +Two launch plans are configured to run daily: + +- `nfs_ttl_cleanup_exa_daily`: Runs on exa-cluster daily at midnight UTC +- `nfs_ttl_cleanup_cirrascale_daily`: Runs on cirrascale daily at midnight UTC + +Both launch plans have default inputs: +- `base_path`: "/mnt/nfs" +- `ttl_days`: 28 (4 weeks) +- `dry_run`: False + +## Usage + +### Register the Workflow + +```bash +pyflyte register --project my-project --domain development nfs_ttl_cleanup.py +``` + +### Manual Execution + +You can manually execute the workflow with custom parameters: + +```bash +# Dry run to see what would be deleted +pyflyte run nfs_ttl_cleanup.py nfs_ttl_cleanup_exa_workflow \ + --base_path /mnt/nfs \ + --ttl_days 28 \ + --dry_run True + +# Actual cleanup with custom TTL (7 days) +pyflyte run nfs_ttl_cleanup.py nfs_ttl_cleanup_exa_workflow \ + --base_path /mnt/nfs \ + --ttl_days 7 \ + --dry_run False +``` + +### Launch Plan Execution + +The launch plans will run automatically on schedule, but you can also trigger them manually: + +```bash +# Using flytectl +flytectl create execution --project my-project --domain production \ + -p nfs_ttl_cleanup_exa_daily +``` + +## How It Works + +1. **Directory Scanning**: The task scans all top-level directories in the base path + +2. **Access Time Check**: For each directory, it walks through all files and checks their last access time (`st_atime`) + +3. **TTL Evaluation**: If ALL files in a directory have not been accessed within the TTL period, the directory is marked for deletion + +4. **Safe Deletion**: Only directories that meet the criteria are deleted using `shutil.rmtree()` + +5. **Statistics**: Returns counts of deleted and skipped directories + +## Configuration + +### Changing the Schedule + +Modify the `daily_schedule` to change when the workflow runs: + +```python +# Run every 12 hours +daily_schedule = CronSchedule(schedule="0 */12 * * *") + +# Run weekly on Sunday at midnight +weekly_schedule = CronSchedule(schedule="0 0 * * 0") +``` + +### Adjusting Resources + +Modify the `V1ResourceRequirements` in the pod templates: + +```python +resources=V1ResourceRequirements( + requests={ + "cpu": "8", + "memory": "16Gi" + }, + limits={ + "cpu": "8", + "memory": "16Gi" + } +) +``` + +### Customizing Base Path + +You can change the default `base_path` in the launch plan: + +```python +default_inputs={ + "base_path": "/mnt/nfs/custom/path", + "ttl_days": 28, + "dry_run": False +} +``` + +## Safety Considerations + +1. **File Access Time**: The workflow uses `st_atime` (last access time). Note that some filesystems may not update this accurately depending on mount options (e.g., `noatime`). + +2. **Permissions**: The cirrascale task runs as root (UID 0) to ensure proper NFS access. The exa-cluster task uses the default pod user. + +3. **Dry Run First**: Always test with `dry_run=True` before running actual cleanup. + +4. **Top-Level Only**: The workflow only considers top-level directories in the base path, not nested subdirectories. + +5. **Error Handling**: If a file cannot be accessed, the directory is considered active and not deleted. + +## Monitoring + +Check the execution logs to see: +- Number of directories scanned +- Number of directories deleted +- Number of directories skipped +- List of deleted directory names +- Any errors encountered + +## Return Value + +The workflow returns a dictionary with: +```python +{ + "deleted_count": int, # Number of directories deleted + "skipped_count": int, # Number of directories skipped + "deleted_dirs": List[str], # Names of deleted directories + "error": str # Error message if any (optional) +} +``` diff --git a/examples/nfs_ttl_cleanup.py b/examples/nfs_ttl_cleanup.py new file mode 100644 index 0000000000..a523d2f43e --- /dev/null +++ b/examples/nfs_ttl_cleanup.py @@ -0,0 +1,392 @@ +""" +NFS TTL Cleanup Workflow + +This workflow deletes directories from NFS storage where all files haven't been +accessed in the specified TTL period. It supports two clusters: +- exa-cluster: Uses NFS PVC (nfs-pvc) +- cirrascale: Uses NFS direct mount (172.18.72.200:/export/metaphor) + +The workflow runs daily and defaults to a 4-week TTL period. +""" + +import os +import time +from datetime import timedelta +from typing import List + +from flytekit import LaunchPlan, PodTemplate, task, workflow +from flytekit.core.schedule import CronSchedule +from kubernetes.client import ( + V1Container, + V1NFSVolumeSource, + V1PersistentVolumeClaimVolumeSource, + V1PodSpec, + V1ResourceRequirements, + V1SecurityContext, + V1Volume, + V1VolumeMount, +) + + +def should_delete_directory(dir_path: str, ttl_seconds: float) -> bool: + """ + Check if all files in a directory haven't been accessed within TTL period. + + Args: + dir_path: Path to the directory to check + ttl_seconds: TTL in seconds + + Returns: + True if all files haven't been accessed within TTL, False otherwise + """ + current_time = time.time() + cutoff_time = current_time - ttl_seconds + + try: + for root, dirs, files in os.walk(dir_path): + for file in files: + file_path = os.path.join(root, file) + try: + stat_info = os.stat(file_path) + if stat_info.st_atime > cutoff_time: + return False + except (OSError, PermissionError) as e: + print(f"Warning: Could not stat {file_path}: {e}") + return False + + return True + except (OSError, PermissionError) as e: + print(f"Warning: Could not walk directory {dir_path}: {e}") + return False + + +def delete_directory(dir_path: str) -> None: + """ + Delete a directory and all its contents. + + Args: + dir_path: Path to the directory to delete + """ + try: + import shutil + shutil.rmtree(dir_path) + print(f"Deleted directory: {dir_path}") + except Exception as e: + print(f"Error deleting directory {dir_path}: {e}") + + +exa_cluster_pod_template = PodTemplate( + pod_spec=V1PodSpec( + node_selector={"cluster": "exa-cluster"}, + volumes=[ + V1Volume( + name="nfs-pvc", + persistent_volume_claim=V1PersistentVolumeClaimVolumeSource( + claim_name="nfs-pvc", + ), + ), + ], + containers=[ + V1Container( + name="primary", + volume_mounts=[ + V1VolumeMount( + name="nfs-pvc", + mount_path="/mnt/nfs", + ), + ], + resources=V1ResourceRequirements( + requests={ + "cpu": "4", + "memory": "8Gi" + }, + limits={ + "cpu": "4", + "memory": "8Gi" + } + ), + ) + ], + ) +) + + +cirrascale_pod_template = PodTemplate( + pod_spec=V1PodSpec( + node_selector={"cluster": "cirrascale"}, + volumes=[ + V1Volume( + name="nfs-volume", + nfs=V1NFSVolumeSource( + server="172.18.72.200", + path="/export/metaphor" + ), + ), + ], + containers=[ + V1Container( + name="primary", + volume_mounts=[ + V1VolumeMount( + name="nfs-volume", + mount_path="/mnt/nfs", + ), + ], + resources=V1ResourceRequirements( + requests={ + "cpu": "4", + "memory": "8Gi" + }, + limits={ + "cpu": "4", + "memory": "8Gi" + } + ), + security_context=V1SecurityContext( + run_as_user=0, + run_as_group=0, + ) + ) + ], + ) +) + + +@task(pod_template=exa_cluster_pod_template) +def cleanup_nfs_ttl_exa( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Clean up old directories on exa-cluster NFS. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + ttl_seconds = ttl_days * 24 * 60 * 60 + deleted_count = 0 + skipped_count = 0 + deleted_dirs: List[str] = [] + + print(f"Starting NFS cleanup on exa-cluster") + print(f"Base path: {base_path}") + print(f"TTL: {ttl_days} days ({ttl_seconds} seconds)") + print(f"Dry run: {dry_run}") + + if not os.path.exists(base_path): + print(f"Base path {base_path} does not exist") + return { + "deleted_count": 0, + "skipped_count": 0, + "deleted_dirs": [], + "error": f"Base path {base_path} does not exist" + } + + try: + entries = os.listdir(base_path) + print(f"Found {len(entries)} entries in {base_path}") + + for entry in entries: + full_path = os.path.join(base_path, entry) + + if not os.path.isdir(full_path): + continue + + if should_delete_directory(full_path, ttl_seconds): + print(f"Directory eligible for deletion: {full_path}") + if not dry_run: + delete_directory(full_path) + deleted_count += 1 + deleted_dirs.append(entry) + else: + skipped_count += 1 + + except Exception as e: + print(f"Error scanning base path: {e}") + return { + "deleted_count": deleted_count, + "skipped_count": skipped_count, + "deleted_dirs": deleted_dirs, + "error": str(e) + } + + print(f"Cleanup complete. Deleted: {deleted_count}, Skipped: {skipped_count}") + return { + "deleted_count": deleted_count, + "skipped_count": skipped_count, + "deleted_dirs": deleted_dirs + } + + +@task(pod_template=cirrascale_pod_template) +def cleanup_nfs_ttl_cirrascale( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Clean up old directories on cirrascale NFS. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + ttl_seconds = ttl_days * 24 * 60 * 60 + deleted_count = 0 + skipped_count = 0 + deleted_dirs: List[str] = [] + + print(f"Starting NFS cleanup on cirrascale") + print(f"Base path: {base_path}") + print(f"TTL: {ttl_days} days ({ttl_seconds} seconds)") + print(f"Dry run: {dry_run}") + + if not os.path.exists(base_path): + print(f"Base path {base_path} does not exist") + return { + "deleted_count": 0, + "skipped_count": 0, + "deleted_dirs": [], + "error": f"Base path {base_path} does not exist" + } + + try: + entries = os.listdir(base_path) + print(f"Found {len(entries)} entries in {base_path}") + + for entry in entries: + full_path = os.path.join(base_path, entry) + + if not os.path.isdir(full_path): + continue + + if should_delete_directory(full_path, ttl_seconds): + print(f"Directory eligible for deletion: {full_path}") + if not dry_run: + delete_directory(full_path) + deleted_count += 1 + deleted_dirs.append(entry) + else: + skipped_count += 1 + + except Exception as e: + print(f"Error scanning base path: {e}") + return { + "deleted_count": deleted_count, + "skipped_count": skipped_count, + "deleted_dirs": deleted_dirs, + "error": str(e) + } + + print(f"Cleanup complete. Deleted: {deleted_count}, Skipped: {skipped_count}") + return { + "deleted_count": deleted_count, + "skipped_count": skipped_count, + "deleted_dirs": deleted_dirs + } + + +@workflow +def nfs_ttl_cleanup_workflow( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Workflow to clean up old directories from NFS based on TTL. + + This is a generic workflow that will be specialized by launch plans + for different clusters. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + return cleanup_nfs_ttl_exa(base_path=base_path, ttl_days=ttl_days, dry_run=dry_run) + + +@workflow +def nfs_ttl_cleanup_exa_workflow( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Workflow specifically for exa-cluster NFS cleanup. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + return cleanup_nfs_ttl_exa(base_path=base_path, ttl_days=ttl_days, dry_run=dry_run) + + +@workflow +def nfs_ttl_cleanup_cirrascale_workflow( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Workflow specifically for cirrascale NFS cleanup. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + return cleanup_nfs_ttl_cirrascale(base_path=base_path, ttl_days=ttl_days, dry_run=dry_run) + + +daily_schedule = CronSchedule( + schedule="0 0 * * *", # Run at midnight every day +) + +exa_cluster_launch_plan = LaunchPlan.get_or_create( + workflow=nfs_ttl_cleanup_exa_workflow, + name="nfs_ttl_cleanup_exa_daily", + schedule=daily_schedule, + default_inputs={ + "base_path": "/mnt/nfs", + "ttl_days": 28, # 4 weeks default + "dry_run": False + } +) + +cirrascale_launch_plan = LaunchPlan.get_or_create( + workflow=nfs_ttl_cleanup_cirrascale_workflow, + name="nfs_ttl_cleanup_cirrascale_daily", + schedule=daily_schedule, + default_inputs={ + "base_path": "/mnt/nfs", + "ttl_days": 28, # 4 weeks default + "dry_run": False + } +) + + +if __name__ == "__main__": + print("Testing exa-cluster workflow...") + result = nfs_ttl_cleanup_exa_workflow(base_path="/tmp/test", ttl_days=1, dry_run=True) + print(f"Result: {result}")