diff --git a/README.fr.md b/README.fr.md index 1e82afe..e5ab221 100644 --- a/README.fr.md +++ b/README.fr.md @@ -100,7 +100,7 @@ Gaspillage minimum estimé : ~$25 944/mois - Détecte le gaspillage IA/ML coûteux : SageMaker, AML, Vertex AI — ressources GPU signalées comme candidats à risque plus élevé (500–23 000 $/mois) - Fonctionne sur AWS, Azure et GCP en un seul outil - S'exécute entièrement dans votre environnement — aucun agent, pas de SaaS, aucun credential stocké -- 47 règles de détection sélectives et haut signal, conçues pour éviter les faux positifs en environnements IaC +- 48 règles de détection sélectives et haut signal, conçues pour éviter les faux positifs en environnements IaC - Prêt pour CI/CD — codes de sortie d'application + sorties JSON/CSV/markdown ### Ce que CleanCloud ne fait PAS @@ -153,6 +153,7 @@ L'infrastructure IA/ML inactive est la source de gaspillage cloud invisible à l | Studio Apps SageMaker (KernelGateway/JupyterLab/CodeEditor) | 42 – 1 600+ $ / mois | | Domaine SageMaker (stockage EFS inactif) | Charges EFS continues | | Training Job SageMaker (job GPU runaway/bloqué) | 670 – 2 360+ $ / jour | +| Processing Job SageMaker (bloqué/stuck) | 670 – 2 360+ $ / jour | | Cluster AML Compute Azure (GPU) | 600 – 15 000 $ / mois | | Instance de calcul Azure ML (GPU) | 600 – 15 000+ $ / mois | | Endpoint en ligne Azure ML (GPU) | 200 – 2 600+ $ / mois | @@ -166,7 +167,7 @@ L'infrastructure IA/ML inactive est la source de gaspillage cloud invisible à l CleanCloud détecte les endpoints à zéro invocation / zéro prédiction, l'activité de contrôle inactive sur les notebooks et apps managés, ainsi que les training jobs managés anormalement longs sur les 3 clouds. Les outils natifs montrent la facture — ils ne nomment pas la ressource concrète à examiner. ```bash -cleancloud scan --provider aws --category ai # PTUs Bedrock + endpoints + notebooks + domaines + Studio apps SageMaker + training jobs SageMaker + EC2 GPU +cleancloud scan --provider aws --category ai # PTUs Bedrock + endpoints + notebooks + domaines + Studio apps SageMaker + training jobs + processing jobs SageMaker + EC2 GPU cleancloud scan --provider azure --category ai # clusters AML + instances ML + endpoints en ligne + AI Search + PTUs OpenAI cleancloud scan --provider gcp --category ai # endpoints Vertex AI + Workbench + training jobs + Cloud TPU + Feature Stores cleancloud scan --provider aws --category all # hygiène + IA/ML ensemble @@ -433,7 +434,7 @@ Oui. CleanCloud n'a besoin d'accès réseau qu'aux endpoints API de votre cloud ## Ce que CleanCloud détecte -47 règles pour AWS, Azure et GCP — conservatrices, haut signal, conçues pour éviter les faux positifs en environnements IaC. +48 règles pour AWS, Azure et GCP — conservatrices, haut signal, conçues pour éviter les faux positifs en environnements IaC. **AWS :** - Compute : instances arrêtées 30+ jours (charges EBS continuent) @@ -442,7 +443,7 @@ Oui. CleanCloud n'a besoin d'accès réseau qu'aux endpoints API de votre cloud - Plateforme : instances RDS inactives (HIGH) - Observabilité : logs CloudWatch à rétention infinie - Gouvernance : ressources sans tags, security groups inutilisés -- IA/ML *(opt-in : `--category ai`)* : Bedrock Provisioned Throughput (Model Units) inactifs avec zéro invocation depuis 7+ jours ; endpoints SageMaker sans trafic `InvokeEndpoint` observé depuis 14+ jours ; instances Notebook SageMaker avec timestamps de contrôle inactifs depuis 14+ jours ; Domaines SageMaker sans apps en cours d'exécution sur tous les profils et espaces depuis 30+ jours (coût de stockage EFS continu) ; Studio Apps SageMaker (`KernelGateway`/`JupyterLab`/`CodeEditor`) sans signal d'activité récent exploitable depuis 7+ jours ; training jobs SageMaker toujours `InProgress` au-delà du seuil de 24h +- IA/ML *(opt-in : `--category ai`)* : Bedrock Provisioned Throughput (Model Units) inactifs avec zéro invocation depuis 7+ jours ; endpoints SageMaker sans trafic `InvokeEndpoint` observé depuis 14+ jours ; instances Notebook SageMaker avec timestamps de contrôle inactifs depuis 14+ jours ; Domaines SageMaker sans apps en cours d'exécution sur tous les profils et espaces depuis 30+ jours (coût de stockage EFS continu) ; Studio Apps SageMaker (`KernelGateway`/`JupyterLab`/`CodeEditor`) sans signal d'activité récent exploitable depuis 7+ jours ; training jobs SageMaker toujours `InProgress` au-delà du seuil de 24h ; processing jobs SageMaker toujours `InProgress` au-delà du seuil de 24h **Azure :** - Compute : VMs arrêtées (non désallouées) (HIGH) diff --git a/README.md b/README.md index 53a45a2..86bf498 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ Minimum estimated waste: ~$25,944/month - Catches expensive idle AI/ML waste: SageMaker, AML, Vertex AI — GPU-backed resources flagged as higher-risk review candidates ($500–$23K/month) - Works across AWS, Azure, and GCP in one tool - Runs entirely in your environment — no agents, no SaaS, no credentials stored -- 46 curated, high-signal detection rules designed to avoid false positives in IaC environments +- 48 curated, high-signal detection rules designed to avoid false positives in IaC environments - CI/CD-ready — enforcement exit codes + JSON/CSV/markdown output ### What CleanCloud does NOT do @@ -153,6 +153,7 @@ Idle AI/ML infrastructure is the fastest-growing source of invisible cloud spend | SageMaker Studio Apps (KernelGateway/JupyterLab/CodeEditor) | $42 – $1,600+ / month | | SageMaker Domain (idle EFS storage) | Continuous EFS charges | | SageMaker Training Job (runaway/hung GPU job) | $670 – $2,360+ / day | +| SageMaker Processing Job (hung/stuck) | $670 – $2,360+ / day | | Azure AML compute cluster (GPU) | $600 – $15,000 / month | | Azure ML Compute Instance (GPU) | $600 – $15,000+ / month | | Azure ML Online Endpoint (GPU-backed) | $200 – $2,600+ / month | @@ -166,7 +167,7 @@ Idle AI/ML infrastructure is the fastest-growing source of invisible cloud spend CleanCloud detects zero-invocation / zero-prediction endpoints, stale managed notebook and app activity, and long-running managed training jobs across all three clouds. Native cost tools show the bill — they do not name the specific resource to review. ```bash -cleancloud scan --provider aws --category ai # Bedrock PTUs + SageMaker endpoints + notebooks + domains + Studio apps + training jobs + idle GPU EC2 +cleancloud scan --provider aws --category ai # Bedrock PTUs + SageMaker endpoints + notebooks + domains + Studio apps + training jobs + processing jobs + idle GPU EC2 cleancloud scan --provider azure --category ai # AML compute + ML instances + online endpoints + AI Search + OpenAI PTUs cleancloud scan --provider gcp --category ai # Vertex AI endpoints + Workbench + training jobs + Cloud TPU + Feature Stores cleancloud scan --provider aws --category all # hygiene + AI/ML together @@ -433,7 +434,7 @@ Yes. CleanCloud only needs network access to your cloud provider's API endpoints ## What CleanCloud Detects -47 rules across AWS, Azure, and GCP — conservative, high-signal, designed to avoid false positives in IaC environments. +48 rules across AWS, Azure, and GCP — conservative, high-signal, designed to avoid false positives in IaC environments. **AWS:** - Compute: stopped instances 30+ days (EBS charges continue) @@ -442,7 +443,7 @@ Yes. CleanCloud only needs network access to your cloud provider's API endpoints - Platform: idle RDS instances (HIGH) - Observability: infinite retention CloudWatch Logs - Governance: untagged resources, unused security groups -- AI/ML *(opt-in: `--category ai`)*: idle Bedrock Provisioned Throughput (Model Units) with zero invocations 7+ days; idle SageMaker endpoints with no observed `InvokeEndpoint` traffic 14+ days; SageMaker Notebook Instances with stale control-plane timestamps 14+ days; SageMaker Domains with no running apps across all user profiles and spaces 30+ days (continuous EFS storage cost); SageMaker Studio apps (`KernelGateway`/`JupyterLab`/`CodeEditor`) with no usable recent activity signal 7+ days; SageMaker training jobs still `InProgress` beyond the 24h threshold +- AI/ML *(opt-in: `--category ai`)*: idle Bedrock Provisioned Throughput (Model Units) with zero invocations 7+ days; idle SageMaker endpoints with no observed `InvokeEndpoint` traffic 14+ days; SageMaker Notebook Instances with stale control-plane timestamps 14+ days; SageMaker Domains with no running apps across all user profiles and spaces 30+ days (continuous EFS storage cost); SageMaker Studio apps (`KernelGateway`/`JupyterLab`/`CodeEditor`) with no usable recent activity signal 7+ days; SageMaker training jobs still `InProgress` beyond the 24h threshold; SageMaker processing jobs still `InProgress` beyond the 24h threshold **Azure:** - Compute: stopped (not deallocated) VMs (HIGH) diff --git a/cleancloud/doctor/aws.py b/cleancloud/doctor/aws.py index dc12fea..73fc70b 100644 --- a/cleancloud/doctor/aws.py +++ b/cleancloud/doctor/aws.py @@ -856,6 +856,35 @@ def run_aws_ai_doctor(profile: Optional[str], region: Optional[str] = None) -> N permissions_failed.append(("sagemaker:DescribeTrainingJob", str(e))) warn(f"sagemaker:DescribeTrainingJob - {e}") + # --- sagemaker:ListProcessingJobs + sagemaker:DescribeProcessingJob (aws.sagemaker.processing_job.long_running) --- + try: + sagemaker.list_processing_jobs(MaxResults=1) + permissions_tested.append("sagemaker:ListProcessingJobs") + success("sagemaker:ListProcessingJobs") + except Exception as e: + permissions_failed.append(("sagemaker:ListProcessingJobs", str(e))) + warn(f"sagemaker:ListProcessingJobs - {e}") + + try: + _pj_paginator = sagemaker.get_paginator("list_processing_jobs") + _target_job = None + for _pj_page in _pj_paginator.paginate(PaginationConfig={"PageSize": 20}): + for _pj in _pj_page.get("ProcessingJobSummaries", []): + _target_job = _pj + break + if _target_job: + break + + if _target_job: + sagemaker.describe_processing_job(ProcessingJobName=_target_job["ProcessingJobName"]) + permissions_tested.append("sagemaker:DescribeProcessingJob") + success("sagemaker:DescribeProcessingJob") + else: + info("sagemaker:DescribeProcessingJob - not tested (no processing job found to probe)") + except Exception as e: + permissions_failed.append(("sagemaker:DescribeProcessingJob", str(e))) + warn(f"sagemaker:DescribeProcessingJob - {e}") + try: cloudwatch = session.client("cloudwatch", region_name=region) now = datetime.now(timezone.utc) diff --git a/cleancloud/providers/aws/errors.py b/cleancloud/providers/aws/errors.py new file mode 100644 index 0000000..9f34a7d --- /dev/null +++ b/cleancloud/providers/aws/errors.py @@ -0,0 +1,21 @@ +"""Shared AWS error classification helpers.""" + +from botocore.exceptions import ClientError + +# Error codes that indicate a permission/authorization failure. +# Covers the common SageMaker, EC2, IAM, and service-specific variants. +_PERMISSION_ERROR_CODES = frozenset( + { + "AccessDenied", + "AccessDeniedException", + "UnauthorizedOperation", + "UnauthorizedException", + "Client.UnauthorizedOperation", + } +) + + +def is_permission_error(exc: ClientError) -> bool: + """Return True when a ClientError represents an authorization failure.""" + code = exc.response.get("Error", {}).get("Code", "") + return code in _PERMISSION_ERROR_CODES diff --git a/cleancloud/providers/aws/rules/ai/sagemaker_processing_job_long_running.py b/cleancloud/providers/aws/rules/ai/sagemaker_processing_job_long_running.py new file mode 100644 index 0000000..cfef738 --- /dev/null +++ b/cleancloud/providers/aws/rules/ai/sagemaker_processing_job_long_running.py @@ -0,0 +1,363 @@ +""" +Rule: aws.sagemaker.processing_job.long_running + + (spec — docs/specs/aws/ai/sagemaker_processing_job_long_running.md) + +Intent: + Detect SageMaker processing jobs that are still InProgress and have remained + active longer than the configured review threshold, so they can be reviewed + as possible hung, stuck, or forgotten jobs. + + This is a CleanCloud-derived review heuristic based on SageMaker + processing-job metadata, not an AWS-native long-running finding. It is a + read-only review-candidate rule — not a stop-safe rule. + +Exclusions: + - processing_job_name or processing_job_arn absent (malformed inventory item) + - list or describe status absent or not "InProgress" + - CreationTime absent, naive, or future beyond clock_skew_tolerance_seconds + - ProcessingStartTime future beyond clock_skew_tolerance_seconds + - ProcessingStartTime < CreationTime beyond clock_skew_tolerance_seconds + - elapsed_runtime_hours < long_running_hours_threshold + +Detection: + - InProgress processing job + - elapsed_runtime_hours >= long_running_hours_threshold + - runtime anchor: ProcessingStartTime when present, else CreationTime + +Key rules: + - ListProcessingJobs paginated WITHOUT StatusEquals (filtered client-side as + a conservative completeness choice) + - resource_id = ProcessingJobArn from describe; falls back to list-level ARN + - estimated_monthly_cost_usd = None + - Confidence: HIGH when exceeded_applicable_runtime_limit; MEDIUM otherwise + - Risk: HIGH for accelerator-backed (ml.g*, ml.p*, ml.inf*, ml.trn*); + MEDIUM otherwise + - configured_runtime_limit_seconds: MaxRuntimeInSeconds when present; the + value becomes applicable only after ProcessingStartTime exists + - ListProcessingJobs failure → FAIL RULE; permission failure → FAIL RULE + - DescribeProcessingJob permission failure → FAIL RULE + - DescribeProcessingJob non-permission failure → SKIP ITEM + - clock_skew_tolerance_seconds = 300 + +Blind spots: + - Actual processing-container progress state + - Exact price impact or savings impact + - Whether long pending time is actively billable compute + +APIs: + - sagemaker:ListProcessingJobs + - sagemaker:DescribeProcessingJob +""" + +from datetime import datetime, timedelta, timezone +from typing import List, Optional + +import boto3 +from botocore.exceptions import BotoCoreError, ClientError + +from cleancloud.core.confidence import ConfidenceLevel +from cleancloud.core.evidence import Evidence +from cleancloud.core.finding import Finding +from cleancloud.core.risk import RiskLevel +from cleancloud.providers.aws.errors import is_permission_error + +_DEFAULT_LONG_RUNNING_HOURS_THRESHOLD = 24 +_ELIGIBLE_STATUS = "InProgress" +_CLOCK_SKEW_TOLERANCE_SECONDS = 300 +_ACCELERATOR_PREFIXES = ("ml.g", "ml.p", "ml.inf", "ml.trn") + +_FINDING_TITLE = "Long-running SageMaker processing job review candidate" +_FINDING_REASON = ( + "InProgress SageMaker processing job has exceeded the configured long-running threshold" +) + +_SIGNALS_NOT_CHECKED = ( + "Actual processing-container progress state", + "Exact price impact or savings impact", + "Whether long pending time is actively billable compute", +) + +RULE_METADATA = { + "id": "aws.sagemaker.processing_job.long_running", + "category": "ai", + "service": "sagemaker", + "cost_impact": "high", +} + + +def _str(value: object) -> Optional[str]: + """Return value as str only when it is a non-empty string; else None.""" + return value if isinstance(value, str) and value else None + + +def _is_accelerator_backed(instance_type: Optional[str]) -> bool: + """Return True when the instance type is an accelerator (ml.g*, ml.p*, ml.inf*, ml.trn*).""" + if not instance_type: + return False + return any(instance_type.startswith(prefix) for prefix in _ACCELERATOR_PREFIXES) + + +def _normalize_list_item(item: object, now_utc: datetime) -> Optional[dict]: + """Normalize a raw ListProcessingJobs item to canonical list-level fields.""" + if not isinstance(item, dict): + return None + + skew_tol = timedelta(seconds=_CLOCK_SKEW_TOLERANCE_SECONDS) + + processing_job_name = _str(item.get("ProcessingJobName")) + if processing_job_name is None: + return None + + processing_job_arn = _str(item.get("ProcessingJobArn")) + if processing_job_arn is None: + return None + + list_status = _str(item.get("ProcessingJobStatus")) + if list_status is None: + return None + + raw_ct = item.get("CreationTime") + if not isinstance(raw_ct, datetime) or raw_ct.tzinfo is None: + return None + creation_time_utc = raw_ct.astimezone(timezone.utc) + if creation_time_utc > now_utc + skew_tol: + return None + + job_age_seconds = max(0, (now_utc - creation_time_utc).total_seconds()) + job_age_hours = int(job_age_seconds // 3600) + + last_modified_time_utc = None + raw_lmt = item.get("LastModifiedTime") + if isinstance(raw_lmt, datetime) and raw_lmt.tzinfo is not None: + lmt = raw_lmt.astimezone(timezone.utc) + if lmt <= now_utc + skew_tol: + last_modified_time_utc = lmt + + return { + "processing_job_name": processing_job_name, + "processing_job_arn": processing_job_arn, + "list_status": list_status, + "creation_time_utc": creation_time_utc, + "job_age_hours": job_age_hours, + "last_modified_time_utc": last_modified_time_utc, + } + + +def _normalize_describe(response: object, now_utc: datetime) -> Optional[dict]: + """Normalize a DescribeProcessingJob response to canonical describe-level fields.""" + if not isinstance(response, dict): + return None + + skew_tol = timedelta(seconds=_CLOCK_SKEW_TOLERANCE_SECONDS) + + resource_id = _str(response.get("ProcessingJobArn")) + + describe_status = _str(response.get("ProcessingJobStatus")) + if describe_status is None: + return None + + processing_start_time_utc = None + raw_pst = response.get("ProcessingStartTime") + if isinstance(raw_pst, datetime): + if raw_pst.tzinfo is None: + pass + else: + pst = raw_pst.astimezone(timezone.utc) + if pst > now_utc + skew_tol: + return None + processing_start_time_utc = pst + + stopping = response.get("StoppingCondition") + if not isinstance(stopping, dict): + stopping = {} + + raw_configured_limit = stopping.get("MaxRuntimeInSeconds") + configured_runtime_limit_seconds = ( + raw_configured_limit + if isinstance(raw_configured_limit, int) + and not isinstance(raw_configured_limit, bool) + and raw_configured_limit > 0 + else None + ) + + processing_resources = response.get("ProcessingResources") + if not isinstance(processing_resources, dict): + processing_resources = {} + cluster_config = processing_resources.get("ClusterConfig") + if not isinstance(cluster_config, dict): + cluster_config = {} + + instance_type = _str(cluster_config.get("InstanceType")) + raw_instance_count = cluster_config.get("InstanceCount") + instance_count = raw_instance_count if isinstance(raw_instance_count, int) else None + + return { + "resource_id": resource_id, + "describe_status": describe_status, + "processing_start_time_utc": processing_start_time_utc, + "configured_runtime_limit_seconds": configured_runtime_limit_seconds, + "instance_type": instance_type, + "instance_count": instance_count, + } + + +def find_long_running_sagemaker_processing_jobs( + session: boto3.Session, + region: str, + long_running_hours_threshold: int = _DEFAULT_LONG_RUNNING_HOURS_THRESHOLD, +) -> List[Finding]: + sagemaker = session.client("sagemaker", region_name=region) + + # Spec 8: paginate without StatusEquals and filter InProgress client-side. + try: + paginator = sagemaker.get_paginator("list_processing_jobs") + page_iterator = paginator.paginate() + except ClientError as exc: + if is_permission_error(exc): + raise PermissionError( + "Missing required IAM permission: sagemaker:ListProcessingJobs" + ) from exc + raise + except BotoCoreError: + raise + + now = datetime.now(timezone.utc) + evaluation_window_start = now - timedelta(seconds=long_running_hours_threshold * 3600) + skew_tol = timedelta(seconds=_CLOCK_SKEW_TOLERANCE_SECONDS) + findings: List[Finding] = [] + + for page in page_iterator: + for raw_item in page.get("ProcessingJobSummaries", []): + nl = _normalize_list_item(raw_item, now) + if nl is None: + continue + if nl["list_status"] != _ELIGIBLE_STATUS: + continue + + try: + raw_describe = sagemaker.describe_processing_job( + ProcessingJobName=nl["processing_job_name"] + ) + except ClientError as exc: + if is_permission_error(exc): + raise PermissionError( + "Missing required IAM permission: sagemaker:DescribeProcessingJob" + ) from exc + continue + except BotoCoreError: + continue + + nd = _normalize_describe(raw_describe, now) + if nd is None: + continue + if nd["describe_status"] != _ELIGIBLE_STATUS: + continue + + processing_start_time = nd["processing_start_time_utc"] + if processing_start_time is not None: + if processing_start_time < nl["creation_time_utc"] - skew_tol: + continue + + resource_id = nd["resource_id"] or nl["processing_job_arn"] + processing_job_arn = resource_id + + if processing_start_time is not None: + runtime_anchor_type = "processing_start_time" + runtime_anchor_label = "ProcessingStartTime" + active_seconds = max(0, (now - processing_start_time).total_seconds()) + active_processing_hours = int(active_seconds // 3600) + elapsed_runtime_hours = active_processing_hours + elapsed_runtime_seconds = active_seconds + else: + runtime_anchor_type = "creation_time" + runtime_anchor_label = "CreationTime" + active_processing_hours = None + elapsed_runtime_hours = nl["job_age_hours"] + elapsed_runtime_seconds = max(0, (now - nl["creation_time_utc"]).total_seconds()) + + applicable_runtime_limit_seconds = ( + nd["configured_runtime_limit_seconds"] + if processing_start_time is not None + else None + ) + unbounded_runtime_limit = applicable_runtime_limit_seconds is None + exceeded_applicable_runtime_limit = ( + applicable_runtime_limit_seconds is not None + and elapsed_runtime_seconds > applicable_runtime_limit_seconds + ) + + if elapsed_runtime_hours < long_running_hours_threshold: + continue + + is_accelerator_backed = _is_accelerator_backed(nd["instance_type"]) + risk = RiskLevel.HIGH if is_accelerator_backed else RiskLevel.MEDIUM + confidence = ( + ConfidenceLevel.HIGH + if exceeded_applicable_runtime_limit + else ConfidenceLevel.MEDIUM + ) + + signals_used = [ + f"Processing job primary status is '{_ELIGIBLE_STATUS}'", + f"elapsed_runtime_hours ({elapsed_runtime_hours}h) met or exceeded the " + f"configured threshold ({long_running_hours_threshold}h)", + f"Runtime anchor used: {runtime_anchor_label}", + ( + "Job exceeded the SageMaker MaxRuntimeInSeconds limit" + if exceeded_applicable_runtime_limit + else "SageMaker MaxRuntimeInSeconds limit was not exceeded or was not yet applicable" + ), + ] + + findings.append( + Finding( + provider="aws", + rule_id="aws.sagemaker.processing_job.long_running", + resource_type="aws.sagemaker.processing_job", + resource_id=resource_id, + region=region, + estimated_monthly_cost_usd=None, + title=_FINDING_TITLE, + summary=( + f"SageMaker processing job '{nl['processing_job_name']}' has been " + f"InProgress for {elapsed_runtime_hours} hours, exceeding the " + f"{long_running_hours_threshold}-hour threshold" + ), + reason=_FINDING_REASON, + risk=risk, + confidence=confidence, + detected_at=now, + evidence=Evidence( + signals_used=signals_used, + signals_not_checked=list(_SIGNALS_NOT_CHECKED), + time_window=f"{long_running_hours_threshold} hours", + ), + details={ + "evaluation_path": "long-running-sagemaker-processing-job-review-candidate", + "processing_job_arn": processing_job_arn, + "processing_job_name": nl["processing_job_name"], + "normalized_status": _ELIGIBLE_STATUS, + "creation_time": nl["creation_time_utc"].isoformat(), + "processing_start_time": ( + processing_start_time.isoformat() if processing_start_time else None + ), + "runtime_anchor_type": runtime_anchor_type, + "elapsed_runtime_hours": elapsed_runtime_hours, + "job_age_hours": nl["job_age_hours"], + "active_processing_hours": active_processing_hours, + "long_running_hours_threshold": long_running_hours_threshold, + "evaluation_window_start": evaluation_window_start.isoformat(), + "evaluation_window_end": now.isoformat(), + "configured_runtime_limit_seconds": nd["configured_runtime_limit_seconds"], + "applicable_runtime_limit_seconds": applicable_runtime_limit_seconds, + "unbounded_runtime_limit": unbounded_runtime_limit, + "exceeded_applicable_runtime_limit": exceeded_applicable_runtime_limit, + "instance_type": nd["instance_type"], + "instance_count": nd["instance_count"], + "is_accelerator_backed": is_accelerator_backed, + }, + ) + ) + + return findings diff --git a/cleancloud/providers/aws/scan.py b/cleancloud/providers/aws/scan.py index 03eead4..31bed8b 100644 --- a/cleancloud/providers/aws/scan.py +++ b/cleancloud/providers/aws/scan.py @@ -21,6 +21,9 @@ from cleancloud.providers.aws.rules.ai.sagemaker_notebook_idle import ( find_idle_sagemaker_notebooks, ) +from cleancloud.providers.aws.rules.ai.sagemaker_processing_job_long_running import ( + find_long_running_sagemaker_processing_jobs, +) from cleancloud.providers.aws.rules.ai.sagemaker_studio_app_idle import ( find_idle_sagemaker_studio_apps, ) @@ -76,6 +79,7 @@ "aws.bedrock.provisioned_throughput.idle": find_idle_bedrock_provisioned_throughputs, "aws.sagemaker.studio_app.idle": find_idle_sagemaker_studio_apps, "aws.sagemaker.training_job.long_running": find_long_running_sagemaker_training_jobs, + "aws.sagemaker.processing_job.long_running": find_long_running_sagemaker_processing_jobs, } AWS_RULES: List[Callable] = list(dict.fromkeys(AWS_RULE_MAP.values())) @@ -144,7 +148,7 @@ def scan_aws_with_region_selection( if include_ai: click.echo( " (Regions with EBS volumes, snapshots, logs, Elastic IPs, ENIs, RDS, " - "NAT Gateways, ELBs, SageMaker endpoints/notebooks, or Bedrock provisioned throughputs)" + "NAT Gateways, ELBs, SageMaker AI resources, or Bedrock provisioned throughputs)" ) else: click.echo( @@ -286,20 +290,29 @@ def _region_has_cleancloud_resources( # Wrapped individually so a missing permission for one service doesn't # prevent the other from being checked. if include_ai: - # 9. Check SageMaker endpoints try: sagemaker = session.client("sagemaker", region_name=region, config=BOTO_CONFIG) - endpoints = sagemaker.list_endpoints(MaxResults=1) - if endpoints.get("Endpoints"): - return True, None - notebooks = sagemaker.list_notebook_instances(MaxResults=1) - if notebooks.get("NotebookInstances"): - return True, None - apps = sagemaker.list_apps(MaxResults=1) - if apps.get("Apps"): - return True, None except Exception: - pass # no SageMaker perms or service not available in region — skip + sagemaker = None + + if sagemaker is not None: + sagemaker_probes = ( + lambda: sagemaker.list_endpoints(MaxResults=1).get("Endpoints"), + lambda: sagemaker.list_notebook_instances(MaxResults=1).get( + "NotebookInstances" + ), + lambda: sagemaker.list_apps(MaxResults=1).get("Apps"), + lambda: sagemaker.list_training_jobs(MaxResults=1).get("TrainingJobSummaries"), + lambda: sagemaker.list_processing_jobs(MaxResults=1).get( + "ProcessingJobSummaries" + ), + ) + for probe in sagemaker_probes: + try: + if probe(): + return True, None + except Exception: + pass # missing per-call permission or service issue — continue probing # 10. Check Bedrock provisioned throughputs try: diff --git a/deploy/cloudformation/cleancloud-role.yaml b/deploy/cloudformation/cleancloud-role.yaml index c874957..12af5cf 100644 --- a/deploy/cloudformation/cleancloud-role.yaml +++ b/deploy/cloudformation/cleancloud-role.yaml @@ -28,7 +28,7 @@ Parameters: Default: "false" AllowedValues: ["true", "false"] Description: > - Set to true to attach the AI/ML policy (Bedrock Provisioned Throughput, SageMaker endpoints/notebooks/Studio apps, EC2 GPU). + Set to true to attach the AI/ML policy (Bedrock Provisioned Throughput, SageMaker endpoints/notebooks/Studio apps/training jobs/processing jobs, EC2 GPU). Required for: cleancloud scan --category ai See: security/aws/ai-readonly.json @@ -143,6 +143,8 @@ Resources: - sagemaker:DescribeApp - sagemaker:ListTrainingJobs - sagemaker:DescribeTrainingJob + - sagemaker:ListProcessingJobs + - sagemaker:DescribeProcessingJob Resource: "*" - Sid: EC2GPUReadOnly Effect: Allow diff --git a/deploy/terraform/aws/main.tf b/deploy/terraform/aws/main.tf index d89083d..c1d2a1a 100644 --- a/deploy/terraform/aws/main.tf +++ b/deploy/terraform/aws/main.tf @@ -65,6 +65,8 @@ resource "aws_iam_role_policy" "cleancloud_ai" { "sagemaker:DescribeApp", "sagemaker:ListTrainingJobs", "sagemaker:DescribeTrainingJob", + "sagemaker:ListProcessingJobs", + "sagemaker:DescribeProcessingJob", ] Resource = "*" }, diff --git a/docs/aws.md b/docs/aws.md index 6feccef..235dd5d 100644 --- a/docs/aws.md +++ b/docs/aws.md @@ -309,7 +309,7 @@ For the complete production workflow with enforcement flags, scheduling, and art > |------|----------|---------------| > | `base-readonly.json` | `sts:GetCallerIdentity`, `cloudwatch:GetMetricStatistics` | **Always — every scan, every category** | > | `hygiene-readonly.json` | EC2, RDS, ELB, S3, logs | `--category hygiene` (default) | -> | `ai-readonly.json` | Bedrock Provisioned Throughput, SageMaker endpoints/notebooks/domains/Studio apps/training jobs, EC2 GPU instances, CloudWatch metrics | `--category ai` | +> | `ai-readonly.json` | Bedrock Provisioned Throughput, SageMaker endpoints/notebooks/domains/Studio apps/training jobs/processing jobs, EC2 GPU instances, CloudWatch metrics | `--category ai` | > > `base-readonly.json` must be attached alongside any category file. It provides `sts:GetCallerIdentity` (used at startup and by `doctor` to verify credentials) and shared CloudWatch metric access. Attach `hygiene-readonly.json` for the default scan path, and `ai-readonly.json` for `--category ai`. @@ -409,7 +409,7 @@ Attach this policy to your IAM role or user for the default hygiene scan path (c - Safe for production accounts - Compatible with security-reviewed pipelines -For AI/ML scans, also attach [`security/aws/ai-readonly.json`](../security/aws/ai-readonly.json). It adds permissions for Bedrock Provisioned Throughput, SageMaker endpoints, notebook instances, SageMaker Domains (`sagemaker:ListDomains`, `sagemaker:DescribeDomain`), SageMaker Studio apps (`sagemaker:ListApps`, `sagemaker:DescribeApp`), SageMaker training jobs (`sagemaker:ListTrainingJobs`, `sagemaker:DescribeTrainingJob`), EC2 GPU instances, and `cloudwatch:ListMetrics` for GPU metric discovery. +For AI/ML scans, also attach [`security/aws/ai-readonly.json`](../security/aws/ai-readonly.json). It adds permissions for Bedrock Provisioned Throughput, SageMaker endpoints, notebook instances, SageMaker Domains (`sagemaker:ListDomains`, `sagemaker:DescribeDomain`), SageMaker Studio apps (`sagemaker:ListApps`, `sagemaker:DescribeApp`), SageMaker training jobs (`sagemaker:ListTrainingJobs`, `sagemaker:DescribeTrainingJob`), SageMaker processing jobs (`sagemaker:ListProcessingJobs`, `sagemaker:DescribeProcessingJob`), EC2 GPU instances, and `cloudwatch:ListMetrics` for GPU metric discovery. --- @@ -923,7 +923,7 @@ Permissions Tested: 17/17 passed ====================================================================== ``` -**What the AI doctor adds:** Bedrock Provisioned Throughput, SageMaker endpoints, notebook instances, SageMaker Domains, SageMaker Studio apps, SageMaker training jobs, EC2 GPU inventory, and the CloudWatch permissions those AI rules need. Run it before `cleancloud scan --provider aws --category ai`. +**What the AI doctor adds:** Bedrock Provisioned Throughput, SageMaker endpoints, notebook instances, SageMaker Domains, SageMaker Studio apps, SageMaker training jobs, SageMaker processing jobs, EC2 GPU inventory, and the CloudWatch permissions those AI rules need. Run it before `cleancloud scan --provider aws --category ai`. --- diff --git a/docs/rules/aws.md b/docs/rules/aws.md index 814efe1..648d4e0 100644 --- a/docs/rules/aws.md +++ b/docs/rules/aws.md @@ -314,3 +314,16 @@ **Exclusions:** completed/stopped jobs; spot jobs use `MaxWaitTimeInSeconds`, not `MaxRuntimeInSeconds` **Spec:** [specs/aws/ai/sagemaker_training_job_long_running.md](../specs/aws/ai/sagemaker_training_job_long_running.md) + +#### `aws.sagemaker.processing_job.long_running` +**Detects:** SageMaker processing jobs still `InProgress` beyond `long_running_hours_threshold` + +**Confidence / Risk:** HIGH (elapsed time exceeds `MaxRuntimeInSeconds` after processing has started); MEDIUM (threshold exceeded without an exceeded applicable runtime limit) / HIGH (accelerator-backed: `ml.g*`, `ml.p*`, `ml.inf*`, `ml.trn*`); MEDIUM (other) + +**Permissions:** `sagemaker:ListProcessingJobs`, `sagemaker:DescribeProcessingJob` + +**Params:** `long_running_hours_threshold` (default: 24) + +**Exclusions:** completed/stopped jobs; malformed required timestamps; `ProcessingStartTime` future beyond skew; `ProcessingStartTime < CreationTime` beyond skew tolerance + +**Spec:** [specs/aws/ai/sagemaker_processing_job_long_running.md](../specs/aws/ai/sagemaker_processing_job_long_running.md) diff --git a/docs/specs/aws/ai/sagemaker_processing_job_long_running.md b/docs/specs/aws/ai/sagemaker_processing_job_long_running.md new file mode 100644 index 0000000..ebf7ceb --- /dev/null +++ b/docs/specs/aws/ai/sagemaker_processing_job_long_running.md @@ -0,0 +1,358 @@ +# aws.sagemaker.processing_job.long_running — Canonical Rule Specification + +## 1. Intent + +Detect SageMaker processing jobs that are still `InProgress` and have remained active longer than +the configured review threshold, so they can be reviewed as possible hung, stuck, or forgotten +jobs. + +This is a **CleanCloud-derived review heuristic** based on SageMaker processing-job metadata, not +an AWS-native long-running finding. It is a **read-only review-candidate rule** — not a stop-safe +rule. + +--- + +## 2. AWS API Grounding + +Based on official SageMaker processing-job API and documentation. + +### Key facts + +1. `ListProcessingJobs` is the canonical inventory API for SageMaker processing jobs and supports + pagination (max 100 per page). +2. AWS documents a `StatusEquals` / `MaxResults` interaction caveat for `ListTrainingJobs` where + `MaxResults` jobs are retrieved first and only then filtered by status. The + `ListProcessingJobs` documentation does not carry this same explicit warning. As a + conservative design choice, this rule fully paginates without `StatusEquals` and filters + client-side, to avoid any risk of silently missing `InProgress` jobs. +3. `ListProcessingJobs` returns `ProcessingJobSummary` objects including `ProcessingJobName`, + `ProcessingJobArn`, `CreationTime`, `ProcessingJobStatus`, `ProcessingEndTime`, + `LastModifiedTime`, `ExitMessage`, and `FailureReason`. +4. `ProcessingStartTime` is **only** available in `DescribeProcessingJob`, not in the list summary. +5. `DescribeProcessingJob` returns `ProcessingJobArn`, `ProcessingJobName`, + `ProcessingJobStatus`, `CreationTime`, `LastModifiedTime`, `ProcessingStartTime`, + `ProcessingEndTime`, `StoppingCondition`, `ProcessingResources`, `AppSpecification`, + `RoleArn`, `ExitMessage`, `FailureReason`, `NetworkConfig`, `Environment`, + `ExperimentConfig`, `AutoMLJobArn`, `MonitoringScheduleArn`, and `TrainingJobArn`. +6. `ProcessingStartTime` is when the processing job starts on instances. +7. `StoppingCondition.MaxRuntimeInSeconds` is the **only** stopping-condition field for processing + jobs. Min: 1, Max: 777600 (9 days). There is no `MaxWaitTimeInSeconds` or + `MaxPendingTimeInSeconds`. +8. `ProcessingResources.ClusterConfig` contains `InstanceType`, `InstanceCount`, `VolumeSizeInGB`, + and optional `VolumeKmsKeyId`. There are no `InstanceGroups` (heterogeneous clusters). +9. Processing jobs do not support managed Spot training, warm pools, serverless execution, or + secondary status. +10. `ProcessingJobStatus` values: `InProgress`, `Completed`, `Failed`, `Stopping`, `Stopped`. +11. Fixed monthly USD cost estimates are not canonical from the fetched AWS docs. + +### Implications + +- Inventory must be built by fully paginating `ListProcessingJobs` and filtering + `ProcessingJobStatus` client-side; this rule must not depend on `StatusEquals="InProgress"` as + complete inventory. +- `DescribeProcessingJob` is required for `ProcessingStartTime` (not available in list) and + authoritative runtime-state enrichment. +- `ProcessingStartTime` is the canonical active-processing runtime anchor when present. +- `CreationTime` is the canonical submission / wall-clock anchor and is used for pending or + pre-start jobs where `ProcessingStartTime` is absent. +- `MaxRuntimeInSeconds` is the configured runtime limit and becomes applicable when + `ProcessingStartTime` is present. +- `estimated_monthly_cost_usd = null`. + +--- + +## 3. Scope and Terminology + +- **Processing job** — an item returned by `ListProcessingJobs`. +- **Eligible primary status** — `ProcessingJobStatus == "InProgress"`. +- `long_running_hours_threshold` — operator-configurable integer >= 1, default 24. +- `clock_skew_tolerance_seconds` — implementation tolerance for future timestamps, recommended 300. +- **job_age_hours** — `floor((now_utc - creation_time_utc) / 3600 seconds)`. +- **active_processing_hours** — `floor((now_utc - processing_start_time_utc) / 3600 seconds)` when + `ProcessingStartTime` is present. +- **runtime_anchor**: + - `processing_start_time_utc` when present + - otherwise `creation_time_utc` +- **runtime_anchor_type**: + - `"processing_start_time"` when `ProcessingStartTime` is present + - `"creation_time"` otherwise +- **configured_runtime_limit_seconds** — `MaxRuntimeInSeconds` from `StoppingCondition` when + present and valid; otherwise `null`. This is the raw configured value, independent of whether + the job has started processing. +- **applicable_runtime_limit_seconds**: + - `configured_runtime_limit_seconds` when `ProcessingStartTime` is present (the limit applies + once processing has started) + - `null` when `ProcessingStartTime` is absent (the job has not started, so the limit is not + yet in effect) +- **unbounded_runtime_limit** — `true` when `applicable_runtime_limit_seconds == null` +- `evaluation_window_start_utc = now_utc - long_running_hours_threshold × 3600 seconds` +- `evaluation_window_end_utc = now_utc` + +### Explicit scope boundary + +This rule applies only to processing jobs whose primary status is currently `InProgress`. + +Out of scope: + +- `Completed` +- `Failed` +- `Stopping` +- `Stopped` +- exact price estimation, accrued USD estimation, or savings estimation + +--- + +## 4. Canonical Rule Statement + +A SageMaker processing job is eligible only when **all** of the following are true: + +- stable processing-job identity exists +- primary status is `InProgress` +- `CreationTime` is valid +- `runtime_anchor` is valid +- `elapsed_runtime_hours >= long_running_hours_threshold`, where: + - `elapsed_runtime_hours = active_processing_hours` when `ProcessingStartTime` is present + - otherwise `elapsed_runtime_hours = job_age_hours` + +No additional predicate may be required for baseline eligibility, including instance type, instance +count, or static cost heuristics. + +--- + +## 5. Normalization Contract + +All rule logic must operate on normalized fields only. + +### 5.1 List-Level Fields + +| Canonical field | Source field | Absent / invalid | +|---|---|---| +| `processing_job_name` | `ProcessingJobName` | skip item | +| `processing_job_arn` | `ProcessingJobArn` | skip item | +| `list_status` | `ProcessingJobStatus` | skip item | +| `creation_time_utc` | `CreationTime` (tz-aware UTC) | skip item | +| `last_modified_time_utc` | `LastModifiedTime` (tz-aware UTC) | null | + +### 5.2 Describe-Level Fields + +| Canonical field | Source field | Absent / invalid | +|---|---|---| +| `resource_id` | `ProcessingJobArn` | fall back to normalized `processing_job_arn` | +| `describe_status` | `ProcessingJobStatus` | skip item | +| `processing_start_time_utc` | `ProcessingStartTime` (tz-aware UTC) | null | +| `configured_runtime_limit_seconds` | `StoppingCondition.MaxRuntimeInSeconds` | null | +| `instance_type` | `ProcessingResources.ClusterConfig.InstanceType` | null | +| `instance_count` | `ProcessingResources.ClusterConfig.InstanceCount` | null | + +### 5.3 Derived Fields + +| Canonical field | Derivation | +|---|---| +| `job_age_hours` | floor((now_utc - creation_time_utc) / 3600) | +| `active_processing_hours` | floor((now_utc - processing_start_time_utc) / 3600) when processing start exists | +| `runtime_anchor_type` | `"processing_start_time"` when processing start exists, else `"creation_time"` | +| `elapsed_runtime_hours` | `active_processing_hours` when processing start exists, else `job_age_hours` | +| `applicable_runtime_limit_seconds` | `configured_runtime_limit_seconds` when processing start exists; else `null` | +| `unbounded_runtime_limit` | `true` when `applicable_runtime_limit_seconds` is `null` | +| `elapsed_runtime_seconds` | `(now_utc - runtime_anchor).total_seconds()` — used for runtime-limit comparison (avoids floor rounding) | +| `exceeded_applicable_runtime_limit` | `true` when `applicable_runtime_limit_seconds` is non-null and `elapsed_runtime_seconds > applicable_runtime_limit_seconds` | + +Normalization requirements: + +- String-valued fields: normalize only from non-empty strings. +- Timestamp fields: must be timezone-aware UTC before use; naive timestamps must skip the item + when required and normalize to `null` when optional. +- Future `CreationTime` or `ProcessingStartTime` beyond `clock_skew_tolerance_seconds` must skip + the item. +- `LastModifiedTime` must never cause a skip; if absent, naive, or future beyond skew tolerance, + normalize to `null`. +- `ProcessingStartTime < CreationTime` by more than `clock_skew_tolerance_seconds` must skip the + item as inconsistent timestamp state. +- `ProcessingResources` and `StoppingCondition` must degrade safely to `null` fields when absent + or malformed; optional context must not crash evaluation. + +--- + +## 6. Runtime Signal Contract + +This rule evaluates **elapsed runtime**, not true processing progress. + +### 6.1 Primary elapsed-runtime rule + +- If `ProcessingStartTime` is present, use `ProcessingStartTime` as the runtime anchor. +- If `ProcessingStartTime` is absent, use `CreationTime` as the runtime anchor. +- If `elapsed_runtime_hours >= long_running_hours_threshold`, the job is a long-running review + candidate. + +### 6.2 Stopping-condition interpretation + +- `MaxRuntimeInSeconds` is the configured runtime limit. It becomes the applicable runtime upper + bound only when `ProcessingStartTime` is present (the limit governs active processing time). +- If `ProcessingStartTime` is absent, the configured limit exists but is not yet in effect — the + job has not started actual processing. +- Runtime-limit comparison must use `elapsed_runtime_seconds` (not floored hours) to avoid a job + that is slightly over the limit appearing equal. +- If no applicable runtime limit exists, treat the job as unbounded and rely on the + threshold-only signal. +- Exceeding the applicable runtime limit is a stronger signal than threshold age alone. + +### 6.3 Explicit blind spots + +This rule does **not** prove: + +- that the processing container is hung or making no progress +- that the elapsed runtime is financially wasteful +- that a long pending state is definitely billable compute + +--- + +## 7. Pricing / Cost Boundary + +- `estimated_monthly_cost_usd = null` +- Do not hardcode instance-price tables, accrued USD estimates, or regional billing assumptions. + +--- + +## 8. Deterministic Evaluation Order + +1. Retrieve and fully paginate `ListProcessingJobs` **without** relying on `StatusEquals` for + completeness. +2. Normalize each list item. +3. For each normalized item: + - identity absent → **SKIP ITEM** + - list status absent → **SKIP ITEM** + - list status != `InProgress` → **SKIP ITEM** + - invalid / naive / future `creation_time_utc` → **SKIP ITEM** +4. Call `DescribeProcessingJob` for the candidate item. +5. Permission failure → **FAIL RULE**. +6. Non-permission describe failure (for example resource vanished between list and describe) → + **SKIP ITEM**. +7. Normalize describe fields. +8. Re-check describe status; if not `InProgress` → **SKIP ITEM**. +9. invalid / future `processing_start_time_utc` beyond skew tolerance → **SKIP ITEM**. +10. `processing_start_time_utc < creation_time_utc` beyond skew tolerance → **SKIP ITEM**. +11. Compute `elapsed_runtime_hours` from the canonical runtime anchor. +12. `elapsed_runtime_hours < long_running_hours_threshold` → **SKIP ITEM**. +13. Otherwise → **EMIT**. + +No raw AWS field access after normalization. + +--- + +## 9. Exclusion Rules + +1. identity absent (`processing_job_name` or `processing_job_arn`) → malformed inventory item +2. list or describe status absent → missing primary state +3. status not `InProgress` → out of scope +4. `CreationTime` absent / naive / future beyond skew tolerance → missing or invalid runtime anchor +5. `ProcessingStartTime` future beyond skew tolerance → invalid runtime anchor +6. `ProcessingStartTime < CreationTime` beyond skew tolerance → inconsistent timestamp state +7. `elapsed_runtime_hours < long_running_hours_threshold` → not long-running enough + +--- + +## 10. Failure Model + +**Rule-level failures (FAIL RULE):** + +- `ListProcessingJobs` request or pagination failure +- `DescribeProcessingJob` permission failure +- permission failure for required APIs + +**Item-level skips (SKIP ITEM):** + +- malformed identity or missing required timestamps +- non-`InProgress` status +- invalid timestamp relationships +- non-permission `DescribeProcessingJob` failure +- candidate below threshold + +--- + +## 11. Evidence / Details Contract + +### Required details fields + +``` +evaluation_path = "long-running-sagemaker-processing-job-review-candidate" +processing_job_arn +processing_job_name +normalized_status = "InProgress" +creation_time +processing_start_time +runtime_anchor_type +elapsed_runtime_hours +job_age_hours +active_processing_hours +long_running_hours_threshold +evaluation_window_start +evaluation_window_end +configured_runtime_limit_seconds +applicable_runtime_limit_seconds +unbounded_runtime_limit +exceeded_applicable_runtime_limit +``` + +### Optional context fields + +``` +instance_type +instance_count +is_accelerator_backed +``` + +### Required evidence wording + +**Signals used** must state: + +- processing job primary status is `InProgress` +- `elapsed_runtime_hours` met or exceeded the configured threshold +- which runtime anchor was used (`ProcessingStartTime` or `CreationTime`) +- whether the job exceeded the SageMaker `MaxRuntimeInSeconds` limit + +**Signals not checked** must state major blind spots: + +- actual processing-container progress state +- exact price impact or savings impact +- whether long pending time is actively billable compute + +--- + +## 12. Confidence Model + +| Condition | Confidence | +|---|---| +| `exceeded_applicable_runtime_limit = true` | `HIGH` | +| all other emitted findings | `MEDIUM` | + +No LOW finding should be emitted. + +--- + +## 13. Risk Model + +| Condition | Risk | +|---|---| +| `instance_type` is accelerator-backed (`ml.g*`, `ml.p*`, `ml.inf*`, `ml.trn*`) | `HIGH` | +| all other emitted findings | `MEDIUM` | + +Risk is about likely waste severity, not proof of safe interruption. + +--- + +## 14. Title and Reason Contract + +| Condition | Title | Reason | +|---|---|---| +| Long-running processing job finding | `"Long-running SageMaker processing job review candidate"` | `"InProgress SageMaker processing job has exceeded the configured long-running threshold"` | + +--- + +## 15. Non-Goals + +This rule does **not**: + +- infer exact billing from static price tables +- cover MonitoringSchedule-spawned processing jobs differently from standalone ones +- cover AutoML-spawned processing jobs differently from standalone ones +- determine whether a long-running job should be stopped automatically diff --git a/security/aws/ai-readonly.json b/security/aws/ai-readonly.json index 1e2ecae..536c843 100644 --- a/security/aws/ai-readonly.json +++ b/security/aws/ai-readonly.json @@ -41,6 +41,15 @@ ], "Resource": "*" }, + { + "Sid": "ProcessingJobReadOnly", + "Effect": "Allow", + "Action": [ + "sagemaker:ListProcessingJobs", + "sagemaker:DescribeProcessingJob" + ], + "Resource": "*" + }, { "Sid": "EC2GPUReadOnly", "Effect": "Allow", diff --git a/tests/cleancloud/providers/aws/ai/test_aws_sagemaker_processing_job_long_running.py b/tests/cleancloud/providers/aws/ai/test_aws_sagemaker_processing_job_long_running.py new file mode 100644 index 0000000..54385fb --- /dev/null +++ b/tests/cleancloud/providers/aws/ai/test_aws_sagemaker_processing_job_long_running.py @@ -0,0 +1,689 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock + +import pytest +from botocore.exceptions import BotoCoreError, ClientError + +from cleancloud.providers.aws.rules.ai.sagemaker_processing_job_long_running import ( + RULE_METADATA, + _is_accelerator_backed, + _normalize_describe, + _normalize_list_item, + find_long_running_sagemaker_processing_jobs, +) + +_REGION = "us-east-1" +_ARN_PREFIX = "arn:aws:sagemaker:us-east-1:123456789012:processing-job" + + +def _make_list_job( + name="processing-job-1", + arn=None, + age_hours=48, + status="InProgress", + lmt_hours=None, +): + now = datetime.now(timezone.utc) + item = { + "ProcessingJobName": name, + "ProcessingJobArn": arn or f"{_ARN_PREFIX}/{name}", + "ProcessingJobStatus": status, + "CreationTime": now - timedelta(hours=age_hours), + } + if lmt_hours is not None: + item["LastModifiedTime"] = now - timedelta(hours=lmt_hours) + return item + + +def _make_describe( + name="processing-job-1", + arn=None, + status="InProgress", + processing_start_hours=None, + max_runtime_seconds=None, + instance_type="ml.m5.xlarge", + instance_count=1, +): + now = datetime.now(timezone.utc) + result = { + "ProcessingJobArn": arn or f"{_ARN_PREFIX}/{name}", + "ProcessingJobName": name, + "ProcessingJobStatus": status, + "ProcessingResources": { + "ClusterConfig": { + "InstanceType": instance_type, + "InstanceCount": instance_count, + } + }, + "StoppingCondition": {}, + } + if processing_start_hours is not None: + result["ProcessingStartTime"] = now - timedelta(hours=processing_start_hours) + if max_runtime_seconds is not None: + result["StoppingCondition"]["MaxRuntimeInSeconds"] = max_runtime_seconds + return result + + +def _make_session( + jobs=None, + describe_response=None, + describe_side_effect=None, + pages=None, + list_side_effect=None, +): + sagemaker = MagicMock() + + paginator = MagicMock() + if list_side_effect is not None: + paginator.paginate.side_effect = list_side_effect + else: + paginator.paginate.return_value = pages or [{"ProcessingJobSummaries": jobs or []}] + sagemaker.get_paginator.return_value = paginator + + if describe_side_effect is not None: + sagemaker.describe_processing_job.side_effect = describe_side_effect + elif describe_response is not None: + sagemaker.describe_processing_job.return_value = describe_response + else: + sagemaker.describe_processing_job.return_value = _make_describe() + + session = MagicMock() + session.client.return_value = sagemaker + return session, sagemaker + + +def _auth_error(code="AccessDeniedException"): + return ClientError({"Error": {"Code": code, "Message": "Access denied"}}, "op") + + +def _transient_error(code="ThrottlingException"): + return ClientError({"Error": {"Code": code, "Message": "Throttled"}}, "op") + + +class TestMustEmit: + def test_basic_long_running_job_detected(self): + job = _make_list_job(age_hours=48) + desc = _make_describe(processing_start_hours=48) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs( + session, _REGION, long_running_hours_threshold=24 + ) + + assert len(findings) == 1 + f = findings[0] + assert f.rule_id == "aws.sagemaker.processing_job.long_running" + assert f.resource_type == "aws.sagemaker.processing_job" + assert f.provider == "aws" + assert f.region == _REGION + + def test_exact_threshold_emits(self): + job = _make_list_job(age_hours=24) + desc = _make_describe(processing_start_hours=24) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs( + session, _REGION, long_running_hours_threshold=24 + ) + + assert len(findings) == 1 + + def test_creation_time_anchor_when_no_processing_start(self): + job = _make_list_job(age_hours=30) + desc = _make_describe() + desc.pop("ProcessingStartTime", None) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs( + session, _REGION, long_running_hours_threshold=24 + ) + + assert len(findings) == 1 + assert findings[0].details["runtime_anchor_type"] == "creation_time" + assert findings[0].details["processing_start_time"] is None + + def test_processing_start_time_anchor(self): + job = _make_list_job(age_hours=50) + desc = _make_describe(processing_start_hours=26) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs( + session, _REGION, long_running_hours_threshold=24 + ) + + assert len(findings) == 1 + f = findings[0] + assert f.details["runtime_anchor_type"] == "processing_start_time" + assert f.details["processing_start_time"] is not None + assert f.details["active_processing_hours"] == 26 + + def test_resource_id_falls_back_to_list_arn_when_describe_arn_absent(self): + list_arn = f"{_ARN_PREFIX}/fallback-job" + job = _make_list_job(name="fallback-job", age_hours=48, arn=list_arn) + desc = _make_describe() + desc.pop("ProcessingJobArn", None) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert findings[0].resource_id == list_arn + + def test_estimated_monthly_cost_is_none(self): + job = _make_list_job(age_hours=48) + session, _ = _make_session(jobs=[job]) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert findings[0].estimated_monthly_cost_usd is None + + def test_paginated_results_across_multiple_pages(self): + job1 = _make_list_job(name="job-1", age_hours=48) + job2 = _make_list_job(name="job-2", age_hours=6) + job3 = _make_list_job(name="job-3", age_hours=72) + pages = [ + {"ProcessingJobSummaries": [job1]}, + {"ProcessingJobSummaries": [job2, job3]}, + ] + desc1 = _make_describe(name="job-1", processing_start_hours=48) + desc2 = _make_describe(name="job-2", processing_start_hours=6) + desc3 = _make_describe(name="job-3", processing_start_hours=72) + + def _describe(**kwargs): + return { + "job-1": desc1, + "job-2": desc2, + "job-3": desc3, + }[kwargs["ProcessingJobName"]] + + session, sagemaker = _make_session(pages=pages) + sagemaker.describe_processing_job.side_effect = _describe + + findings = find_long_running_sagemaker_processing_jobs( + session, _REGION, long_running_hours_threshold=24 + ) + + assert [f.details["processing_job_name"] for f in findings] == ["job-1", "job-3"] + sagemaker.get_paginator.return_value.paginate.assert_called_once_with() + + +class TestMustSkipListLevel: + def test_name_absent_skipped(self): + job = _make_list_job(age_hours=48) + del job["ProcessingJobName"] + session, _ = _make_session(jobs=[job]) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_arn_absent_skipped(self): + job = _make_list_job(age_hours=48) + del job["ProcessingJobArn"] + session, _ = _make_session(jobs=[job]) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_status_absent_skipped(self): + job = _make_list_job(age_hours=48) + del job["ProcessingJobStatus"] + session, _ = _make_session(jobs=[job]) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_non_inprogress_status_skipped(self): + for status in ("Completed", "Failed", "Stopping", "Stopped"): + job = _make_list_job(age_hours=48, status=status) + session, _ = _make_session(jobs=[job]) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_creation_time_absent_skipped(self): + job = _make_list_job(age_hours=48) + del job["CreationTime"] + session, _ = _make_session(jobs=[job]) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_creation_time_naive_skipped(self): + job = _make_list_job(age_hours=48) + job["CreationTime"] = datetime.now() - timedelta(hours=48) + session, _ = _make_session(jobs=[job]) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_creation_time_future_beyond_skew_skipped(self): + job = _make_list_job(age_hours=48) + job["CreationTime"] = datetime.now(timezone.utc) + timedelta(seconds=600) + session, _ = _make_session(jobs=[job]) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_last_modified_time_future_beyond_skew_becomes_null_not_skip(self): + job = _make_list_job(age_hours=48) + job["LastModifiedTime"] = datetime.now(timezone.utc) + timedelta(seconds=600) + session, _ = _make_session(jobs=[job]) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert len(findings) == 1 + + def test_short_job_below_threshold_skipped(self): + job = _make_list_job(age_hours=6) + desc = _make_describe() + session, _ = _make_session(jobs=[job], describe_response=desc) + + assert ( + find_long_running_sagemaker_processing_jobs( + session, _REGION, long_running_hours_threshold=24 + ) + == [] + ) + + def test_item_not_dict_skipped(self): + session, sagemaker = _make_session() + sagemaker.get_paginator.return_value.paginate.return_value = [ + {"ProcessingJobSummaries": ["not-a-dict"]} + ] + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + +class TestMustSkipDescribeLevel: + def test_describe_status_absent_skipped(self): + job = _make_list_job(age_hours=48) + desc = _make_describe() + del desc["ProcessingJobStatus"] + session, _ = _make_session(jobs=[job], describe_response=desc) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_describe_status_not_inprogress_skipped(self): + job = _make_list_job(age_hours=48) + desc = _make_describe(status="Completed") + session, _ = _make_session(jobs=[job], describe_response=desc) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_processing_start_time_naive_treated_as_null_uses_creation_time(self): + job = _make_list_job(age_hours=48) + desc = _make_describe() + desc["ProcessingStartTime"] = datetime.now() - timedelta(hours=46) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert len(findings) == 1 + assert findings[0].details["runtime_anchor_type"] == "creation_time" + + def test_processing_start_time_future_beyond_skew_skipped(self): + job = _make_list_job(age_hours=48) + desc = _make_describe() + desc["ProcessingStartTime"] = datetime.now(timezone.utc) + timedelta(seconds=600) + session, _ = _make_session(jobs=[job], describe_response=desc) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_processing_start_before_creation_beyond_skew_skipped(self): + now = datetime.now(timezone.utc) + job = _make_list_job(age_hours=48) + job["CreationTime"] = now - timedelta(hours=48) + desc = _make_describe() + desc["ProcessingStartTime"] = now - timedelta(hours=49) + session, _ = _make_session(jobs=[job], describe_response=desc) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_processing_start_within_skew_of_creation_emits(self): + now = datetime.now(timezone.utc) + job = _make_list_job(age_hours=48) + job["CreationTime"] = now - timedelta(hours=48) + desc = _make_describe() + desc["ProcessingStartTime"] = (now - timedelta(hours=48)) - timedelta(seconds=200) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert len(findings) == 1 + + def test_elapsed_below_threshold_after_describe_skipped(self): + job = _make_list_job(age_hours=50) + desc = _make_describe(processing_start_hours=12) + session, _ = _make_session(jobs=[job], describe_response=desc) + + assert ( + find_long_running_sagemaker_processing_jobs( + session, _REGION, long_running_hours_threshold=24 + ) + == [] + ) + + +class TestMustFailRule: + def test_list_permission_error_access_denied_raises(self): + session, sagemaker = _make_session() + sagemaker.get_paginator.return_value.paginate.side_effect = _auth_error() + + with pytest.raises(PermissionError, match="sagemaker:ListProcessingJobs"): + find_long_running_sagemaker_processing_jobs(session, _REGION) + + def test_list_non_permission_error_reraises(self): + session, sagemaker = _make_session() + sagemaker.get_paginator.return_value.paginate.side_effect = _transient_error() + + with pytest.raises(ClientError): + find_long_running_sagemaker_processing_jobs(session, _REGION) + + def test_list_botocore_error_reraises(self): + session, sagemaker = _make_session() + sagemaker.get_paginator.return_value.paginate.side_effect = BotoCoreError() + + with pytest.raises(BotoCoreError): + find_long_running_sagemaker_processing_jobs(session, _REGION) + + def test_describe_permission_error_raises(self): + job = _make_list_job(age_hours=48) + session, _ = _make_session( + jobs=[job], describe_side_effect=_auth_error("AccessDeniedException") + ) + + with pytest.raises(PermissionError, match="sagemaker:DescribeProcessingJob"): + find_long_running_sagemaker_processing_jobs(session, _REGION) + + +class TestDescribeSkipItem: + def test_describe_transient_error_skips_job(self): + job1 = _make_list_job(name="job-1", age_hours=48) + job2 = _make_list_job(name="job-2", age_hours=48) + desc2 = _make_describe(name="job-2", processing_start_hours=48) + + def _describe(**kwargs): + if kwargs["ProcessingJobName"] == "job-1": + raise _transient_error() + return desc2 + + session, sagemaker = _make_session(jobs=[job1, job2]) + sagemaker.describe_processing_job.side_effect = _describe + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert len(findings) == 1 + assert findings[0].details["processing_job_name"] == "job-2" + + def test_describe_resource_not_found_skips_job(self): + job = _make_list_job(age_hours=48) + err = ClientError({"Error": {"Code": "ResourceNotFound", "Message": "not found"}}, "op") + session, _ = _make_session(jobs=[job], describe_side_effect=err) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_describe_botocore_error_skips_job(self): + job = _make_list_job(age_hours=48) + session, _ = _make_session(jobs=[job], describe_side_effect=BotoCoreError()) + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + def test_describe_returns_non_dict_skips_job(self): + job = _make_list_job(age_hours=48) + session, sagemaker = _make_session(jobs=[job]) + sagemaker.describe_processing_job.return_value = "bad-response" + assert find_long_running_sagemaker_processing_jobs(session, _REGION) == [] + + +class TestRuntimeLimit: + def test_configured_limit_captured_but_not_applicable_before_processing_starts(self): + job = _make_list_job(age_hours=30) + desc = _make_describe(max_runtime_seconds=86_400) + desc.pop("ProcessingStartTime", None) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert findings[0].details["configured_runtime_limit_seconds"] == 86_400 + assert findings[0].details["applicable_runtime_limit_seconds"] is None + assert findings[0].details["unbounded_runtime_limit"] is True + + def test_processing_start_uses_configured_limit_as_applicable_limit(self): + job = _make_list_job(age_hours=50) + desc = _make_describe(processing_start_hours=30, max_runtime_seconds=86_400) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert findings[0].details["configured_runtime_limit_seconds"] == 86_400 + assert findings[0].details["applicable_runtime_limit_seconds"] == 86_400 + + def test_no_stopping_condition_is_unbounded(self): + job = _make_list_job(age_hours=48) + desc = _make_describe(processing_start_hours=48) + desc["StoppingCondition"] = {} + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert findings[0].details["applicable_runtime_limit_seconds"] is None + assert findings[0].details["unbounded_runtime_limit"] is True + + def test_stopping_condition_non_dict_degrades_safely(self): + job = _make_list_job(age_hours=48) + desc = _make_describe(processing_start_hours=48) + desc["StoppingCondition"] = "bad" + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert len(findings) == 1 + assert findings[0].details["configured_runtime_limit_seconds"] is None + + def test_exceeded_runtime_limit_uses_seconds_not_floored_hours(self): + job = _make_list_job(age_hours=30) + desc = _make_describe(max_runtime_seconds=86_400) + desc["ProcessingStartTime"] = datetime.now(timezone.utc) - timedelta(hours=24, seconds=1) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs( + session, _REGION, long_running_hours_threshold=24 + ) + + assert findings[0].details["active_processing_hours"] == 24 + assert findings[0].details["exceeded_applicable_runtime_limit"] is True + + +class TestConfidenceModel: + def test_high_confidence_when_exceeded_limit(self): + job = _make_list_job(age_hours=30) + desc = _make_describe(processing_start_hours=30, max_runtime_seconds=86_400) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert findings[0].confidence.value == "high" + + def test_medium_confidence_when_no_applicable_limit(self): + job = _make_list_job(age_hours=30) + desc = _make_describe() + desc.pop("ProcessingStartTime", None) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert findings[0].confidence.value == "medium" + + def test_medium_confidence_when_limit_not_exceeded(self): + job = _make_list_job(age_hours=30) + desc = _make_describe(processing_start_hours=30, max_runtime_seconds=604_800) + session, _ = _make_session(jobs=[job], describe_response=desc) + + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + + assert findings[0].confidence.value == "medium" + assert findings[0].details["exceeded_applicable_runtime_limit"] is False + + +class TestRiskModel: + def test_high_risk_gpu_instance(self): + job = _make_list_job(age_hours=48) + desc = _make_describe(processing_start_hours=48, instance_type="ml.p3.16xlarge") + session, _ = _make_session(jobs=[job], describe_response=desc) + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + assert findings[0].risk.value == "high" + + def test_high_risk_inf_instance(self): + job = _make_list_job(age_hours=48) + desc = _make_describe(processing_start_hours=48, instance_type="ml.inf1.xlarge") + session, _ = _make_session(jobs=[job], describe_response=desc) + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + assert findings[0].risk.value == "high" + + def test_medium_risk_cpu_instance(self): + job = _make_list_job(age_hours=48) + desc = _make_describe(processing_start_hours=48, instance_type="ml.m5.xlarge") + session, _ = _make_session(jobs=[job], describe_response=desc) + findings = find_long_running_sagemaker_processing_jobs(session, _REGION) + assert findings[0].risk.value == "medium" + + +class TestNormalizeListItem: + def _now(self): + return datetime.now(timezone.utc) + + def test_valid_item_normalizes(self): + now = self._now() + item = { + "ProcessingJobName": "job-1", + "ProcessingJobArn": f"{_ARN_PREFIX}/job-1", + "ProcessingJobStatus": "InProgress", + "CreationTime": now - timedelta(hours=24), + } + result = _normalize_list_item(item, now) + assert result is not None + assert result["processing_job_name"] == "job-1" + assert result["list_status"] == "InProgress" + assert result["job_age_hours"] == 24 + + def test_non_dict_returns_none(self): + assert _normalize_list_item("bad", datetime.now(timezone.utc)) is None + + def test_naive_creation_time_returns_none(self): + now = self._now() + item = { + "ProcessingJobName": "job-1", + "ProcessingJobArn": f"{_ARN_PREFIX}/job-1", + "ProcessingJobStatus": "InProgress", + "CreationTime": now.replace(tzinfo=None) - timedelta(hours=24), + } + assert _normalize_list_item(item, now) is None + + def test_naive_last_modified_time_is_null(self): + now = self._now() + item = { + "ProcessingJobName": "job-1", + "ProcessingJobArn": f"{_ARN_PREFIX}/job-1", + "ProcessingJobStatus": "InProgress", + "CreationTime": now - timedelta(hours=24), + "LastModifiedTime": (now - timedelta(hours=12)).replace(tzinfo=None), + } + result = _normalize_list_item(item, now) + assert result is not None + assert result["last_modified_time_utc"] is None + + def test_future_last_modified_time_is_null(self): + now = self._now() + item = { + "ProcessingJobName": "job-1", + "ProcessingJobArn": f"{_ARN_PREFIX}/job-1", + "ProcessingJobStatus": "InProgress", + "CreationTime": now - timedelta(hours=24), + "LastModifiedTime": now + timedelta(seconds=600), + } + result = _normalize_list_item(item, now) + assert result is not None + assert result["last_modified_time_utc"] is None + + +class TestNormalizeDescribe: + def _now(self): + return datetime.now(timezone.utc) + + def test_valid_response_normalizes(self): + now = self._now() + resp = { + "ProcessingJobArn": f"{_ARN_PREFIX}/job-1", + "ProcessingJobStatus": "InProgress", + "StoppingCondition": {"MaxRuntimeInSeconds": 3600}, + "ProcessingResources": { + "ClusterConfig": {"InstanceType": "ml.m5.xlarge", "InstanceCount": 1} + }, + } + result = _normalize_describe(resp, now) + assert result is not None + assert result["describe_status"] == "InProgress" + assert result["configured_runtime_limit_seconds"] == 3600 + assert result["instance_type"] == "ml.m5.xlarge" + + def test_non_dict_returns_none(self): + assert _normalize_describe("bad", datetime.now(timezone.utc)) is None + + def test_missing_status_returns_none(self): + now = self._now() + resp = {"ProcessingJobArn": f"{_ARN_PREFIX}/job-1"} + assert _normalize_describe(resp, now) is None + + def test_naive_processing_start_treated_as_null(self): + now = self._now() + resp = { + "ProcessingJobArn": f"{_ARN_PREFIX}/job-1", + "ProcessingJobStatus": "InProgress", + "ProcessingStartTime": (now - timedelta(hours=24)).replace(tzinfo=None), + } + result = _normalize_describe(resp, now) + assert result is not None + assert result["processing_start_time_utc"] is None + + def test_future_processing_start_returns_none(self): + now = self._now() + resp = { + "ProcessingJobArn": f"{_ARN_PREFIX}/job-1", + "ProcessingJobStatus": "InProgress", + "ProcessingStartTime": now + timedelta(seconds=600), + } + assert _normalize_describe(resp, now) is None + + def test_processing_resources_non_dict_degrades_to_null_fields(self): + now = self._now() + resp = { + "ProcessingJobArn": f"{_ARN_PREFIX}/job-1", + "ProcessingJobStatus": "InProgress", + "ProcessingResources": "bad", + } + result = _normalize_describe(resp, now) + assert result is not None + assert result["instance_type"] is None + assert result["instance_count"] is None + + def test_zero_max_runtime_treated_as_null(self): + now = self._now() + resp = { + "ProcessingJobArn": f"{_ARN_PREFIX}/job-1", + "ProcessingJobStatus": "InProgress", + "StoppingCondition": {"MaxRuntimeInSeconds": 0}, + } + result = _normalize_describe(resp, now) + assert result["configured_runtime_limit_seconds"] is None + + +class TestIsAcceleratorBacked: + def test_g_prefix_is_accelerator(self): + assert _is_accelerator_backed("ml.g5.xlarge") is True + + def test_p_prefix_is_accelerator(self): + assert _is_accelerator_backed("ml.p3.16xlarge") is True + + def test_inf_prefix_is_accelerator(self): + assert _is_accelerator_backed("ml.inf1.xlarge") is True + + def test_trn_prefix_is_accelerator(self): + assert _is_accelerator_backed("ml.trn1.32xlarge") is True + + def test_m_prefix_not_accelerator(self): + assert _is_accelerator_backed("ml.m5.xlarge") is False + + def test_none_not_accelerator(self): + assert _is_accelerator_backed(None) is False + + +class TestRuleMetadata: + def test_rule_id(self): + assert RULE_METADATA["id"] == "aws.sagemaker.processing_job.long_running" + + def test_category(self): + assert RULE_METADATA["category"] == "ai" + + def test_service(self): + assert RULE_METADATA["service"] == "sagemaker" + + def test_cost_impact(self): + assert RULE_METADATA["cost_impact"] == "high" diff --git a/tests/cleancloud/safety/aws/test_aws_iam_policy_parity.py b/tests/cleancloud/safety/aws/test_aws_iam_policy_parity.py index be84955..477764d 100644 --- a/tests/cleancloud/safety/aws/test_aws_iam_policy_parity.py +++ b/tests/cleancloud/safety/aws/test_aws_iam_policy_parity.py @@ -72,6 +72,9 @@ # aws.sagemaker.training_job.long_running "sagemaker:ListTrainingJobs", "sagemaker:DescribeTrainingJob", + # aws.sagemaker.processing_job.long_running + "sagemaker:ListProcessingJobs", + "sagemaker:DescribeProcessingJob", # aws.bedrock.provisioned_throughput.idle "bedrock:ListProvisionedModelThroughputs", # aws.ec2.gpu.idle diff --git a/tests/e2e/aws/test_aws_ai_rules_smoke.py b/tests/e2e/aws/test_aws_ai_rules_smoke.py index 1db603c..76b1b8a 100644 --- a/tests/e2e/aws/test_aws_ai_rules_smoke.py +++ b/tests/e2e/aws/test_aws_ai_rules_smoke.py @@ -17,6 +17,9 @@ from cleancloud.providers.aws.rules.ai.sagemaker_notebook_idle import ( find_idle_sagemaker_notebooks, ) +from cleancloud.providers.aws.rules.ai.sagemaker_processing_job_long_running import ( + find_long_running_sagemaker_processing_jobs, +) from cleancloud.providers.aws.rules.ai.sagemaker_studio_app_idle import ( find_idle_sagemaker_studio_apps, ) @@ -32,6 +35,7 @@ "aws.bedrock.provisioned_throughput.idle", "aws.sagemaker.studio_app.idle", "aws.sagemaker.training_job.long_running", + "aws.sagemaker.processing_job.long_running", } @@ -49,6 +53,7 @@ def test_aws_ai_rules_run_without_error(): find_idle_bedrock_provisioned_throughputs, find_idle_sagemaker_studio_apps, find_long_running_sagemaker_training_jobs, + find_long_running_sagemaker_processing_jobs, ] all_results = [] @@ -191,10 +196,39 @@ def test_sagemaker_training_job_long_running_returns_list_of_findings(): assert f.region == "us-east-1" assert f.detected_at and isinstance(f.detected_at, datetime) assert f.confidence.value in ("high", "medium") - assert f.risk.value in ("critical", "high", "medium") - assert "job_name" in f.details + assert f.risk.value in ("high", "medium") + assert "training_job_name" in f.details assert "instance_type" in f.details assert "instance_count" in f.details - assert "duration_hours" in f.details - assert "accrued_cost_usd" in f.details - assert "cost_basis" in f.details + assert "elapsed_runtime_hours" in f.details + assert "applicable_runtime_limit_seconds" in f.details + assert "exceeded_applicable_runtime_limit" in f.details + + +@pytest.mark.e2e +@pytest.mark.aws +def test_sagemaker_processing_job_long_running_returns_list_of_findings(): + """Smoke test: rule runs without error and returns typed findings.""" + session = boto3.Session() + try: + findings = find_long_running_sagemaker_processing_jobs(session, "us-east-1") + except PermissionError as e: + pytest.fail(f"Missing IAM permissions: {e}") + + assert isinstance(findings, list) + for f in findings: + assert isinstance(f, Finding) + assert f.rule_id == "aws.sagemaker.processing_job.long_running" + assert f.resource_type == "aws.sagemaker.processing_job" + assert f.provider == "aws" + assert f.resource_id + assert f.region == "us-east-1" + assert f.detected_at and isinstance(f.detected_at, datetime) + assert f.confidence.value in ("high", "medium") + assert f.risk.value in ("high", "medium") + assert "processing_job_name" in f.details + assert "instance_type" in f.details + assert "instance_count" in f.details + assert "elapsed_runtime_hours" in f.details + assert "applicable_runtime_limit_seconds" in f.details + assert "exceeded_applicable_runtime_limit" in f.details