diff --git a/examples/NFS_TTL_CLEANUP_README.md b/examples/NFS_TTL_CLEANUP_README.md new file mode 100644 index 0000000000..e3189e6128 --- /dev/null +++ b/examples/NFS_TTL_CLEANUP_README.md @@ -0,0 +1,174 @@ +# NFS TTL Cleanup Workflow + +This example demonstrates a Flyte workflow that cleans up old directories from NFS storage based on a Time-To-Live (TTL) period. + +## Overview + +The workflow scans directories on NFS mounts and deletes directories where **all files** haven't been accessed within the specified TTL period (default: 4 weeks). This helps manage storage space by removing stale data. + +## Features + +- **Two Cluster Support**: + - `exa-cluster`: Uses NFS PVC (nfs-pvc) + - `cirrascale`: Uses NFS direct mount (172.18.72.200:/export/metaphor) + +- **Scheduled Execution**: Runs daily at midnight UTC + +- **Configurable TTL**: Default is 28 days (4 weeks), but can be customized + +- **Dry Run Mode**: Test the cleanup without actually deleting files + +- **Safe Deletion Logic**: Only deletes directories where ALL files are older than TTL + +## Architecture + +### Tasks + +- `cleanup_nfs_ttl_exa`: Task configured for exa-cluster with PVC mounting +- `cleanup_nfs_ttl_cirrascale`: Task configured for cirrascale with direct NFS mounting + +Each task: +- Mounts the appropriate NFS storage +- Selects the correct cluster via node selector +- Configures resource requirements (4 CPU, 8Gi memory) + +### Workflows + +- `nfs_ttl_cleanup_exa_workflow`: Workflow for exa-cluster +- `nfs_ttl_cleanup_cirrascale_workflow`: Workflow for cirrascale + +### Launch Plans + +Two launch plans are configured to run daily: + +- `nfs_ttl_cleanup_exa_daily`: Runs on exa-cluster daily at midnight UTC +- `nfs_ttl_cleanup_cirrascale_daily`: Runs on cirrascale daily at midnight UTC + +Both launch plans have default inputs: +- `base_path`: "/mnt/nfs" +- `ttl_days`: 28 (4 weeks) +- `dry_run`: False + +## Usage + +### Register the Workflow + +```bash +pyflyte register --project my-project --domain development nfs_ttl_cleanup.py +``` + +### Manual Execution + +You can manually execute the workflow with custom parameters: + +```bash +# Dry run to see what would be deleted +pyflyte run nfs_ttl_cleanup.py nfs_ttl_cleanup_exa_workflow \ + --base_path /mnt/nfs \ + --ttl_days 28 \ + --dry_run True + +# Actual cleanup with custom TTL (7 days) +pyflyte run nfs_ttl_cleanup.py nfs_ttl_cleanup_exa_workflow \ + --base_path /mnt/nfs \ + --ttl_days 7 \ + --dry_run False +``` + +### Launch Plan Execution + +The launch plans will run automatically on schedule, but you can also trigger them manually: + +```bash +# Using flytectl +flytectl create execution --project my-project --domain production \ + -p nfs_ttl_cleanup_exa_daily +``` + +## How It Works + +1. **Directory Scanning**: The task scans all top-level directories in the base path + +2. **Access Time Check**: For each directory, it walks through all files and checks their last access time (`st_atime`) + +3. **TTL Evaluation**: If ALL files in a directory have not been accessed within the TTL period, the directory is marked for deletion + +4. **Safe Deletion**: Only directories that meet the criteria are deleted using `shutil.rmtree()` + +5. **Statistics**: Returns counts of deleted and skipped directories + +## Configuration + +### Changing the Schedule + +Modify the `daily_schedule` to change when the workflow runs: + +```python +# Run every 12 hours +daily_schedule = CronSchedule(schedule="0 */12 * * *") + +# Run weekly on Sunday at midnight +weekly_schedule = CronSchedule(schedule="0 0 * * 0") +``` + +### Adjusting Resources + +Modify the `V1ResourceRequirements` in the pod templates: + +```python +resources=V1ResourceRequirements( + requests={ + "cpu": "8", + "memory": "16Gi" + }, + limits={ + "cpu": "8", + "memory": "16Gi" + } +) +``` + +### Customizing Base Path + +You can change the default `base_path` in the launch plan: + +```python +default_inputs={ + "base_path": "/mnt/nfs/custom/path", + "ttl_days": 28, + "dry_run": False +} +``` + +## Safety Considerations + +1. **File Access Time**: The workflow uses `st_atime` (last access time). Note that some filesystems may not update this accurately depending on mount options (e.g., `noatime`). + +2. **Permissions**: The cirrascale task runs as root (UID 0) to ensure proper NFS access. The exa-cluster task uses the default pod user. + +3. **Dry Run First**: Always test with `dry_run=True` before running actual cleanup. + +4. **Top-Level Only**: The workflow only considers top-level directories in the base path, not nested subdirectories. + +5. **Error Handling**: If a file cannot be accessed, the directory is considered active and not deleted. + +## Monitoring + +Check the execution logs to see: +- Number of directories scanned +- Number of directories deleted +- Number of directories skipped +- List of deleted directory names +- Any errors encountered + +## Return Value + +The workflow returns a dictionary with: +```python +{ + "deleted_count": int, # Number of directories deleted + "skipped_count": int, # Number of directories skipped + "deleted_dirs": List[str], # Names of deleted directories + "error": str # Error message if any (optional) +} +``` diff --git a/examples/nfs_ttl_cleanup.py b/examples/nfs_ttl_cleanup.py new file mode 100644 index 0000000000..a523d2f43e --- /dev/null +++ b/examples/nfs_ttl_cleanup.py @@ -0,0 +1,392 @@ +""" +NFS TTL Cleanup Workflow + +This workflow deletes directories from NFS storage where all files haven't been +accessed in the specified TTL period. It supports two clusters: +- exa-cluster: Uses NFS PVC (nfs-pvc) +- cirrascale: Uses NFS direct mount (172.18.72.200:/export/metaphor) + +The workflow runs daily and defaults to a 4-week TTL period. +""" + +import os +import time +from datetime import timedelta +from typing import List + +from flytekit import LaunchPlan, PodTemplate, task, workflow +from flytekit.core.schedule import CronSchedule +from kubernetes.client import ( + V1Container, + V1NFSVolumeSource, + V1PersistentVolumeClaimVolumeSource, + V1PodSpec, + V1ResourceRequirements, + V1SecurityContext, + V1Volume, + V1VolumeMount, +) + + +def should_delete_directory(dir_path: str, ttl_seconds: float) -> bool: + """ + Check if all files in a directory haven't been accessed within TTL period. + + Args: + dir_path: Path to the directory to check + ttl_seconds: TTL in seconds + + Returns: + True if all files haven't been accessed within TTL, False otherwise + """ + current_time = time.time() + cutoff_time = current_time - ttl_seconds + + try: + for root, dirs, files in os.walk(dir_path): + for file in files: + file_path = os.path.join(root, file) + try: + stat_info = os.stat(file_path) + if stat_info.st_atime > cutoff_time: + return False + except (OSError, PermissionError) as e: + print(f"Warning: Could not stat {file_path}: {e}") + return False + + return True + except (OSError, PermissionError) as e: + print(f"Warning: Could not walk directory {dir_path}: {e}") + return False + + +def delete_directory(dir_path: str) -> None: + """ + Delete a directory and all its contents. + + Args: + dir_path: Path to the directory to delete + """ + try: + import shutil + shutil.rmtree(dir_path) + print(f"Deleted directory: {dir_path}") + except Exception as e: + print(f"Error deleting directory {dir_path}: {e}") + + +exa_cluster_pod_template = PodTemplate( + pod_spec=V1PodSpec( + node_selector={"cluster": "exa-cluster"}, + volumes=[ + V1Volume( + name="nfs-pvc", + persistent_volume_claim=V1PersistentVolumeClaimVolumeSource( + claim_name="nfs-pvc", + ), + ), + ], + containers=[ + V1Container( + name="primary", + volume_mounts=[ + V1VolumeMount( + name="nfs-pvc", + mount_path="/mnt/nfs", + ), + ], + resources=V1ResourceRequirements( + requests={ + "cpu": "4", + "memory": "8Gi" + }, + limits={ + "cpu": "4", + "memory": "8Gi" + } + ), + ) + ], + ) +) + + +cirrascale_pod_template = PodTemplate( + pod_spec=V1PodSpec( + node_selector={"cluster": "cirrascale"}, + volumes=[ + V1Volume( + name="nfs-volume", + nfs=V1NFSVolumeSource( + server="172.18.72.200", + path="/export/metaphor" + ), + ), + ], + containers=[ + V1Container( + name="primary", + volume_mounts=[ + V1VolumeMount( + name="nfs-volume", + mount_path="/mnt/nfs", + ), + ], + resources=V1ResourceRequirements( + requests={ + "cpu": "4", + "memory": "8Gi" + }, + limits={ + "cpu": "4", + "memory": "8Gi" + } + ), + security_context=V1SecurityContext( + run_as_user=0, + run_as_group=0, + ) + ) + ], + ) +) + + +@task(pod_template=exa_cluster_pod_template) +def cleanup_nfs_ttl_exa( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Clean up old directories on exa-cluster NFS. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + ttl_seconds = ttl_days * 24 * 60 * 60 + deleted_count = 0 + skipped_count = 0 + deleted_dirs: List[str] = [] + + print(f"Starting NFS cleanup on exa-cluster") + print(f"Base path: {base_path}") + print(f"TTL: {ttl_days} days ({ttl_seconds} seconds)") + print(f"Dry run: {dry_run}") + + if not os.path.exists(base_path): + print(f"Base path {base_path} does not exist") + return { + "deleted_count": 0, + "skipped_count": 0, + "deleted_dirs": [], + "error": f"Base path {base_path} does not exist" + } + + try: + entries = os.listdir(base_path) + print(f"Found {len(entries)} entries in {base_path}") + + for entry in entries: + full_path = os.path.join(base_path, entry) + + if not os.path.isdir(full_path): + continue + + if should_delete_directory(full_path, ttl_seconds): + print(f"Directory eligible for deletion: {full_path}") + if not dry_run: + delete_directory(full_path) + deleted_count += 1 + deleted_dirs.append(entry) + else: + skipped_count += 1 + + except Exception as e: + print(f"Error scanning base path: {e}") + return { + "deleted_count": deleted_count, + "skipped_count": skipped_count, + "deleted_dirs": deleted_dirs, + "error": str(e) + } + + print(f"Cleanup complete. Deleted: {deleted_count}, Skipped: {skipped_count}") + return { + "deleted_count": deleted_count, + "skipped_count": skipped_count, + "deleted_dirs": deleted_dirs + } + + +@task(pod_template=cirrascale_pod_template) +def cleanup_nfs_ttl_cirrascale( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Clean up old directories on cirrascale NFS. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + ttl_seconds = ttl_days * 24 * 60 * 60 + deleted_count = 0 + skipped_count = 0 + deleted_dirs: List[str] = [] + + print(f"Starting NFS cleanup on cirrascale") + print(f"Base path: {base_path}") + print(f"TTL: {ttl_days} days ({ttl_seconds} seconds)") + print(f"Dry run: {dry_run}") + + if not os.path.exists(base_path): + print(f"Base path {base_path} does not exist") + return { + "deleted_count": 0, + "skipped_count": 0, + "deleted_dirs": [], + "error": f"Base path {base_path} does not exist" + } + + try: + entries = os.listdir(base_path) + print(f"Found {len(entries)} entries in {base_path}") + + for entry in entries: + full_path = os.path.join(base_path, entry) + + if not os.path.isdir(full_path): + continue + + if should_delete_directory(full_path, ttl_seconds): + print(f"Directory eligible for deletion: {full_path}") + if not dry_run: + delete_directory(full_path) + deleted_count += 1 + deleted_dirs.append(entry) + else: + skipped_count += 1 + + except Exception as e: + print(f"Error scanning base path: {e}") + return { + "deleted_count": deleted_count, + "skipped_count": skipped_count, + "deleted_dirs": deleted_dirs, + "error": str(e) + } + + print(f"Cleanup complete. Deleted: {deleted_count}, Skipped: {skipped_count}") + return { + "deleted_count": deleted_count, + "skipped_count": skipped_count, + "deleted_dirs": deleted_dirs + } + + +@workflow +def nfs_ttl_cleanup_workflow( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Workflow to clean up old directories from NFS based on TTL. + + This is a generic workflow that will be specialized by launch plans + for different clusters. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + return cleanup_nfs_ttl_exa(base_path=base_path, ttl_days=ttl_days, dry_run=dry_run) + + +@workflow +def nfs_ttl_cleanup_exa_workflow( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Workflow specifically for exa-cluster NFS cleanup. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + return cleanup_nfs_ttl_exa(base_path=base_path, ttl_days=ttl_days, dry_run=dry_run) + + +@workflow +def nfs_ttl_cleanup_cirrascale_workflow( + base_path: str = "/mnt/nfs", + ttl_days: int = 28, + dry_run: bool = False +) -> dict: + """ + Workflow specifically for cirrascale NFS cleanup. + + Args: + base_path: Base path to scan for directories + ttl_days: Time-to-live in days (default 28 days = 4 weeks) + dry_run: If True, only report what would be deleted without deleting + + Returns: + Dictionary with cleanup statistics + """ + return cleanup_nfs_ttl_cirrascale(base_path=base_path, ttl_days=ttl_days, dry_run=dry_run) + + +daily_schedule = CronSchedule( + schedule="0 0 * * *", # Run at midnight every day +) + +exa_cluster_launch_plan = LaunchPlan.get_or_create( + workflow=nfs_ttl_cleanup_exa_workflow, + name="nfs_ttl_cleanup_exa_daily", + schedule=daily_schedule, + default_inputs={ + "base_path": "/mnt/nfs", + "ttl_days": 28, # 4 weeks default + "dry_run": False + } +) + +cirrascale_launch_plan = LaunchPlan.get_or_create( + workflow=nfs_ttl_cleanup_cirrascale_workflow, + name="nfs_ttl_cleanup_cirrascale_daily", + schedule=daily_schedule, + default_inputs={ + "base_path": "/mnt/nfs", + "ttl_days": 28, # 4 weeks default + "dry_run": False + } +) + + +if __name__ == "__main__": + print("Testing exa-cluster workflow...") + result = nfs_ttl_cleanup_exa_workflow(base_path="/tmp/test", ttl_days=1, dry_run=True) + print(f"Result: {result}")